<a href="https://colab.research.google.com/github/Devilaryan007/BigDataProject/blob/main/my_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [7]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ReadCSV").getOrCreate()
df_cust = spark.read.option("header", True).csv("customer_dim.csv")
df_cust.show()
df_cust.printSchema()


+-----------+------------+----------+--------------+
|customer_id|        name|   contact|loyalty_points|
+-----------+------------+----------+--------------+
|       C001|Anita Sharma|9876543210|           110|
|       C002| Rahul Mehra|9811122233|            76|
|       C003|  Sourav Roy|8918762107|            12|
+-----------+------------+----------+--------------+

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- loyalty_points: string (nullable = true)



In [3]:
df_prod = spark.read.option("header", True).csv("product_dim.csv")
df_prod.show()

+----------+------------+------------+-----+----------+
|product_id|        name|    category|price|  supplier|
+----------+------------+------------+-----+----------+
|       101|Basmati Rice|      Grains|   90|  AgroMart|
|       102|  Toothpaste|PersonalCare|   45|DailyNeeds|
|       103| Cooking Oil|  Essentials|  110|  PureDrop|
+----------+------------+------------+-----+----------+



In [4]:
df_inv = spark.read.option("header", True).csv("inventory_dim.csv")
df_inv.show()

+----------+-----------+-----------------+
|product_id|stock_level|reorder_threshold|
+----------+-----------+-----------------+
|       101|         18|               10|
|       102|          5|               15|
|       103|         10|                8|
+----------+-----------+-----------------+



In [26]:
df_sales = spark.read.option("header", True).csv("sales.csv")
df_sales.show()

+-------+---------+-----------+----------+--------+------------+
|sale_id|     date|customer_id|product_id|quantity|total_amount|
+-------+---------+-----------+----------+--------+------------+
|   S001|6/30/2025|       C001|       101|      10|         180|
|   S002|6/30/2025|       C002|       103|       2|         110|
+-------+---------+-----------+----------+--------+------------+



In [11]:
df_cust=df_cust.withColumn("contact",df_cust["contact"].cast("int"))
df_cust=df_cust.withColumn("loyalty_points",df_cust["loyalty_points"].cast("int"))
df_cust.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- contact: integer (nullable = true)
 |-- loyalty_points: integer (nullable = true)



In [12]:
df_prod=df_prod.withColumn("price",df_prod["price"].cast("int")).withColumn("product_id",df_prod["product_id"].cast("int"))
df_prod.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- supplier: string (nullable = true)



In [13]:
df_inv=df_inv.withColumn("product_id",df_inv["product_id"].cast("int")).withColumn("stock_level",df_inv["stock_level"].cast("int")).withColumn("reorder_threshold",df_inv["reorder_threshold"].cast("int"))
df_inv.printSchema()

root
 |-- product_id: integer (nullable = true)
 |-- stock_level: integer (nullable = true)
 |-- reorder_threshold: integer (nullable = true)



In [28]:
from pyspark.sql.functions import to_date, to_timestamp
from pyspark.sql.types import IntegerType, DoubleType

df_sales = df_sales.withColumn("date", to_timestamp("date", "M/d/yyyy").cast("date"))
df_sales = df_sales.withColumn("quantity", df_sales["quantity"].cast(IntegerType()))
df_sales = df_sales.withColumn("total_amount", df_sales["total_amount"].cast(DoubleType()))

df_sales.printSchema()
df_sales.show()

root
 |-- sale_id: string (nullable = true)
 |-- date: date (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- total_amount: double (nullable = true)

+-------+----------+-----------+----------+--------+------------+
|sale_id|      date|customer_id|product_id|quantity|total_amount|
+-------+----------+-----------+----------+--------+------------+
|   S001|2025-06-30|       C001|       101|      10|       180.0|
|   S002|2025-06-30|       C002|       103|       2|       110.0|
+-------+----------+-----------+----------+--------+------------+



In [32]:
df_customer_sales=df_cust.join(df_sales,df_cust.customer_id==df_sales.customer_id,"left")
df_customer_sales.show()

+-----------+------------+-------+--------------+-------+----------+-----------+----------+--------+------------+
|customer_id|        name|contact|loyalty_points|sale_id|      date|customer_id|product_id|quantity|total_amount|
+-----------+------------+-------+--------------+-------+----------+-----------+----------+--------+------------+
|       C001|Anita Sharma|   NULL|           110|   S001|2025-06-30|       C001|       101|      10|       180.0|
|       C002| Rahul Mehra|   NULL|            76|   S002|2025-06-30|       C002|       103|       2|       110.0|
|       C003|  Sourav Roy|   NULL|            12|   NULL|      NULL|       NULL|      NULL|    NULL|        NULL|
+-----------+------------+-------+--------------+-------+----------+-----------+----------+--------+------------+



In [33]:
df_product_sales=df_prod.join(df_sales,df_sales.product_id==df_prod.product_id,"inner")
df_product_sales.show()

+----------+------------+----------+-----+--------+-------+----------+-----------+----------+--------+------------+
|product_id|        name|  category|price|supplier|sale_id|      date|customer_id|product_id|quantity|total_amount|
+----------+------------+----------+-----+--------+-------+----------+-----------+----------+--------+------------+
|       101|Basmati Rice|    Grains|   90|AgroMart|   S001|2025-06-30|       C001|       101|      10|       180.0|
|       103| Cooking Oil|Essentials|  110|PureDrop|   S002|2025-06-30|       C002|       103|       2|       110.0|
+----------+------------+----------+-----+--------+-------+----------+-----------+----------+--------+------------+



In [35]:
df_inv_sales=df_inv.join(df_sales,df_sales.product_id==df_inv.product_id,"left")
df_inv_sales.show()

+----------+-----------+-----------------+-------+----------+-----------+----------+--------+------------+
|product_id|stock_level|reorder_threshold|sale_id|      date|customer_id|product_id|quantity|total_amount|
+----------+-----------+-----------------+-------+----------+-----------+----------+--------+------------+
|       101|         18|               10|   S001|2025-06-30|       C001|       101|      10|       180.0|
|       102|          5|               15|   NULL|      NULL|       NULL|      NULL|    NULL|        NULL|
|       103|         10|                8|   S002|2025-06-30|       C002|       103|       2|       110.0|
+----------+-----------+-----------------+-------+----------+-----------+----------+--------+------------+



In [36]:
df_full=df_sales.join(df_cust,df_cust.customer_id==df_sales.customer_id,"left").join(df_prod,df_prod.product_id==df_sales.product_id,"left").join(df_inv,df_inv.product_id==df_sales.product_id,"left")
df_full.show()

+-------+----------+-----------+----------+--------+------------+-----------+------------+-------+--------------+----------+------------+----------+-----+--------+----------+-----------+-----------------+
|sale_id|      date|customer_id|product_id|quantity|total_amount|customer_id|        name|contact|loyalty_points|product_id|        name|  category|price|supplier|product_id|stock_level|reorder_threshold|
+-------+----------+-----------+----------+--------+------------+-----------+------------+-------+--------------+----------+------------+----------+-----+--------+----------+-----------+-----------------+
|   S001|2025-06-30|       C001|       101|      10|       180.0|       C001|Anita Sharma|   NULL|           110|       101|Basmati Rice|    Grains|   90|AgroMart|       101|         18|               10|
|   S002|2025-06-30|       C002|       103|       2|       110.0|       C002| Rahul Mehra|   NULL|            76|       103| Cooking Oil|Essentials|  110|PureDrop|       103|      