In [1]:
# Install PySpark
!pip install pyspark

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("RetailPipeline").getOrCreate()

# Set file paths (adjust based on your Drive)
base_path = "/content/drive/MyDrive/Retail-Data-Pipeline/"

sales_df = spark.read.option("header", "true").csv(base_path + "sales.csv", inferSchema=True)
customers_df = spark.read.option("header", "true").csv(base_path + "customers.csv", inferSchema=True)
products_df = spark.read.option("header", "true").csv(base_path + "products.csv", inferSchema=True)

# Show data
sales_df.show()
customers_df.show()
products_df.show()


+-------+-----------+----------+--------+-----+----------+
|sale_id|customer_id|product_id|quantity|price| sale_date|
+-------+-----------+----------+--------+-----+----------+
|      1|        101|       201|       2|  500|2023-01-01|
|      2|        102|       202|       1| 1500|2023-01-02|
|      3|        103|       203|       5|  200|2023-01-03|
|      4|        101|       204|       3|  300|2023-01-04|
|      5|        104|       205|       2| 1000|2023-01-05|
|      6|        105|       206|       4|  250|2023-01-06|
|      7|        102|       202|       1| 1500|2023-01-07|
|      8|        101|       203|       2|  200|2023-01-08|
|      9|        106|       204|       3|  300|2023-01-09|
|     10|        107|       201|       2|  500|2023-01-10|
+-------+-----------+----------+--------+-----+----------+

+-----------+-----+------+
|customer_id| name|region|
+-----------+-----+------+
|        101| Amit| North|
|        102|Sneha| South|
|        103|Rahul|  East|
|        10

In [3]:
from pyspark.sql.functions import col, expr

# Join dataframes
sales_with_customers = sales_df.join(customers_df, on="customer_id", how="left")
full_data = sales_with_customers.join(products_df, on="product_id", how="left")

# Add a calculated column: total_amount = quantity * price
full_data = full_data.withColumn("total_amount", col("quantity") * col("price"))

# Show final result
full_data.select("sale_id", "name", "region", "product_name", "category", "quantity", "price", "total_amount", "sale_date").show()


+-------+-----+------+------------+-----------+--------+-----+------------+----------+
|sale_id| name|region|product_name|   category|quantity|price|total_amount| sale_date|
+-------+-----+------+------------+-----------+--------+-----+------------+----------+
|      1| Amit| North|      Laptop|Electronics|       2|  500|        1000|2023-01-01|
|      2|Sneha| South|       Phone|Electronics|       1| 1500|        1500|2023-01-02|
|      3|Rahul|  East|     Charger|Accessories|       5|  200|        1000|2023-01-03|
|      4| Amit| North|       Mouse|Accessories|       3|  300|         900|2023-01-04|
|      5|Priya|  West|      Tablet|Electronics|       2| 1000|        2000|2023-01-05|
|      6| John| South|    Keyboard|Accessories|       4|  250|        1000|2023-01-06|
|      7|Sneha| South|       Phone|Electronics|       1| 1500|        1500|2023-01-07|
|      8| Amit| North|     Charger|Accessories|       2|  200|         400|2023-01-08|
|      9| Sara| North|       Mouse|Accessor

In [4]:
# Save as CSV
output_path = base_path + "processed_sales.csv"
full_data.write.option("header", "true").mode("overwrite").csv(output_path)
