In [None]:
pip install pyspark

In [None]:
!python3 -c "import pyspark; print(pyspark.__version__)"

<h2>Import Necessary Libraries</h2>

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
from pyspark.sql.functions import date_format
from pyspark.sql.functions import count
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import year
from pyspark.sql.functions import sum as spark_sum
from pyspark.sql.functions import explode, col
from pyspark.sql.functions import sum as spark_sum, count

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Ecommerce Big Data Visualization") \
    .getOrCreate()

spark


<h2>Load data (into Notebook Cell)</h2>

In [None]:
transactions = spark.read.json("/project/data_raw/transactions.json")
transactions.printSchema()
transactions.show(5, truncate=False)


<h2>Convert a small sample to Pandas</h2>

In [None]:
pdf_transactions = transactions.limit(20).toPandas()
pdf_transactions


<h2>Overall Transactions status</h2>

In [None]:
transactions_df = transactions.select(
    "transaction_id",
    "user_id",
    "timestamp",
    "total",
    "payment_method",
    "status"
)

transactions_df.show(10, truncate=False)


<h2>No of Completed, Shipped,Processed,Delivered Transactions</h2>

In [None]:
status_counts = transactions_df.groupBy("status") \
    .agg(count("*").alias("count")) \
    .orderBy("count", ascending=False)

status_counts.show()


In [None]:
pdf_status = status_counts.toPandas()
pdf_status


**Convert to Pandas**

In [None]:
plt.figure(figsize=(8,5))
plt.bar(
    pdf_status["status"],
    pdf_status["count"]
)

plt.title("Transaction Status Distribution")
plt.xlabel("Transaction Status")
plt.ylabel("Number of Transactions")
plt.grid(axis="y")
plt.tight_layout()
plt.show()


The above visualization illustrates the overall number of completed, shipped, processed, and delivered transactions.

<h2>Let Ensure timestamp is usable</h2>

In [None]:
transactions_ts = transactions.withColumn(
    "timestamp_ts", to_timestamp("timestamp")
)

transactions_ts.select("timestamp", "timestamp_ts").show(5, truncate=False)


<h2>DAILY Revenue Trend Visualization<h2/>

In [None]:
from pyspark.sql.functions import to_date, sum as spark_sum

daily_revenue = transactions_ts.withColumn(
    "date", to_date("timestamp_ts")
).groupBy("date").agg(
    spark_sum("total").alias("daily_revenue")
).orderBy("date")

daily_revenue.show(10)


In [None]:
pdf_daily = daily_revenue.toPandas()
pdf_daily_revenue.head()

**Plot Daily Revenue**

In [None]:
#Plot Daily Revenue
plt.figure(figsize=(10,5))
plt.plot(
    pdf_daily["date"],
    pdf_daily["daily_revenue"],
    marker="o"
)

plt.title("Daily Revenue Trend")
plt.xlabel("Date")
plt.ylabel("Revenue")
plt.xticks(rotation=45)
plt.grid(True)
plt.tight_layout()
plt.show()


**Interpretation:** The above visualization highlights customer payment preferences. The results provide insight into dominant payment channels, supporting strategic decisions related to payment integrations and customer convenience.

<h2>MONTHLY Revenue Trend Visualization</h2>

In [None]:
monthly_revenue = transactions_ts.withColumn(
    "month", date_format("timestamp_ts", "yyyy-MM")
).groupBy("month").agg(
    spark_sum("total").alias("monthly_revenue")
).orderBy("month")

monthly_revenue.show()


In [None]:
pdf_monthly = monthly_revenue.toPandas()
pdf_monthly.head()

In [None]:
plt.figure(figsize=(10,5))
plt.plot(
    pdf_monthly["month"],
    pdf_monthly["monthly_revenue"],
    marker="o"
)

plt.title("Monthly Revenue Trend")
plt.xlabel("Month")
plt.ylabel("Revenue")
plt.xticks(rotation=45)
plt.grid(True)
plt.tight_layout()
plt.show()


**Interpretation:** The monthly revenue trend smooths daily fluctuations and reveals broader sales patterns, supporting seasonal and performance analysis

<h2>YEARLY Revenue Trend visualization</h2>

In [None]:
yearly_revenue = transactions_ts.withColumn(
    "year", year("timestamp_ts")
).groupBy("year").agg(
    spark_sum("total").alias("yearly_revenue")
).orderBy("year")

yearly_revenue.show()


In [None]:
pdf_yearly = yearly_revenue.toPandas()
pdf_yearly.head()

In [None]:
plt.figure(figsize=(8,5))
plt.bar(
    pdf_yearly["year"].astype(str),
    pdf_yearly["yearly_revenue"]
)

plt.title("Yearly Revenue Trend")
plt.xlabel("Year")
plt.ylabel("Revenue")
plt.grid(axis="y")
plt.tight_layout()
plt.show()


**Interpretation:** The yearly revenue trend highlights long-term business growth and overall performance, useful for strategic planning and forecasting

<h2>Revenue by Payment Method</h2>

In [None]:
payment_revenue = transactions.groupBy("payment_method").agg(
    spark_sum("total").alias("total_revenue")
).orderBy("total_revenue", ascending=False)

payment_revenue.show()


In [None]:
pdf_payment = payment_revenue.toPandas()
pdf_payment.head()

In [None]:
plt.figure(figsize=(8,5))
plt.bar(
    pdf_payment["payment_method"],
    pdf_payment["total_revenue"]
)

plt.title("Revenue by Payment Method")
plt.xlabel("Payment Method")
plt.ylabel("Total Revenue")
plt.grid(axis="y")
plt.tight_layout()
plt.show()


**Interpretation:** This visualization shows how revenue is distributed across different payment methods, helping identify the most preferred and profitable payment channels.

<h2>Top Products by Revenue</h2>

**Explode items**

In [None]:
#Explode items
items_df = transactions.select(
    explode("items").alias("item")
)

items_df.show(5, truncate=False)

**Explode items**

In [None]:
products_df = items_df.select(
    col("item.product_id").alias("product_id"),
    col("item.quantity").alias("quantity"),
    col("item.subtotal").alias("revenue")
)

products_df.show(5)


**Aggregate revenue by product category**

In [None]:
top_products = products_df.groupBy("product_id").agg(
    spark_sum("revenue").alias("total_revenue"),
    spark_sum("quantity").alias("total_quantity")
).orderBy("total_revenue", ascending=False)

top_products.show(10)

In [None]:
pdf_products = top_products.limit(10).toPandas()
pdf_products.head()

<h2>Visualization (Top 10 Products)</h2>

In [None]:
plt.figure(figsize=(10,5))
plt.bar(
    pdf_products["product_id"],
    pdf_products["total_revenue"]
)

plt.title("Top 10 Products by Revenue")
plt.xlabel("Product ID")
plt.ylabel("Revenue")
plt.xticks(rotation=45)
plt.grid(axis="y")
plt.tight_layout()
plt.show()

**Interpretation:** The top products by revenue represent the most profitable items in the platform and can guide inventory planning and marketing strategies

<h2>Aggregate spending per customer (Spark)</h2>

In [None]:
customer_spending = transactions.groupBy("user_id").agg(
    spark_sum("total").alias("total_spent"),
    count("*").alias("num_transactions")
).orderBy("total_spent", ascending=False)

customer_spending.show(10)


In [None]:
pdf_customers = customer_spending.toPandas()
pdf_customers.head()


In [None]:
plt.figure(figsize=(10,5))
plt.hist(
    pdf_customers["total_spent"],
    bins=50
)

plt.title("Distribution of Customer Total Spending")
plt.xlabel("Total Spending")
plt.ylabel("Number of Customers")
plt.grid(True)
plt.tight_layout()
plt.show()


**Interpretation:** The histogram shows that most customers fall into lower spending ranges, while a smaller group of high-value customers contributes disproportionately to total revenue, reflecting typical e-commerce purchasing behavior.

<h2>Customer Segmentation (Low / Medium / High spenders)</h2>

In [None]:
pdf_customers["spending_segment"] = pd.qcut(
    pdf_customers["total_spent"],
    q=3,
    labels=["Low", "Medium", "High"]
)

pdf_customers["spending_segment"].value_counts()


**Visualize segments**

In [None]:
#Visualize segments
segment_counts = pdf_customers["spending_segment"].value_counts()

plt.figure(figsize=(6,5))
segment_counts.plot(kind="bar")

plt.title("Customer Spending Segments")
plt.xlabel("Spending Segment")
plt.ylabel("Number of Customers")
plt.grid(axis="y")
plt.tight_layout()
plt.show()


**Interpretation:** Customers can be grouped into low, medium, and high spending segments. High-value customers, though fewer in number, represent a critical segment for revenue optimization and targeted marketing strategies.

<h2>Relationship: Transactions vs Spending<h2/>

In [None]:
#Behavioral insight.
plt.figure(figsize=(8,5))
plt.scatter(
    pdf_customers["num_transactions"],
    pdf_customers["total_spent"],
    alpha=0.5
)

plt.title("Customer Transactions vs Total Spending")
plt.xlabel("Number of Transactions")
plt.ylabel("Total Spending")
plt.grid(True)
plt.tight_layout()
plt.show()

**Interpretation:** The scatter plot reveals a positive relationship between transaction frequency and total spending, indicating that repeat customers tend to generate higher revenue.

<h2>CONVERSION FUNNEL ANALYSIS</h2>

**Let first confirm path (run this cell)**

In [None]:
import os

os.listdir("/project/data_raw")


**Let Create the sessions Spark DataFrame**

In [None]:
sessions = spark.read.json("/project/data_raw/sessions_*.json")

sessions.printSchema()
sessions.show(5, truncate=False)

In [None]:
from pyspark.sql.functions import count

session_funnel = sessions.groupBy("conversion_status") \
    .agg(count("*").alias("session_count"))

session_funnel.show()


**How effectively do user sessions convert into purchases?**

In [None]:
from pyspark.sql.functions import count

session_funnel = sessions.groupBy("conversion_status") \
    .agg(count("*").alias("session_count"))

session_funnel.show()
