In [1]:
#Installing pyspark using terminal


In [10]:
#Importing Libraries
import pandas as pd
from pyspark.sql import*
from pyspark.sql.functions import*
from pyspark.sql import SparkSession
import os
import psycopg2


In [5]:
#Java was downloaded same with postgres jdbc file
#Setting Java Environment
os.environ['JAVA_HOME'] = r'C:\java8'

In [11]:
#Initializing my spark Session
spark = SparkSession.builder\
        .appName("Nuga Bank")\
        .config("spark.jars","postgresql-42.7.3.jar") \
        .getOrCreate()

In [12]:
spark

In [29]:
#Extracting the data into spark
df = spark.read.csv(r'dataset\rawdata\nuga_bank_transactions.csv', header=True, inferSchema=True)

In [30]:
df.show()

+--------------------+------+----------------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+------+--------------+
|    Transaction_Date|Amount|Transaction_Type|    Customer_Name|    Customer_Address|       Customer_City|Customer_State|    Customer_Country|             Company|           Job_Title|               Email|       Phone_Number| Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|Gender|Marital_Status|
+--------------------+------+----------------+-----------------+--------------------+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+-------------------+-------

In [31]:
df.printSchema()

root
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)
 |-- Customer_Name: string (nullable = true)
 |-- Customer_Address: string (nullable = true)
 |-- Customer_City: string (nullable = true)
 |-- Customer_State: string (nullable = true)
 |-- Customer_Country: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Job_Title: string (nullable = true)
 |-- Email: string (nullable = true)
 |-- Phone_Number: string (nullable = true)
 |-- Credit_Card_Number: long (nullable = true)
 |-- IBAN: string (nullable = true)
 |-- Currency_Code: string (nullable = true)
 |-- Random_Number: double (nullable = true)
 |-- Category: string (nullable = true)
 |-- Group: string (nullable = true)
 |-- Is_Active: string (nullable = true)
 |-- Last_Updated: timestamp (nullable = true)
 |-- Description: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Marital_Status: string (nullable = true)

In [32]:
#Cleaning and Transformation
#Checking for null
for column in df.columns:
    print(column, 'Nulls: ', df.filter(df[column].isNull()).count())


Transaction_Date Nulls:  0
Amount Nulls:  0
Transaction_Type Nulls:  0
Customer_Name Nulls:  100425
Customer_Address Nulls:  100087
Customer_City Nulls:  100034
Customer_State Nulls:  100009
Customer_Country Nulls:  100672
Company Nulls:  100295
Job_Title Nulls:  99924
Email Nulls:  100043
Phone_Number Nulls:  100524
Credit_Card_Number Nulls:  100085
IBAN Nulls:  100300
Currency_Code Nulls:  99342
Random_Number Nulls:  99913
Category Nulls:  100332
Group Nulls:  100209
Is_Active Nulls:  100259
Last_Updated Nulls:  100321
Description Nulls:  100403
Gender Nulls:  99767
Marital_Status Nulls:  99904


In [33]:
#Checking summary statistics
df.describe().show()

+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+-------------------+--------------------+--------------------+-------------+-----------------+--------+------+---------+--------------------+------+--------------+
|summary|            Amount|Transaction_Type|Customer_Name|    Customer_Address|Customer_City|Customer_State|Customer_Country|      Company|         Job_Title|              Email|       Phone_Number|  Credit_Card_Number|                IBAN|Currency_Code|    Random_Number|Category| Group|Is_Active|         Description|Gender|Marital_Status|
+-------+------------------+----------------+-------------+--------------------+-------------+--------------+----------------+-------------+------------------+-------------------+-------------------+--------------------+--------------------+-------------+-----------------+--------+------+---------+---------------

In [35]:
#Filling up Nulls
df1 = df.fillna({
    'Customer_Name': 'Unknown',
    'Customer_Address': 'Unknown',
    'Customer_City': 'Unknown',
    'Customer_State': 'Unknown',
    'Customer_Country': 'Unknown',
    'Company': 'Unknown',
    'Job_Title': 'Unknown',
    'Email': 'Unknown',
    'Phone_Number': 'Unknown',
    'Credit_Card_Number': 0,
    'IBAN': 'Unknown',
    'Currency_Code': 'Unknown',
    'Random_Number': 0.0,
    'Category': 'Unknown',
    'Group': 'Unknown',
    'Is_Active': 'Unknown',
    'Description': 'Unknown',
    'Gender': 'Unknown',
    'Marital_Status': 'Unknown',
     })


In [36]:
#Dropping missing values for Last_Updated
df1 = df1.na.drop(subset=['Last_Updated'])

In [37]:
#ReChecking for null
for column in df1.columns:
    print(column, 'Nulls: ', df1.filter(df1[column].isNull()).count())


Transaction_Date Nulls:  0
Amount Nulls:  0
Transaction_Type Nulls:  0
Customer_Name Nulls:  0
Customer_Address Nulls:  0
Customer_City Nulls:  0
Customer_State Nulls:  0
Customer_Country Nulls:  0
Company Nulls:  0
Job_Title Nulls:  0
Email Nulls:  0
Phone_Number Nulls:  0
Credit_Card_Number Nulls:  0
IBAN Nulls:  0
Currency_Code Nulls:  0
Random_Number Nulls:  0
Category Nulls:  0
Group Nulls:  0
Is_Active Nulls:  0
Last_Updated Nulls:  0
Description Nulls:  0
Gender Nulls:  0
Marital_Status Nulls:  0


In [39]:
#Creating the tables,dataframes
#transaction table
transactions = df1.select('Transaction_Date',  'Amount','Transaction_Type')\
                  .withColumn('Transaction_ID', monotonically_increasing_id())\
                  .select('Transaction_ID','Transaction_Date',  'Amount','Transaction_Type')
transactions.show(5)

+--------------+--------------------+------+----------------+
|Transaction_ID|    Transaction_Date|Amount|Transaction_Type|
+--------------+--------------------+------+----------------+
|             0|2024-03-23 15:38:...| 34.76|      Withdrawal|
|             1|2024-04-22 19:15:...|163.92|      Withdrawal|
|             2|2024-04-12 19:46:...|386.32|      Withdrawal|
|             3|2024-04-17 15:29:...|407.15|         Deposit|
|             4|2024-02-10 01:51:...|161.31|         Deposit|
+--------------+--------------------+------+----------------+
only showing top 5 rows



In [41]:
#customers table
customers = df1.select('Customer_Name','Customer_Address','Customer_City','Customer_State','Customer_Country')\
                  .withColumn('Customer_ID', monotonically_increasing_id())\
                  .select('Customer_ID','Customer_Name','Customer_Address','Customer_City','Customer_State','Customer_Country')
customers.show(5)

+-----------+--------------+--------------------+------------------+--------------+--------------------+
|Customer_ID| Customer_Name|    Customer_Address|     Customer_City|Customer_State|    Customer_Country|
+-----------+--------------+--------------------+------------------+--------------+--------------------+
|          0|    James Neal|54912 Holmes Lodg...| West Keithborough|       Florida|                Togo|
|          1|   Thomas Long| 1133 Collin Passage|        Joshuabury|   Connecticut|Lao People's Demo...|
|          2|Ashley Shelton|5297 Johnson Port...|       North Maria|    New Jersey|              Bhutan|
|          3| James Rosario|56955 Moore Glens...|North Michellefurt|    New Mexico|             Iceland|
|          4|Miguel Leonard|262 Beck Expressw...|           Unknown| West Virginia|             Eritrea|
+-----------+--------------+--------------------+------------------+--------------+--------------------+
only showing top 5 rows



In [43]:
#employees table
employees = df1.select('Company','Job_Title','Email','Phone_Number','Gender','Marital_Status')\
                  .withColumn('Employee_ID', monotonically_increasing_id())\
                  .select('Employee_ID','Company','Job_Title','Email','Phone_Number','Gender','Marital_Status')
employees.show(5)

+-----------+--------------------+--------------------+--------------------+-------------------+-------+--------------+
|Employee_ID|             Company|           Job_Title|               Email|       Phone_Number| Gender|Marital_Status|
+-----------+--------------------+--------------------+--------------------+-------------------+-------+--------------+
|          0|Benson, Johnson a...|             Unknown|             Unknown|  493.720.6609x7545|  Other|      Divorced|
|          1|             Unknown|   Food technologist|michellelynch@exa...|      (497)554-3317| Female|       Married|
|          2|       Jones-Mueller|Database administ...| ljordan@example.org|      (534)769-3072|  Other|       Unknown|
|          3|       Vargas-Harris|Horticultural the...|parkerjames@examp...|+1-447-900-1320x257|Unknown|       Unknown|
|          4|Richardson, Gonza...|   Minerals surveyor| zweaver@example.net|            Unknown| Female|       Married|
+-----------+--------------------+------

In [45]:
#nuga_bank_fact
fact_table = df1.join(transactions,['Transaction_Date',  'Amount','Transaction_Type'],'inner')\
                .join(customers,['Customer_Name','Customer_Address','Customer_City','Customer_State','Customer_Country'])\
                .join(employees,['Company','Job_Title','Email','Phone_Number','Gender','Marital_Status'])\
                .select('Transaction_ID','Customer_ID','Employee_ID','Credit_Card_Number','IBAN','Currency_Code','Random_Number',\
                        'Category','Group','Is_Active','Last_Updated','Description')
fact_table.show(5)

+--------------+-----------+-----------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|Transaction_ID|Customer_ID|Employee_ID|Credit_Card_Number|                IBAN|Currency_Code|Random_Number|Category|Group|Is_Active|        Last_Updated|         Description|
+--------------+-----------+-----------+------------------+--------------------+-------------+-------------+--------+-----+---------+--------------------+--------------------+
|   34359813601|34359813601|34359813601|  4625846628264239|GB97NDWP370396356...|          BSD|       6117.0| Unknown|    X|      Yes|2023-02-14 08:41:...|Onto give past no...|
|    8589962116| 8589962116| 8589962116|    30167114834907|GB09TXTK261344054...|          PHP|          0.0|       C|    Z|      Yes|2022-11-03 23:14:...|Sport internation...|
|   85899356804|85899356804|85899356804|  3521456466352539|GB17MFSX600590853...|          JEP|       3308.0|       C|   

In [47]:
#Loading into Postgre SQL
#Creating a function
def get_db_connection():
    connection = psycopg2.connect (
        user = 'postgres',
        password = '',
        host = 'localhost',
        port = '5432',
        database = 'Nuga_Bank_PySpark'
    )
    return connection
#connecting to sql database
conn = get_db_connection()

In [69]:
transactions.printSchema()

root
 |-- Transaction_ID: long (nullable = false)
 |-- Transaction_Date: timestamp (nullable = true)
 |-- Amount: double (nullable = true)
 |-- Transaction_Type: string (nullable = true)



In [70]:
#Creating a function to create tables
def create_table():
    conn = get_db_connection()
    cursor = conn.cursor()
    create_table_query = '''
                        DROP TABLE IF EXISTS transactions;
                        DROP TABLE IF EXISTS customers;
                        DROP TABLE IF EXISTS employees;
                        DROP TABLE IF EXISTS fact_table;

                        CREATE TABLE transactions (
                        Transaction_ID BIGINT ,
                        Transaction_Date DATE,
                        Amount FLOAT,
                        Transaction_Type VARCHAR (10000)
                        );
                        
                        
                        CREATE TABLE customers (
                        Customer_ID BIGINT ,
                        Customer_Name VARCHAR (10000),
                        Customer_Address VARCHAR (10000),
                        Customer_City VARCHAR (10000),
                        Customer_State VARCHAR (10000),
                        Customer_Country VARCHAR (10000)
                        );

                        CREATE TABLE employees (
                        Employee_ID BIGINT ,
                        Company VARCHAR (10000),
                        Job_Title VARCHAR (10000),
                        Email VARCHAR (10000),
                        Phone_Number VARCHAR (10000),
                        Gender VARCHAR (10000),
                        Marital_Status VARCHAR (10000)
                        );

                        CREATE TABLE fact_table (
                        Transaction_ID BIGINT ,
                        Customer_ID BIGINT ,
                        Employee_ID BIGINT ,
                        Credit_Card_Number BIGINT ,
                        IBAN VARCHAR(1000) ,
                        Currency_Code VARCHAR(10000),
                        Random_Number FLOAT ,
                        Category VARCHAR(10000),
                        "Group" VARCHAR(10000),
                        Is_Active VARCHAR(10000),
                        Last_Updated DATE,
                        Description VARCHAR(10000)
                        );

                        
                        '''
    cursor.execute(create_table_query)
    conn.commit()
    cursor.close()
    conn.close()

In [71]:
create_table()

In [72]:
#building url for loading the data
url = "jdbc:postgresql://localhost:5432/Nuga_Bank_PySpark"
properties = {
    "user": "postgres",
    "password": "",
    "driver": "org.postgresql.Driver"
}
customers.write.jdbc(url=url,table="customers", mode="append",properties = properties)

In [73]:
employees.write.jdbc(url=url,table="employees", mode="append",properties = properties)

In [74]:
transactions.write.jdbc(url=url,table="transactions", mode="append",properties = properties)



In [75]:
fact_table.write.jdbc(url=url,table="fact_table", mode="append",properties = properties)

In [None]:
#Automating the process using.py