In [None]:
# pip install pyspark findspark notebook

## Import the libraries

In [1]:
import findspark
import os

os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@17"
os.environ["SPARK_HOME"] = "/opt/homebrew/Cellar/apache-spark/4.0.1/libexec"

findspark.init()

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("EcommerceABTesting") \
    .master("local[*]") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

print("Spark version:", spark.version)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/10/02 18:16:58 WARN Utils: Your hostname, Anaghas-MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 10.0.0.162 instead (on interface en0)
25/10/02 18:16:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/02 18:16:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark version: 4.0.1


## 1. Load the data

In [3]:
df = spark.read.csv("data/2019-Nov.csv", header=True, inferSchema=True)
df.printSchema()



root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)



                                                                                

In [4]:
print("Data loaded:")
df.show(20)

Data loaded:
+-------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand| price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+--------+------+---------+--------------------+
|2019-10-31 20:00:00|      view|   1003461|2053013555631882655|electronics.smart...|  xiaomi|489.07|520088904|4d3b30da-a5e4-49d...|
|2019-10-31 20:00:00|      view|   5000088|2053013566100866035|appliances.sewing...|  janome|293.65|530496790|8e5f4f83-366c-4f7...|
|2019-10-31 20:00:01|      view|  17302664|2053013553853497655|                NULL|   creed| 28.31|561587266|755422e7-9040-477...|
|2019-10-31 20:00:01|      view|   3601530|2053013563810775923|appliances.kitche...|      lg|712.87|518085591|3bfb58cd-7892-48c...|
|2019-10-31 20:00:01|      view|   1004775|2053013555631882655|

In [5]:
df_all = spark.read.csv(
    ["data/2019-Oct.csv", "data/2019-Nov.csv"],
    header=True,
    inferSchema=True
)

                                                                                

In [6]:
print("Complete Data loaded:")
df_all.show(20)

Complete Data loaded:
+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|2019-09-30 20:00:00|      view|  44600062|2103807459595387724|                NULL|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|
|2019-09-30 20:00:00|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|   33.2|554748717|9333dfbd-b87a-470...|
|2019-09-30 20:00:01|      view|  17200506|2053013559792632471|furniture.living_...|    NULL|  543.1|519107250|566511c2-e2e3-422...|
|2019-09-30 20:00:01|      view|   1307067|2053013558920217191|  computers.notebook|  lenovo| 251.74|550050854|7c90fc70-0e80-459...|
|2019-09-30 20:00:04|      view|   1004237|2053

### Count the number of rows

In [7]:
print("Dataset size: ", df_all.count())



Dataset size:  109950743


                                                                                

## 2.Data Preprocessing

In [8]:
from pyspark.sql.functions import col, sum, count, when, rand, max as spark_max, to_date, coalesce, lit, max

#### Inspect missing values

In [9]:
print("Missing values in the dataset:")
df_all.select([
    sum(col(c).isNull().cast("int")).alias(c) for c in df_all.columns
]).show()

Missing values in the dataset:




+----------+----------+----------+-----------+-------------+--------+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|   brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+--------+-----+-------+------------+
|         0|         0|         0|          0|     35413780|15331243|    0|      0|          12|
+----------+----------+----------+-----------+-------------+--------+-----+-------+------------+



                                                                                

#### Handling missing values

In [10]:
df_all = df_all \
    .withColumn("category_code", coalesce(col("category_code"), lit("unknown"))) \
    .withColumn("brand", coalesce(col("brand"), lit("unknown")))

df_all = df_all.na.drop(subset=["price"])

In [11]:
df_all = df_all.na.drop(subset=["user_session"])

In [12]:
print("Missing values in the dataset after preprocessing:")
df_all.select([
    sum(col(c).isNull().cast("int")).alias(c) for c in df_all.columns
]).show()

Missing values in the dataset after preprocessing:




+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+
|event_time|event_type|product_id|category_id|category_code|brand|price|user_id|user_session|
+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+
|         0|         0|         0|          0|            0|    0|    0|      0|           0|
+----------+----------+----------+-----------+-------------+-----+-----+-------+------------+



                                                                                

In [13]:
print("Datafram size after handling the missing values:", df_all.count())



Datafram size after handling the missing values: 109950731


                                                                                

#### Filter Relevant Events

For A/B testing, user behavior funnel matters.

view --> cart --> purchase

In [14]:
df_all = df_all.filter(col("event_type").isin(["view", "cart", "purchase"]))

#### Extract Date

In [15]:
df_all = df_all.withColumn("event_date", to_date("event_time"))

#### Assign A/B Groups

In [16]:
user_groups = df_all.select("user_id").distinct() \
    .withColumn("group", when(rand() < 0.5, "A").otherwise("B"))

df_all = df_all.join(user_groups, on="user_id", how="left")

#### Define Conversion

Conversion = at least one purchase in a session.

In [17]:
conversion = df_all.groupBy("user_id", "group", "event_date") \
    .agg(
        max(when(col("event_type") == "purchase", 1).otherwise(0)).alias("converted")
    )

#### Aggregate for Group-Level Metrics

In [18]:
summary = conversion.groupBy("group") \
    .agg(
        sum("converted").alias("conversions"),
        count("user_id").alias("total_users")
    )

summary = summary.withColumn("conversion_rate", col("conversions")/col("total_users"))
summary.show()



+-----+-----------+-----------+-------------------+
|group|conversions|total_users|    conversion_rate|
+-----+-----------+-----------+-------------------+
|    B|     583963|    7575349|0.07708727347083283|
|    A|     587907|    7587288|0.07748578938877765|
+-----+-----------+-----------+-------------------+



                                                                                

## 3.Statistical Hypothesis Testing

In [19]:
import pandas as pd
from scipy.stats import chi2_contingency
from statsmodels.stats.proportion import proportions_ztest

In [20]:
conversions_A = 348659
users_A = 2657570

conversions_B = 348811
users_B = 2659079

In [21]:
# Building contingency table for Chi-square test
data = [
    [conversions_A, users_A - conversions_A],  # Group A
    [conversions_B, users_B - conversions_B]   # Group B
]

In [22]:
table = pd.DataFrame(
    data,
    columns=["Converted", "Not Converted"],
    index=["Group A", "Group B"]
)

print("Contingency Table:")
print(table)

Contingency Table:
         Converted  Not Converted
Group A     348659        2308911
Group B     348811        2310268


In [23]:
# Chi-Square Test
chi2, p_chi, dof, expected = chi2_contingency(table)
print("\nChi-Square Test Results")
print("Chi2 Statistic:", chi2)
print("Degrees of Freedom:", dof)
print("p-value:", p_chi)


Chi-Square Test Results
Chi2 Statistic: 0.003335757486803909
Degrees of Freedom: 1
p-value: 0.9539429730990618


In [24]:
# Two-Proportion Z-Test
conversions = [conversions_A, conversions_B]
totals = [users_A, users_B]

z_stat, p_z = proportions_ztest(conversions, totals)
print("\nTwo-Proportion Z-Test Results")
print("Z-Statistic:", z_stat)
print("p-value:", p_z)


Two-Proportion Z-Test Results
Z-Statistic: 0.059040634436065685
p-value: 0.952919742943726


In [25]:
# Interpretation
alpha = 0.05
if p_z < alpha:
    print("Conclusion: Reject H0 → Conversion rates differ significantly between groups.")
else:
    print("Conclusion: Fail to Reject H0 → No significant difference between groups.")

Conclusion: Fail to Reject H0 → No significant difference between groups.


### **Interpretation:**

- The observed conversion rates for Group A (\~13.12%) and Group B (\~13.18%) are statistically indistinguishable. This is expected, because the user are split randomly into A/B groups without introducing an actual “new feature".
- If a product change (e.g., new checkout feature) is introduced to Group B, then on running the same analysis,it would be possible to test if that feature truly lifted conversions.

## Saving the data

In [26]:
# Group-level summary (already works fine)
summary_pd = summary.toPandas()
summary_pd.to_csv("outputs/group_summary.csv", index=False)

# Daily conversion summary
daily_summary = conversion.groupBy("event_date", "group") \
    .agg(
        sum("converted").alias("conversions"),
        count("user_id").alias("total_users")
    ) \
    .withColumn("conversion_rate", col("conversions")/col("total_users"))

daily_summary_pd = daily_summary.toPandas()
daily_summary_pd.to_csv("outputs/daily_summary.csv", index=False)

print("CSV files exported successfully!")



CSV files exported successfully!


                                                                                