In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

#create session
spark = SparkSession.builder.appName('Financial Analysis').getOrCreate()

In [0]:
# s3 bucket credentials

AWS_ACCESS_KEY_ID='****************************'
AWS_SECRET_ACCESS_KEY='***************************'

In [0]:
spark.conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.conf.set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
spark.conf.set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
spark.conf.set("fs.s3a.endpoint", "s3.amazonaws.com")

In [0]:
# Define the schema
finance_schema = StructType([
    StructField("Transaction ID", StringType(), False),
    StructField("Customer ID", StringType(), False),
    StructField("Transaction Amount", DoubleType(), False),
    StructField("Transaction Date", TimestampType(), False),
    StructField("Payment Method", StringType(), False),
    StructField("Product Category", StringType(), False),
    StructField("Quantity", IntegerType(), False),
    StructField("Customer Age", IntegerType(), False),
    StructField("Customer Location", StringType(), False),
    StructField("Device Used", StringType(), False),
    StructField("IP Address", StringType(), False),
    StructField("Shipping Address", StringType(), False),
    StructField("Billing Address", StringType(), False),
    StructField("Is Fraudulent", IntegerType(), False),
    StructField("Account Age Days", IntegerType(), False),
    StructField("Transaction Hour", IntegerType(), False)
])




In [0]:
s3_bucket = "s3://financial-pipeline/Financial_Dataset.csv"
financial_df = spark.read.schema(finance_schema).format('csv').option('header', 'true').option('delimiter', ',').load(s3_bucket)
financial_df.printSchema()
financial_df.show(2)


root
 |-- Transaction ID: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Transaction Amount: double (nullable = true)
 |-- Transaction Date: timestamp (nullable = true)
 |-- Payment Method: string (nullable = true)
 |-- Product Category: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Customer Age: integer (nullable = true)
 |-- Customer Location: string (nullable = true)
 |-- Device Used: string (nullable = true)
 |-- IP Address: string (nullable = true)
 |-- Shipping Address: string (nullable = true)
 |-- Billing Address: string (nullable = true)
 |-- Is Fraudulent: integer (nullable = true)
 |-- Account Age Days: integer (nullable = true)
 |-- Transaction Hour: integer (nullable = true)

+--------------------+--------------------+------------------+-------------------+--------------+----------------+--------+------------+-----------------+-----------+-------------+--------------------+---------------+-------------+----------------+---

In [0]:
from pyspark.sql.functions import avg, sum, count, dayofweek

In [0]:
# loading the data into a pyspark table
df = financial_df.createOrReplaceTempView("transactions")

# to view all pyspark table in our cluster
spark.catalog.listTables()

Out[8]: [Table(name='transactions', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

# To Answer some business Questions with our pyspark table


### 1. Distribution of transaction amounts across different customer age groups

In [0]:
# SQL Query
age_dist = spark.sql("""
    SELECT `transactions`.`Customer Age`, 
           COUNT(*) AS Transaction_Count, 
           AVG(`transactions`.`Transaction Amount`) AS Avg_Transaction_Amount, 
           SUM(`transactions`.`Transaction Amount`) AS Total_Transaction_Amount
    FROM transactions
    GROUP BY `transactions`.`Customer Age`
    ORDER BY `transactions`.`Customer Age`
""")
age_dist.show()


+------------+-----------------+----------------------+------------------------+
|Customer Age|Transaction_Count|Avg_Transaction_Amount|Total_Transaction_Amount|
+------------+-----------------+----------------------+------------------------+
|        null|            47268|    19.772996530422272|                467315.0|
|          -2|                1|                190.76|                  190.76|
|           0|                8|              192.6875|                  1541.5|
|           1|                4|                 53.19|                  212.76|
|           2|                4|    235.95250000000001|       943.8100000000001|
|           3|                5|               181.304|                  906.52|
|           4|                9|    145.53333333333333|                  1309.8|
|           5|               12|    267.19083333333333|                 3206.29|
|           6|               20|    374.79400000000004|       7495.880000000001|
|           7|              

### 2. Most popular product categories among different customer demographics (age, location)


In [0]:
# SQL Query
popular_categories = spark.sql("""
    SELECT `transactions`.`Customer Age`, `transactions`.`Customer Location`, `transactions`.`Product Category`, 
           COUNT(*) AS Category_Count
    FROM transactions
    GROUP BY `transactions`.`Customer Age`, `transactions`.`Customer Location`, `transactions`.`Product Category`
    ORDER BY Category_Count DESC
""")
popular_categories.show()


+------------+-----------------+----------------+--------------+
|Customer Age|Customer Location|Product Category|Category_Count|
+------------+-----------------+----------------+--------------+
|        null|             null|            null|         47268|
|          38|       Lewismouth|   home & garden|             2|
|          45|    South Matthew|   home & garden|             2|
|          25|    East Jonathan|     electronics|             2|
|          44|       Port Sarah|     electronics|             2|
|          45|  West Davidville|     electronics|             2|
|          38|       West James|   home & garden|             2|
|          29|      Lake Steven|   home & garden|             2|
|          37|        Johnville|     electronics|             2|
|          31|       Lake Brian|        clothing|             2|
|          41|   Alexandramouth|   home & garden|             2|
|          28|       Josephfort|        clothing|             2|
|          51|     West M

### 3. Most frequently used payment methods

In [0]:

# SQL Query
payment_methods = spark.sql("""
    SELECT `transactions`.`Payment Method` AS Payment_Method, 
           COUNT(*) AS Usage_Count
    FROM transactions
    WHERE `transactions`.`Payment Method` IS NOT NULL
    GROUP BY `transactions`.`Payment Method`
    ORDER BY Usage_Count DESC
""")
payment_methods.show(4)





+--------------+-----------+
|Payment_Method|Usage_Count|
+--------------+-----------+
|    debit card|       5952|
|   credit card|       5923|
|        PayPal|       5899|
| bank transfer|       5860|
+--------------+-----------+
only showing top 4 rows



### 4. Locations with the highest transaction volumes

In [0]:
# SQL Query

highest_volumes = spark.sql("""
    SELECT `transactions`.`Customer Location` AS Customer_Location, 
           COUNT(*) AS Transaction_Count
    FROM transactions
    WHERE `transactions`.`Customer Location` IS NOT NULL
    GROUP BY `transactions`.`Customer Location`
    ORDER BY Transaction_Count DESC
""")
highest_volumes.show()





+-----------------+-----------------+
|Customer_Location|Transaction_Count|
+-----------------+-----------------+
|    North Michael|               30|
|     East Michael|               24|
| West Christopher|               21|
|       East David|               20|
|     Lake Michael|               20|
|       Jamesmouth|               19|
|       Smithmouth|               18|
|      New Michael|               18|
|      North James|               17|
|        East John|               17|
|    South Michael|               17|
|       Port James|               16|
|     South Robert|               16|
|   South Jennifer|               16|
|     Port Michael|               15|
|     West Michael|               15|
|     Lake William|               15|
|        New David|               15|
|       West James|               15|
|      Lake Robert|               15|
+-----------------+-----------------+
only showing top 20 rows



### 5. Average transaction amount for each product category

In [0]:
# SQL Query

avg_trans_amount = spark.sql("""
    SELECT `transactions`.`Product Category` AS Product_Category, 
           AVG(`transactions`.`Transaction Amount`) AS Avg_Transaction_Amount
    FROM transactions
    WHERE `transactions`.`Product Category` IS NOT NULL
    GROUP BY `transactions`.`Product Category`
    ORDER BY Avg_Transaction_Amount DESC
""")

avg_trans_amount.show()


+----------------+----------------------+
|Product_Category|Avg_Transaction_Amount|
+----------------+----------------------+
|   home & garden|    234.90742791475174|
|        clothing|    228.69546499255145|
|    toys & games|     228.4037441860467|
|     electronics|    228.21673967986555|
| health & beauty|     226.5108713337618|
+----------------+----------------------+



### For some reasons I have to stop here as I didn't really achieve what I wanted to achieve. Spark dataframe added so many null values into my dataset but once it is loaded in pandas and excel, there are no null values.

### I had the intentions of adding automation in my pipeline but I am restricted because I used databrick community edition as at the time of creating this project. So yeah, I hope to come back to this project later in future after I must have added more knowledge on databricks. Thanks for sticking around.