# PySpark/Azure Databricks Exercise
This notebook is used to perform ingestion of raw data files into a Spark session by means of an Azure Databricks notebook environment. Here, we demonstrate the use of the methods provided in PySpark for performing analysis of tabular datasets (metadata, aggregations, etc).

In [0]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.functions import *

In [0]:
# Packages
import pyspark.sql.functions as F
import pandas as pd
import numpy as np
import os

In [0]:
# Constants
DATA_PATH = r'file://'+os.path.abspath('../data')
LOAN_PATH = os.path.join(DATA_PATH, 'loan.csv')
CREDIT_PATH = os.path.join(DATA_PATH, 'credit card.csv')
TXN_PATH = os.path.join(DATA_PATH, 'txn.csv')

# Helper functions
def clean_columns(df):
    # Removes beginning/trailing whitespace from columns in a spark dataframe
    cols = df.columns
    for c in cols:
        df = df.withColumnRenamed(c, c.strip())
    return df

## LOAN DATASET #

In [0]:
# Read in loan dataset, note that some numeric columns are recorded as strings with commas - convert these to int
loan_df = spark.read.csv(LOAN_PATH, header=True, inferSchema=True)
loan_df = clean_columns(loan_df) \
    .withColumn('Loan Amount', F.trim(F.regexp_replace(F.col('Loan Amount'),',','')).astype('int')) \
    .withColumn('Debt Record', F.trim(F.regexp_replace(F.col('Debt Record'),',','')).astype('int'))

In [0]:
# Print the schema of the loan dataset
loan_df.printSchema()

root
 |-- Customer_ID: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Occupation: string (nullable = true)
 |-- Marital Status: string (nullable = true)
 |-- Family Size: integer (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Expenditure: integer (nullable = true)
 |-- Use Frequency: integer (nullable = true)
 |-- Loan Category: string (nullable = true)
 |-- Loan Amount: integer (nullable = true)
 |-- Overdue: integer (nullable = true)
 |-- Debt Record: integer (nullable = true)
 |-- Returned Cheque: integer (nullable = true)
 |-- Dishonour of Bill: integer (nullable = true)



In [0]:
# Print the first 5 lines of the loan dataset
loan_df.show(5)

+-----------+---+------+------------+--------------+-----------+------+-----------+-------------+-------------+-----------+-------+-----------+---------------+-----------------+
|Customer_ID|Age|Gender|  Occupation|Marital Status|Family Size|Income|Expenditure|Use Frequency|Loan Category|Loan Amount|Overdue|Debt Record|Returned Cheque|Dishonour of Bill|
+-----------+---+------+------------+--------------+-----------+------+-----------+-------------+-------------+-----------+-------+-----------+---------------+-----------------+
|    IB14001| 30|  MALE|BANK MANAGER|        SINGLE|          4| 50000|      22199|            6|      HOUSING|    1000000|      5|      42898|              6|                9|
|    IB14008| 44|  MALE|   PROFESSOR|       MARRIED|          6| 51000|      19999|            4|     SHOPPING|      50000|      3|      33999|              1|                5|
|    IB14012| 30|FEMALE|     DENTIST|        SINGLE|          3| 58450|      27675|            5|   TRAVELLING

In [0]:
# Print the number of columns in the loan dataset
len(loan_df.columns)

15

In [0]:
# Print the number of rows in the loan dataset
loan_df.count()

500

In [0]:
# Print the count of distinct records in the loan dataset
loan_df.distinct().count()

500

In [0]:
# Find the number of loans in each category
loan_df \
    .groupBy('Loan Category') \
    .agg(F.count(F.lit(1)).alias('Count')) \
    .orderBy(F.desc('Count')) \
    .show()

+------------------+-----+
|     Loan Category|Count|
+------------------+-----+
|         GOLD LOAN|   77|
|           HOUSING|   67|
|        AUTOMOBILE|   60|
|        TRAVELLING|   53|
|       RESTAURANTS|   41|
|          SHOPPING|   35|
|COMPUTER SOFTWARES|   35|
|          BUSINESS|   24|
|        RESTAURANT|   20|
|  EDUCATIONAL LOAN|   20|
|   HOME APPLIANCES|   14|
|       ELECTRONICS|   14|
|           DINNING|   14|
|       AGRICULTURE|   12|
|       BOOK STORES|    7|
|          BUILDING|    7|
+------------------+-----+



In [0]:
# Find the number of people who have taken more than 1 lakh
loan_df.filter(F.col('Loan Amount')>100000).count()

450

In [0]:
# Find the number of people with income greater than 60000 rupees
loan_df.filter(F.col('Income')>60000).count()

198

In [0]:
# Find the number of people with 2 or more returned cheques and income less than 50000
loan_df.filter((F.col('Returned Cheque')>=2) & (F.col('Income')<50000)).count()

137

In [0]:
# Find the number of people with 2 or more returned cheques and are single
loan_df.filter((F.col('Returned Cheque')>=2) & (F.col('Marital Status')=='SINGLE')).count()

111

In [0]:
# Find the number of people with expenditure over 50000 a month
loan_df.filter(F.col('Expenditure')>50000).count()

6

# CREDIT CARD DATASET #

In [0]:
# Load the credit card dataset
credit_df = spark.read.csv(CREDIT_PATH, header=True, inferSchema=True)

In [0]:
# Print the schema of the credit card dataset
credit_df.printSchema()

root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



In [0]:
# Print the number of columns in the credit card dataset
len(credit_df.columns)

13

In [0]:
# Print the number of rows in the credit card dataset
credit_df.count()

10000

In [0]:
# Print the number of distinct records in the credit card dataset
credit_df.distinct().count()

10000

In [0]:
# Print the first 5 rows in the credit card dataset
credit_df.show(5)

+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+--------------+---------------+------+
|RowNumber|CustomerId| Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+--------------+---------------+------+
|        1|  15634602|Hargrave|        619|   France|Female| 42|     2|      0.0|            1|             1|      101348.88|     1|
|        2|  15647311|    Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|             1|      112542.58|     0|
|        3|  15619304|    Onio|        502|   France|Female| 42|     8| 159660.8|            3|             0|      113931.57|     1|
|        4|  15701354|    Boni|        699|   France|Female| 39|     1|      0.0|            2|             0|       93826.63|     0|
|        5|  15737888|Mitchell|        850|    Spain|Female| 4

In [0]:
credit_df.filter(F.col('CreditScore')>700).count()

3116

In [0]:
# Find the number of members who are elgible for credit card (assume this means credit score > 700)
credit_df.filter(F.col('CreditScore')>700).count()

3116

In [0]:
# Find the number of members who are  elgible and active in the bank
credit_df.filter(F.col('CreditScore')*F.col('IsActiveMember')>700).count()

1637

In [0]:
# Find the credit card users in Spain 
credit_df.filter(F.col('Geography')=='Spain').show()

+---------+----------+---------+-----------+---------+------+---+------+---------+-------------+--------------+---------------+------+
|RowNumber|CustomerId|  Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+---------+-----------+---------+------+---+------+---------+-------------+--------------+---------------+------+
|        2|  15647311|     Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|             1|      112542.58|     0|
|        5|  15737888| Mitchell|        850|    Spain|Female| 43|     2|125510.82|            1|             1|        79084.1|     0|
|        6|  15574012|      Chu|        645|    Spain|  Male| 44|     8|113755.78|            2|             0|      149756.71|     1|
|       12|  15737173|  Andrews|        497|    Spain|  Male| 24|     3|      0.0|            2|             0|       76390.01|     0|
|       15|  15600882|    Scott|        635|    Spain|F

In [0]:
# Find the credit card users with Estiamted Salary greater than 100000 and have exited the card
credit_df.filter(F.col('EstimatedSalary')*F.col('Exited')>100000).count()


1044

In [0]:
# Find the credit card users with Estiamted Salary less than 100000 and have more than 1 products
credit_df.filter((F.col('EstimatedSalary')<100000) & (F.col('NumOfProducts')>1)).count()

2432

# TRANSACTION DATASET #

In [0]:
# Load the transacton dataset
txn_df = spark.read.csv(TXN_PATH, header=True, inferSchema=True)
txn_df = clean_columns(txn_df)

In [0]:
# Print the schema of the transacton dataset
txn_df.printSchema()

root
 |-- Account No: string (nullable = true)
 |-- TRANSACTION DETAILS: string (nullable = true)
 |-- VALUE DATE: string (nullable = true)
 |-- WITHDRAWAL AMT: double (nullable = true)
 |-- DEPOSIT AMT: double (nullable = true)
 |-- BALANCE AMT: double (nullable = true)



In [0]:
#COUNT OF TRANSACTION ON EVERY ACCOUNT
txn_df \
    .groupBy('Account No') \
    .agg(F.count(F.lit(1)).alias('count')).show()

+-------------+-----+
|   Account No|count|
+-------------+-----+
|409000438611'| 4588|
|     1196711'|10536|
|     1196428'|48779|
|409000493210'| 6014|
|409000611074'| 1093|
|409000425051'|  802|
|409000405747'|   51|
|409000493201'| 1044|
|409000438620'|13454|
|409000362497'|29840|
+-------------+-----+



In [0]:
# Find the Maximum withdrawal amount for each account
txn_df \
    .groupBy('Account No') \
    .agg(F.max('WITHDRAWAL AMT').alias('MaxWithdrawl')).show()


+-------------+-------------+
|   Account No| MaxWithdrawl|
+-------------+-------------+
|409000438611'|        2.4E8|
|     1196711'|4.594475464E8|
|     1196428'|        1.5E8|
|409000493210'|        1.5E7|
|409000611074'|     912000.0|
|409000425051'|       3.54E8|
|409000405747'|        1.7E8|
|409000493201'|    2500000.0|
|409000438620'|        4.0E8|
|409000362497'|1.413662392E8|
+-------------+-------------+



In [0]:
# MINIMUM WITHDRAWAL AMOUNT OF AN ACCOUNT
txn_df \
    .groupBy('Account No') \
    .agg(F.min('WITHDRAWAL AMT').alias('MinWithdrawl')).show()

+-------------+------------+
|   Account No|MinWithdrawl|
+-------------+------------+
|409000438611'|         0.2|
|     1196711'|        0.25|
|     1196428'|        0.25|
|409000493210'|        0.01|
|409000611074'|       120.0|
|409000425051'|        1.25|
|409000405747'|        21.0|
|409000493201'|         2.1|
|409000438620'|        0.34|
|409000362497'|        0.97|
+-------------+------------+



In [0]:
#MAXIMUM DEPOSIT AMOUNT OF AN ACCOUNT
txn_df \
    .groupBy('Account No') \
    .agg(F.max('DEPOSIT AMT').alias('MaxDeposit')).show()

+-------------+-------------+
|   Account No|   MaxDeposit|
+-------------+-------------+
|409000438611'|     1.7025E8|
|     1196711'|        5.0E8|
|     1196428'|2.119594422E8|
|409000493210'|        1.5E7|
|409000611074'|    3000000.0|
|409000425051'|        1.5E7|
|409000405747'|      2.021E8|
|409000493201'|    1000000.0|
|409000438620'|      5.448E8|
|409000362497'|        2.0E8|
+-------------+-------------+



In [0]:
#MINIMUM DEPOSIT AMOUNT OF AN ACCOUNT
txn_df \
    .groupBy('Account No') \
    .agg(F.min('DEPOSIT AMT').alias('MinDeposit')).show()

+-------------+----------+
|   Account No|MinDeposit|
+-------------+----------+
|409000438611'|      0.03|
|     1196711'|      1.01|
|     1196428'|       1.0|
|409000493210'|      0.01|
|409000611074'|    1320.0|
|409000425051'|       1.0|
|409000405747'|     500.0|
|409000493201'|       0.9|
|409000438620'|      0.07|
|409000362497'|      0.03|
+-------------+----------+



In [0]:
#sum of balance in every bank account
txn_df \
    .groupBy('Account No') \
    .agg(F.sum('BALANCE AMT').alias('SumBalance')).show()

+-------------+--------------------+
|   Account No|          SumBalance|
+-------------+--------------------+
|409000438611'|-2.49486577068339...|
|     1196711'|-1.60476498101275E13|
|     1196428'| -8.1418498130721E13|
|409000493210'|-3.27584952132095...|
|409000611074'|       1.615533622E9|
|409000425051'|-3.77211841164998...|
|409000405747'|-2.43108047067000...|
|409000493201'|1.0420831829499985E9|
|409000438620'|-7.12291867951358...|
|409000362497'| -5.2860004792808E13|
+-------------+--------------------+



In [0]:
#Number of transaction on each date
txn_df \
    .groupBy('VALUE DATE') \
    .agg(F.count(F.lit(1)).alias('count')).show()


+----------+-----+
|VALUE DATE|count|
+----------+-----+
| 23-Dec-16|  143|
|  7-Feb-19|   98|
| 21-Jul-15|   80|
|  9-Sep-15|   91|
| 17-Jan-15|   16|
| 18-Nov-17|   53|
| 21-Feb-18|   77|
| 20-Mar-18|   71|
| 19-Apr-18|   71|
| 21-Jun-16|   97|
| 17-Oct-17|  101|
|  3-Jan-18|   70|
|  8-Jun-18|  223|
| 15-Dec-18|   62|
|  8-Aug-16|   97|
| 17-Dec-16|   74|
|  3-Sep-15|   83|
| 21-Jan-16|   76|
|  4-May-18|   92|
|  7-Sep-17|   94|
+----------+-----+
only showing top 20 rows



In [0]:
#List of customers with withdrawal amount more than 1 lakh
txn_df.filter(F.col('WITHDRAWAL AMT')>100000).show()


+-------------+--------------------+----------+--------------+-----------+-----------+
|   Account No| TRANSACTION DETAILS|VALUE DATE|WITHDRAWAL AMT|DEPOSIT AMT|BALANCE AMT|
+-------------+--------------------+----------+--------------+-----------+-----------+
|409000611074'|INDO GIBL Indiafo...| 16-Aug-17|      133900.0|       NULL|  8366100.0|
|409000611074'|INDO GIBL Indiafo...| 16-Aug-17|      195800.0|       NULL|  8147300.0|
|409000611074'|INDO GIBL Indiafo...| 16-Aug-17|      143800.0|       NULL|  7781600.0|
|409000611074'|INDO GIBL Indiafo...| 16-Aug-17|      331650.0|       NULL|  7449950.0|
|409000611074'|INDO GIBL Indiafo...| 16-Aug-17|      129000.0|       NULL|  7320950.0|
|409000611074'|INDO GIBL Indiafo...| 16-Aug-17|      230013.0|       NULL|  7090937.0|
|409000611074'|INDO GIBL Indiafo...| 16-Aug-17|      367900.0|       NULL|  6723037.0|
|409000611074'|INDO GIBL Indiafo...| 16-Aug-17|      108000.0|       NULL|  6615037.0|
|409000611074'|INDO GIBL Indiafo...| 16-Aug