In [1]:
import os
import pyspark
conf = pyspark.SparkConf()

sc = pyspark.SparkContext(conf=conf)
spark = pyspark.sql.SparkSession(sc)
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/05/07 14:04:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

# Import user dataset 
df = spark.read.option("inferSchema", "true").option("header", "true").csv("/Users/yxwen/Desktop/BigData/FinalProj/tianchi_mobile_recommend_train_user.csv")
# Clean the data - delete unrelevant column 'item_geohash'
df = df.drop("user_geohash")
# num1 = df.count()
# delete outlier (invalid value) 
df1 = df.na.drop()
# num2 = df1.count()
# print(num1, num2)
# consistent processing: separate 'time' column to 'data' and 'hour'
df_user = df1.withColumn("date", split("time", " ").getItem(0))
df_user = df_user.withColumn("hour", split("time", " ").getItem(1))
# convert the data to date data type and time to timestamp data type
df_user = df_user.withColumn("date", to_date("date", "yyyy-MM-dd"))
df_user = df_user.withColumn("hour", to_timestamp("hour", "HH:mm:ss"))
df_user.printSchema()




root
 |-- user_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- behavior_type: integer (nullable = true)
 |-- item_category: integer (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- date: date (nullable = true)
 |-- hour: timestamp (nullable = true)



                                                                                

In [7]:
""" Overview Count """
# Find total number of items 
num_item = df_user.select("item_id").distinct().count()
print(num_item)
# Find total number of categories
num_categories = df_user.select("item_category").distinct().count()
print(num_categories)
# Find total number of click
num_behavior_1 = df_user.filter(df_user.behavior_type == 1).count()
print(num_behavior_1)
# Find total number of favorite
num_behavior_2 = df_user.filter(df_user.behavior_type == 2).count()
print(num_behavior_2)
# Find total number of add to shopping cart
num_behavior_3 = df_user.filter(df_user.behavior_type == 3).count()
print(num_behavior_3)
# Find total number of purchase
num_behavior_4 = df_user.filter(df_user.behavior_type == 4).count()
print(num_behavior_4)

                                                                                

2876947


                                                                                

8916


                                                                                

11550581


                                                                                

242556


                                                                                

343564




120205


                                                                                

In [None]:
""" What products do users want to find on Taobao? """
# Find top 5 values according to the 'item_category'
# count according to 'item_category' when 'behavior_type' == 1
df_filtered = df_user.filter(col("behavior_type") == 1)
df_grouped = df_filtered.groupBy("item_category").agg(count("*").alias("category_count"))
df_topCategory = df_grouped.orderBy(col("category_count").desc()).limit(5)
df_topCategory.show()
# df_final.write.mode("overwrite").json("/Users/yxwen/Desktop/BigData/FinalProj/top5Category.json")

# Find top 3 items in each category
# count according to 'item_id' when 'behavior_type' == 1

# Create a list of the top 5 item categories
top5_categories = df_topCategory.select("item_category").rdd.flatMap(lambda x: x).collect()
# Filter the rows in df_filtered to only include the top 5 categories
df_filtered_top5 = df_filtered.filter(col("item_category").isin(top5_categories))
# Group the filtered rows by both "item_category" and "item_id", and count the number of occurrences
df_item_counts = df_filtered_top5.groupBy("item_category", "item_id").agg(count("*").alias("item_count"))
# Define a window specification that partitions the rows by "item_category" and orders them by "item_count" in descending order
w = Window.partitionBy("item_category").orderBy(col("item_count").desc())
# Add a new column to df_item_counts that ranks the rows by "item_count" within each "item_category"
df_item_counts_ranked = df_item_counts.withColumn("rank", dense_rank().over(w))
# Keep only the rows with a rank of 1, 2, or 3
df_final = df_item_counts_ranked.filter(col("rank") <= 3).drop("rank")
# df_final.show()

# Write the df_final dataframe to a new JSON file
df_final.write.mode("overwrite").json("/Users/yxwen/Desktop/BigData/FinalProj/top3InCategory.json")


In [None]:
""" When users are used to buying products? """
# df_user.printSchema()
# Count according to 'date'
# Extract data by top 5 categories found before with behavior_type==4 which represent "buy"
df_filtered_2 = df_user.filter((col("item_category").isin(top5_categories)) & (col("behavior_type") == 4))
df_filtered_2.printSchema()
# Count each item according to what each category in each date
df_grouped = df_filtered_2.groupBy("item_category", "date")\
                   .agg(count("*").alias("item_count"))
# Sort in desending order
df_grouped_date = df_grouped.orderBy(desc("item_count"))
df_grouped_date.show()

# Count according to 'hour'
df_grouped = df_filtered_2.groupBy("item_category", "hour")\
                   .agg(count("*").alias("item_count"))
# Sort in desending order
df_grouped_time = df_grouped.orderBy(desc("item_count"))
df_grouped_time.show()

In [21]:
""" Whether the products pushed by Taobao platform meet the needs of users """
# Demand TOP5 Purchase Situation
deman = df_filtered_2.groupBy("item_category").agg(count("*").alias("category_count"))
top = deman.orderBy(col("category_count").desc()).limit(5)
top.show()
# Count top categories when 'behavior_type' == 4
filtered = df_user.filter(col("behavior_type") == 4)
grouped = filtered.groupBy("item_category").agg(count("*").alias("category_count"))
topCategory = grouped.orderBy(col("category_count").desc()).limit(5)
topCategory.show()

                                                                                

+-------------+--------------+
|item_category|category_count|
+-------------+--------------+
|         1863|          2000|
|         6513|          1059|
|         5894|           958|
|         5027|           858|
|        13230|           841|
+-------------+--------------+



[Stage 124:>                                                        (0 + 8) / 8]

+-------------+--------------+
|item_category|category_count|
+-------------+--------------+
|         6344|          2208|
|         1863|          2000|
|         5232|          1611|
|         6977|          1324|
|         8877|          1072|
+-------------+--------------+



                                                                                

In [9]:
spark.stop()