In [None]:
!pip install pyspark
!pip install findspark

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").enableHiveSupport().getOrCreate()
## spark = SparkSession.builder.appName("SparkSQLExampleApp") .getOrCreate()

spark.conf.set("spark.sql.legacy.createHiveTableByDefault", False)
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark.conf.set("spark.sql.session.timeZone", 'UTC+08:00') ## SET TIME ZONE 'Asia/Shanghai';

In [None]:
import time 
##  %time  next line 
##  %%time  whole cell 
from pyspark.sql.functions import *
from pyspark.sql.types import ByteType, IntegerType, DateType, StringType

In [None]:
df=spark.read.csv("../input/userdata/UserBehavior.csv") 
df.schema.names

## 1. Data Cleaning 

In [None]:
## 1.1 column name 
df = df.toDF("user_id", "item_id", "category_id", "behavior_type", "input_timestamp")

In [None]:
## 1.2 data format 
## Unix time - Epoch time - seconds passed since Epoch time 1970-01-01 00:00:00 UTC. 
## do not put show at the end 
df = df.withColumn("tt", from_unixtime("input_timestamp"))

In [None]:
df = df.withColumn("d", to_date("tt")).withColumn("h", hour("tt"))
## df.withColumn("hour", substring("timestamp", 0, 13)).show()

In [None]:
df = df.drop("input_timestamp", "tt") 

In [None]:
df.show(5)

In [None]:
## 1.2 Data Type
df = df.withColumn("user_id",col("user_id").cast("Integer")).withColumn("item_id",col("item_id").cast("Integer"))
df = df.withColumn("category_id",col("category_id").cast("Integer"))
df = df.withColumn("d",col("d").cast("Date")).withColumn("h", col("h").cast("Integer"))

In [None]:
df.printSchema()

In [None]:
## 1.3 null value 

## df.filter(df.user_id.isNull() & df.item_id.isNull() & df.category_id.isNull() \
##         & df.category_id.isNull() & df.input_timestamp.isNull()).show()

## no null value 

In [None]:

## 1.4 repeated value 

## df.dropDuplicates()

## df.groupBy("user_id", "item_id", "category_id", "behavior_type", "input_timestamp").count()

## %time : long running time 
## explaintion: repeated with multiple devices


In [None]:
## df.count()
## 100150807

In [None]:
## 1.5 anomaly 

## df_anomaly_lower = df.filter(df.d < '2017-11-25')
## df_anomaly_higher = df.filter(df.d > '2017-12-03')

## df_anomaly_higher.count()
## 2428
## df_anomaly_lower.count()
## 53148

In [None]:
## higher_sample = df_anomaly_higher.sample(False, 0.1, seed=0)
## higher_sample
## lower_sample = df_anomaly_lower.sample(False, 0.1, seed=0)
## lower_sample

## sample with replacement (default False).

In [None]:
df = df.filter((df.d > '2017-11-24') & (df.d < '2017-12-04') )

## df.count()
## 100095231

In [None]:
## df.withColumn("row_num", row_number().over(Window.partitionBy("Group") 
## row_number(): only used on partitionBy
## groupBy vs partitionBy

## df.count()
## df.distinct().count()
## output00 = df.select(countDistinct("user_id", "item_id", "category_id", "behavior_type", "input_timestamp"))

## BIG DATA
## https://www.mikulskibartosz.name/how-to-speed-up-pyspark/
## spark.default.parallelism 

## 2. Data Analysis

In [None]:
df.registerTempTable("user_behavior")
## df.createOrReplaceTempView("user_behavior")

In [None]:
df.printSchema()

In [None]:
spark.sql(" CREATE TABLE temp_behavior (SELECT * FROM user_behavior LIMIT 5) ")
df_temp = spark.table("temp_behavior")
df_temp.printSchema()

In [None]:
spark.sql(" DESCRIBE user_behavior ")

### 0.1    overview 

In [None]:
## index = df.count()
## df_out = df.select(countDistinct("user_id")) 100095231
## df_out = df.select(countDistinct("item_id")) 4161138
## df_out = df.select(countDistinct("category_id")) 9437
## df.select(approx_count_distinct("item_id")).collect() 4117166
## df.select(countDistinct("user_id", "item_id", "category_id")) together 

In [None]:
## df_out

In [None]:
index = 100095231
user_count = 987991
item_count = 4161138
category_count = 9437

In [None]:
import pandas as pd 
import numpy as np

In [None]:
columns = ["index","user_count", "user_count", "user_count"]
data =np.array((index, user_count, item_count, category_count)).reshape((1,4))
df_output = pd.DataFrame(data, columns = columns)

In [None]:
df_output.to_csv('./01_overview.csv')

### 1.1 traffic_conversion 

In [None]:
spark.sql(" CREATE TABLE 11_traffic_conversion ( SELECT SUM(CASE WHEN behavior_type ='pv' THEN 1 ELSE 0 END) AS pv_count, \
SUM(CASE WHEN behavior_type ='fav' THEN 1 ELSE 0 END) AS fav_count, \
SUM(CASE WHEN behavior_type ='cart' THEN 1 ELSE 0 END) AS cart_count, \
SUM(CASE WHEN behavior_type ='buy' THEN 1 ELSE 0 END) AS buy_count \
FROM user_behavior ) ")

In [None]:
%%time
df_output = spark.table("11_traffic_conversion")
df_output

In [None]:
%%time
df_output.toPandas().to_csv('./11_traffic_conversion.csv')

### 1.2 user_conversion 

In [None]:
spark.sql(" CREATE TABLE 12_user_behavior_count ( SELECT user_id, \
SUM(CASE WHEN behavior_type ='pv' THEN 1 ELSE 0 END) AS pv_count, \
SUM(CASE WHEN behavior_type ='fav' THEN 1 ELSE 0 END) AS fav_count, \
SUM(CASE WHEN behavior_type ='cart' THEN 1 ELSE 0 END) AS cart_count, \
SUM(CASE WHEN behavior_type ='buy' THEN 1 ELSE 0 END) AS buy_count \
FROM user_behavior \
GROUP BY user_id) ")


In [None]:
spark.sql(" CREATE TABLE 12_user_conversion ( select SUM(CASE WHEN  pv_count > 0 THEN 1 ELSE 0 END) AS pv_user_count, \
SUM(CASE WHEN  fav_count > 0 THEN 1 ELSE 0 END) AS fav_user_count, \
SUM(CASE WHEN  cart_count > 0 THEN 1 ELSE 0 END) AS cart_user_count, \
SUM(CASE WHEN  buy_count > 0 THEN 1 ELSE 0 END) AS buy_user_count \
from 12_user_behavior_type ) ")

In [None]:
df_output = spark.table("12_user_conversion")
df_output

In [None]:
df_output.toPandas().to_csv('./12_user_conversion.csv')

### 2.1 user_in_days 

In [None]:
spark.sql(" CREATE TABLE 21_user_in_days (SELECT d, \
COUNT(DISTINCT (user_id)) AS user_count, \
SUM(CASE WHEN behavior_type ='pv' THEN 1 ELSE 0 END) AS pv_count, \
SUM(CASE WHEN behavior_type ='fav' THEN 1 ELSE 0 END) AS fav_count, \
SUM(CASE WHEN behavior_type ='cart' THEN 1 ELSE 0 END) AS cart_count, \
SUM(CASE WHEN behavior_type ='buy' THEN 1 ELSE 0 END) AS buy_count \
FROM user_behavior \
GROUP BY d \
ORDER BY d )")

In [None]:
df_output = spark.table("21_user_in_days")
df_output

In [None]:
df_output.toPandas().to_csv('./21_user_in_days.csv')

### 2.2 user_in_hours 

In [None]:
spark.sql(" CREATE TABLE 22_user_in_hours  (SELECT h, \
COUNT(DISTINCT (user_id)) AS user_count, \
SUM(CASE WHEN behavior_type ='pv' THEN 1 ELSE 0 END) AS pv_count, \
SUM(CASE WHEN behavior_type ='fav' THEN 1 ELSE 0 END) AS fav_count, \
SUM(CASE WHEN behavior_type ='cart' THEN 1 ELSE 0 END) AS cart_count, \
SUM(CASE WHEN behavior_type ='buy' THEN 1 ELSE 0 END) AS buy_count \
FROM user_behavior \
GROUP BY h \
ORDER BY h )")

In [None]:
df_output = spark.table("22_user_in_hours")
df_output

In [None]:
df_output.toPandas().to_csv('./22_user_in_hours.csv')

### 3.1 RFM Model

In [None]:
spark.sql(" CREATE TABLE 31_RF (SELECT user_id, \
DATEDIFF('2017-12-03', max(d)) AS recency, \
COUNT(user_id) AS frequency \
FROM user_behavior \
WHERE behavior_type='buy' \
GROUP BY user_id \
ORDER BY recency) ")

In [None]:
spark.sql(" CREATE TABLE 31_RF_rating (SELECT user_id, \
(CASE WHEN recency > 3 THEN 1 \
WHEN recency BETWEEN 2 and 3 THEN 2 \
WHEN recency BETWEEN 0 and 1 THEN 3 ELSE 0 END) AS R_rating, \
(CASE WHEN frequency BETWEEN 0 and 1 THEN 1 \
WHEN frequency BETWEEN 2 and 3 THEN 2 \
WHEN frequency >3 THEN 3 ELSE 0 END ) AS F_rating \
FROM 31_RF \
ORDER BY R_rating, F_rating) ")

In [None]:
## df_test_RF = spark.sql(" select * from 31_RF limit 20 ")
## df_test_RF_rating = spark.sql(" select * from 31_RF_rating limit 20 ")
## df_test_RF
## df_test_RF_rating

In [None]:
spark.sql(" CREATE TABLE 31_RF_caculation (SELECT R_rating, \
F_rating, \
COUNT(*) AS RF_count \
FROM 31_RF_rating \
GROUP BY R_rating, F_rating \
ORDER BY R_rating, F_rating ) ")

In [None]:
df_output = spark.table("31_RF_caculation")
df_output

In [None]:
df_output.toPandas().to_csv('./31_RF_caculation.csv')

In [None]:
spark.sql(" CREATE TABLE 32_F_distribution (SELECT frequency, \
COUNT(frequency) AS user_count \
FROM 31_RF \
GROUP by frequency \
ORDER by frequency) ")

In [None]:
df_output = spark.table("32_F_distribution")
df_output

In [None]:
df_output.toPandas().to_csv('./32_F_distribution.csv')

In [None]:
spark.sql(" CREATE TABLE 32_R_distribution (SELECT recency, \
COUNT(recency) AS user_count \
FROM 31_RF \
GROUP by recency \
ORDER by recency) ")

In [None]:
df_output = spark.table("32_R_distribution")
df_output

In [None]:
df_output.toPandas().to_csv('./32_R_distribution.csv')

### 4.1 Items (80/20 model)

In [None]:
spark.sql(" CREATE TABLE 41_item_sale AS ( SELECT item_id,  \
COUNT(item_id) AS buy_count  \
FROM user_behavior  \
WHERE behavior_type = \"buy\"  \
GROUP BY item_id  \
ORDER BY buy_count) ")  \

In [None]:
spark.sql(" CREATE TABLE 41_item_sales_distribution ( SELECT buy_count,  \
COUNT(buy_count) AS item_count \
FROM 41_item_sale \
GROUP by buy_count  \
ORDER by buy_count ) ")

In [None]:
df_output = spark.table("41_item_sales_distribution")
df_output

In [None]:
df_output.toPandas().to_csv('./41_item_sales_distribution.csv')

### 4.1 category

In [None]:
### category sales distrub

In [None]:
spark.sql(" CREATE TABLE 42_cat_sale  AS ( SELECT category_id,  \
COUNT(category_id) AS buy_count  \
FROM user_behavior  \
WHERE behavior_type = \"buy\"  \
GROUP BY category_id  \
ORDER BY buy_count) ")  \

In [None]:
spark.sql(" CREATE TABLE 42_cat_sales_distribution  ( SELECT buy_count,  \
COUNT(buy_count) AS cat_count \
FROM 42_cat_sale \
GROUP by buy_count  \
ORDER by buy_count ) ")

In [None]:
df_output = spark.table("42_cat_sales_distribution ")
df_output

In [None]:
df_output.toPandas().to_csv('./42_cat_sales_distribution .csv')