In [0]:
# Unmounting the existing mount point
dbutils.fs.unmount("/mnt/processedtransactions")

# mounting the blob storage
dbutils.fs.mount(
  source="wasbs://customer-transactions@baloyibank.blob.core.windows.net",
  mount_point="/mnt/processedtransactions",
  extra_configs={
    "fs.azure.account.key.baloyibank.blob.core.windows.net": "N24LzTMUtY6ZxJUUSSQEQOjlzsAe+3YxO/omEzwb9c0EcgbkExlUouTk9mVA85r+AStMazDLw=="
  }
)

/mnt/processedtransactions has been unmounted.


True

In [0]:
dbutils.fs.ls("/mnt/processedtransactions/processed data")

[FileInfo(path='dbfs:/mnt/processedtransactions/processed data/customer_transactions.csv', name='customer_transactions.csv', size=3292093, modificationTime=1729100842000)]

In [0]:
df = spark.read.csv("/mnt/processedtransactions/processed data", header=True, inferSchema=True)
display(df) 

In [0]:
# Dropping rows where any value in the important columns is null
df_cleaned = df.dropna(subset=["CustomerID", "TransactionAmount", "TransactionDate", "MerchantCategory", "Location"])

# Showing the first few rows of the cleaned data
df_cleaned.show(10)


+----------+-----------------+-------------------+----------------+-----------+
|CustomerID|TransactionAmount|    TransactionDate|MerchantCategory|   Location|
+----------+-----------------+-------------------+----------------+-----------+
|  CUST_214|           138.29|2024-09-22 17:45:39|   Entertainment|Los Angeles|
|   CUST_67|           295.08|2024-09-16 17:45:39|        Clothing|   New York|
|  CUST_250|           235.56|2024-09-13 17:45:39|     Electronics|Los Angeles|
|  CUST_233|            57.04|2024-09-15 17:45:39|        Clothing|    Phoenix|
|  CUST_149|           124.61|2024-10-05 17:45:39|         Grocery|    Chicago|
|   CUST_58|           178.88|2024-09-30 17:45:39|   Entertainment|    Chicago|
|  CUST_117|           299.58|2024-09-19 17:45:39|          Health|   New York|
|    CUST_4|           228.14|2024-09-21 17:45:39|     Electronics|    Houston|
|  CUST_155|           272.15|2024-09-19 17:45:39|   Entertainment|    Houston|
|   CUST_31|            282.9|2024-09-19

In [0]:
# Filter transactions where the TransactionAmount is greater than 100
df_filtered = df_cleaned.filter(df_cleaned["TransactionAmount"] > 100)

# Showing the filtered data
df_filtered.show(5)


+----------+-----------------+-------------------+----------------+-----------+
|CustomerID|TransactionAmount|    TransactionDate|MerchantCategory|   Location|
+----------+-----------------+-------------------+----------------+-----------+
|  CUST_214|           138.29|2024-09-22 17:45:39|   Entertainment|Los Angeles|
|   CUST_67|           295.08|2024-09-16 17:45:39|        Clothing|   New York|
|  CUST_250|           235.56|2024-09-13 17:45:39|     Electronics|Los Angeles|
|  CUST_149|           124.61|2024-10-05 17:45:39|         Grocery|    Chicago|
|   CUST_58|           178.88|2024-09-30 17:45:39|   Entertainment|    Chicago|
+----------+-----------------+-------------------+----------------+-----------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import sum

# Grouping by 'CustomerID' and calculating the total transaction amount per customer
df_total_spent = df_filtered.groupBy("CustomerID").agg(sum("TransactionAmount").alias("TotalSpent"))

# Showing the result
df_total_spent.show(5)


+----------+------------------+
|CustomerID|        TotalSpent|
+----------+------------------+
|  CUST_246|45950.439999999995|
|  CUST_285|41186.819999999985|
|   CUST_62|49076.420000000006|
|  CUST_159| 47264.50000000001|
|   CUST_25| 50174.17000000002|
+----------+------------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import count

# Grouping by 'CustomerID' and count the number of transactions per customer
df_transaction_count = df_filtered.groupBy("CustomerID").agg(count("CustomerID").alias("TransactionCount"))

# Showing the result
df_transaction_count.show(5)


+----------+----------------+
|CustomerID|TransactionCount|
+----------+----------------+
|  CUST_246|             150|
|  CUST_285|             139|
|   CUST_62|             162|
|  CUST_159|             154|
|   CUST_25|             170|
+----------+----------------+
only showing top 5 rows



In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank

# Defining a window specification for ranking customers by total spending
windowSpec = Window.orderBy(df_total_spent["TotalSpent"].desc())

# Adding a 'Rank' column to rank customers by their total transaction amount
df_ranked = df_total_spent.withColumn("Rank", rank().over(windowSpec))

# Showing the ranked customers
df_ranked.show(5)


+----------+------------------+----+
|CustomerID|        TotalSpent|Rank|
+----------+------------------+----+
|   CUST_45| 58939.97000000002|   1|
|  CUST_229|58236.110000000066|   2|
|  CUST_248| 57890.63999999999|   3|
|   CUST_44|           57608.6|   4|
|  CUST_249|          57356.89|   5|
+----------+------------------+----+
only showing top 5 rows



In [0]:
merged_df = df_total_spent.join(df_transaction_count,on="CustomerID", how="inner")
merged_df.show(5)   

+----------+------------------+----------------+
|CustomerID|        TotalSpent|TransactionCount|
+----------+------------------+----------------+
|  CUST_246|45950.439999999995|             150|
|  CUST_285|41186.819999999985|             139|
|   CUST_62|49076.420000000006|             162|
|  CUST_159| 47264.50000000001|             154|
|   CUST_25| 50174.17000000002|             170|
+----------+------------------+----------------+
only showing top 5 rows



In [0]:
final_df = merged_df.join(df_filtered, on="CustomerID", how="inner")
final_df.show(5)

+----------+------------------+----------------+-----------------+-------------------+----------------+--------+
|CustomerID|        TotalSpent|TransactionCount|TransactionAmount|    TransactionDate|MerchantCategory|Location|
+----------+------------------+----------------+-----------------+-------------------+----------------+--------+
|  CUST_246|45950.439999999995|             150|           434.33|2024-10-02 17:45:40|   Entertainment| Phoenix|
|  CUST_285|41186.819999999985|             139|            432.5|2024-09-25 17:45:40|     Electronics|New York|
|   CUST_62|49076.420000000006|             162|           232.99|2024-10-06 17:45:40|          Health| Phoenix|
|  CUST_159| 47264.50000000001|             154|           399.11|2024-09-30 17:45:40|         Grocery| Phoenix|
|   CUST_25| 50174.17000000002|             170|           205.73|2024-09-09 17:45:40|   Entertainment| Phoenix|
+----------+------------------+----------------+-----------------+-------------------+----------

In [0]:
# Saved transformed DataFrame for further analysis
final_df.write.option("header", "true").mode("overwrite").csv("/mnt/processedtransactions/transformed data from databricks")