<a href="https://colab.research.google.com/github/Sunnykumar1554/Ad-s-/blob/main/Big_Data_Analysis_on_Global_E_Commerce_Transaction_Using_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Task-4 Big Data Analytic using Apache Spark

Task 1 :- Load Dataset

In [1]:
!pip install pyspark openpyxl




In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType, DateType

import pandas as pd

spark = SparkSession.builder \
    .appName("OnlineRetailII_Analysis") \
    .getOrCreate()

spark


Upload the data to Google Colab

In [3]:
from google.colab import files
uploaded = files.upload()  # choose online_retail_II.xlsx


Saving online_retail_II.xlsx to online_retail_II.xlsx


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Read the data using Spark

In [None]:
excel_path = "/content/online_retail_II.xlsx"

# Read first sheet (or specify sheet_name if needed)
pdf_raw = pd.read_excel(excel_path)

pdf_raw.head()


Display first 10 rows,
print schema,
count total records

1. How many rows does the dataset countain?


# 525461

2. What issues do you observe in the data?

In [None]:
df_raw = spark.createDataFrame(pdf_raw)
df_raw.printSchema()
df_raw.show(10, truncate=False)

row_count = df_raw.count()
print("Total rows in raw dataset:", row_count)


Part 2: Data Cleaning using Spark(core Big Data skill)

1 Remove roe where:

*   CostomerID is null
*   Description is null



In [None]:
df_clean = df_raw.dropna(subset=["Customer ID", "Description"])


2 Convert:

*   InvoiceData to Date type
*   UnitPrice to Float



In [None]:
# If InvoiceDate is a string with date + time, we convert it to DateType
df_clean = df_clean.withColumn(
    "InvoiceDate",
    F.to_date("InvoiceDate")  # if fails, we will parse format explicitly
)


In [None]:
# Alternative robust version:
df_clean = df_clean.withColumn(
    "InvoiceDate_str",
    F.col("InvoiceDate").cast("string")
)

df_clean = df_clean.withColumn(
    "InvoiceDate",
    F.to_date("InvoiceDate_str", "yyyy-MM-dd")  # change if your format is different
).drop("InvoiceDate_str")


In [None]:
df_clean = df_clean.withColumn(
    "Price",
    F.col("Price").cast(FloatType())
)


3 Remove negative Quantity values

In [None]:
df_clean = df_clean.filter(F.col("Quantity") > 0)


4 Create new column:

In [None]:
df_clean = df_clean.withColumn(
    "TotalAmount",
    F.col("Quantity") * F.col("Price")
)


5 TotalAmount = Quantity * UnitPrice

Show cleaned DataFrame

Print updated record count = 407695

In [None]:
df_clean.show(10, truncate=False)

clean_count = df_clean.count()
print("Rows after cleaning:", clean_count)


Part 3: Business Analysis Tasks

Part (A) - Top 10 Selling Products

*   Product descriptions with highest total sales value



In [None]:
top_products = df_clean.groupBy("Description") \
    .agg(F.sum("TotalAmount").alias("total_sales")) \
    .orderBy(F.desc("total_sales")) \
    .limit(10)

top_products.show(truncate=False)


Task (B) - Country-wise Revenue

*   Total revenue per country
*   Sort in descending order



In [None]:
country_revenue = df_clean.groupBy("Country") \
    .agg(F.sum("TotalAmount").alias("total_revenue")) \
    .orderBy(F.desc("total_revenue"))

country_revenue.show(truncate=False)


Task (C) – Customer Purchase Behaviour

*   Top 10 customers by total spending



In [None]:
top_customers = df_clean.groupBy("Customer ID") \
    .agg(F.sum("TotalAmount").alias("total_spent")) \
    .orderBy(F.desc("total_spent")) \
    .limit(10)

top_customers.show(truncate=False)


Task (D) – Monthly Sales Trend

*   Extract month from InvoiceDate and calculate total monthly revenue.
*   Students must display results in tabular form.





In [None]:
df_monthly = df_clean.withColumn("Year", F.year("InvoiceDate")) \
                     .withColumn("Month", F.month("InvoiceDate"))

monthly_sales = df_monthly.groupBy("Year", "Month") \
    .agg(F.sum("TotalAmount").alias("monthly_revenue")) \
    .orderBy("Year", "Month")

monthly_sales.show(truncate=False)


In [None]:
monthly_pdf = monthly_sales.toPandas()
monthly_pdf.head()


PART 4: Big Data Performance Comparison

Normal Approach (Pandas)

In [None]:
import time

start = time.time()

# Read Excel again with Pandas
pdf = pd.read_excel(excel_path)

# Basic cleaning similar to Spark
pdf = pdf.dropna(subset=["Customer ID", "Description"])
pdf = pdf[pdf["Quantity"] > 0]

pdf["TotalAmount"] = pdf["Quantity"] * pdf["Price"]

country_rev_pd = pdf.groupby("Country")["TotalAmount"].sum().sort_values(ascending=False)

end = time.time()
pandas_time = end - start

print("Pandas aggregation time (seconds):", pandas_time)
country_rev_pd.head()


Spark Approach

In [None]:
start = time.time()

country_rev_spark = df_clean.groupBy("Country") \
    .agg(F.sum("TotalAmount").alias("total_revenue")) \
    .orderBy(F.desc("total_revenue"))

country_rev_spark.show(10, truncate=False)

end = time.time()
spark_time = end - start

print("Spark aggregation time (seconds):", spark_time)


Which was faster and why?

In my case Spark is the fastest, it happens because of manily 2 reasons

*   Because i am runnig this program in Google colab so the internet connection is one of the main point.
*   In the implementation we are not using distributed file system and parallel processing which are the core mechanism Big Data (spark) due to this Pandas may result less processing time at few cases.  



What would happen if data grows to 10 million rows?

When the data grows to 10 million row, In that case Spark will run faster every time.

Because spark uses Lazy Evaluti
When you perform transformations like:

filter, withColumn, dropna, groupBy

Spark does not execute them immediately.

It waits until you run an action, such as:

show()

count()

collect()



PART 5: Optimization Challenge (Advanced Thinking)

Students must apply:



*   .cache() to cleaned dataset




In [None]:
import time

start = time.time()

country_rev_no_cache = df_clean.groupBy("Country") \
    .agg(F.sum("TotalAmount").alias("total_revenue")) \
    .orderBy(F.desc("total_revenue"))

country_rev_no_cache.show(10, truncate=False)

end = time.time()
no_cache_time = end - start

print("Time without cache:", no_cache_time)




*   Re-run one aggregation



In [None]:
# Cache
df_clean_cached = df_clean.cache()

# Trigger cache load
df_clean_cached.count()

# Aggregation with cache
start = time.time()

country_rev_cache = df_clean_cached.groupBy("Country") \
    .agg(F.sum("TotalAmount").alias("total_revenue")) \
    .orderBy(F.desc("total_revenue"))

country_rev_cache.show(10, truncate=False)

end = time.time()
cache_time = end - start

print("Time with cache:", cache_time)




*   Compare time before and after caching



Time without cache aggregation = 3.14...
Time with cache aggregation = 0.723...



Why caching reduced exeution time?


Caching reduced exeution time because spark uses lazy evalution, when you perform transformation like filter,withcolum,dropna,groupby.


spark does not execute them immeditely.

It waits until you run an action, such as:

*   show()
*   count()
*   collent()


