In [1]:
%%configure -f
{
    "name": "multinomial",
    "driverMemory":"1000M",
    "executorMemory": "8000M", 
    "executorCores": 4,
    "numExecutors" :25,
    "conf": 
    {
        "spark.yarn.appMasterEnv.PYSPARK_PYTHON":"pyspark_env_1.3/bin/python",
        "spark.pyspark.driver.python":"pyspark_env_1.3/bin/python",
        "spark.pyspark.python":"pyspark_env_1.3/bin/python",
        "spark.yarn.dist.archives":"hdfs://mycluster/user/webuser/ai/pyspark/env/pyspark_env_1.3.zip#pyspark_env_1.3",
        "spark.jars": "hdfs://mycluster/user/webuser/531/pyspark/env/hudi-spark-bundle_2.11-0.7.0.jar",
        "spark.driver.extraClassPath": "hdfs://mycluster/user/webuser/531/pyspark/env/hudi-spark-bundle_2.11-0.7.0.jar",
        "spark.executor.extraClassPath": "hudi-spark-bundle_2.11-0.7.0.jar",
        "spark.sql.broadcastTimeout": "3600"
        
    }
}

In [2]:
%%spark
from functools import reduce
from ua_parser import user_agent_parser
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pyspark.sql.functions as sf
from pyspark.sql.window import Window
from pyspark.sql.functions import lit, when, to_timestamp
from pyspark.sql.types import IntegerType, StructType, StringType, ArrayType, DoubleType, StructField, LongType, TimestampType 

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
827,application_1600149154303_886955,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
%%local
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

In [4]:
tslog = spark.read.parquet("/user/webuser/ai/recommendation/tslog_tag/2021/05/{21,22,23,24,25,26,27}/*")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# define UA parsers

# Parse device family info
parse_device_family_udf = F.udf(lambda user_agent: parse_device_family(user_agent))

def parse_device_family(user_agent):
    return user_agent_parser.ParseDevice(user_agent).get("family")

# Parse device model info
parse_device_model_udf = F.udf(lambda user_agent: parse_device_model(user_agent))

def parse_device_model(user_agent):
    return user_agent_parser.ParseDevice(user_agent).get("model")

# Parse OS info
parse_os_udf = F.udf(lambda user_agent: parse_os(user_agent))

def parse_os(user_agent):
    return user_agent_parser.ParseOS(user_agent).get("family")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
reqstid_log = tslog.filter(F.col('Request').rlike("reqstid="))  \
.withColumn("position", F.regexp_extract("Request", '(position=)(\d+)', 2).cast(IntegerType()))  \
.withColumn("OS", parse_os_udf(F.col("UserAgent"))) \
.withColumn("is_mobile_OS", when(F.col("OS").isin(["iOS","Android"]),1).otherwise(0)) \
.withColumn("is_mobile_source", when(F.col("source").rlike("https://m.ruten.com.tw"),1).otherwise(0)) \
.withColumn('time_impressed', F.regexp_extract("Request", '(log\.gif\?)(\d+)', 2).cast(LongType()))  \
.withColumn("recommender", F.regexp_extract("Request",'(ts_set=)(\S+)_(\d{14})_(\d+)_(\d+)', 2)) \
.withColumn('rc', F.regexp_extract("Request",'(ts_id=)(\S+)\.(\d+)',2))  \
.withColumn('CID', F.regexp_extract("Request",'(ts_id=)(\S+)\.(\d+)',3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
windowSpec = Window.partitionBy("requestId","position").orderBy("time_impressed")

impress_log = reqstid_log.filter(F.col("Request").rlike("type=impress")) 
impress_log = impress_log.withColumn("was_impressed", lit(1))  \
.withColumn("row_order", F.row_number().over(windowSpec))  

click_log = reqstid_log.filter(F.col("Request").rlike("type=click")) 
click_log = click_log.withColumn("was_clicked", lit(1))  \
.withColumnRenamed('time_impressed', 'time_clicked')  \
.select('requestId','gno','position','was_clicked','time_clicked')

df_tslog = impress_log.join(click_log, on=['gno','requestId','position'], how='left')  \
.withColumn('was_clicked', when(F.col('was_clicked')==1, 1).otherwise(0))  \
.filter(F.col('is_mobile_OS')==1)  \
.drop('Request', 'UserAgent', 'Source', 'year', 'month', 'day', 'hour','is_mobile_OS','is_mobile_source','process_time','OS')  

df_tslog.show(5, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------+------------------------------------+--------+-----+-----------------------------------------------------------------------------------------------+-------+-----+--------------+-------------+--------------+-----+-------+----------------+--------+--------+--------+--------------+--------------+-----------+--------------------+----------+-------------+---------+-----------+------------+
|gno           |requestId                           |position|price|title                                                                                          |soldNum|watch|sellerDiscount|rutenDiscount|noShippingCost|pCoin|oversea|shippingDiscount|campaign|maxPrice|minPrice|limitesDeliver|time_impressed|recommender|rc                  |CID       |was_impressed|row_order|was_clicked|time_clicked|
+--------------+------------------------------------+--------+-----+-----------------------------------------------------------------------------------------------+-------+-----+--------------

In [8]:
request_meta = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "snapshot") \
.load("/user/webuser/ai/recommendation/streaming/hudi/request_meta/2021/05/{21,22,23,24,25,26,27}")  \
.select('requestId','rc','timestamp')  \
.withColumnRenamed('timestamp', 'request_time')

request_meta.show(5, False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------------------+--------------------+-----------------------+
|requestId                           |rc                  |request_time           |
+------------------------------------+--------------------+-----------------------+
|b9ba9f07-7042-4609-9510-388ea31322ca|370535043400380A3F0C|2021-05-24 00:00:00.201|
|39265481-6fa4-45c8-b7a7-79412af2347a|20210523300821393161|2021-05-24 00:00:02.085|
|4901da8c-768f-4b4a-b5b3-89eba9da4020|20210206100679973926|2021-05-24 00:00:04.952|
|23bffd84-e04e-46b0-8168-2ccf04ebf457|340D3E0A320B3A033503|2021-05-24 00:00:04.412|
|c4a76b7d-cc06-4dc7-8bd0-6321ac899e51|37053501370232003703|2021-05-24 00:00:02.081|
+------------------------------------+--------------------+-----------------------+
only showing top 5 rows

# Calculate browsing time

In [9]:
#calculate the browsing time 
df_duration = df_tslog.join(request_meta, on='requestId',how='left').filter(F.col('row_order')==1)  \
.withColumn('time',sf.from_unixtime(F.col('time_impressed')/1000,'yyyy-MM-dd HH:mm:ss')) \
.withColumn('time', to_timestamp('time'))  \
.withColumn('request2impress', F.unix_timestamp('time') - F.unix_timestamp('request_time'))

browsing = df_duration.groupBy('requestId').agg(sf.max('request2impress').alias('browsing'))
df_duration.describe('request2impress').show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-----------------+
|summary|  request2impress|
+-------+-----------------+
|  count|         12251131|
|   mean|1886.271370782012|
| stddev|15021.81066000784|
|    min|          -950403|
|    max|         10973098|
+-------+-----------------+

In [10]:
quantiles = browsing.approxQuantile("browsing", list(i*0.05 for i in range(1,20)), 0)
print(quantiles)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[7.0, 12.0, 18.0, 26.0, 37.0, 50.0, 66.0, 86.0, 113.0, 147.0, 192.0, 254.0, 340.0, 464.0, 651.0, 943.0, 1417.0, 2166.0, 5634.0]

In [11]:
#users who browse the guess you like after refresh
duration2 = df_tslog.filter(F.col('row_order')==2).select('requestId')
browsing2 = browsing.join(duration2, on='requestId', how='inner')
quantiles2 = browsing2.approxQuantile("browsing", list(i*0.05 for i in range(1,20)), 0)
print(quantiles2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[41.0, 73.0, 107.0, 143.0, 185.0, 233.0, 285.0, 343.0, 417.0, 494.0, 591.0, 709.0, 852.0, 1012.0, 1207.0, 1440.0, 1726.0, 2118.0, 2648.0]

# Conditional logit

In [12]:
df_choice = df_tslog.filter((F.col('position')<=4)&(F.col('row_order')==1)).withColumn('label', F.col('position')*F.col('was_clicked'))

boolean_list = ['sellerDiscount','rutenDiscount','noShippingCost','pCoin','oversea','shippingDiscount','limitesDeliver']
for i in boolean_list:
    df_choice = df_choice.withColumn(i, F.col(i).cast(IntegerType()))

df_choice = df_choice.withColumn('DIEN', when(F.col('recommender')=='recmd',1).otherwise(0))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Imported Calibrated score

In [13]:
candidate_log = spark.read.format("hudi") \
.option("hoodie.datasource.query.type", "snapshot") \
.load("/user/webuser/ai/recommendation/streaming/hudi/candidates/2021/05/{21,22,23,24,25,26,27}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
candidate_schema = StructType() \
    .add("Date", StringType()) \
    .add("ClientIP", StringType()) \
    .add("Request", StringType()) \
    .add("Status", StringType()) \
    .add("UserAgent", StringType()) \
    .add("Source", StringType()) \
    .add("third_ts_id", StringType()) \
    .add("CID", StringType())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
df_can = candidate_log.select("requestId", sf.explode("candidates")) \
.select("requestId", "col.*").select("requestId","gno","category","pCTR","score","recall","price","soldNum", \
"watch", \
F.array_contains("recall", "alsoviewV3").alias("by_alsoview"), \
F.array_contains("recall", "eges").alias("by_egs"), \
F.array_contains("recall", "ydnn").alias("by_ydnn"), \
F.array_contains("recall", "metapath2vec").alias("by_meta"))
df_can.show(5,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------------------+--------------+------------+-----------+--------------------+--------------------------+-----+-------+-----+-----------+------+-------+-------+
|requestId                           |gno           |category    |pCTR       |score               |recall                    |price|soldNum|watch|by_alsoview|by_egs|by_ydnn|by_meta|
+------------------------------------+--------------+------------+-----------+--------------------+--------------------------+-----+-------+-----+-----------+------+-------+-------+
|ddc93ac9-27f9-4e49-85e0-cded3f46af36|22116854640582|00090014    |0.996163487|0.005158966154255028|[ydnn, metapath2vec, eges]|null |null   |null |false      |true  |true   |true   |
|ddc93ac9-27f9-4e49-85e0-cded3f46af36|22118192987842|00090014    |0.994080245|0.004250599379210514|[ydnn, metapath2vec, eges]|null |null   |null |false      |true  |true   |true   |
|ddc93ac9-27f9-4e49-85e0-cded3f46af36|22119317164063|000900130009|0.988546968|0.0033937911

In [16]:
df_can = df_can.select('requestId','gno','score')
df_choice = df_choice.join(df_can, on= ['requestId','gno'], how='left')  \
.filter(F.col('score').isNotNull())

df_choice_id = df_choice.groupBy('requestId')  \
.agg(F.sum('label').alias('label'), F.sum('was_impressed').alias('impress'), F.sum('was_clicked').alias('click'), F.sum('DIEN').alias('DIEN'))  \
.filter((F.col('impress')==4)&(F.col('click')<=1)&(F.col('DIEN')==4))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
def Remove_NA(df, column_list):
    for column in column_list:
        df = df.filter(F.col(column).isNotNull())
    return df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
choice_list = ['choice1','choice2','choice3','choice4']
null_list = ['soldNum','watch','sellerDiscount','rutenDiscount','noShippingCost','pCoin','oversea','shippingDiscount','limitesDeliver','score']
for choice in choice_list:
    globals()[choice] = df_choice_id.filter((F.col('label')==0)|(F.col('label')==choice[6])).join(df_choice, on=['requestId','label'], how='left').filter(F.col('position')==choice[6])
    globals()[choice] = Remove_NA(globals()[choice], null_list)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Logit

In [19]:
#Regress was_clicked on Discount
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols =['sellerDiscount','rutenDiscount','noShippingCost','pCoin','shippingDiscount','limitesDeliver', 'score'], outputCol = 'features')
assembler_df1 = assembler.transform(choice1)
assembler_df2 = assembler.transform(choice2)
assembler_df3 = assembler.transform(choice3)
assembler_df4 = assembler.transform(choice4)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
from pyspark.ml.regression import GeneralizedLinearRegression
glr = GeneralizedLinearRegression(family="binomial",featuresCol='features', labelCol = 'was_clicked', maxIter=10)
# Fit the model
model1 = glr.fit(assembler_df1)
model2 = glr.fit(assembler_df2)
model3 = glr.fit(assembler_df3)
model4 = glr.fit(assembler_df4)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
print('model1')
print(model1.summary)
print('model2')
print(model2.summary)
print('model3')
print(model3.summary)
print('model4')
print(model4.summary)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

model1
Coefficients:
         Feature Estimate Std Error T Value P Value
     (Intercept)  -3.0673    0.5160 -5.9447  0.0000
  sellerDiscount  -0.2259    0.1196 -1.8889  0.0589
   rutenDiscount  -0.1713    0.0920 -1.8623  0.0626
  noShippingCost   0.1914    0.0755  2.5362  0.0112
           pCoin  -0.0898    0.0518 -1.7346  0.0828
shippingDiscount  -0.0216    0.5160 -0.0419  0.9666
  limitesDeliver  -0.0485    0.0686 -0.7072  0.4795
           score  20.6958    2.4770  8.3552  0.0000

(Dispersion parameter for binomial family taken to be 1.0000)
    Null deviance: 14630.5482 on 34596 degrees of freedom
Residual deviance: 14547.6666 on 34596 degrees of freedom
AIC: 14563.6666
model2
Coefficients:
         Feature Estimate Std Error T Value P Value
     (Intercept)  -3.2625    0.5134 -6.3551  0.0000
  sellerDiscount   0.1671    0.1139  1.4670  0.1424
   rutenDiscount   0.0450    0.0940  0.4788  0.6321
  noShippingCost   0.0131    0.0858  0.1528  0.8786
           pCoin   0.0282    0.0582

# Discount plus watch&sold

In [22]:
for choice in choice_list
    globals()[choice] = globals()[choice].withColumn('log_sold', sf.log(F.col('soldNum')+1))  \
    .withColumn('log_watch', sf.log(F.col('watch')+1))  \
    .withColumn('log_maxPrice', sf.log(F.col('maxPrice')+1))  \
    .withColumn('log_minPrice', sf.log(F.col('minPrice')+1))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
invalid syntax (<stdin>, line 1)
  File "<stdin>", line 1
    for choice in choice_list
                            ^
SyntaxError: invalid syntax



In [23]:
feature2 = ['log_watch','log_sold','log_minPrice','log_maxPrice','sellerDiscount','rutenDiscount','noShippingCost','pCoin','oversea','shippingDiscount','limitesDeliver','score']
assembler = VectorAssembler(inputCols = feature2, outputCol = 'features')
assembler_df1 = assembler.transform(choice1)
assembler_df2 = assembler.transform(choice2)
assembler_df3 = assembler.transform(choice3)
assembler_df4 = assembler.transform(choice4)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
'Field "log_watch" does not exist.\nAvailable fields: requestId, label, impress, click, DIEN, gno, position, price, title, soldNum, watch, sellerDiscount, rutenDiscount, noShippingCost, pCoin, oversea, shippingDiscount, campaign, maxPrice, minPrice, limitesDeliver, time_impressed, recommender, rc, CID, was_impressed, row_order, was_clicked, time_clicked, DIEN, score'
Traceback (most recent call last):
  File "/home/webuser/spark-2.4.3-bin-hadoop2.7/python/pyspark/ml/base.py", line 173, in transform
    return self._transform(dataset)
  File "/home/webuser/spark-2.4.3-bin-hadoop2.7/python/pyspark/ml/wrapper.py", line 312, in _transform
    return DataFrame(self._java_obj.transform(dataset._jdf), dataset.sql_ctx)
  File "/mnt/hdfs1/nm/usercache/webuser/appcache/application_1600149154303_886955/container_1600149154303_886955_01_000001/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  Fi

In [None]:
glr = GeneralizedLinearRegression(family="binomial",featuresCol='features', labelCol = 'was_clicked', maxIter=100)
# Fit the model
model1 = glr.fit(assembler_df1)
model2 = glr.fit(assembler_df2)
model3 = glr.fit(assembler_df3)
model4 = glr.fit(assembler_df4)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
print('model1')
print(model1.summary)
print('model2')
print(model2.summary)
print('model3')
print(model3.summary)
print('model4')
print(model4.summary)