<a href="https://colab.research.google.com/github/BestChanyanart/ANZ_Forage--R/blob/main/ANZ_Module_3_Exploring_Big_Data_with_PySpark_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Task 3 - Exploring Big Data**

Take these steps to complete this task:

    To perform the following transformation steps using the synthetic transaction file by using PySpark DataFrame. Output the results to a local file.
    Project only the records where 

        > Status=authorized AND card_present_flag=0
        > Split the long_lat and merchant_long_lat fields into long, lat and merch_long, merch_lat fields
        > Output the data as a CSV file


## Install Spark and PySpark

In [None]:
# Installing Spark 

!apt-get update                                                                          
!apt-get install openjdk-8-jdk-headless -qq > /dev/null                                  
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz 
!tar xzvf spark-3.1.2-bin-hadoop2.7.tgz                                                  
!pip install -q findspark==1.3.0    

In [2]:
# Set Environment Variable 

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [3]:
# Install PySpark 

!pip install pyspark==3.1.2

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==3.1.2
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 73 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 58.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880769 sha256=bc1f41f4bcb1ada18c0d3d93af7c1701a3cfbf0c8b8374af504c724d130aa1fe
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [None]:
# Check CPU on google colab 

!cat /proc/cpuinfo

## Download Data

In [32]:
# Download Library 

import pandas as pd 

In [33]:
# Load the Drive Helper and mount

from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [9]:
# define the path

filePath = '/content/drive/MyDrive/ANZ Virtual Internship/Dataset/'

In [34]:
# Read Data 

f = filePath + 'ANZ synthesised transaction dataset.xlsx'
data = pd.read_excel(f)

data.head(2)

Unnamed: 0,status,card_present_flag,bpay_biller_code,account,currency,long_lat,txn_description,merchant_id,merchant_code,first_name,...,age,merchant_suburb,merchant_state,extraction,amount,transaction_id,country,customer_id,merchant_long_lat,movement
0,authorized,1.0,,ACC-1598451071,AUD,153.41 -27.95,POS,81c48296-73be-44a7-befa-d053f48ce7cd,,Diana,...,26,Ashmore,QLD,2018-08-01T01:01:15.000+0000,16.25,a623070bfead4541a6b0fff8a09e706c,Australia,CUS-2487424745,153.38 -27.99,debit
1,authorized,0.0,,ACC-1598451071,AUD,153.41 -27.95,SALES-POS,830a451c-316e-4a6a-bf25-e37caedca49e,,Diana,...,26,Sydney,NSW,2018-08-01T01:13:45.000+0000,14.19,13270a2a902145da9db4c951e04b51b9,Australia,CUS-2487424745,151.21 -33.87,debit


In [35]:
data.isnull().sum()

status                   0
card_present_flag     4326
bpay_biller_code     11158
account                  0
currency                 0
long_lat                 0
txn_description          0
merchant_id           4326
merchant_code        11160
first_name               0
balance                  0
date                     0
gender                   0
age                      0
merchant_suburb       4326
merchant_state        4326
extraction               0
amount                   0
transaction_id           0
country                  0
customer_id              0
merchant_long_lat     4326
movement                 0
dtype: int64

In [12]:
data.columns

Index(['status', 'card_present_flag', 'bpay_biller_code', 'account',
       'currency', 'long_lat', 'txn_description', 'merchant_id',
       'merchant_code', 'first_name', 'balance', 'date', 'gender', 'age',
       'merchant_suburb', 'merchant_state', 'extraction', 'amount',
       'transaction_id', 'country', 'customer_id', 'merchant_long_lat',
       'movement'],
      dtype='object')

## 1. Using PySpark 

### 1.1 Initializing Spark Session

In [58]:
from pyspark.sql.types import *

mySchema = StructType([ StructField("status", StringType(), True)\
                       ,StructField("card_present_flag", FloatType(), True)\
                       ,StructField("bpay_biller_code", StringType(), True)\
                       ,StructField("account", StringType(), True)\
                       ,StructField("currency", StringType(), True)\
                       ,StructField("long_lat", StringType(), True)\
                       ,StructField("txn_description",  StringType(), True)\
                       ,StructField("merchant_id", StringType(), True)\
                       ,StructField("merchant_code", StringType(), True)\
                       ,StructField("first_name", StringType(), True)\
                       ,StructField("balance", FloatType(), True)\
                       ,StructField("date", StringType(), True)\
                       ,StructField("gender", StringType(), True)\
                       ,StructField("age", IntegerType(), True)\
                       ,StructField("merchant_suburb",  StringType(), True)\
                       ,StructField("merchant_state",  StringType(), True)\
                       ,StructField("extraction",  StringType(), True)\
                       ,StructField("amount", FloatType(), True)\
                       ,StructField("transaction_id",  StringType(), True)\
                       ,StructField("country",  StringType(), True)\
                       ,StructField("customer_id",  StringType(), True)\
                       ,StructField("merchant_long_lat",  StringType(), True)\
                       ,StructField("movement",  StringType(), True)])

In [59]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

In [60]:
dt = spark.createDataFrame(data, schema=mySchema)
dt.show(10)

+----------+-----------------+----------------+--------------+--------+-------------+---------------+--------------------+-------------+----------+-------+--------------------+------+---+---------------+--------------+--------------------+------+--------------------+---------+--------------+-----------------+--------+
|    status|card_present_flag|bpay_biller_code|       account|currency|     long_lat|txn_description|         merchant_id|merchant_code|first_name|balance|                date|gender|age|merchant_suburb|merchant_state|          extraction|amount|      transaction_id|  country|   customer_id|merchant_long_lat|movement|
+----------+-----------------+----------------+--------------+--------+-------------+---------------+--------------------+-------------+----------+-------+--------------------+------+---+---------------+--------------+--------------------+------+--------------------+---------+--------------+-----------------+--------+
|authorized|              1.0|          

In [61]:
# Replace NaN with Null 
dt = dt.replace('NaN', None)

In [40]:
# Preview
dt.show(5)

+----------+-----------------+----------------+--------------+--------+-------------+---------------+--------------------+-------------+----------+-------+--------------------+------+---+---------------+--------------+--------------------+------+--------------------+---------+--------------+-----------------+--------+
|    status|card_present_flag|bpay_biller_code|       account|currency|     long_lat|txn_description|         merchant_id|merchant_code|first_name|balance|                date|gender|age|merchant_suburb|merchant_state|          extraction|amount|      transaction_id|  country|   customer_id|merchant_long_lat|movement|
+----------+-----------------+----------------+--------------+--------+-------------+---------------+--------------------+-------------+----------+-------+--------------------+------+---+---------------+--------------+--------------------+------+--------------------+---------+--------------+-----------------+--------+
|authorized|              1.0|          

In [41]:
# To count how many completed data in each column 
dt.summary("count").show()

+-------+------+-----------------+----------------+-------+--------+--------+---------------+-----------+-------------+----------+-------+-----+------+-----+---------------+--------------+----------+------+--------------+-------+-----------+-----------------+--------+
|summary|status|card_present_flag|bpay_biller_code|account|currency|long_lat|txn_description|merchant_id|merchant_code|first_name|balance| date|gender|  age|merchant_suburb|merchant_state|extraction|amount|transaction_id|country|customer_id|merchant_long_lat|movement|
+-------+------+-----------------+----------------+-------+--------+--------+---------------+-----------+-------------+----------+-------+-----+------+-----+---------------+--------------+----------+------+--------------+-------+-----------+-----------------+--------+
|  count| 12043|            12043|             885|  12043|   12043|   12043|          12043|       7717|          883|     12043|  12043|12043| 12043|12043|           7717|          7717|     

In [42]:
# check Schema again !
dt.printSchema()

root
 |-- status: string (nullable = true)
 |-- card_present_flag: float (nullable = true)
 |-- bpay_biller_code: string (nullable = true)
 |-- account: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- long_lat: string (nullable = true)
 |-- txn_description: string (nullable = true)
 |-- merchant_id: string (nullable = true)
 |-- merchant_code: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- balance: float (nullable = true)
 |-- date: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- merchant_suburb: string (nullable = true)
 |-- merchant_state: string (nullable = true)
 |-- extraction: string (nullable = true)
 |-- amount: float (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- merchant_long_lat: string (nullable = true)
 |-- movement: string (nullable = true)



In [20]:
# To show number of Row and Column
print( (dt.count(),len(dt.columns)))

(12043, 23)


### 1.2 Tranform Data

In [62]:
# To Filter only Status=authorized AND card_present_flag=0
dt_filter = dt.where((dt['status'] == 'authorized') & (dt['card_present_flag'] == 0.0))
dt_filter.show(5)

+----------+-----------------+----------------+--------------+--------+-------------+---------------+--------------------+-------------+----------+--------+--------------------+------+---+---------------+--------------+--------------------+------+--------------------+---------+--------------+-----------------+--------+
|    status|card_present_flag|bpay_biller_code|       account|currency|     long_lat|txn_description|         merchant_id|merchant_code|first_name| balance|                date|gender|age|merchant_suburb|merchant_state|          extraction|amount|      transaction_id|  country|   customer_id|merchant_long_lat|movement|
+----------+-----------------+----------------+--------------+--------+-------------+---------------+--------------------+-------------+----------+--------+--------------------+------+---+---------------+--------------+--------------------+------+--------------------+---------+--------------+-----------------+--------+
|authorized|              0.0|       

In [65]:
# To Split Long_Lat to  Long, Lat

import pyspark.sql.functions as f

split_cols = f.split(dt_filter['long_lat'], ' ')
df1 = dt_filter.withColumn('long', split_cols.getItem(0)) \
        .withColumn('lat', split_cols.getItem(1)) 

df1.drop('long_lat').show(2)
  

+----------+-----------------+----------------+--------------+--------+---------------+--------------------+-------------+----------+-------+--------------------+------+---+---------------+--------------+--------------------+------+--------------------+---------+--------------+-----------------+--------+------+------+
|    status|card_present_flag|bpay_biller_code|       account|currency|txn_description|         merchant_id|merchant_code|first_name|balance|                date|gender|age|merchant_suburb|merchant_state|          extraction|amount|      transaction_id|  country|   customer_id|merchant_long_lat|movement|  long|   lat|
+----------+-----------------+----------------+--------------+--------+---------------+--------------------+-------------+----------+-------+--------------------+------+---+---------------+--------------+--------------------+------+--------------------+---------+--------------+-----------------+--------+------+------+
|authorized|              0.0|          

In [67]:
# To Split Merchant_Long_Lat to  merch_long, merch_lat 

split_cols_mer = f.split(df1['merchant_long_lat'], ' ')
df = df1.withColumn('merch_long', split_cols_mer.getItem(0)) \
    .withColumn('merch_lat', split_cols_mer.getItem(1))

df.drop('merchant_long_lat').show(2)

+----------+-----------------+----------------+--------------+--------+-------------+---------------+--------------------+-------------+----------+-------+--------------------+------+---+---------------+--------------+--------------------+------+--------------------+---------+--------------+--------+------+------+----------+---------+
|    status|card_present_flag|bpay_biller_code|       account|currency|     long_lat|txn_description|         merchant_id|merchant_code|first_name|balance|                date|gender|age|merchant_suburb|merchant_state|          extraction|amount|      transaction_id|  country|   customer_id|movement|  long|   lat|merch_long|merch_lat|
+----------+-----------------+----------------+--------------+--------+-------------+---------------+--------------------+-------------+----------+-------+--------------------+------+---+---------------+--------------+--------------------+------+--------------------+---------+--------------+--------+------+------+----------+

### 1.3 Save to CSV. file

In [None]:
# save in Partitioned Files (Use Multiple Worker)
df.write.csv('anz_task3.csv', header= True)

In [70]:
# forced write to 1 files in csv.
df.coalesce(1).write.csv('anz_task3_allparts.csv', header= True)