<a href="https://colab.research.google.com/github/Pravallika-0202/Cancer-Prediction/blob/main/SparkFlow_Ecommerce_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
!pip install pyspark




In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("SparkFlow_Ecommerce_Pipeline") \
    .master("local[*]") \
    .getOrCreate()

sc = spark.sparkContext

print("Spark Version:", spark.version)
print("Master:", sc.master)
print("App Name:", sc.appName)


Spark Version: 4.0.2
Master: local[*]
App Name: SparkFlow_Ecommerce_Pipeline


Environment Choice: Google Colab with PySpark

This project uses Google Colab with PySpark installed via pip. Spark runs in local mode (local[*]), simulating the Driver and Executors on a single machine. This setup allows development and demonstration of Spark architecture, transformations, Spark SQL, and distributed processing concepts without requiring cluster provisioning.

Spark Version: 4.0.2
Execution Mode: local[*]

What role does the Driver play?

The Driver is the control center of a Spark application.

It:

Creates the SparkSession

Defines transformations and builds the DAG (Directed Acyclic Graph)

Communicates with the Cluster Manager to request resources

Schedules tasks to Executors

Collects results from Executors when actions are triggered

In local mode (local[*]), the notebook process acts as the Driver.

What are Executors, and why are they long-lived?

Executors are worker processes that:

Execute tasks sent by the Driver

Store data in memory (caching/persisting)

Handle shuffle operations

Return results back to the Driver

They are long-lived because:

Starting/stopping processes repeatedly is expensive

Keeping executors alive allows caching intermediate results

Reduces overhead and improves performance

In local mode, executor threads run within the same machine.

What is a Cluster Manager, and how does Spark interact with it?

A Cluster Manager is responsible for allocating computing resources.

Examples:

Standalone

YARN

Kubernetes

Databricks-managed clusters

Spark Driver requests resources (CPU, memory) from the Cluster Manager.
The Cluster Manager launches Executors on available nodes.

In this Colab setup:
There is no external cluster manager — Spark runs in local mode.

In [7]:
from pyspark.sql import functions as F

data = spark.range(1, 1000000)

transformed = data.filter(F.col("id") % 2 == 0)

print("Transformation defined. No execution yet.")

# Now trigger execution
count_result = transformed.count()

print("Count of even numbers:", count_result)


Transformation defined. No execution yet.
Count of even numbers: 499999


In [9]:
import os

base_path = "/content/sparkflow_raw"
os.makedirs(base_path, exist_ok=True)


In [11]:
customers_data = """customer_id,first_name,last_name,email,signup_ts,country
C001,John,Doe,john@example.com,2023-01-10 10:00:00,Australia
C002,Jane,Smith,jane@example.com,2023-03-15 12:30:00,Australia
C003,Mike,Brown,mike@example.com,2022-11-05 09:15:00,USA
C004,Sarah,Wilson,sarah@example.com,2024-02-20 14:45:00,UK
"""

with open(f"{base_path}/customers.csv", "w") as f:
    f.write(customers_data)


In [12]:
products_data = """product_id,product_name,category,unit_price
P001,Laptop,Electronics,1200.50
P002,Headphones,Electronics,150.00
P003,Coffee Machine,Home Appliances,300.75
P004,Desk Chair,Furniture,220.40
"""

with open(f"{base_path}/products.csv", "w") as f:
    f.write(products_data)


In [13]:
orders_data = """
{"order_id":"O001","customer_id":"C001","product_id":"P001","quantity":"1","order_ts":"2023-02-01 11:00:00","platform":"Web"}
{"order_id":"O002","customer_id":"C002","product_id":"P002","quantity":"2","order_ts":"2023-04-01 15:20:00","platform":"Mobile"}
{"order_id":"O003","customer_id":"C001","product_id":"P003","quantity":"1","order_ts":"2023-05-10 09:00:00","platform":"Web"}
{"order_id":"O004","customer_id":"C003","product_id":"P004","quantity":"3","order_ts":"2022-12-15 16:40:00","platform":"Web"}
"""

with open(f"{base_path}/orders.json", "w") as f:
    f.write(orders_data)


In [14]:
from pyspark.sql.types import *

customers_schema = StructType([
    StructField("customer_id", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("email", StringType(), True),
    StructField("signup_ts", StringType(), True),
    StructField("country", StringType(), True),
])

products_schema = StructType([
    StructField("product_id", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("unit_price", StringType(), True),
])

orders_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("quantity", StringType(), True),
    StructField("order_ts", StringType(), True),
    StructField("platform", StringType(), True),
])


In [15]:
customers_raw = spark.read \
    .option("header", "true") \
    .schema(customers_schema) \
    .csv(f"{base_path}/customers.csv")

products_raw = spark.read \
    .option("header", "true") \
    .schema(products_schema) \
    .csv(f"{base_path}/products.csv")

orders_raw = spark.read \
    .schema(orders_schema) \
    .json(f"{base_path}/orders.json")


In [16]:
customers_raw.printSchema()
customers_raw.show()

products_raw.printSchema()
products_raw.show()

orders_raw.printSchema()
orders_raw.show()


root
 |-- customer_id: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- signup_ts: string (nullable = true)
 |-- country: string (nullable = true)

+-----------+----------+---------+-----------------+-------------------+---------+
|customer_id|first_name|last_name|            email|          signup_ts|  country|
+-----------+----------+---------+-----------------+-------------------+---------+
|       C001|      John|      Doe| john@example.com|2023-01-10 10:00:00|Australia|
|       C002|      Jane|    Smith| jane@example.com|2023-03-15 12:30:00|Australia|
|       C003|      Mike|    Brown| mike@example.com|2022-11-05 09:15:00|      USA|
|       C004|     Sarah|   Wilson|sarah@example.com|2024-02-20 14:45:00|       UK|
+-----------+----------+---------+-----------------+-------------------+---------+

root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = t

Data was read using Spark’s distributed readers

Explicit schemas were defined to avoid schema inference cost

Raw datasets are preserved without transformations

Spark automatically partitions data internally

In [17]:
orders_rdd = orders_raw.rdd

print("Number of partitions:", orders_rdd.getNumPartitions())


Number of partitions: 1


In [18]:
def safe_int(x):
    try:
        return int(x)
    except:
        return 0

quantity_rdd = orders_rdd.map(lambda row: safe_int(row["quantity"]))

quantity_rdd.collect()


[1, 2, 1, 3]

In [19]:
positive_qty_rdd = quantity_rdd.filter(lambda q: q > 0)

positive_qty_rdd.collect()


[1, 2, 1, 3]

In [20]:
total_quantity = positive_qty_rdd.reduce(lambda a, b: a + b)

print("Total quantity across all orders:", total_quantity)


Total quantity across all orders: 7


In [21]:
total_orders = orders_rdd.map(lambda row: 1).reduce(lambda a, b: a + b)

print("Total number of orders:", total_orders)


Total number of orders: 4


Task 4 – RDD Fundamentals

In this step, the orders DataFrame was converted into an RDD using .rdd. This exposes Spark’s low-level distributed abstraction.

The following RDD operations were demonstrated:

map() – Applied a transformation to each record (extracting quantity).

filter() – Filtered records based on a condition (positive quantities).

reduce() – Aggregated values across partitions to compute totals.

RDDs provide fine-grained control over distributed data processing and allow arbitrary transformations. However, they lack the Catalyst optimizer and Tungsten execution engine available in Spark SQL and DataFrames. As a result, RDD operations are generally less efficient and more verbose compared to DataFrame APIs.

In [22]:
from pyspark.sql import functions as F

# Clean customers
customers = (
    customers_raw
    .filter(F.col("customer_id").isNotNull())
    .withColumn("signup_ts", F.to_timestamp("signup_ts"))
    .dropDuplicates(["customer_id"])
)

# Clean products
products = (
    products_raw
    .filter(F.col("product_id").isNotNull())
    .withColumn("unit_price", F.col("unit_price").cast("double"))
    .dropDuplicates(["product_id"])
)

# Clean orders
orders = (
    orders_raw
    .filter(F.col("order_id").isNotNull())
    .filter(F.col("customer_id").isNotNull())
    .filter(F.col("product_id").isNotNull())
    .withColumn("quantity", F.col("quantity").cast("int"))
    .withColumn("order_ts", F.to_timestamp("order_ts"))
    .filter(F.col("quantity") > 0)
    .dropDuplicates(["order_id"])
)


In [23]:
orders.printSchema()
orders.show()


root
 |-- order_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- order_ts: timestamp (nullable = true)
 |-- platform: string (nullable = true)

+--------+-----------+----------+--------+-------------------+--------+
|order_id|customer_id|product_id|quantity|           order_ts|platform|
+--------+-----------+----------+--------+-------------------+--------+
|    O001|       C001|      P001|       1|2023-02-01 11:00:00|     Web|
|    O002|       C002|      P002|       2|2023-04-01 15:20:00|  Mobile|
|    O003|       C001|      P003|       1|2023-05-10 09:00:00|     Web|
|    O004|       C003|      P004|       3|2022-12-15 16:40:00|     Web|
+--------+-----------+----------+--------+-------------------+--------+



In [24]:
fact_orders = (
    orders.alias("o")
    .join(products.alias("p"), on="product_id", how="left")
)


In [25]:
fact_orders = (
    fact_orders
    .withColumn("order_value", F.col("quantity") * F.col("unit_price"))
    .withColumn("is_high_value_order", F.col("order_value") >= 500)
    .withColumn("order_year", F.year("order_ts"))
)


In [26]:
fact_orders.show()


+----------+--------+-----------+--------+-------------------+--------+--------------+---------------+----------+-----------+-------------------+----------+
|product_id|order_id|customer_id|quantity|           order_ts|platform|  product_name|       category|unit_price|order_value|is_high_value_order|order_year|
+----------+--------+-----------+--------+-------------------+--------+--------------+---------------+----------+-----------+-------------------+----------+
|      P001|    O001|       C001|       1|2023-02-01 11:00:00|     Web|        Laptop|    Electronics|    1200.5|     1200.5|               true|      2023|
|      P002|    O002|       C002|       2|2023-04-01 15:20:00|  Mobile|    Headphones|    Electronics|     150.0|      300.0|              false|      2023|
|      P003|    O003|       C001|       1|2023-05-10 09:00:00|     Web|Coffee Machine|Home Appliances|    300.75|     300.75|              false|      2023|
|      P004|    O004|       C003|       3|2022-12-15 16:40

In [27]:
fact_orders.select("order_id", "customer_id", "order_value").show()


+--------+-----------+-----------+
|order_id|customer_id|order_value|
+--------+-----------+-----------+
|    O001|       C001|     1200.5|
|    O002|       C002|      300.0|
|    O003|       C001|     300.75|
|    O004|       C003|      661.2|
+--------+-----------+-----------+



In [28]:
high_value_orders = fact_orders.filter(F.col("is_high_value_order") == True)
high_value_orders.show()


+----------+--------+-----------+--------+-------------------+--------+------------+-----------+----------+-----------+-------------------+----------+
|product_id|order_id|customer_id|quantity|           order_ts|platform|product_name|   category|unit_price|order_value|is_high_value_order|order_year|
+----------+--------+-----------+--------+-------------------+--------+------------+-----------+----------+-----------+-------------------+----------+
|      P001|    O001|       C001|       1|2023-02-01 11:00:00|     Web|      Laptop|Electronics|    1200.5|     1200.5|               true|      2023|
|      P004|    O004|       C003|       3|2022-12-15 16:40:00|     Web|  Desk Chair|  Furniture|     220.4|      661.2|               true|      2022|
+----------+--------+-----------+--------+-------------------+--------+------------+-----------+----------+-----------+-------------------+----------+



In [29]:
yearly_revenue = (
    fact_orders
    .groupBy("order_year")
    .agg(
        F.sum("order_value").alias("total_revenue"),
        F.countDistinct("order_id").alias("order_count")
    )
    .orderBy("order_year")
)

yearly_revenue.show()


+----------+-------------+-----------+
|order_year|total_revenue|order_count|
+----------+-------------+-----------+
|      2022|        661.2|          1|
|      2023|      1801.25|          3|
+----------+-------------+-----------+



Task 5 – DataFrame Transformations

This stage converts raw datasets into cleaned analytical tables.

Data quality improvements included:

Removing null identifiers

Casting numeric fields (quantity, unit_price)

Converting timestamp fields to proper datetime types

Removing duplicate records

Orders were joined with products to create a fact table. Derived columns were created:

order_value = quantity × unit_price

is_high_value_order = Boolean flag for revenue ≥ 500

order_year extracted from timestamp

Core Spark operations demonstrated:

select()

withColumn()

filter()

groupBy()

agg()

These transformations use Spark SQL’s Catalyst optimizer, making them more efficient than equivalent RDD operations.