<a href="https://colab.research.google.com/github/anusha-tikarya/Hexa_Project/blob/Week3/Week3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#Install and Set Up PySpark in Colab
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=4cfcdbc494682b2d0c4f327fdccd091945209eee1f0e05c2e0e60abad2239191
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, TimestampType


In [None]:
# Simulate and Load Real-Time Data

from datetime import datetime

# Initialize the Spark session
spark = SparkSession.builder.appName("EcommerceRealTimeProcessing").getOrCreate()

# Define schema for order data (order_id, product_id, customer_id, quantity, order_amount, order_date)
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("order_amount", FloatType(), True),
    StructField("order_date", TimestampType(), True)
])

# Simulate real-time data using a list with datetime objects
order_data = [
    (1001, 1, 2001, 2, 139.99, datetime(2023, 9, 27, 12, 30, 0)),
    (1002, 2, 2002, 1, 934.99, datetime(2023, 9, 27, 13, 45, 0)),
    (1003, 3, 2003, 1, 543.99, datetime(2023, 9, 27, 14, 15, 0)),
    (1004, 1, 2001, 3, 419.97, datetime(2023, 9, 27, 15, 0, 0)),
    (1005, 2, 2003, 2, 1869.98, datetime(2023, 9, 27, 16, 5, 0)),
]

# Create a DataFrame from the list of orders
df_orders = spark.createDataFrame(order_data, schema)

# Show the data
df_orders.show()


+--------+----------+-----------+--------+------------+-------------------+
|order_id|product_id|customer_id|quantity|order_amount|         order_date|
+--------+----------+-----------+--------+------------+-------------------+
|    1001|         1|       2001|       2|      139.99|2023-09-27 12:30:00|
|    1002|         2|       2002|       1|      934.99|2023-09-27 13:45:00|
|    1003|         3|       2003|       1|      543.99|2023-09-27 14:15:00|
|    1004|         1|       2001|       3|      419.97|2023-09-27 15:00:00|
|    1005|         2|       2003|       2|     1869.98|2023-09-27 16:05:00|
+--------+----------+-----------+--------+------------+-------------------+



In [None]:
# Process Data Using PySpark
# Group the data by product_id and calculate the total sales (sum of order_amount) per product
product_sales = df_orders.groupBy("product_id").agg(
    sum("order_amount").alias("total_sales"),
    count("order_id").alias("order_count")
)

# Show the result
product_sales.show()


+----------+------------------+-----------+
|product_id|       total_sales|order_count|
+----------+------------------+-----------+
|         1|1119.9200286865234|          3|
|         2| 2804.969970703125|          2|
|         3|  1087.97998046875|          2|
+----------+------------------+-----------+



In [None]:
# Real-Time Streaming Simulation

from datetime import datetime

# Simulating appending new data in real-time
new_order_data = [
    (1006, 3, 2002, 1, 543.99, datetime(2023, 9, 27, 17, 0, 0)),
    (1007, 1, 2001, 4, 559.96, datetime(2023, 9, 27, 17, 30, 0)),
]

# Create a new DataFrame for the new batch of orders with the correct timestamp format
new_df_orders = spark.createDataFrame(new_order_data, schema)

# Append the new data to the original DataFrame
df_orders = df_orders.union(new_df_orders)

# Perform the same aggregation again with the updated data
updated_product_sales = df_orders.groupBy("product_id").agg(
    sum("order_amount").alias("total_sales"),
    count("order_id").alias("order_count")
)

# Show the updated result
updated_product_sales.show()


+----------+------------------+-----------+
|product_id|       total_sales|order_count|
+----------+------------------+-----------+
|         1|1119.9200286865234|          3|
|         2| 2804.969970703125|          2|
|         3|  1087.97998046875|          2|
+----------+------------------+-----------+



In [None]:
# Write the results to a CSV file
updated_product_sales.write.csv("/content/product_sales.csv", header=True)

# Or, write to the console (in real-time streaming, you'd use .writeStream)
updated_product_sales.show()


+----------+------------------+-----------+
|product_id|       total_sales|order_count|
+----------+------------------+-----------+
|         1|1119.9200286865234|          3|
|         2| 2804.969970703125|          2|
|         3|  1087.97998046875|          2|
+----------+------------------+-----------+

