### Setup (only required for the first run on the Spark cluster)

In [1]:
!pip install pandas



In [1]:
ROOT_PATH = '/data/tungtv/Code/dataset/dataset_cafebiz_full//'
#TODO: Upload this file (generated by the ACR module training) to GCS before calling spark script
# !gsutil cp {ROOT_PATH}/adressa_articles.csv.

### Loading dependencies

In [2]:
import os
import json
import pandas as pd
import pickle
import datetime
import hashlib
import math
import matplotlib
%matplotlib inline

In [17]:
import findspark
findspark.init()
import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark import SparkContext,SparkConf,SQLContext
from pyspark.sql import SparkSession
# from pyspark.sql.functions import pandas_udf
# from pyspark.sql.functions import PandasUDFType
from pyspark.sql.functions import *

In [4]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('myAppName1') \
    .getOrCreate()

In [5]:
sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [6]:
print(spark.version)

2.2.0


## Loading articles pre-processed

In [7]:
articles_original_df = pd.read_csv('/data/tungtv/Code/dataset/dataset_cafebiz_full/cafebiz_articles.csv')

In [8]:
articles_original_df.columns

Index(['id', 'content', 'created_at_ts', 'teaser', 'domain', 'keywords',
       'title', 'url', 'category0', 'persons', 'locations', 'text_highlights',
       'id_encoded', 'category0_encoded', 'keywords_encoded',
       'locations_encoded', 'persons_encoded'],
      dtype='object')

In [9]:
articles_original_df['url'].nunique()

1960

In [10]:
valid_articles_urls_to_ids_dict = dict(articles_original_df[['url','id_encoded']].apply(lambda x: (x['url'], x['id_encoded']), axis=1).values)
len(valid_articles_urls_to_ids_dict)

1960

### Loading user interactions

In [None]:
#INTERACTIONS_PATH = 'gs://news_public_datasets4/adressa/one_week/*'
#INTERACTIONS_PATH = 'three_month/20170101'

DAYS_TO_LOAD_INTERACTIONS=17
interaction_json_files = [os.path.join(ROOT_PATH, 'three_month/201701{:02d}'.format(day)) for day in range(1, DAYS_TO_LOAD_INTERACTIONS)]
print('Loading interaction files: {}'.format(interaction_json_files))

interactions_df = spark.read \
  .option("mode", "PERMISSIVE") \
  .json(interaction_json_files)

In [18]:
path_file_log = 'hdfs://10.5.36.95:9000/user/thanhpt/log_tos_pc/' 
interactions_df  = spark.read.parquet(path_file_log+"2019-02-**")
interactions_df =  interactions_df.filter(col("domain").isin(['cafebiz.vn']))

In [19]:
interactions_df.printSchema()

root
 |-- dt: string (nullable = true)
 |-- cookietime: string (nullable = true)
 |-- browser_code: integer (nullable = true)
 |-- browser_ver: string (nullable = true)
 |-- os_code: integer (nullable = true)
 |-- os_version: string (nullable = true)
 |-- ip: long (nullable = true)
 |-- loc_id: integer (nullable = true)
 |-- domain: string (nullable = true)
 |-- path: string (nullable = true)
 |-- referer: string (nullable = true)
 |-- guid: string (nullable = true)
 |-- pageloadId: string (nullable = true)
 |-- screen: string (nullable = true)
 |-- d_guid: string (nullable = true)
 |-- category: string (nullable = true)
 |-- utm_source: string (nullable = true)
 |-- utm_campaign: string (nullable = true)
 |-- utm_medium: string (nullable = true)
 |-- milis: long (nullable = true)
 |-- tos: long (nullable = true)
 |-- tor: long (nullable = true)
 |-- top: long (nullable = true)
 |-- scrollEnd: integer (nullable = true)
 |-- pageLoadTime: long (nullable = true)
 |-- lookUpTime: long (nu

In [20]:
interactions_df.count()

3708197

In [15]:
#Retrives article id from its cannonical URL (because sometimes article ids in interactions do no match with articles tables, but cannonical URL do)
def get_article_id_encoded_from_url(canonical_url):
    if canonical_url in valid_articles_urls_to_ids_dict:
        return valid_articles_urls_to_ids_dict[canonical_url]    
    return None

get_article_id_encoded_from_url_udf = F.udf(get_article_id_encoded_from_url, pyspark.sql.types.IntegerType())

In [16]:
#Filtering only interactions whose url/id is available in the articles table
interactions_article_id_encoded_df = interactions_df.withColumn('article_id', get_article_id_encoded_from_url_udf(interactions_df['canonicalUrl']))
interactions_filtered_df = interactions_article_id_encoded_df.filter(interactions_article_id_encoded_df['article_id'].isNull() == False).cache()

In [17]:
# interactions_filtered_df.show(3)

In [18]:
#Valid interactions
interactions_filtered_df.count()

5517760

In [19]:
#Distinct items count
interactions_filtered_df.select('article_id').distinct().count()

26909

In [None]:
# modified
# interactions_filtered_df.type

In [20]:
first_timestamp_ts = interactions_filtered_df.select('time').agg(F.min('time')).collect()[0][0] * 1000
first_timestamp_ts

1483225202000

### Analyzing elapsed time since publishing

In [21]:
#interactions_filtered_df.filter(interactions_filtered_df['time'].isNull()).count()
#0

In [22]:
def get_timestamp_from_date_str(value):
    if value is not None:
        return int(datetime.datetime.strptime(value, '%Y-%m-%dT%H:%M:%S.%fZ').timestamp())
    return None

get_timestamp_from_date_str_udf = F.udf(get_timestamp_from_date_str, pyspark.sql.types.IntegerType())

In [23]:
interactions_filtered_with_publish_ts_df = interactions_filtered_df.withColumn('publish_ts', get_timestamp_from_date_str_udf(interactions_filtered_df['publishtime']))
interactions_filtered_with_publish_ts_df = interactions_filtered_with_publish_ts_df.withColumn('elapsed_min_since_published', ((F.col('time') - F.col('publish_ts')) / 60).cast(pyspark.sql.types.IntegerType()))

In [24]:
#interactions_filtered_with_publish_ts_df.select('publishtime','publish_ts', 'time', 'elapsed_min_since_published').show(100)

In [25]:
%%time
interactions_filtered_with_publish_ts_df.approxQuantile("elapsed_min_since_published", [0.10, 0.25, 0.50, 0.75, 0.90], 0.01)
#[49.0, 108.0, 334.0, 1020.0, 4611.0]

CPU times: user 16 ms, sys: 22.8 ms, total: 38.8 ms
Wall time: 6.89 s


[467.0, 533.0, 727.0, 1565.0, 3105.0]

In [26]:
elapsed_min_since_published_df = interactions_filtered_with_publish_ts_df.select('elapsed_min_since_published').toPandas()
print(len(elapsed_min_since_published_df[pd.isnull(elapsed_min_since_published_df['elapsed_min_since_published'])]))
elapsed_min_since_published_df.describe()

79409


Unnamed: 0,elapsed_min_since_published
count,5438351.0
mean,78572.92
std,564513.1
min,-314739.0
25%,518.0
50%,714.0
75%,1360.0
max,11447630.0


In [27]:

'''
elapsed_min_since_published
count	2.600818e+06
mean	6.438622e+04
std	5.051825e+05
min	-3.151590e+05
25%	9.400000e+01
50%	2.580000e+02
75%	8.370000e+02
max	8.608278e+06
'''

'\nelapsed_min_since_published\ncount\t2.600818e+06\nmean\t6.438622e+04\nstd\t5.051825e+05\nmin\t-3.151590e+05\n25%\t9.400000e+01\n50%\t2.580000e+02\n75%\t8.370000e+02\nmax\t8.608278e+06\n'

### Analyzing clicks by article distribution

In [28]:
#clicks_by_article_count_df = interactions_filtered_df.groupBy('article_id').count()
#clicks_by_article_count_df.approxQuantile("count", [0.01, 0.10, 0.25, 0.50, 0.75, 0.90, 0.99], 0.01)
#[1.0, 1.0, 1.0, 1.0, 2.0, 6.0, 33581.0]

### Processing categorical features

In [29]:
def get_categ_features_counts_dataframe(interactions_spark_df,column_name):
    df_pandas = interactions_spark_df.groupBy(column_name).count().toPandas().sort_values('count', ascending=False)
    return df_pandas

In [30]:
PAD_TOKEN = '<PAD>'
UNFREQ_TOKEN = '<UNF>'

def get_encoder_for_values(values):
    encoder_values = [PAD_TOKEN, UNFREQ_TOKEN] + values
    encoder_ids = list(range(len(encoder_values)))
    encoder_dict = dict(zip(encoder_values, encoder_ids))
    return encoder_dict

def get_categ_features_encoder_dict(counts_df, min_freq=100):
    freq_values = counts_df[counts_df['count'] >= 100][counts_df.columns[0]].values.tolist()
    encoder_dict = get_encoder_for_values(freq_values)
    return encoder_dict

def encode_cat_feature(value, encoder_dict):
    if value in encoder_dict:
        return encoder_dict[value]
    else:
        return encoder_dict[UNFREQ_TOKEN]

In [31]:
countries_df = get_categ_features_counts_dataframe(interactions_filtered_df, 'country')
len(countries_df)

178

In [32]:
countries_encoder_dict = get_categ_features_encoder_dict(countries_df)
len(countries_encoder_dict)

70

In [33]:
cities_df = get_categ_features_counts_dataframe(interactions_filtered_df, 'city')
len(cities_df)

7012

In [34]:
cities_encoder_dict = get_categ_features_encoder_dict(cities_df)
len(cities_encoder_dict)

1025

In [35]:
regions_df = get_categ_features_counts_dataframe(interactions_filtered_df, 'region')
len(regions_df)

1354

In [36]:
regions_encoder_dict = get_categ_features_encoder_dict(regions_df)
len(regions_encoder_dict)

238

In [37]:
devices_df = get_categ_features_counts_dataframe(interactions_filtered_df, 'deviceType')
print(len(devices_df))
devices_df

3


Unnamed: 0,deviceType,count
0,Mobile,2682837
2,Desktop,1726706
1,Tablet,1108217


In [38]:
devices_encoder_dict = get_categ_features_encoder_dict(devices_df)
len(devices_encoder_dict)

5

In [39]:
os_df = get_categ_features_counts_dataframe(interactions_filtered_df, 'os')
print(len(os_df))
os_df

11


Unnamed: 0,os,count
0,iPhone OS,2364480
10,Windows,1469623
8,Android,1405860
3,Macintosh,236950
2,Windows Phone OS,18761
5,Linux,15389
7,Unknown,6342
4,BlackBerry,288
1,Symbian OS,43
9,BSD,14


In [40]:
os_encoder_dict = get_categ_features_encoder_dict(os_df)
len(os_encoder_dict)

10

In [41]:
referrer_class_df = get_categ_features_counts_dataframe(interactions_filtered_df, 'referrerHostClass')
print(len(referrer_class_df))
referrer_class_df

5


Unnamed: 0,referrerHostClass,count
2,internal,3277921
4,search,782901
0,direct,779496
3,social,423573
1,other,253869


In [42]:
referrer_class_encoder_dict = get_categ_features_encoder_dict(referrer_class_df)
len(referrer_class_encoder_dict)

7

In [43]:
encoders_dict = {
    'city': cities_encoder_dict,
    'region': regions_encoder_dict,
    'country': countries_encoder_dict,
    'os': os_encoder_dict,
    'device': devices_encoder_dict,
    'referrer_class': referrer_class_encoder_dict
}

### Processing numeric features

In [44]:
%%time
active_time_quantiles = interactions_filtered_df.approxQuantile("activeTime", [0.10, 0.25, 0.50, 0.75, 0.90], 0.01)
print(active_time_quantiles)

[8.0, 19.0, 45.0, 90.0, 147.0]
CPU times: user 24.8 ms, sys: 19.2 ms, total: 44 ms
Wall time: 1.22 s


In [45]:
active_time_stats_df = interactions_filtered_df.describe('activeTime').toPandas()
active_time_stats_df

Unnamed: 0,summary,activeTime
0,count,2336917.0
1,mean,65.07003072851967
2,stddev,69.32856559320139
3,min,1.0
4,max,899.0


In [46]:
active_time_mean = float(active_time_stats_df[active_time_stats_df['summary'] == 'mean']['activeTime'].values[0])
active_time_stddev = float(active_time_stats_df[active_time_stats_df['summary'] == 'stddev']['activeTime'].values[0])

### Splitting sessions

In [47]:
'''
schema = T.StructType([
    T.StructField("userId", T.StringType()),
    T.StructField("min_ts", T.IntegerType())
])

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def split_sessions(df):

    result_df = df[['userId']]
    result_df['min_ts'] = df['time'].min()
    
    return result

%%time
tmp = interactions_filtered_df.groupBy('userId').apply(split_sessions)
tmp.show(100)
'''

'\nschema = T.StructType([\n    T.StructField("userId", T.StringType()),\n    T.StructField("min_ts", T.IntegerType())\n])\n\n@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)\ndef split_sessions(df):\n\n    result_df = df[[\'userId\']]\n    result_df[\'min_ts\'] = df[\'time\'].min()\n    \n    return result\n\n%%time\ntmp = interactions_filtered_df.groupBy(\'userId\').apply(split_sessions)\ntmp.show(100)\n'

In [48]:
def hash_str_to_int(encoded_bytes_text, digits):
    return int(str(int(hashlib.md5(encoded_bytes_text).hexdigest()[:8], 16))[:digits])      

In [49]:
MAX_SESSION_IDLE_TIME_MS = 30 * 60 * 1000    #30 min

def close_session(session):
    size = len(session)
    
    #Creating and artificial session id based on the first click timestamp and a hash of user id
    first_click = session[0]
    session_id = (int(first_click['timestamp']) * 100) + hash_str_to_int(first_click['user_id'].encode(), 3)
    session_hour = int((first_click['timestamp'] - first_timestamp_ts) / (1000 * 60 * 60)) #Converting timestamp to hours since first timestamp
    
    #Converting to Spark DataFrame Rows, to convert RDD back to DataFrame
    clicks = list([T.Row(**click) for click in session])
    session_dict = {'session_id': session_id,
                    'session_hour': session_hour,
                    'session_size': size,
                    'session_start': first_click['timestamp'],
                    'user_id': first_click['user_id'],
                    'clicks': clicks 
                   }
    session_row = T.Row(**session_dict)
    
    return session_row
        
def transform_interaction(interaction):        
    return {
            'article_id': interaction['article_id'],
            'url': interaction['canonicalUrl'],
            'user_id': interaction['userId'],
            'timestamp': interaction['time'] * 1000, #converting to timestamp
            'active_time_secs': interaction['activeTime'],
            'country': encode_cat_feature(interaction['country'], encoders_dict['country']),
            'region': encode_cat_feature(interaction['region'], encoders_dict['region']),
            'city': encode_cat_feature(interaction['city'], encoders_dict['city']),
            'os': encode_cat_feature(interaction['os'], encoders_dict['os']),
            'device': encode_cat_feature(interaction['deviceType'], encoders_dict['device']),
            'referrer_class': encode_cat_feature(interaction['referrerHostClass'], encoders_dict['referrer_class']),
           }

def split_sessions(group):
    user, interactions = group
    #Ensuring items are sorted by time
    interactions_sorted_by_time = sorted(interactions, key=lambda x: x['time'])
    #Transforming interactions
    interactions_transformed = list(map(transform_interaction, interactions_sorted_by_time))

    
    sessions = []
    session = []        
    first_timestamp = interactions_transformed[0]['timestamp']
    last_timestamp = first_timestamp    
    for interaction in interactions_transformed:
        
        delta_ms = (interaction['timestamp'] - last_timestamp)
        interaction['_elapsed_ms_since_last_click'] = delta_ms 

        if delta_ms <= MAX_SESSION_IDLE_TIME_MS:    
            #Ignoring repeated items in session
            if len(list(filter(lambda x: x['article_id'] == interaction['article_id'], session))) == 0:        
                session.append(interaction)            
        else:
            #If session have at least 2 clicks (minimum for next click predicition)
            if len(session) >= 2:
                session_row = close_session(session)
                sessions.append(session_row)                
            session = [interaction]

        last_timestamp = interaction['timestamp']
            
    if len(session) >= 2:
        session_row = close_session(session)
        sessions.append(session_row)
        
    #if len(sessions) > 1:
    #    raise Exception('USER with more than one session: {}'.format(user))
    
    return list(zip(map(lambda x: x['session_id'], sessions), 
                    sessions))

In [50]:
'''
#To debug
%%time
sessions_rdd = interactions_filtered_df.limit(1000).rdd.map(lambda x: (x['userId'], x)).groupByKey() \
                    .collect()

for row in sessions_rdd:
    print(split_sessions(row))
    print()
'''

"\n#To debug\n%%time\nsessions_rdd = interactions_filtered_df.limit(1000).rdd.map(lambda x: (x['userId'], x)).groupByKey()                     .collect()\n\nfor row in sessions_rdd:\n    print(split_sessions(row))\n    print()\n"

In [51]:
%%time
sessions_rdd = interactions_filtered_df.rdd.map(lambda x: (x['userId'], x)).groupByKey() \
                            .flatMap(split_sessions) \
                            .sortByKey() \
                            .map(lambda x: x[1])

CPU times: user 558 ms, sys: 142 ms, total: 700 ms
Wall time: 16min 42s


#### Exporting sessions to JSON lines

In [52]:
sessions_sdf = sessions_rdd.toDF()

In [53]:
%%time
sessions_sdf.write.partitionBy("session_hour").json(os.path.join(ROOT_PATH,"data_transformed/sessions_processed_by_spark/"))

CPU times: user 20.4 ms, sys: 15 ms, total: 35.4 ms
Wall time: 4.2 s


In [54]:
sessions_sdf.count()

989276

In [55]:
def serialize(filename, obj):
    with open(filename, 'wb') as handle:
        pickle.dump(obj, handle)#, protocol=pickle.HIGHEST_PROTOCOL)

In [56]:
NAR_ENCODERS_PATH  = 'nar_encoders_adressa.pickle'
serialize(NAR_ENCODERS_PATH, encoders_dict)

In [57]:
!cp {NAR_ENCODERS_PATH} {ROOT_PATH}/data_transformed/pickles/