## Pranav Spark Notebook (with comments)

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

# Initialize Spark Session
spark = SparkSession.builder.appName("E-Commerce Analysis").getOrCreate()

# Read the CSV file
df = spark.read.csv("e_commerce.csv", header=True, inferSchema=True)

# 1. Basic: Display schema
df.printSchema()

# 2. Basic: Show first 10 rows
df.show(10)

# 3. Basic: Count total number of rows
total_rows = df.count()
print(f"Total number of rows: {total_rows}")

# 4. Medium: Get basic statistics of numeric columns
df.select("quantity", "price").describe().show()


In [None]:
# 5. Medium: Count distinct customers
distinct_customers = df.select("customer_id").distinct().count()
print(f"Number of distinct customers: {distinct_customers}")

# 6. Medium: Top 5 selling products
df.groupBy("product_name").agg(sum("quantity").alias("total_quantity")) \
  .orderBy(desc("total_quantity")).show(5)

# 7. Medium: Total revenue by category
df.groupBy("category").agg(sum(col("quantity") * col("price")).alias("total_revenue")) \
  .orderBy(desc("total_revenue")).show()

# 8. Advanced: Daily revenue trend
df.withColumn("date", to_date("transaction_date")) \
  .groupBy("date") \
  .agg(sum(col("quantity") * col("price")).alias("daily_revenue")) \
  .orderBy("date").show()

# 9. Advanced: Customer segmentation by total spend
window_spec = Window.orderBy(desc("total_spend"))
customer_segments = df.groupBy("customer_id") \
  .agg(sum(col("quantity") * col("price")).alias("total_spend")) \
  .withColumn("rank", dense_rank().over(window_spec)) \
  .withColumn("segment", when(col("rank") <= 100, "Top 100") \
                        .when(col("rank") <= 1000, "Top 1000") \
                        .otherwise("Regular"))
customer_segments.show()

# 10. Advanced: Product affinity analysis (products often bought together)
from itertools import combinations
product_pairs = df.groupBy("transaction_id") \
  .agg(collect_set("product_name").alias("products"))
product_pairs = product_pairs.withColumn("product_pairs", explode(arrays_zip(
    array([lit(x) for x in combinations(range(5), 2)]),
    arrays_zip(*[slice(col("products"), i, i+1) for i in range(5)])
)))
product_pairs.select("product_pairs.*") \
  .groupBy("0", "1") \
  .count() \
  .orderBy(desc("count")) \
  .show(10)

# 11. Basic: Most popular payment method
df.groupBy("payment_method").count().orderBy(desc("count")).show()

# 12. Medium: Average order value
avg_order_value = df.groupBy("transaction_id") \
  .agg(sum(col("quantity") * col("price")).alias("order_value")) \
  .select(avg("order_value")).first()[0]
print(f"Average Order Value: ${avg_order_value:.2f}")

# 13. Medium: Monthly sales trend
df.withColumn("year_month", date_format("transaction_date", "yyyy-MM")) \
  .groupBy("year_month") \
  .agg(sum(col("quantity") * col("price")).alias("monthly_sales")) \
  .orderBy("year_month").show()

# 14. Advanced: Customer lifetime value (CLV)
clv = df.groupBy("customer_id") \
  .agg(sum(col("quantity") * col("price")).alias("total_spend"), 
       datediff(max("transaction_date"), min("transaction_date")).alias("days_as_customer")) \
  .withColumn("clv", col("total_spend") / (col("days_as_customer") / 365)) \
  .orderBy(desc("clv"))
clv.show()

# 15. Advanced: Seasonal product analysis
df.withColumn("month", month("transaction_date")) \
  .groupBy("month", "category") \
  .agg(sum("quantity").alias("total_quantity")) \
  .orderBy("month", desc("total_quantity")) \
  .show()

# 16. Basic: Number of partitions
num_partitions = df.rdd.getNumPartitions()
print(f"Number of partitions: {num_partitions}")

# 17. Advanced: Repartition for better performance
df_repartitioned = df.repartition(100)  # Adjust the number based on your cluster size

# 18. Advanced: Cache the dataframe for repeated use
df_repartitioned.cache()

# 19. Medium: Use Spark SQL for complex queries
df.createOrReplaceTempView("e_commerce")
spark.sql("""
  SELECT category, subcategory, SUM(quantity * price) as revenue
  FROM e_commerce
  GROUP BY category, subcategory
  ORDER BY revenue DESC
  LIMIT 10
""").show()

# 20. Advanced: Window functions for customer ranking
window_spec = Window.partitionBy("category").orderBy(desc("total_spend"))
customer_category_rank = df.groupBy("customer_id", "category") \
  .agg(sum(col("quantity") * col("price")).alias("total_spend")) \
  .withColumn("rank", dense_rank().over(window_spec))
customer_category_rank.show()



In [None]:
# 21. Advanced: UDF for categorizing transaction time
from pyspark.sql.types import StringType

@udf(returnType=StringType())
def categorize_time(timestamp):
    hour = timestamp.hour
    if 5 <= hour < 12:
        return "Morning"
    elif 12 <= hour < 17:
        return "Afternoon"
    elif 17 <= hour < 21:
        return "Evening"
    else:
        return "Night"

df_with_time_category = df.withColumn("time_category", categorize_time(col("transaction_date")))
df_with_time_category.groupBy("time_category").count().orderBy(desc("count")).show()

# 22. Medium: Pivot table for category sales by month
pivoted_sales = df.withColumn("month", date_format("transaction_date", "MMM")) \
  .groupBy("category") \
  .pivot("month") \
  .agg(sum(col("quantity") * col("price")).alias("sales")) \
  .orderBy("category")
pivoted_sales.show()

# 23. Advanced: Analyze data skew
df.groupBy("category").count().orderBy(desc("count")).show()

# 24. Check executor and cluster details
print(spark.sparkContext.getConf().getAll())

# 25. Advanced: Use broadcast join for small lookup tables
# Assuming we have a small product_info dataframe
product_info = spark.createDataFrame([
    ("product_id1", "Brand A"),
    ("product_id2", "Brand B")
], ["product_id", "brand"])

broadcast_df = broadcast(product_info)
df_with_brand = df.join(broadcast_df, "product_id", "left")

# 26. Advanced: Write optimized parquet files
df.write.partitionBy("category").format("parquet").mode("overwrite").save("e_commerce_partitioned")

# 27. Advanced: Read optimized parquet files
df_optimized = spark.read.parquet("e_commerce_partitioned")

# Remember to stop the Spark session when done
spark.stop()

In [None]:
# 28. Advanced: Cohort analysis for customer retention
from pyspark.sql.functions import date_trunc, datediff, count, countDistinct

df = df.withColumn("cohort_month", date_trunc("month", col("transaction_date")))
df = df.withColumn("months_since_first_purchase", 
                   datediff(date_trunc("month", col("transaction_date")), 
                            col("cohort_month")) / 30)

cohort_analysis = df.groupBy("cohort_month", "months_since_first_purchase") \
    .agg(countDistinct("customer_id").alias("customer_count")) \
    .orderBy("cohort_month", "months_since_first_purchase")

cohort_analysis.show()

# 29. Advanced: RFM (Recency, Frequency, Monetary) Analysis
from pyspark.sql.functions import max, datediff, count, sum

current_date = df.agg(max("transaction_date")).collect()[0][0]

rfm = df.groupBy("customer_id").agg(
    datediff(lit(current_date), max("transaction_date")).alias("recency"),
    count("transaction_id").alias("frequency"),
    sum(col("quantity") * col("price")).alias("monetary")
)

rfm.show()

# 30. Medium: Calculate average time between purchases for each customer
from pyspark.sql.functions import lag, avg

window_spec = Window.partitionBy("customer_id").orderBy("transaction_date")

time_between_purchases = df.withColumn("prev_purchase_date", 
                                       lag("transaction_date").over(window_spec)) \
    .withColumn("days_between_purchases", 
                datediff(col("transaction_date"), col("prev_purchase_date"))) \
    .groupBy("customer_id") \
    .agg(avg("days_between_purchases").alias("avg_days_between_purchases"))

time_between_purchases.show()

# 31. Advanced: Identify potential fraudulent transactions
from pyspark.sql.functions import stddev

avg_order_value = df.groupBy("customer_id") \
    .agg(avg(col("quantity") * col("price")).alias("avg_order_value"))

stddev_order_value = df.groupBy("customer_id") \
    .agg(stddev(col("quantity") * col("price")).alias("stddev_order_value"))

potential_fraud = df.join(avg_order_value, "customer_id") \
    .join(stddev_order_value, "customer_id") \
    .filter(col("quantity") * col("price") > col("avg_order_value") + 3 * col("stddev_order_value"))

potential_fraud.show()

# 32. Medium: Calculate running total of sales for each product
window_spec = Window.partitionBy("product_id").orderBy("transaction_date") \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

running_total = df.withColumn("running_total", 
                              sum(col("quantity") * col("price")).over(window_spec))

running_total.show()

# 33. Advanced: Implement a recommendation system using ALS
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Prepare data for ALS
als_data = df.select("customer_id", "product_id", "quantity")

# Split data into training and test sets
(training, test) = als_data.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
als = ALS(maxIter=5, regParam=0.01, userCol="customer_id", itemCol="product_id", ratingCol="quantity",
          coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="quantity", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")

# Generate top 10 product recommendations for each user
userRecs = model.recommendForAllUsers(10)
userRecs.show()

# 34. Medium: Analyze cart abandonment rate
cart_abandonment = df.groupBy("customer_id", "transaction_id") \
    .agg(sum(when(col("payment_method").isNull(), 1).otherwise(0)).alias("abandoned_items"),
         count("*").alias("total_items")) \
    .withColumn("abandonment_rate", col("abandoned_items") / col("total_items"))

cart_abandonment.show()

# 35. Advanced: Implement a custom Accumulator
from pyspark.accumulators import AccumulatorParam

class SetAccumulator(AccumulatorParam):
    def zero(self, initialValue):
        return set(initialValue)

    def addInPlace(self, v1, v2):
        v1.update(v2)
        return v1

unique_products = spark.sparkContext.accumulator(set(), SetAccumulator())

def add_products(row):
    unique_products.add({row.product_id})

df.foreach(add_products)

print(f"Number of unique products: {len(unique_products.value)}")

# 36. Advanced: Use Spark Streaming to process real-time sales data
from pyspark.streaming import StreamingContext

ssc = StreamingContext(spark.sparkContext, 1)  # 1-second batch interval

# Assume we have a stream of sales data coming in
sales_stream = ssc.socketTextStream("localhost", 9999)

# Process the stream
parsed_sales = sales_stream.map(lambda line: line.split(","))
revenue_stream = parsed_sales.map(lambda x: (x[0], float(x[1]))) \
    .reduceByKey(lambda x, y: x + y)

revenue_stream.pprint()

ssc.start()
ssc.awaitTermination()

# 37. Medium: Use Spark MLlib for customer churn prediction
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Prepare features
feature_columns = ["total_spend", "frequency", "avg_order_value", "days_since_last_purchase"]
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(df)

# Split data into training and test sets
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model
rf = RandomForestClassifier(labelCol="churn", featuresCol="features", numTrees=10)
model = rf.fit(trainingData)

# Make predictions
predictions = model.transform(testData)

# Evaluate model
evaluator = BinaryClassificationEvaluator(labelCol="churn")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy = {accuracy}")

# 38. Advanced: Implement custom Partitioner for optimized joins
from pyspark import RDD, Partitioner

class CustomPartitioner(Partitioner):
    def __init__(self, partitions):
        self.partitions = partitions

    def getPartition(self, key):
        return hash(key) % self.partitions

    def numPartitions(self):
        return self.partitions

# Use custom partitioner
rdd = df.rdd.map(lambda row: (row.customer_id, row))
partitioned_rdd = rdd.partitionBy(100, CustomPartitioner(100))

# 39. Medium: Analyze customer lifetime value (CLV) distribution
from pyspark.sql.functions import percentile_approx

clv_distribution = df.groupBy("customer_id") \
    .agg(sum(col("quantity") * col("price")).alias("clv")) \
    .select(
        percentile_approx("clv", array([0.25, 0.5, 0.75, 0.9, 0.95, 0.99]), 10000).alias("percentiles")
    ).first()

print("CLV Distribution:")
print(f"25th percentile: {clv_distribution.percentiles[0]}")
print(f"Median: {clv_distribution.percentiles[1]}")
print(f"75th percentile: {clv_distribution.percentiles[2]}")
print(f"90th percentile: {clv_distribution.percentiles[3]}")
print(f"95th percentile: {clv_distribution.percentiles[4]}")
print(f"99th percentile: {clv_distribution.percentiles[5]}")



In [None]:
# 40. Advanced: Implement custom UDF with Pandas UDF for better performance
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd

@pandas_udf("double", PandasUDFType.SCALAR)
def calculate_discount(quantity: pd.Series, price: pd.Series) -> pd.Series:
    return np.where(quantity * price > 1000, 0.1, 0.05)

df_with_discount = df.withColumn("discount", calculate_discount(col("quantity"), col("price")))
df_with_discount.show()

# 41. Medium: Analyze product return rate
returns = spark.read.csv("returns.csv", header=True, inferSchema=True)
return_rate = df.join(returns, "transaction_id", "left") \
    .groupBy("product_id") \
    .agg(
        count(when(col("return_reason").isNotNull(), True)).alias("returns"),
        count("*").alias("total_sales")
    ) \
    .withColumn("return_rate", col("returns") / col("total_sales"))

return_rate.orderBy(desc("return_rate")).show()

# 42. Advanced: Implement A/B testing analysis
ab_test = spark.read.csv("ab_test.csv", header=True, inferSchema=True)
ab_results = ab_test.groupBy("test_group") \
    .agg(
        avg("conversion_rate").alias("avg_conversion_rate"),
        stddev("conversion_rate").alias("stddev_conversion_rate"),
        count("*").alias("sample_size")
    )

ab_results.show()

# 43. Medium: Analyze customer acquisition cost
marketing_spend = spark.read.csv("marketing_spend.csv", header=True, inferSchema=True)
customer_acquisition = df.select("customer_id").distinct() \
    .join(marketing_spend, "date") \
    .groupBy("channel") \
    .agg(
        count("customer_id").alias("new_customers"),
        sum("spend").alias("total_spend")
    ) \
    .withColumn("acquisition_cost", col("total_spend") / col("new_customers"))

customer_acquisition.show()

# 44. Advanced: Implement custom Optimizer for Spark SQL
from pyspark.sql.optimizer import Optimizer

class CustomOptimizer(Optimizer):
    def apply(self, plan):
        # Implement custom optimization logic here
        return plan

spark.experimental.extraOptimizations = [CustomOptimizer()]

# 45. Medium: Analyze product affinity using Market Basket Analysis
from pyspark.ml.fpm import FPGrowth

transactions = df.groupBy("transaction_id") \
    .agg(collect_set("product_id").alias("items"))

fp_growth = FPGrowth(itemsCol="items", minSupport=0.01, minConfidence=0.5)
model = fp_growth.fit(transactions)

model.freqItemsets.show()
model.associationRules.show()

# 46. Advanced: Implement custom Broadcast variable for lookup tables
product_lookup = spark.sparkContext.broadcast({
    row["product_id"]: row["product_name"] 
    for row in df.select("product_id", "product_name").distinct().collect()
})

def lookup_product_name(product_id):
    return product_lookup.value.get(product_id, "Unknown")

lookup_udf = udf(lookup_product_name)
df_with_names = df.withColumn("product_name", lookup_udf(col("product_id")))
df_with_names.show()

# 47. Medium: Analyze seasonal trends in product categories
seasonal_trends = df.withColumn("month", month("transaction_date")) \
    .groupBy("category", "month") \
    .agg(sum(col("quantity") * col("price")).alias("revenue")) \
    .orderBy("category", "month")

seasonal_trends.show()

spark.stop()