In [None]:
import pyspark.sql as ps
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer
import pandas
inp_file_path: str = 'file:///D:/MAHE/5thSem/BDA2/data/2019-Sma.csv'
print(inp_file_path)

file:///D:/MAHE/5thSem/BDA2/data/2019-Oct.csv


In [69]:
spark: ps.SparkSession = ps.SparkSession.builder.master("yarn").appName("test").getOrCreate()
raw_data: ps.DataFrame = spark.read.csv(inp_file_path, header=True, inferSchema=True)

In [70]:
raw_data.show(5)
raw_data.summary()

+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|   brand|  price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+--------+-------+---------+--------------------+
|2019-10-01 05:30:00|      view|  44600062|2103807459595387724|                null|shiseido|  35.79|541312140|72d76fde-8bb3-4e0...|
|2019-10-01 05:30:00|      view|   3900821|2053013552326770905|appliances.enviro...|    aqua|   33.2|554748717|9333dfbd-b87a-470...|
|2019-10-01 05:30:01|      view|  17200506|2053013559792632471|furniture.living_...|    null|  543.1|519107250|566511c2-e2e3-422...|
|2019-10-01 05:30:01|      view|   1307067|2053013558920217191|  computers.notebook|  lenovo| 251.74|550050854|7c90fc70-0e80-459...|
|2019-10-01 05:30:04|      view|   1004237|2053013555631882655|electr

DataFrame[summary: string, event_type: string, product_id: string, category_id: string, category_code: string, brand: string, price: string, user_id: string, user_session: string]

In [71]:
str_ind = StringIndexer(inputCols=['event_type', 'category_code'], outputCols=['event_ind', 'category_ind'], handleInvalid='keep')
str_ind = str_ind.fit(raw_data)
eve_dict = {label: ind for ind, label in enumerate(str_ind.labelsArray[0])}
cat_dict = {label: ind for ind, label in enumerate(str_ind.labelsArray[1])}

df = str_ind.transform(raw_data)
df = df.drop(df.category_id, df.event_type, df.category_code). \
    withColumnsRenamed({'event_ind': 'event_type', 'category_ind': 'category_code'}). \
    withColumn('event_time', F.to_date(df.event_time))

In [72]:
print(eve_dict)
print(cat_dict)
df.show()
df.summary()

{'view': 0, 'cart': 1, 'purchase': 2}
{'electronics.smartphone': 0, 'electronics.clocks': 1, 'computers.notebook': 2, 'electronics.video.tv': 3, 'electronics.audio.headphone': 4, 'appliances.kitchen.refrigerators': 5, 'appliances.kitchen.washer': 6, 'appliances.environment.vacuum': 7, 'apparel.shoes': 8, 'auto.accessories.player': 9, 'computers.desktop': 10, 'apparel.shoes.keds': 11, 'furniture.bedroom.bed': 12, 'electronics.tablet': 13, 'electronics.audio.subwoofer': 14, 'furniture.living_room.cabinet': 15, 'construction.tools.drill': 16, 'electronics.telephone': 17, 'auto.accessories.videoregister': 18, 'kids.carriage': 19, 'furniture.living_room.sofa': 20, 'appliances.kitchen.blender': 21, 'appliances.kitchen.oven': 22, 'auto.accessories.alarm': 23, 'accessories.bag': 24, 'appliances.kitchen.kettle': 25, 'appliances.kitchen.microwave': 26, 'appliances.iron': 27, 'appliances.kitchen.meat_grinder': 28, 'appliances.environment.air_heater': 29, 'appliances.sewing_machine': 30, 'computer

DataFrame[summary: string, product_id: string, brand: string, price: string, user_id: string, user_session: string, event_type: string, category_code: string]

In [None]:
visits = df.groupBy(F.to_date(df.event_time).alias("event_time")). \
    agg(F.countDistinct("user_session").alias("Number_of_daily_visits"),
        F.countDistinct("user_id").alias("Number_of_daily_visitors"))

sales = df.where(df.event_type == eve_dict['purchase']). \
    withColumn("event_time", F.to_date(df.event_time)). \
    groupBy("event_time"). \
    agg(F.count("event_type").alias("number_of_daily_sales"),
        F.sum("price").alias("total_daily_sales"))

daily = visits.join(sales, on=["event_time"], how="left"). \
    withColumn("conversion_rate", F.col('number_of_daily_sales') / F.col('number_of_daily_visits'))
daily.show()


In [None]:
def extract_category_code(input_text: str):
    return input_text.split('.')

extract_category_udf = F.udf(extract_category_code)

In [None]:
spark.stop()