In [55]:
# import libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct, sum as spark_sum, avg as spark_avg, max as spark_max
import os

In [56]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Spark assignment") \
    .getOrCreate()

In [57]:
# Read input files
order_details_df = spark.read.csv("Dataset/orderdetails.csv", header=True, inferSchema=True)
order_item_details_df = spark.read.csv("Dataset/orderitemdetails.csv", header=True, inferSchema=True)
accounts_df = spark.read.csv("Dataset/accounts.csv", header=True, inferSchema=True)
transactions_df = spark.read.csv("Dataset/transactions.csv", header=True, inferSchema=True)

In [58]:
#Create output directory if it doesn't exist
output_dir = "Output"
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

USER SUMMARY

In [59]:
# 1. Users with most orders and their total amounts
user_order_amounts = order_details_df.join(order_item_details_df, "OrderID") \
    .groupBy("CustomerID").agg(countDistinct("OrderID").alias("NumOrders"),
                               spark_sum(col("UnitPrice") * col("Quantity")).alias("TotalAmount")) \
    .orderBy(col("NumOrders").desc())

# 2. User wise latest transactions and number of items
latest_transactions = order_details_df.join(order_item_details_df, "OrderID") \
    .groupBy("CustomerID").agg(spark_max("OrderDate").alias("LatestTransactionDate"),
                               count("*").alias("NumItems"))

# 3. User wise average item quantity for each order
avg_item_quantity = order_details_df.join(order_item_details_df, "OrderID") \
    .groupBy("CustomerID", "OrderID").agg(spark_avg("Quantity").alias("AvgItemQuantityPerOrder")) \
    .groupBy("CustomerID").agg(spark_avg("AvgItemQuantityPerOrder").alias("AvgItemQuantityPerOrder"))

combined_df = user_order_amounts.join(latest_transactions, "CustomerID", "outer") \
    .join(avg_item_quantity, "CustomerID", "outer")

# Save file
combined_df.write.csv("Output/Customer_summary.csv", header=True, mode="overwrite")
# combined_df.toPandas().to_csv("Output/Customer_summary.csv", index=False)
combined_df.show()


+----------+---------+------------------+---------------------+--------+-----------------------+
|CustomerID|NumOrders|       TotalAmount|LatestTransactionDate|NumItems|AvgItemQuantityPerOrder|
+----------+---------+------------------+---------------------+--------+-----------------------+
|        28|        1|1537.0800000000002|           2024-02-17|       5|                    6.2|
|       103|        1|           1792.43|           2023-09-12|       5|                    5.8|
|       137|        1|             936.3|           2024-02-14|       1|                   10.0|
|       168|        1| 994.1300000000001|           2024-04-16|       3|      7.333333333333333|
|       169|        1|           1438.98|           2023-07-29|       5|                    4.4|
|       190|        1|           1113.97|           2023-07-24|       4|                   4.25|
|       192|        1|235.82999999999998|           2023-05-28|       1|                    3.0|
|       211|        1|        

In [60]:
# Most bought Products
most_bought_products = order_item_details_df.groupBy("ProductID").agg(count("*").alias("TotalBought")) \
    .orderBy(col("TotalBought").desc()).limit(10)

# Monthly transactions
monthly_transactions = transactions_df.withColumn("Month", month("Date")) \
    .groupBy("Month").agg(count("*").alias("TotalTransactions"))

# Users with most orders and their total amounts
user_order_amounts = order_details_df.join(order_item_details_df, "OrderID") \
    .groupBy("CustomerID").agg(countDistinct("OrderID").alias("NumOrders"),
                               spark_sum(col("UnitPrice") * col("Quantity")).alias("TotalAmount")) \
    .orderBy(col("NumOrders").desc())

# User wise latest transactions and number of items
latest_transactions = order_details_df.join(order_item_details_df, "OrderID") \
    .groupBy("CustomerID").agg(spark_max("OrderDate").alias("LatestTransactionDate"),
                               count("*").alias("NumItems"))

# User wise average item quantity for each order
avg_item_quantity = order_details_df.join(order_item_details_df, "OrderID") \
    .groupBy("CustomerID", "OrderID").agg(spark_avg("Quantity").alias("AvgItemQuantityPerOrder")) \
    .groupBy("CustomerID").agg(spark_avg("AvgItemQuantityPerOrder").alias("AvgItemQuantityPerOrder"))

# ORDER SUMMARY
order_joined_df = order_details_df.join(order_item_details_df, "OrderID")
order_summary = order_joined_df.groupBy("OrderDate").agg(spark_sum("UnitPrice").alias("TotalAmount"),
                                                        spark_avg("Quantity").alias("AverageQuantity"))

In [61]:
# Save results
order_summary.write.csv("Output/order_summary.csv", header=True, mode="overwrite")
most_bought_products.write.csv("Output/most_bought_products.csv", header=True, mode="overwrite")
monthly_transactions.write.csv("Output/monthly_transactions.csv", header=True, mode="overwrite")
user_order_amounts.write.csv("Output/user_order_amounts.csv", header=True, mode="overwrite")
latest_transactions.write.csv("Output/latest_transactions.csv", header=True, mode="overwrite")
avg_item_quantity.write.csv("Output/avg_item_quantity.csv", header=True, mode="overwrite")

# Print results
print("Order Summary:")
order_summary.show()

print("Most Bought Products:")
most_bought_products.show()

print("Monthly Transactions:")
monthly_transactions.show()

print("Users with Most Orders and Their Total Amounts:")
user_order_amounts.show()

print("User wise Latest Transactions and Number of Items:")
latest_transactions.show()

print("User wise Average Item Quantity for Each Order:")
avg_item_quantity.show()


Order Summary:
+----------+------------------+------------------+
| OrderDate|       TotalAmount|   AverageQuantity|
+----------+------------------+------------------+
|2023-06-22|45266.359999999986| 5.595797280593325|
|2023-07-15|44048.859999999986| 5.520565552699229|
|2023-11-08| 43063.11000000001| 5.543640897755611|
|2023-05-22| 44429.02999999997| 5.708128078817734|
|2024-02-05|36269.189999999966| 5.275075987841945|
|2023-09-14| 48650.52000000002| 5.421926910299003|
|2023-11-22| 42240.67999999998| 5.590680100755668|
|2023-09-19|44741.259999999995|  5.65559655596556|
|2023-06-18| 41915.79000000003| 5.513797634691196|
|2024-01-07|47162.999999999956| 5.781286549707603|
|2023-06-23|38913.380000000005| 5.584397163120568|
|2023-12-10|          45971.96| 5.463592233009709|
|2024-04-20| 45048.85999999997|5.6610576923076925|
|2023-11-29|45619.510000000024| 5.630281690140845|
|2024-01-11| 48570.22999999998| 5.488610478359909|
|2023-11-25|45619.479999999996| 5.682033096926714|
|2023-09-27|4388

USER DEFINED FUNCTIONS TO BUCKETIZE TRANSACTIONS

In [62]:
# Define UDF to categorize transaction amounts
def categorize_spending(amount):
    if amount <= 500:
        return "Low"
    elif amount <= 1500:
        return "Medium"
    else:
        return "High"

# Register the UDF
categorize_spending_udf = udf(categorize_spending, StringType())

# Apply the UDF to create a new column with spending categories
transactions_with_categories = transactions_df.withColumn("SpendingCategory", categorize_spending_udf(col("Amount")))

transactions_with_categories.write.csv("Output/transactions_with_categories.csv", header=True, mode="overwrite")
transactions_with_categories.show()

# Stop Session
spark.stop()


+--------------------+-------------+--------+---------------+--------------------+----------------+
|       TransactionID|AccountNumber|  Amount|TransactionType|                Date|SpendingCategory|
+--------------------+-------------+--------+---------------+--------------------+----------------+
|2b56317a-14d8-458...|     84391309| 1018.79|     Withdrawal|2024-01-23 19:37:...|          Medium|
|d7a0257f-1d9b-425...|     87653661| 4462.85|        Deposit|2024-04-01 10:37:...|            High|
|ff95a821-4a45-4c7...|     43702950|-4295.97|        Deposit|2023-08-29 18:38:...|             Low|
|2620a0a6-5f04-403...|      7421744|-2159.54|        Deposit|2024-01-02 18:30:...|             Low|
|7146dda3-ba2f-474...|      5219279|-1404.86|     Withdrawal|2024-02-06 01:32:...|             Low|
|1b205073-41ee-4d0...|     75955253|  1178.2|        Deposit|2023-12-25 18:32:...|          Medium|
|69b31d62-b895-4d5...|       984593| -968.27|     Withdrawal|2023-11-24 04:42:...|             Low|
