In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
                .appName("BatchAnalysis")\
                .config("spark.driver.memory", "12g")\
                .getOrCreate()

df = spark.read.csv("./datasets/2019-Nov-100.csv", header=True, inferSchema=True)


In [2]:
df.printSchema()
df.describe().show()

root
 |-- event_time: timestamp (nullable = true)
 |-- event_type: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- category_id: long (nullable = true)
 |-- category_code: string (nullable = true)
 |-- brand: string (nullable = true)
 |-- price: double (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- user_session: string (nullable = true)

+-------+----------+--------------------+--------------------+-------------------+--------+------------------+--------------------+--------------------+
|summary|event_type|          product_id|         category_id|      category_code|   brand|             price|             user_id|        user_session|
+-------+----------+--------------------+--------------------+-------------------+--------+------------------+--------------------+--------------------+
|  count|    200000|              200000|              200000|             133358|  169680|            200000|              200000|              200000|
|   mean

Phân tích tỉ lệ người dùng xem hàng rồi mua sản phẩm, người dùng xem không mua, người dùng thêm giỏ hàng

In [3]:
view_events = df.filter(df.event_type == "view")
cart_events = df.filter(df.event_type == "cart")
purchase_events = df.filter(df.event_type == "purchase")

In [4]:
condition = (view_events.user_id == purchase_events. user_id) & (view_events.product_id == purchase_events.product_id)
view_and_purchase = view_events.join(purchase_events,condition,"left_semi")\
                                .distinct()
view_and_purchase.show(10)

total_view = view_events.count()
users_view_and_purchase = view_and_purchase.count()
ratio_view_and_purchase = users_view_and_purchase / total_view
print(f"Tỉ lệ người dùng xem và mua sản phẩm: {ratio_view_and_purchase:.10f}")

+-------------------+----------+----------+-------------------+--------------------+-------+------+---------+--------------------+
|         event_time|event_type|product_id|        category_id|       category_code|  brand| price|  user_id|        user_session|
+-------------------+----------+----------+-------------------+--------------------+-------+------+---------+--------------------+
|2019-11-01 07:22:35|      view|   4804056|2053013554658804075|electronics.audio...|  apple|160.57|520214844|47f0b244-18c9-4bf...|
|2019-11-01 07:23:14|      view|   4804056|2053013554658804075|electronics.audio...|  apple|160.57|520214844|47f0b244-18c9-4bf...|
|2019-11-01 07:26:01|      view|   4804056|2053013554658804075|electronics.audio...|  apple|160.57|520214844|47f0b244-18c9-4bf...|
|2019-11-01 07:26:51|      view|   4804056|2053013554658804075|electronics.audio...|  apple|160.57|520214844|47f0b244-18c9-4bf...|
|2019-11-01 07:29:55|      view|   4000170|2053013566142809077|construction.tool...

thời gian chạy hết khoảng 1m

In [5]:
view_not_purchase = view_events.join(purchase_events, condition, "left_anti")\
                            .distinct()

users_view_not_purchase = view_not_purchase.count()
ratio_view_not_purchase = users_view_not_purchase / total_view
print(f"Tỉ lệ người dùng xem và không mua sản phẩm: {ratio_view_not_purchase:.10f}")

Tỉ lệ người dùng xem và không mua sản phẩm: 0.9512134720


In [6]:
condition2 = (view_events.user_id == cart_events. user_id) & (view_events.product_id == cart_events.product_id)
view_and_add_to_cart = view_events.join(cart_events,condition2,"inner")\
                                    .distinct()

users_view_and_add_to_cart = view_and_add_to_cart.count()
ratio_view_and_add_to_cart = users_view_and_add_to_cart / total_view
print(f"Tỉ lệ người dùng xem và thêm vào giỏ hàng: {ratio_view_and_add_to_cart:.10f}")

Tỉ lệ người dùng xem và thêm vào giỏ hàng: 0.0480642232


In [None]:
# Sử dụng Partition pruning, bucketing và caching
# Bucket dữ liệu
#Thích hợp khi cần join hoặc aggregation bảng
df.write \
    .bucketBy(10, "user_id", "product_id") \
    .sortBy("user_id", "product_id") \
    .mode("overwrite") \
    .format("parquet") \
    .option("path", "/datasets") \
    .saveAsTable("bucketed_product_user")


In [None]:
# Ghi dữ liệu với partition theo event_type
#Thích hợp khi cần đọc ghi dữ liệu cần filter
df.write \
    .partitionBy("event_type") \
    .mode("overwrite") \
    .parquet("./datasets/partitioned_event_type")


In [None]:
# Đọc dữ liệu với Partition Pruning
view_events_partition = spark.read.parquet("./datasets/partitioned_event_type").filter("event_type = 'view'")
purchase_events_partition = spark.read.parquet("./datasets/partitioned_event_type").filter("event_type = 'purchase'")


In [None]:

# Khi đọc dữ liệu từ bảng đã được bucket thì spark sẽ tự tối ưu join
view_events_bucketed = spark.read.table("bucketed_product_user").filter("event_type = 'view'").cache()
purchase_events_bucketed = spark.read.table("bucketed_product_user").filter("event_type = 'purchase'").cache()

condition = (view_events_bucketed.user_id == purchase_events_bucketed.user_id) & \
            (view_events_bucketed.product_id == purchase_events_bucketed.product_id)

view_and_purchase_bucketed = view_events_bucketed.join(purchase_events_bucketed, condition, "inner").distinct()

# Tính toán tỷ lệ
total_view = view_events_bucketed.count()
users_view_and_purchase = view_and_purchase_bucketed.count()
ratio_view_and_purchase = users_view_and_purchase / total_view

print(f"Tỉ lệ người dùng xem và mua sản phẩm: {ratio_view_and_purchase:.10f}")

Tỉ lệ người dùng xem và mua sản phẩm: 0.0912348219


thời gian chạy còn khoảng 20s

In [None]:
# xóa khỏi cache
view_events_bucketed.unpersist()
purchase_events_bucketed.unpersist()