In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("question3").getOrCreate()

In [2]:
from pyspark.sql.functions import col, translate, explode


product_views = spark.read.json(r"data\product-views.json")
orders = spark.read.json(r"data\orders.json")
product_category_map = spark.read.csv(r"data\product-category-map.csv", header=True)

product_views = product_views.withColumn("event", col("event")) \
            .withColumn("messageid", col("messageid")) \
            .withColumn("userid", col("userid")) \
            .withColumn("productid", translate(col("properties").cast("string"), "[]","")) \
            .withColumn("source", translate(col("context").cast("string"), "[]", "")) \
            .drop("context").drop("properties")

orders = orders.select(orders.event,
                       orders.messageid,
                       orders.userid,
                       explode(orders.lineitems).alias("lineitems"),
                       orders.orderid) \
                .withColumn("lineitems", translate(col("lineitems").cast("string"), "[]", ""))

product_views.createOrReplaceTempView("product_views")
orders.createOrReplaceTempView("orders")
product_category_map.createOrReplaceTempView("product_category_map")

In [39]:
from pyspark.sql.functions import regexp_replace

spark.sql("""
WITH new_orders
AS (select length(event), length(messageid), length(userid),
		regexp_replace(substr(lineitems , 0, instr(lineitems, ',')), ",", "") as productid,
		substr(lineitems , instr(lineitems, ',')+2, length(lineitems)) as quantity,
		length(orderid)
	from orders o)
select * from new_orders
""").show(truncate=False)

+-------------+-----------------+--------------+-----------+--------+-------------------------------+
|length(event)|length(messageid)|length(userid)|productid  |quantity|length(CAST(orderid AS STRING))|
+-------------+-----------------+--------------+-----------+--------+-------------------------------+
|10           |36               |8             |product-784|3       |5                              |
|10           |36               |8             |product-173|1       |5                              |
|10           |36               |8             |product-424|1       |5                              |
|10           |36               |8             |product-393|3       |5                              |
|10           |36               |7             |product-369|3       |5                              |
|10           |36               |8             |product-430|1       |5                              |
|10           |36               |7             |product-166|3       |5            

In [40]:
spark.sql("""
select q1.productid, q1.total_boughts, q2.total_views, cast(q1.total_boughts as float)/cast(q2.total_views as float) as conversion_rate from
	(WITH new_orders
	AS (select event, messageid, userid,
			regexp_replace(substr(lineitems , 0, instr(lineitems, ',')), ",", "") as productid,
			substr(lineitems , instr(lineitems, ',')+2, length(lineitems)) as quantity,
			orderid
		from orders o)
	select no.productid, sum(quantity) as total_boughts from new_orders no
	group by no.productid) q1
	inner join 
	(select pv.productid, count(*) total_views from product_views pv
	group by pv.productid) q2
	on q1.productid=q2.productid;
""").show(5)

+-----------+-------------+-----------+-------------------+
|  productid|total_boughts|total_views|    conversion_rate|
+-----------+-------------+-----------+-------------------+
|  product-9|         96.0|        216| 0.4444444444444444|
| product-25|        110.0|        249|0.44176706827309237|
|product-270|         77.0|        130| 0.5923076923076923|
|product-774|         26.0|         53|0.49056603773584906|
|product-435|         26.0|         68|0.38235294117647056|
+-----------+-------------+-----------+-------------------+
only showing top 5 rows



In [43]:
spark.sql("""
select q4.categoryid, sum(q3.total_boughts), sum(q3.total_views), sum(cast(q3.total_boughts as float))/sum(cast(q3.total_views as float)) as conversion_rate from 
	(select q1.productid, q1.total_boughts, q2.total_views, cast(q1.total_boughts as float)/cast(q2.total_views as float) as conversion_rate from
		(WITH new_orders
		AS (select event, messageid, userid,
				regexp_replace(substr(lineitems , 0, instr(lineitems, ',')), ",", "") as productid,
				substr(lineitems , instr(lineitems, ',')+2, length(lineitems)) as quantity,
				orderid
			from orders o)
		select no.productid, sum(quantity) as total_boughts from new_orders no
		group by no.productid) q1
		inner join 
		(select pv.productid, count(*) total_views from product_views pv
		group by pv.productid) q2
		on q1.productid=q2.productid) q3
	inner join
	(select * from product_category_map pcm) q4
	on q3.productid=q4.productid
group by q4.categoryid
order by q4.categoryid
""").show(truncate=False)

+-----------+------------------+----------------+-------------------+
|categoryid |sum(total_boughts)|sum(total_views)|conversion_rate    |
+-----------+------------------+----------------+-------------------+
|category-1 |4267.0            |9158            |0.46593142607556237|
|category-10|2472.0            |5536            |0.44653179190751446|
|category-11|1119.0            |2565            |0.43625730994152045|
|category-12|1337.0            |2919            |0.4580335731414868 |
|category-13|1641.0            |3774            |0.43481717011128773|
|category-14|1854.0            |4159            |0.4457802356335658 |
|category-15|1419.0            |3035            |0.4675453047775947 |
|category-16|1518.0            |3296            |0.46055825242718446|
|category-17|1291.0            |2709            |0.4765596160944998 |
|category-18|1458.0            |2985            |0.4884422110552764 |
|category-19|1813.0            |3874            |0.46799173980382036|
|category-2 |4852.0 