In [2]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m17.8 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m10.1 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812366 sha256=82faf4f43ef13c44e548fcbd64406c9eca9c93b173b4bcf97713a349f1ca0176
  Stored in directory: /Users/aravindh/Library/Caches/pip/wheels/9d/29/ee/3a756632ca3f0a6870933bac1c9db6e4af2c068f019aba0ee1
Successfully built pyspark
Installing collected pack

In [1]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, sum, min, max
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

In [2]:
import sys
sys.path.append("/path/to/spark/python")
sys.path.append("/path/to/spark/python/lib/py4j-<version>-src.zip")

In [3]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("BankingAnalysis") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/17 07:17:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# Define the schema for the dataset
schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("surname", StringType(), True),
    StructField("credit_score", IntegerType(), True),
    StructField("geography", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("tenure", IntegerType(), True),
    StructField("balance", DoubleType(), True),
    StructField("num_of_products", IntegerType(), True),
    StructField("has_credit_card", IntegerType(), True),
    StructField("estimated_salary", DoubleType(), True),
    StructField("exited", IntegerType(), True)
])

In [5]:
#Read the CSV file into a DataFrame
df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load("/Users/aravindh/Desktop/Data Engineering/Azure Mini Project/springboard-pyspark-project/pyspark-project/credit card.csv")

In [6]:
# Create a BankingAnalysis class
class BankingAnalysis:
    def __init__(self, dataframe):
        self.df = dataframe
    
    def total_customers(self):
        return self.df.count()
    
    def customers_by_geography(self):
        return self.df.groupBy("geography").agg(count("*").alias("num_customers"))
    
    def customers_by_gender(self):
        return self.df.groupBy("gender").agg(count("*").alias("num_customers"))
    
    def avg_age_by_geography(self):
        return self.df.groupBy("geography").agg(avg("age").alias("avg_age"))
    
    def avg_balance_by_geography(self):
        return self.df.groupBy("geography").agg(avg("balance").alias("avg_balance"))
    
    def min_max_tenure(self):
        return self.df.agg(min("tenure").alias("min_tenure"), max("tenure").alias("max_tenure"))
    
    def num_credit_card_holders(self):
        return self.df.filter(col("has_credit_card") == 1).count()
    
    def avg_salary_by_gender(self):
        return self.df.groupBy("gender").agg(avg("estimated_salary").alias("avg_salary"))
    
    def num_exited_customers(self):
        return self.df.filter(col("exited") == 1).count()
    
    def total_balance(self):
        return self.df.agg(sum("balance").alias("total_balance")).collect()[0][0]

In [7]:
# Create an instance of the BankingAnalysis class
analysis = BankingAnalysis(df)

In [8]:
# Perform analysis and print the results
print("Total number of customers:", analysis.total_customers())
print("Number of customers by geography:")
analysis.customers_by_geography().show()
print("Number of customers by gender:")
analysis.customers_by_gender().show()
print("Average age by geography:")
analysis.avg_age_by_geography().show()
print("Average balance by geography:")
analysis.avg_balance_by_geography().show()
print("Minimum and maximum tenure:", analysis.min_max_tenure().collect())
print("Number of credit card holders:", analysis.num_credit_card_holders())
print("Average salary by gender:")
analysis.avg_salary_by_gender().show()
print("Number of customers who have exited:", analysis.num_exited_customers())
print("Total balance across all customers:", analysis.total_balance())

Total number of customers: 10000
Number of customers by geography:


24/09/17 07:17:08 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: CreditScore
 Schema: geography
Expected: geography but found: CreditScore
CSV file: file:///Users/aravindh/Desktop/Data%20Engineering/Azure%20Mini%20Project/springboard-pyspark-project/pyspark-project/credit%20card.csv


+---------+-------------+
|geography|num_customers|
+---------+-------------+
|      829|            8|
|      675|           37|
|      691|           34|
|      467|            4|
|      800|           10|
|      451|            5|
|      666|           38|
|      591|           31|
|      447|            4|
|      574|           21|
|      475|            6|
|      718|           38|
|      613|           42|
|      577|           34|
|      581|           38|
|      544|           25|
|      747|           22|
|      740|           19|
|      647|           31|
|      711|           39|
+---------+-------------+
only showing top 20 rows

Number of customers by gender:
+-------+-------------+
| gender|num_customers|
+-------+-------------+
|Germany|         2509|
| France|         5014|
|  Spain|         2477|
+-------+-------------+

Average age by geography:


24/09/17 07:17:09 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Geography
 Schema: gender
Expected: gender but found: Geography
CSV file: file:///Users/aravindh/Desktop/Data%20Engineering/Azure%20Mini%20Project/springboard-pyspark-project/pyspark-project/credit%20card.csv
24/09/17 07:17:09 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: CreditScore, Gender
 Schema: geography, age
Expected: geography but found: CreditScore
CSV file: file:///Users/aravindh/Desktop/Data%20Engineering/Azure%20Mini%20Project/springboard-pyspark-project/pyspark-project/credit%20card.csv


CodeCache: size=131072Kb used=22384Kb max_used=22384Kb free=108687Kb
 bounds [0x00000001068c4000, 0x0000000107ec4000, 0x000000010e8c4000]
 total_blobs=8898 nmethods=7961 adapters=850
 compilation: disabled (not enough contiguous free space left)
+---------+-------+
|geography|avg_age|
+---------+-------+
|      829|   NULL|
|      675|   NULL|
|      691|   NULL|
|      467|   NULL|
|      800|   NULL|
|      451|   NULL|
|      666|   NULL|
|      591|   NULL|
|      447|   NULL|
|      574|   NULL|
|      475|   NULL|
|      718|   NULL|
|      613|   NULL|
|      577|   NULL|
|      581|   NULL|
|      544|   NULL|
|      747|   NULL|
|      740|   NULL|
|      647|   NULL|
|      711|   NULL|
+---------+-------+
only showing top 20 rows

Average balance by geography:


24/09/17 07:17:10 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: CreditScore, Tenure
 Schema: geography, balance
Expected: geography but found: CreditScore
CSV file: file:///Users/aravindh/Desktop/Data%20Engineering/Azure%20Mini%20Project/springboard-pyspark-project/pyspark-project/credit%20card.csv


+---------+------------------+
|geography|       avg_balance|
+---------+------------------+
|      829|              6.25|
|      675| 5.216216216216216|
|      691| 5.617647058823529|
|      467|               6.5|
|      800|               4.5|
|      451|               6.6|
|      666|4.7105263157894735|
|      591| 5.419354838709677|
|      447|               5.0|
|      574| 4.333333333333333|
|      475| 5.333333333333333|
|      718|5.2894736842105265|
|      613| 5.761904761904762|
|      577| 4.823529411764706|
|      581| 4.026315789473684|
|      544|              5.24|
|      747| 5.454545454545454|
|      740| 5.473684210526316|
|      647| 4.903225806451613|
|      711| 5.051282051282051|
+---------+------------------+
only showing top 20 rows

Minimum and maximum tenure: [Row(min_tenure=18, max_tenure=92)]


24/09/17 07:17:10 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Age
 Schema: tenure
Expected: tenure but found: Age
CSV file: file:///Users/aravindh/Desktop/Data%20Engineering/Azure%20Mini%20Project/springboard-pyspark-project/pyspark-project/credit%20card.csv
24/09/17 07:17:10 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: NumOfProducts
 Schema: has_credit_card
Expected: has_credit_card but found: NumOfProducts
CSV file: file:///Users/aravindh/Desktop/Data%20Engineering/Azure%20Mini%20Project/springboard-pyspark-project/pyspark-project/credit%20card.csv


Number of credit card holders: 5084
Average salary by gender:
+-------+-------------------+
| gender|         avg_salary|
+-------+-------------------+
|Germany|0.49740932642487046|
| France| 0.5167530913442362|
|  Spain| 0.5296729915220024|
+-------+-------------------+



24/09/17 07:17:10 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Geography, IsActiveMember
 Schema: gender, estimated_salary
Expected: gender but found: Geography
CSV file: file:///Users/aravindh/Desktop/Data%20Engineering/Azure%20Mini%20Project/springboard-pyspark-project/pyspark-project/credit%20card.csv
24/09/17 07:17:10 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: EstimatedSalary
 Schema: exited
Expected: exited but found: EstimatedSalary
CSV file: file:///Users/aravindh/Desktop/Data%20Engineering/Azure%20Mini%20Project/springboard-pyspark-project/pyspark-project/credit%20card.csv


Number of customers who have exited: 0
Total balance across all customers: 50128.0


24/09/17 07:17:10 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Tenure
 Schema: balance
Expected: balance but found: Tenure
CSV file: file:///Users/aravindh/Desktop/Data%20Engineering/Azure%20Mini%20Project/springboard-pyspark-project/pyspark-project/credit%20card.csv


In [12]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("PrintLoanDataset") \
    .getOrCreate()

# Read the CSV file into a DataFrame
df = spark.read.csv("/Users/aravindh/Desktop/Data Engineering/Azure Mini Project/springboard-pyspark-project/pyspark-project/loan.csv", header=True, inferSchema=True)

In [11]:
# Print the first 5 lines of the DataFrame
df.show(5)

+-----------+---+------+------------+--------------+-----------+------+-----------+-------------+-------------+-----------+-------+------------+----------------+------------------+
|Customer_ID|Age|Gender|  Occupation|Marital Status|Family Size|Income|Expenditure|Use Frequency|Loan Category|Loan Amount|Overdue| Debt Record| Returned Cheque| Dishonour of Bill|
+-----------+---+------+------------+--------------+-----------+------+-----------+-------------+-------------+-----------+-------+------------+----------------+------------------+
|    IB14001| 30|  MALE|BANK MANAGER|        SINGLE|          4| 50000|      22199|            6|      HOUSING| 10,00,000 |      5|      42,898|               6|                 9|
|    IB14008| 44|  MALE|   PROFESSOR|       MARRIED|          6| 51000|      19999|            4|     SHOPPING|     50,000|      3|      33,999|               1|                 5|
|    IB14012| 30|FEMALE|     DENTIST|        SINGLE|          3| 58450|      27675|            

In [13]:
# Print the number of rows in the loan dataset
print("Number of rows:", df.count())

Number of rows: 500


In [14]:
# Print the count of distinct records in the loan dataset
print("Number of distinct records:", df.distinct().count())

Number of distinct records: 500


In [15]:
# Find the number of loans in each category
df.groupBy("Loan Category").count().show()

+------------------+-----+
|     Loan Category|count|
+------------------+-----+
|           HOUSING|   67|
|        TRAVELLING|   53|
|       BOOK STORES|    7|
|       AGRICULTURE|   12|
|         GOLD LOAN|   77|
|  EDUCATIONAL LOAN|   20|
|        AUTOMOBILE|   60|
|          BUSINESS|   24|
|COMPUTER SOFTWARES|   35|
|           DINNING|   14|
|          SHOPPING|   35|
|       RESTAURANTS|   41|
|       ELECTRONICS|   14|
|          BUILDING|    7|
|        RESTAURANT|   20|
|   HOME APPLIANCES|   14|
+------------------+-----+



In [16]:
# Find the number of people who have taken more than 1 lack loan
count = df.filter(df["Loan Amount"] > 100000).count()
print("Number of people with loan amount greater than 1 lack:", count)

Number of people with loan amount greater than 1 lack: 0


In [17]:
# Find the number of people with income greater than 60000 rupees
count = df.filter(df["Income"] > 60000).count()
print("Number of people with income greater than 60000 rupees:", count)

Number of people with income greater than 60000 rupees: 198


In [55]:
count = df.filter((df[" Returned Cheque"] >= 2) & (df["Income"] < 50000)).count()
print("Number of people with 2 or more returned cheques and income less than 50000:", count)

Number of people with 2 or more returned cheques and income less than 50000: 137


In [56]:
# Find the number of people with 2 or more returned cheques and are single
count = df.filter((df[" Returned Cheque"] >= 2) & (df["Marital Status"] == "SINGLE")).count()
print("Number of people with 2 or more returned cheques and are single:", count)

Number of people with 2 or more returned cheques and are single: 111


In [23]:
# Find the number of people with expenditure over 50000 a month
count = df.filter(df["Expenditure"] > 50000).count()
print("Number of people with expenditure over 50000 a month:", count)

Number of people with expenditure over 50000 a month: 6


In [26]:
credit_card_df = spark.read.csv("/Users/aravindh/Desktop/Data Engineering/Azure Mini Project/springboard-pyspark-project/pyspark-project/credit card.csv", header=True, inferSchema=True)

In [27]:
credit_card_df.printSchema()

root
 |-- RowNumber: integer (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- Surname: string (nullable = true)
 |-- CreditScore: integer (nullable = true)
 |-- Geography: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Tenure: integer (nullable = true)
 |-- Balance: double (nullable = true)
 |-- NumOfProducts: integer (nullable = true)
 |-- IsActiveMember: integer (nullable = true)
 |-- EstimatedSalary: double (nullable = true)
 |-- Exited: integer (nullable = true)



In [28]:
print("Number of columns:", len(credit_card_df.columns))

Number of columns: 13


In [29]:
print("Number of rows:", credit_card_df.count())

Number of rows: 10000


In [30]:
print("Number of distinct records:", credit_card_df.distinct().count())

Number of distinct records: 10000


In [31]:
credit_card_df.show(5)

+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+--------------+---------------+------+
|RowNumber|CustomerId| Surname|CreditScore|Geography|Gender|Age|Tenure|  Balance|NumOfProducts|IsActiveMember|EstimatedSalary|Exited|
+---------+----------+--------+-----------+---------+------+---+------+---------+-------------+--------------+---------------+------+
|        1|  15634602|Hargrave|        619|   France|Female| 42|     2|      0.0|            1|             1|      101348.88|     1|
|        2|  15647311|    Hill|        608|    Spain|Female| 41|     1| 83807.86|            1|             1|      112542.58|     0|
|        3|  15619304|    Onio|        502|   France|Female| 42|     8| 159660.8|            3|             0|      113931.57|     1|
|        4|  15701354|    Boni|        699|   France|Female| 39|     1|      0.0|            2|             0|       93826.63|     0|
|        5|  15737888|Mitchell|        850|    Spain|Female| 4

In [35]:
eligible_members = credit_card_df.filter((col("Age") >= 18) & (col("EstimatedSalary") > 30000)).count()
print("Number of members eligible for credit card:", eligible_members)

Number of members eligible for credit card: 8522


In [39]:
eligible_active_members = credit_card_df.filter(
    (col("Age") >= 18) & 
    (col("EstimatedSalary") > 30000) & 
    (col("IsActiveMember") == 1)
).count()
print("Number of members eligible and active in the bank:", eligible_active_members)

Number of members eligible and active in the bank: 4379


In [37]:
users_salary_gt_100k_exited = credit_card_df.filter((col("EstimatedSalary") > 100000) & (col("Exited") == 1)).count()
print("Number of credit card users with Estimated Salary greater than 100000 and have exited the card:", users_salary_gt_100k_exited)

Number of credit card users with Estimated Salary greater than 100000 and have exited the card: 1044


In [38]:
users_salary_lt_100k_multiple_products = credit_card_df.filter((col("EstimatedSalary") < 100000) & (col("NumOfProducts") > 1)).count()
print("Number of credit card users with Estimated Salary less than 100000 and have more than 1 products:", users_salary_lt_100k_multiple_products)

Number of credit card users with Estimated Salary less than 100000 and have more than 1 products: 2432


In [40]:
# Load the transaction dataset
transaction_df = spark.read.csv("/Users/aravindh/Desktop/Data Engineering/Azure Mini Project/springboard-pyspark-project/pyspark-project/txn.csv", header=True, inferSchema=True)

In [44]:
# Find the maximum withdrawal amount for each account
max_withdrawal_df = transaction_df.groupBy("Account No").agg(max(" WITHDRAWAL AMT ").alias("max_withdrawal_amount"))
# Show the result
max_withdrawal_df.show()

+-------------+---------------------+
|   Account No|max_withdrawal_amount|
+-------------+---------------------+
|409000438611'|                2.4E8|
|     1196711'|        4.594475464E8|
|     1196428'|                1.5E8|
|409000493210'|                1.5E7|
|409000611074'|             912000.0|
|409000425051'|               3.54E8|
|409000405747'|                1.7E8|
|409000493201'|            2500000.0|
|409000438620'|                4.0E8|
|409000362497'|        1.413662392E8|
+-------------+---------------------+



In [49]:
# Find the maximum deposit amount of an account
max_deposit_df = transaction_df.groupBy("Account No").agg(max(" DEPOSIT AMT ").alias("max_deposit_amount"))
max_deposit_df.show()

+-------------+------------------+
|   Account No|max_deposit_amount|
+-------------+------------------+
|409000438611'|          1.7025E8|
|     1196711'|             5.0E8|
|     1196428'|     2.119594422E8|
|409000493210'|             1.5E7|
|409000611074'|         3000000.0|
|409000425051'|             1.5E7|
|409000405747'|           2.021E8|
|409000493201'|         1000000.0|
|409000438620'|           5.448E8|
|409000362497'|             2.0E8|
+-------------+------------------+



In [46]:
# Find the minimum deposit amount of an account
min_deposit_df = transaction_df.groupBy("Account No").agg(min(" DEPOSIT AMT ").alias("min_deposit_amount"))
min_deposit_df.show()

+-------------+------------------+
|   Account No|min_deposit_amount|
+-------------+------------------+
|409000438611'|              0.03|
|     1196711'|              1.01|
|     1196428'|               1.0|
|409000493210'|              0.01|
|409000611074'|            1320.0|
|409000425051'|               1.0|
|409000405747'|             500.0|
|409000493201'|               0.9|
|409000438620'|              0.07|
|409000362497'|              0.03|
+-------------+------------------+



In [47]:
# Calculate the sum of balance in every bank account
balance_sum_df = transaction_df.groupBy("Account No").agg(sum("BALANCE AMT").alias("total_balance"))
balance_sum_df.show()

+-------------+--------------------+
|   Account No|       total_balance|
+-------------+--------------------+
|409000438611'|-2.49486577068339...|
|     1196711'|-1.60476498101275E13|
|     1196428'| -8.1418498130721E13|
|409000493210'|-3.27584952132095...|
|409000611074'|       1.615533622E9|
|409000425051'|-3.77211841164998...|
|409000405747'|-2.43108047067000...|
|409000493201'|1.0420831829499985E9|
|409000438620'|-7.12291867951358...|
|409000362497'| -5.2860004792808E13|
+-------------+--------------------+



In [54]:
from pyspark.sql.functions import col, to_date, count

# Find the number of transactions on each date
transaction_count_df = transaction_df.withColumn("transaction_date", to_date("VALUE DATE", "dd-MM-yyyy")) \
                                      .groupBy("transaction_date") \
                                      .agg(count("*").alias("transaction_count"))
transaction_count_df.show()

[Stage 96:>                                                         (0 + 2) / 2]

+----------------+-----------------+
|transaction_date|transaction_count|
+----------------+-----------------+
|            NULL|           116201|
+----------------+-----------------+



                                                                                

In [53]:
# List of customers with withdrawal amount more than 1 lakh
high_withdrawal_customers_df = transaction_df.filter(col(" WITHDRAWAL AMT ") > 100000) \
                                              .select("Account No", " WITHDRAWAL AMT ")
high_withdrawal_customers_df.show()

+-------------+----------------+
|   Account No| WITHDRAWAL AMT |
+-------------+----------------+
|409000611074'|        133900.0|
|409000611074'|        195800.0|
|409000611074'|        143800.0|
|409000611074'|        331650.0|
|409000611074'|        129000.0|
|409000611074'|        230013.0|
|409000611074'|        367900.0|
|409000611074'|        108000.0|
|409000611074'|        141000.0|
|409000611074'|        206000.0|
|409000611074'|        242300.0|
|409000611074'|        113250.0|
|409000611074'|        206900.0|
|409000611074'|        276000.0|
|409000611074'|        171000.0|
|409000611074'|        189800.0|
|409000611074'|        271323.0|
|409000611074'|        200600.0|
|409000611074'|        176900.0|
|409000611074'|        150050.0|
+-------------+----------------+
only showing top 20 rows

