In [21]:
!pip install findspark

Defaulting to user installation because normal site-packages is not writeable


In [22]:
import findspark
findspark.init()

# Start PySpark Session

In [23]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MySparkApp").getOrCreate()

# Load the Raw dataset into a DataFrame

In [22]:
data_path = "purchase.csv"
purchase_df = spark.read.csv(data_path, header=True, inferSchema=True)

purchase_df.show(5)




+-------------------+-------------------+-------------------+-------------------+--------------------+-------+------+-------------------+
|         event_time|           order_id|         product_id|        category_id|       category_code|  brand| price|            user_id|
+-------------------+-------------------+-------------------+-------------------+--------------------+-------+------+-------------------+
|2020-04-24 17:20:39|2294359932054536986|1515966223509089906|2268105426648170900|  electronics.tablet|samsung|162.01|1515915625441993984|
|2020-04-24 17:20:39|2294359932054536986|1515966223509089906|2268105426648170900|  electronics.tablet|samsung|162.01|1515915625441993984|
|2020-04-24 20:07:43|2294444024058086220|2273948319057183658|2268105430162997728|electronics.audio...| huawei| 77.52|1515915625447879434|
|2020-04-24 20:07:43|2294444024058086220|2273948319057183658|2268105430162997728|electronics.audio...| huawei| 77.52|1515915625447879434|
|2020-04-25 00:46:21|2294584263154

                                                                                

# Datatypes of each column

In [64]:
purchase_df.dtypes

[('event_time', 'timestamp'),
 ('order_id', 'bigint'),
 ('product_id', 'bigint'),
 ('category_id', 'bigint'),
 ('category_code', 'string'),
 ('brand', 'string'),
 ('price', 'double'),
 ('user_id', 'bigint')]

# Count number of rows

In [23]:
row_cnt = purchase_df.count()
print(row_cnt)


2633521


# Drop dulplicate rows

In [24]:
purchase_df = purchase_df.dropDuplicates()

In [25]:
row_cnt = purchase_df.count()
print(row_cnt)

24/02/16 22:32:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:04 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:06 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:06 WARN RowBasedKeyValueBatch: Calling spill() on

2632846


                                                                                

# Find the minimum and maximum values of event_time

In [26]:
from pyspark.sql.functions import min, max

min = purchase_df.select(min("event_time")).first()[0]
max = purchase_df.select(max("event_time")).first()[0]

print("Minimum event_time:", min)
print("Maximum event_time:", max)


[Stage 48:>                                                         (0 + 8) / 8]

Minimum event_time: 1970-01-01 06:03:40
Maximum event_time: 2020-11-21 15:40:30


                                                                                

# Extract the year from the event_time column and count distinct years

In [27]:
from pyspark.sql.functions import year

dist_yrs_cnt = purchase_df.select(year("event_time").alias("year")).distinct().count()

print("Count of distinct years:", dist_yrs_cnt)




Count of distinct years: 2


                                                                                

# Count records with year 2020

In [28]:
from pyspark.sql.functions import year

rc = purchase_df.filter(year("event_time") == 2020).count()

print("Number of records with year 2020:", rc)


24/02/16 22:32:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:27 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:28 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:30 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:30 WARN RowBasedKeyValueBatch: Calling spill() on

Number of records with year 2020: 2613215


                                                                                

# Count records with year 1970

In [29]:
from pyspark.sql.functions import year

rc = purchase_df.filter(year("event_time") == 1970).count()

print("Number of records with year 1970:", rc)




Number of records with year 1970: 19631


                                                                                

# Keep records of year 2020 only

In [30]:
df_2020 = purchase_df.filter(year("event_time") != 1970)

In [31]:
df_2020.count()

24/02/16 22:32:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:42 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:43 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:45 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:32:45 WARN RowBasedKeyValueBatch: Calling spill() on

2613215

# Describe the DataFrame with null value counts


In [32]:
from pyspark.sql import functions as F
null_cnt = [F.sum(F.col(column).isNull().cast("integer")).alias(column) for column in df_2020.columns]

null_cnt_df = df_2020.agg(*null_cnt)

null_cnt_df.show()

24/02/16 22:33:02 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:33:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:33:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:33:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:33:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:33:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:33:03 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:33:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/02/16 22:33:05 WARN RowBasedKeyValueBatch: Calling spill() on

+----------+--------+----------+-----------+-------------+------+------+-------+
|event_time|order_id|product_id|category_id|category_code| brand| price|user_id|
+----------+--------+----------+-----------+-------------+------+------+-------+
|         0|       0|         0|     427875|       607074|501294|427875|2051027|
+----------+--------+----------+-----------+-------------+------+------+-------+



                                                                                

# Drop records containing null price

In [33]:
df_price = df_2020.na.drop(subset=["price"])

In [34]:
df_price.count()

24/02/16 22:33:18 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                

2185340

# Drop records having brand value as null

In [35]:
df_brand = df_price.filter(df_price["brand"].isNotNull())

In [36]:
df_brand.count()

                                                                                

2073432

In [37]:
from pyspark.sql import functions as F
null_cnt = [F.sum(F.col(column).isNull().cast("integer")).alias(column) for column in df_brand.columns]

null_cnt_df = df_brand.agg(*null_cnt)

null_cnt_df.show()



+----------+--------+----------+-----------+-------------+-----+-----+-------+
|event_time|order_id|product_id|category_id|category_code|brand|price|user_id|
+----------+--------+----------+-----------+-------------+-----+-----+-------+
|         0|       0|         0|          0|       552013|    0|    0|1538367|
+----------+--------+----------+-----------+-------------+-----+-----+-------+



                                                                                

# Remove records where user IDs are null

In [25]:
# df_user = df_brand.filter(df_brand["user_id"].isNotNull())

In [26]:
# df_user.count()

                                                                                

535699

# Count the distinct values in the "user_id" column


In [38]:
dc = df_brand.select("user_id").distinct().count()

print("Number of distinct values in the 'user_id' column:", dc)




Number of distinct values in the 'user_id' column: 228970


                                                                                

# Check Null values in dataframe

In [40]:
from pyspark.sql import functions as F
null_cnt = [F.sum(F.col(column).isNull().cast("integer")).alias(column) for column in df_brand.columns]

null_cnt_df = df_brand.agg(*null_cnt)

null_cnt_df.show()



+----------+--------+----------+-----------+-------------+-----+-----+-------+
|event_time|order_id|product_id|category_id|category_code|brand|price|user_id|
+----------+--------+----------+-----------+-------------+-----+-----+-------+
|         0|       0|         0|          0|       552013|    0|    0|1538367|
+----------+--------+----------+-----------+-------------+-----+-----+-------+



                                                                                

# Assign "other" where category_code is null

In [41]:
df_cat = df_brand.fillna("other", subset=["category_code"])

In [42]:
from pyspark.sql import functions as F
null_cnt = [F.sum(F.col(column).isNull().cast("integer")).alias(column) for column in df_cat.columns]

null_cnt_df = df_cat.agg(*null_cnt)

null_cnt_df.show()



+----------+--------+----------+-----------+-------------+-----+-----+-------+
|event_time|order_id|product_id|category_id|category_code|brand|price|user_id|
+----------+--------+----------+-----------+-------------+-----+-----+-------+
|         0|       0|         0|          0|            0|    0|    0|1538367|
+----------+--------+----------+-----------+-------------+-----+-----+-------+



                                                                                

# Distinct event times 

In [43]:
dc = df_cat.select("event_time").distinct().count()

print("Number of distinct values in the 'event_time' column:", dc)




Number of distinct values in the 'event_time' column: 1265835


                                                                                

# Distinct Order IDs

In [44]:
dc = df_cat.select("order_id").distinct().count()

print("Number of distinct values in the 'order_id' column:", dc)




Number of distinct values in the 'order_id' column: 1367525


                                                                                

# Check inconsistent records where multiple duplicate order_id have multiple unique event_time

In [45]:
from pyspark.sql import functions as F

# Group by order_id and count the distinct event_time values
incon_rec_cnt = df_cat.groupby("order_id").agg(F.countDistinct("event_time").alias("unique_event_times"))

# Filter for records where the count of unique event_time values is greater than 1
incon_rec_cnt = incon_rec_cnt.filter(incon_rec_cnt["unique_event_times"] > 1)

# Count the number of inconsistent records
num_incon_rec = incon_rec_cnt.count()

print("Number of inconsistent records:", num_incon_rec)




Number of inconsistent records: 0


                                                                                

In [62]:
df_cat.show()



+-------------------+-------------------+-------------------+-------------------+--------------------+-------+------+-------------------+
|         event_time|           order_id|         product_id|        category_id|       category_code|  brand| price|            user_id|
+-------------------+-------------------+-------------------+-------------------+--------------------+-------+------+-------------------+
|2020-04-29 20:11:49|2298069964415828136|1515966223509122874|2268105407933187062|computers.periphe...|     hp|152.52|1515915625443027224|
|2020-04-29 23:42:11|2298175846491357353|1515966223509122666|2268105430162997728|electronics.audio...|samsung|  8.08|1515915625445938216|
|2020-04-30 18:01:51|2298729326712980173|1515966223509089265|2360741866917331945|appliances.enviro...|   beko|231.46|1515915625446617606|
|2020-04-30 19:27:36|2298772487720140990|1515966223509335414|2268105430162997728|electronics.audio...|  razer|104.14|1515915625452727322|
|2020-04-30 21:37:48|2298838016631

                                                                                

# Count rows where brand is 'none'

In [63]:
count_none_brand_rows = df_cat.filter(df_cat["brand"] == "none").count()

print("Number of rows where brand is 'none':", count_none_brand_rows)




Number of rows where brand is 'none': 13386


                                                                                

# Drop them

In [66]:
df_none = df_cat.filter(df_cat["brand"] != "none")
df_none.count()

                                                                                

2060046

# count the distinct records before the first dot '.' in category_code 

In [68]:
from pyspark.sql.functions import split, col

# Extract the category before the first dot '.'
df_with_category = df_none.withColumn("category_before_dot", split(col("category_code"), "\\.")[0])

# Count the distinct records before the first dot '.'
cnt = df_with_category.select("category_before_dot").distinct().count()

print("Number of distinct records before the first dot '.':", cnt)




Number of distinct records before the first dot '.': 14


                                                                                

# Display the distinct category names before the first dot '.'

In [69]:
from pyspark.sql.functions import split, col

# Extract the category before the first dot '.'
df_with_category = df_none.withColumn("category_before_dot", split(col("category_code"), "\\.")[0])

# Select and display the distinct names of records before the first dot '.'
cat_names = df_with_category.select("category_before_dot").distinct().collect()
cat_names = [row["category_before_dot"] for row in cat_names]

print("Distinct names of records before the first dot '.' in the 'category_code' column:")
for name in cat_names:
    print(name)


[Stage 314:>                                                        (0 + 8) / 8]

Distinct names of records before the first dot '.' in the 'category_code' column:
medicine
computers
auto
stationery
sport
other
apparel
appliances
country_yard
furniture
accessories
kids
electronics
construction


                                                                                

# Count Records that start with 'string'

In [70]:
from pyspark.sql.functions import col

cnt = df_none.filter(col("category_code").startswith('medicine')).count()

print(cnt)




3346


                                                                                

In [71]:
from pyspark.sql.functions import col

cnt = df_none.filter(col("category_code").startswith('computers')).count()

print(cnt)




204806


                                                                                

In [72]:
from pyspark.sql.functions import col

cnt = df_none.filter(col("category_code").startswith('auto')).count()

print(cnt)




3228


                                                                                

In [73]:
from pyspark.sql.functions import col

cnt = df_none.filter(col("category_code").startswith('stationery')).count()

print(cnt)




38662


                                                                                

In [74]:
from pyspark.sql.functions import col

cnt = df_none.filter(col("category_code").startswith('sport')).count()

print(cnt)




1794


                                                                                

In [82]:
from pyspark.sql.functions import col

cnt = df_none.filter(col("category_code").startswith('other')).count()

print(cnt)




547318


                                                                                

In [75]:
from pyspark.sql.functions import col

cnt = df_none.filter(col("category_code").startswith('apparel')).count()

print(cnt)




7599


                                                                                

In [76]:
from pyspark.sql.functions import col

cnt = df_none.filter(col("category_code").startswith('appliances')).count()

print(cnt)




584895


                                                                                

In [77]:
from pyspark.sql.functions import col

cnt = df_none.filter(col("category_code").startswith('country_yard')).count()

print(cnt)




303


                                                                                

In [61]:
from pyspark.sql.functions import col

cnt = df_none.filter(col("category_code").startswith('furniture')).count()

print(cnt)




104568


                                                                                

In [78]:
from pyspark.sql.functions import col

cnt = df_none.filter(col("category_code").startswith('accessories')).count()

print(cnt)




13029


                                                                                

In [79]:
from pyspark.sql.functions import col

cnt = df_none.filter(col("category_code").startswith('kids')).count()

print(cnt)




4906


                                                                                

In [80]:
from pyspark.sql.functions import col

cnt = df_none.filter(col("category_code").startswith('electronics')).count()

print(cnt)




536865


                                                                                

In [81]:
from pyspark.sql.functions import col

cnt = df_none.filter(col("category_code").startswith('construction')).count()

print(cnt)




12105


                                                                                

# Split the category_code column on dot '.' and create new columns

In [90]:
from pyspark.sql.functions import split

split_col = split(df_none['category_code'], '\.')

df_dot = df_none.withColumn('cat_1', split_col.getItem(0))
df_dot = df_dot.withColumn('cat_2', split_col.getItem(1))
df_dot = df_dot.withColumn('cat_3', split_col.getItem(2))

df_dot.show()




+-------------------+-------------------+-------------------+-------------------+--------------------+-------+------+-------------------+-----------+-----------+---------------+
|         event_time|           order_id|         product_id|        category_id|       category_code|  brand| price|            user_id|      cat_1|      cat_2|          cat_3|
+-------------------+-------------------+-------------------+-------------------+--------------------+-------+------+-------------------+-----------+-----------+---------------+
|2020-04-29 20:11:49|2298069964415828136|1515966223509122874|2268105407933187062|computers.periphe...|     hp|152.52|1515915625443027224|  computers|peripherals|        printer|
|2020-04-29 23:42:11|2298175846491357353|1515966223509122666|2268105430162997728|electronics.audio...|samsung|  8.08|1515915625445938216|electronics|      audio|      headphone|
|2020-04-30 18:01:51|2298729326712980173|1515966223509089265|2360741866917331945|appliances.enviro...|   beko|

                                                                                

# Drop category_code column

In [92]:
df_nocc = df_dot.drop("category_code")
df_nocc.show()



+-------------------+-------------------+-------------------+-------------------+-------+------+-------------------+-----------+-----------+---------------+
|         event_time|           order_id|         product_id|        category_id|  brand| price|            user_id|      cat_1|      cat_2|          cat_3|
+-------------------+-------------------+-------------------+-------------------+-------+------+-------------------+-----------+-----------+---------------+
|2020-04-29 20:11:49|2298069964415828136|1515966223509122874|2268105407933187062|     hp|152.52|1515915625443027224|  computers|peripherals|        printer|
|2020-04-29 23:42:11|2298175846491357353|1515966223509122666|2268105430162997728|samsung|  8.08|1515915625445938216|electronics|      audio|      headphone|
|2020-04-30 18:01:51|2298729326712980173|1515966223509089265|2360741866917331945|   beko|231.46|1515915625446617606| appliances|environment|air_conditioner|
|2020-04-30 19:27:36|2298772487720140990|15159662235093354

                                                                                

In [93]:
from pyspark.sql import functions as F
null_cnt = [F.sum(F.col(column).isNull().cast("integer")).alias(column) for column in df_nocc.columns]

null_cnt_df = df_nocc.agg(*null_cnt)

null_cnt_df.show()



+----------+--------+----------+-----------+-----+-----+-------+-----+------+-------+
|event_time|order_id|product_id|category_id|brand|price|user_id|cat_1| cat_2|  cat_3|
+----------+--------+----------+-----------+-----+-----+-------+-----+------+-------+
|         0|       0|         0|          0|    0|    0|1527981|    0|547318|1139814|
+----------+--------+----------+-----------+-----+-----+-------+-----+------+-------+



                                                                                

# Assign "other" to null values in cat_2 and cat_3 columns

In [94]:
df_nocc = df_nocc.fillna("other", subset=["cat_2", "cat_3"])

In [95]:
from pyspark.sql import functions as F
null_cnt = [F.sum(F.col(column).isNull().cast("integer")).alias(column) for column in df_nocc.columns]

null_cnt_df = df_nocc.agg(*null_cnt)

null_cnt_df.show()



+----------+--------+----------+-----------+-----+-----+-------+-----+-----+-----+
|event_time|order_id|product_id|category_id|brand|price|user_id|cat_1|cat_2|cat_3|
+----------+--------+----------+-----------+-----+-----+-------+-----+-----+-----+
|         0|       0|         0|          0|    0|    0|1527981|    0|    0|    0|
+----------+--------+----------+-----------+-----+-----+-------+-----+-----+-----+



                                                                                

# Distinct categories in cat 1,2,3 column

In [96]:
cnt = df_nocc.select("cat_1").distinct().count()

print(cnt)



14


                                                                                

In [97]:
cnt = df_nocc.select("cat_2").distinct().count()

print(cnt)



59


                                                                                

In [98]:
cnt = df_nocc.select("cat_3").distinct().count()

print(cnt)

[Stage 443:>                                                        (0 + 8) / 8]

80


                                                                                

# Count rows where price is 0

In [110]:
cnt = df_nocc.filter(df_nocc["price"] == 0).count()

print("Number of records where price is 0:", cnt)



Number of records where price is 0: 103


                                                                                

# Download clean data file

In [101]:
output_path = "noCC"
df_nocc.coalesce(1).write.csv(output_path, header=True)


                                                                                

# Load file

In [109]:
data_path = "nocc.csv"
df_nocc = spark.read.csv(data_path, header=True, inferSchema=True)


                                                                                

# Replace zeros with the average price

In [116]:
from pyspark.sql.functions import avg, when

# Calculate the average price (excluding zeros)
avg_price = df_nocc.filter(df_nocc["price"] != 0).agg(avg("price")).collect()[0][0]

# Replace
df_avgp = df_nocc.withColumn("price", when(df_nocc["price"] == 0, avg_price).otherwise(df_nocc["price"]))


                                                                                

# check zero's count

In [117]:
cnt = df_avgp.filter(df_avgp["price"] == 0).count()

print("Number of records where price is 0:", cnt)

Number of records where price is 0: 0


                                                                                

# Calculate the total amount spent

In [119]:
from pyspark.sql.functions import col, sum

total_amount = df_avgp.select(sum(col("price"))).collect()[0][0]

print("Total amount spent:", total_amount)


Total amount spent: 330064703.276678


                                                                                

# Max spent by user

In [120]:
from pyspark.sql.functions import max

max = df_avgp.agg(max("price")).collect()[0][0]

print("Max spent:", max)



Max spent: 50925.9


                                                                                

# Min and Mean spent 

In [121]:
from pyspark.sql.functions import min, mean

statistics = df_avgp.agg(min("price"), mean("price")).collect()[0]

min = statistics[0]
mean = statistics[1]

print("Min spent:", min)
print("Mean spent:", mean)




Min spent: 0.02
Mean spent: 160.2220063419351


                                                                                

# DON'T IGNORE THISSS

# Download and load file

In [122]:
output_path = "avgPrice"
df_avgp.coalesce(1).write.csv(output_path, header=True)


                                                                                

In [33]:
data_path = "avgp.csv"
df_avgp = spark.read.csv(data_path, header=True, inferSchema=True)


                                                                                

In [30]:
df_avgp.count()

2060046

# Count occurrences of "other" in the cat_1 column

In [38]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

cnt = df_avgp.filter(col("cat_1") == "other").count()

print("Count of 'other' in cat_1 column:", cnt)


Count of 'other' in cat_1 column: 547318


                                                                                

# Assign "miscellaneous" where the value is "other" in the cat1 column

In [39]:
from pyspark.sql.functions import col, when
df_avgp = df_avgp.withColumn("cat_1", when(col("cat_1") == "other", "miscellaneous").otherwise(col("cat_1")))

In [40]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

cnt = df_avgp.filter(col("cat_1") == "miscellaneous").count()

print("Count of 'miscellaneous' in cat_1 column:", cnt)


Count of 'miscellaneous' in cat_1 column: 547318


                                                                                

# Do same for cat_2 and cat_3

In [41]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

cnt = df_avgp.filter(col("cat_2") == "other").count()

print("Count of 'other' in cat_2 column:", cnt)


[Stage 83:>                                                         (0 + 8) / 8]

Count of 'other' in cat_2 column: 547318


                                                                                

In [42]:
from pyspark.sql.functions import col, when
df_avgp = df_avgp.withColumn("cat_2", when(col("cat_2") == "other", "miscellaneous").otherwise(col("cat_2")))

In [43]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

cnt = df_avgp.filter(col("cat_2") == "miscellaneous").count()

print("Count of 'miscellaneous' in cat_2 column:", cnt)




Count of 'miscellaneous' in cat_2 column: 547318


                                                                                

# cat_3

In [44]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

cnt = df_avgp.filter(col("cat_3") == "other").count()

print("Count of 'other' in cat_3 column:", cnt)


[Stage 89:>                                                         (0 + 8) / 8]

Count of 'other' in cat_3 column: 1139814


                                                                                

In [45]:
from pyspark.sql.functions import col, when
df_avgp = df_avgp.withColumn("cat_3", when(col("cat_3") == "other", "miscellaneous").otherwise(col("cat_3")))

In [46]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

cnt = df_avgp.filter(col("cat_3") == "miscellaneous").count()

print("Count of 'miscellaneous' in cat_3 column:", cnt)




Count of 'miscellaneous' in cat_3 column: 1139814


                                                                                

# Count distinct user IDs

In [47]:
df_avgp.select("user_id").distinct().count()

                                                                                

228415

# Check null user ID count in dataframe

In [48]:
from pyspark.sql import functions as F
null_cnt = [F.sum(F.col(column).isNull().cast("integer")).alias(column) for column in df_avgp.columns]

null_cnt_df = df_avgp.agg(*null_cnt)

null_cnt_df.show()


[Stage 101:>                                                        (0 + 8) / 8]

+----------+--------+----------+-----------+-----+-----+-------+-----+-----+-----+
|event_time|order_id|product_id|category_id|brand|price|user_id|cat_1|cat_2|cat_3|
+----------+--------+----------+-----------+-----+-----+-------+-----+-----+-----+
|         0|       0|         0|          0|    0|    0|1527981|    0|    0|    0|
+----------+--------+----------+-----------+-----+-----+-------+-----+-----+-----+



                                                                                

In [4]:
output_path = "Misc"
df_avgp.coalesce(1).write.csv(output_path, header=True)


NameError: name 'df_avgp' is not defined

In [24]:
data_path = "misc.csv"
df_misc = spark.read.csv(data_path, header=True, inferSchema=True)


                                                                                

In [25]:
df_misc.count()

2060046

In [26]:
df_misc.select("user_id").distinct().count()

                                                                                

228415

In [31]:
from pyspark.sql.functions import collect_set

# Step 1: Group the DataFrame by user_id and collect the set of user IDs
uniquser = df_misc.groupBy().agg(collect_set("user_id").alias("unique_user_ids")).first()["unique_user_ids"]

# Show the list of unique user IDs
# print("Unique User IDs:", unique_user_ids)
len(uniquser)





                                                                                

228414

In [34]:
# Step 1: Select the cat_1 column and extract distinct values
distinct_categories = df_misc.select("cat_1").distinct()

# Step 2: Convert the DataFrame of distinct categories into a list
uniqcat = [row.cat_1 for row in distinct_categories.collect()]

len(uniqcat)



                                                                                

14

In [36]:
# from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("DistinctUsersByCategory") \
    .getOrCreate()

# Sample data - replace this with your actual DataFrame
# df_data = spark.createDataFrame([...], ["user_id", "cat_1", ...])

# Step 1: Group the DataFrame by cat_1 and count the distinct user IDs
distinct_users_count_by_category = df_misc.groupBy("cat_1").agg(countDistinct("user_id").alias("unique_user_count"))

# Step 2: Collect the results as a list of tuples containing category and unique user count
categories_with_user_counts = distinct_users_count_by_category.rdd.map(lambda x: (x.cat_1, x.unique_user_count)).collect()

# Print the category and corresponding count of unique user IDs
for category, user_count in categories_with_user_counts:
    print("Category:", category)
    print("No. of Unique User IDs:", user_count)
    print()

# Stop SparkSession





Category: medicine
No. of Unique User IDs: 968

Category: computers
No. of Unique User IDs: 41920

Category: auto
No. of Unique User IDs: 1081

Category: stationery
No. of Unique User IDs: 6084

Category: sport
No. of Unique User IDs: 594

Category: apparel
No. of Unique User IDs: 1689

Category: appliances
No. of Unique User IDs: 85986

Category: country_yard
No. of Unique User IDs: 158

Category: furniture
No. of Unique User IDs: 14221

Category: accessories
No. of Unique User IDs: 2581

Category: kids
No. of Unique User IDs: 1645

Category: electronics
No. of Unique User IDs: 97271

Category: construction
No. of Unique User IDs: 3107

Category: miscellaneous
No. of Unique User IDs: 67841



                                                                                

In [41]:
from pyspark.sql.functions import udf
from random import choice
from pyspark.sql.types import StringType

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("AssignNewUserID") \
    .getOrCreate()


# Convert the list of tuples to a dictionary for efficient lookup
category_user_dict = dict(categories_with_unique_users)

# Define a UDF to assign a random user ID based on the category
@udf(StringType())
def assign_new_user_id(category):
    if category in category_user_dict:
        return str(choice(category_user_dict[category]))
    else:
        return None

# Add the new column new_user_id and assign values using the UDF
df_misc = df_misc.withColumn("new_user_id", assign_new_user_id(df_misc["cat_1"]))

# Show the DataFrame with the new column
df_misc.show()

# Stop SparkSession
# spark.stop()


[Stage 85:>                                                         (0 + 1) / 1]

+-------------------+-------------------+-------------------+-------------------+-------+------+-------------------+-----------+-----------+---------------+-------------------+
|         event_time|           order_id|         product_id|        category_id|  brand| price|            user_id|      cat_1|      cat_2|          cat_3|        new_user_id|
+-------------------+-------------------+-------------------+-------------------+-------+------+-------------------+-----------+-----------+---------------+-------------------+
|2020-04-29 20:11:49|2298069964415828136|1515966223509122874|2268105407933187062|     hp|152.52|1515915625443027224|  computers|peripherals|        printer|1515915625509647001|
|2020-04-29 23:42:11|2298175846491357353|1515966223509122666|2268105430162997728|samsung|  8.08|1515915625445938216|electronics|      audio|      headphone|1515915625511889093|
|2020-04-30 18:01:51|2298729326712980173|1515966223509089265|2360741866917331945|   beko|231.46|1515915625446617606

                                                                                

In [42]:
from pyspark.sql import functions as F
null_cnt = [F.sum(F.col(column).isNull().cast("integer")).alias(column) for column in df_misc.columns]

null_cnt_df = df_misc.agg(*null_cnt)

null_cnt_df.show()




+----------+--------+----------+-----------+-----+-----+-------+-----+-----+-----+-----------+
|event_time|order_id|product_id|category_id|brand|price|user_id|cat_1|cat_2|cat_3|new_user_id|
+----------+--------+----------+-----------+-----+-----+-------+-----+-----+-----+-----------+
|         0|       0|         0|          0|    0|    0|1527981|    0|    0|    0|          0|
+----------+--------+----------+-----------+-----+-----+-------+-----+-----+-----+-----------+



                                                                                

In [44]:
# from pyspark.sql import SparkSession
from pyspark.sql.functions import countDistinct

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("DistinctUsersByCategory") \
    .getOrCreate()

# Sample data - replace this with your actual DataFrame
# df_data = spark.createDataFrame([...], ["user_id", "cat_1", ...])

# Step 1: Group the DataFrame by cat_1 and count the distinct user IDs
distinct_users_count_by_category = df_misc.groupBy("cat_1").agg(countDistinct("new_user_id").alias("unique_user_count"))

# Step 2: Collect the results as a list of tuples containing category and unique user count
categories_with_user_counts = distinct_users_count_by_category.rdd.map(lambda x: (x.cat_1, x.unique_user_count)).collect()

# Print the category and corresponding count of unique user IDs
for category, user_count in categories_with_user_counts:
    print("Category:", category)
    print("No. of Unique User IDs:", user_count)
    print()

# Stop SparkSession





Category: medicine
No. of Unique User IDs: 944

Category: computers
No. of Unique User IDs: 40550

Category: auto
No. of Unique User IDs: 1023

Category: stationery
No. of Unique User IDs: 6066

Category: sport
No. of Unique User IDs: 572

Category: apparel
No. of Unique User IDs: 1672

Category: appliances
No. of Unique User IDs: 80818

Category: country_yard
No. of Unique User IDs: 135

Category: furniture
No. of Unique User IDs: 14184

Category: accessories
No. of Unique User IDs: 2565

Category: kids
No. of Unique User IDs: 1561

Category: electronics
No. of Unique User IDs: 89913

Category: construction
No. of Unique User IDs: 3035

Category: miscellaneous
No. of Unique User IDs: 64115



                                                                                

In [45]:
df_misc.select("user_id").distinct().count()

                                                                                

228415

In [46]:
df_misc.select("new_user_id").distinct().count()

                                                                                

218524

In [47]:
df_filnul= df_misc

In [48]:
output_path = "FillNullUser"
df_filnul.coalesce(1).write.csv(output_path, header=True)


                                                                                

In [50]:
df_nonull = df_filnul.drop("user_id")

In [51]:
df_nonull = df_nonull.withColumnRenamed("new_user_id", "user_id")


In [52]:
df_nonull.show()

+-------------------+-------------------+-------------------+-------------------+-------+------+-----------+-----------+---------------+-------------------+
|         event_time|           order_id|         product_id|        category_id|  brand| price|      cat_1|      cat_2|          cat_3|            user_id|
+-------------------+-------------------+-------------------+-------------------+-------+------+-----------+-----------+---------------+-------------------+
|2020-04-29 20:11:49|2298069964415828136|1515966223509122874|2268105407933187062|     hp|152.52|  computers|peripherals|        printer|1515915625509647001|
|2020-04-29 23:42:11|2298175846491357353|1515966223509122666|2268105430162997728|samsung|  8.08|electronics|      audio|      headphone|1515915625511889093|
|2020-04-30 18:01:51|2298729326712980173|1515966223509089265|2360741866917331945|   beko|231.46| appliances|environment|air_conditioner|1515915625510823948|
|2020-04-30 19:27:36|2298772487720140990|15159662235093354

In [53]:
from pyspark.sql import functions as F
null_cnt = [F.sum(F.col(column).isNull().cast("integer")).alias(column) for column in df_nonull.columns]

null_cnt_df = df_nonull.agg(*null_cnt)

null_cnt_df.show()




+----------+--------+----------+-----------+-----+-----+-----+-----+-----+-------+
|event_time|order_id|product_id|category_id|brand|price|cat_1|cat_2|cat_3|user_id|
+----------+--------+----------+-----------+-----+-----+-----+-----+-----+-------+
|         0|       0|         0|          0|    0|    0|    0|    0|    0|      0|
+----------+--------+----------+-----------+-----+-----+-----+-----+-----+-------+



                                                                                

In [54]:
output_path = "NoNullData"
df_nonull.coalesce(1).write.csv(output_path, header=True)


                                                                                

In [57]:
df=df_nonull

In [69]:
df.select("brand").distinct().count()

                                                                                

907

In [71]:
from pyspark.sql import functions as F

# Assuming df_data is your DataFrame
total_price = df.agg(F.sum("price")).collect()[0][0]

print("Total Price:", total_price)




Total Price: 330064703.276678


                                                                                