# Import libraries and initiate spark

In [1]:
from pyspark.mllib.linalg import DenseVector
from pyspark.mllib.random import RandomRDDs

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col,udf,when,regexp_extract
from pyspark.sql import Window


from pyspark.sql.types import StringType,IntegerType

from urllib.parse import urlparse
import gc
import os
import pandas as pd
import numpy as np
import time

from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))


conf = SparkConf().setAppName('clickstream')\
    .set('spark.driver.memoryOverhead', '2g') \
    .set('spark.executor.memory', '12g') \
    .set("spark.executor.instances", "8") \
    .set('spark.executor.cores', '1') \
    .set("spark.sql.caseSensitive", "true")
#     .set('spark.driver.memory', '10g') \
#     .set('spark.dynamicAllocation.enabled', 'true') \

spark = SparkSession.builder.appName('clickstream').config(conf=conf).getOrCreate()


# Load the data

In [2]:
t0 = time.time()
dg = spark.read.json(r'C:\Users\N716307\Downloads\TweetCount-master\2016\09\*\*\*\*\*\*')
dg.registerTempTable("dg")

df = sqlContext.sql("""
    SELECT request.requestHeaders.`User-Agent` as requestHeaders_User_Agent,request.requestHeaders.Referer as requestHeaders_Referer,
    request.responseHeaders.`google-accounts-signin` as google_accounts_signin, request.documentReferer as documentReferer,
    request.url as url, server_request.request_unixtime as request_unixtime,
    request.ip as ip
    FROM dg
    """
                   )


print("Number of Columns: {:d} - Number of rows: {:d}".format(len(df.columns),df.count()))

t1 = time.time()
print('Command took {:.2f} seconds'.format(t1-t0))

del dg
gc.collect()

Number of Columns: 7 - Number of rows: 4917706
Command took 152.17 seconds


159

# Feature engineering

In [5]:
# user agent --> OS
df = df.withColumn('requestHeaders_User_Agent', regexp_extract(col('requestHeaders_User_Agent'), r"\((\w+)([^\)]+)\)",1))

# extract the google account of the user (if logged in)
df = df.withColumn('google_accounts_signin', regexp_extract(col('google_accounts_signin'), r"(email=\")([-a-zA-Z0-9@:%._\+~#=]{2,256}\.[a-z]{2,6}\b([-a-zA-Z0-9@:%_\+.~#?&//=]*))",2))

# get the hostname of the url columns
for c in ['documentReferer','url','requestHeaders_Referer']:
    df = df.withColumn(c,regexp_extract(col(c), r"(www\.)?([-a-zA-Z0-9@:%._\+~#=]{2,256}\.[a-z]{2,6}\b([-a-zA-Z0-9@:%_\+.~#?&//=]*?))",2))
    
    
df = df.select('ip',
               'requestHeaders_User_Agent',
               'google_accounts_signin',
               'request_unixtime',
               'documentReferer',
               'url')

df = df.dropDuplicates()

# add a columns for how many times an ip is repeated
w = Window.partitionBy('ip')
df = df.withColumn('ip_count',F.count(df.ip).over(w))

# transform unix time format to timestamp format
df = df.withColumn("request_unixtime",F.to_timestamp(df["request_unixtime"]))


# raw date to year, month, date, hour, etc
df = df.withColumn('request_dayofWeek',F.dayofweek(df.request_unixtime))
df = df.withColumn('request_dayofYear',F.dayofyear(df.request_unixtime))
df = df.withColumn('request_day',F.dayofmonth(df.request_unixtime))
df = df.withColumn('request_year',F.year(df.request_unixtime))
df = df.withColumn('request_month',F.month(df.request_unixtime))
df = df.withColumn('request_hour',F.hour(df.request_unixtime))


df = df.withColumn('request_timeOfDay',
                  when(df.request_hour<8,'midnight').otherwise(
                  when(df.request_hour<12,'morning').otherwise(
                  when(df.request_hour<18,'afternoon').otherwise(
                  'night'))))

df = df.withColumn('url_name',
                  when(df.url.like('%amazon%'),'amazon').otherwise(
                  when(df.url.like('%google%'),'google').otherwise(
                  when(df.url.like('%facebook%'),'facebook').otherwise(
                  when(df.url.like('%netflix%'),'netflix').otherwise(
                  when(df.url.like('%priceline%'),'priceline').otherwise(
                  'others'))))))

# rel part of the Link: Used to express a typed relationship with another resource
# df = df.withColumn('Link', regexp_extract(col('Link'), '(.)(rel=)(\w+)',3))

# response header content type
# df = df.withColumn('responseHeader_content_type', regexp_extract(col('responseHeader_content_type'), r"(.*?);",1))



In [37]:
w = Window.partitionBy('url','documentReferer')
df = df.withColumn('url_count',F.count(df.ip).over(w))

In [28]:
for x in ['amazon','netflix','priceline']:
    print('OS for users of: {:s}\n'.format(x))
    df.filter(df.url_name==x).groupBy('requestHeaders_User_Agent').count().show()

OS for user of: amazon

+-------------------------+-----+
|requestHeaders_User_Agent|count|
+-------------------------+-----+
|                      X11| 2913|
|                     null|  464|
|                Macintosh|15981|
|                    Linux|    1|
|                  Windows|36303|
+-------------------------+-----+

OS for user of: netflix

+-------------------------+-----+
|requestHeaders_User_Agent|count|
+-------------------------+-----+
|                      X11|  224|
|                     null|  346|
|                Macintosh|  568|
|                  Windows| 1204|
+-------------------------+-----+

OS for user of: priceline

+-------------------------+-----+
|requestHeaders_User_Agent|count|
+-------------------------+-----+
|                      X11|    1|
|                Macintosh|   54|
|                  Windows|  192|
+-------------------------+-----+



In [32]:
for x in ['amazon','netflix','priceline']:
    print('Day of week for users of: {:s}\n'.format(x))
    df.filter(df.url_name==x).groupBy('request_dayofWeek').count().orderBy('count').show()

Day of week for users of: amazon

+-----------------+-----+
|request_dayofWeek|count|
+-----------------+-----+
|                3|  204|
|                5| 6635|
|                2| 7219|
|                1| 9698|
|                7|10339|
|                4|10517|
|                6|11050|
+-----------------+-----+

Day of week for users of: netflix

+-----------------+-----+
|request_dayofWeek|count|
+-----------------+-----+
|                3|   11|
|                2|  250|
|                5|  288|
|                4|  392|
|                1|  436|
|                6|  446|
|                7|  519|
+-----------------+-----+

Day of week for users of: priceline

+-----------------+-----+
|request_dayofWeek|count|
+-----------------+-----+
|                3|    1|
|                1|   13|
|                5|   14|
|                2|   27|
|                4|   34|
|                7|   57|
|                6|  101|
+-----------------+-----+



In [29]:
dcount = df.filter((df.url_name=='amazon')|
                   (df.url_name=='netflix')|
                   (df.url_name=='priceline')
                  ).groupBy(['url_name','documentReferer']).count().orderBy(col('count'),ascending=False)

In [30]:
for x in ['amazon','netflix','priceline']:
    print('Top refer for users of: {:s}\n'.format(x))
    dcount.filter(dcount.url_name==x).show()

Top refer for users of: amazon

+--------+--------------------+-----+
|url_name|     documentReferer|count|
+--------+--------------------+-----+
|  amazon|          amazon.com|31751|
|  amazon|                null| 7645|
|  amazon|          google.com| 2813|
|  amazon|    smile.amazon.com| 1720|
|  amazon|sellercentral.ama...|  668|
|  amazon|    drudgereport.com|  477|
|  amazon|        facebook.com|  446|
|  amazon|           yahoo.com|  275|
|  amazon|      chinatimes.com|  270|
|  amazon|          reddit.com|  247|
|  amazon|      login.live.com|  245|
|  amazon|            imdb.com|  230|
|  amazon|console.aws.amazo...|  156|
|  amazon|           amazon.ca|  142|
|  amazon|           amazon.es|  130|
|  amazon|         shopbop.com|  112|
|  amazon|             cnn.com|  107|
|  amazon|  bbs.wenxuecity.com|  103|
|  amazon|webmail.earthlink...|   87|
|  amazon|us-mg6.mail.yahoo...|   85|
+--------+--------------------+-----+
only showing top 20 rows

Top refer for users of: netfli

In [None]:
print('{:20s} {:20s} {:20s} {:20s} {:20s}'.format('column #',"column name","null count", "null percent","distinct count"))
c = df.count()
null_drop = []
distinct_high = []
col_number = 0
for x,y in df.dtypes:
    col_number += 1
    null_count = df.filter((col(x).isNull()) | (col(x) == '')).count()
    if y == 'string':
        distinct_count = df.select(x).distinct().count()
        if distinct_count > 100:
            distinct_high.append(x)
    else:
        distinct_count = 0
    if null_count*1./c > 0.2:
        null_drop.append(x)
    print('{:20d} {:40s} {:8d} {:6.2f}%, {:8d}'.format(col_number,x,null_count,null_count*100./c,distinct_count))

# Auxiliary functions

In [4]:
def ip_to_zip(ip):
    import geoip2.database
    reader = geoip2.database.Reader(r'C:\Users\N716307\Downloads\GeoLite2-City_20190528\GeoLite2-City.mmdb')
    try:
        ip_location = reader.city(ip)
        return int(ip_location.postal.code)
    except:
        return None

In [18]:
def ip_to_zip(ip):
    '''
    Function to transform ip address to zip code.
    This functions will be used to create a spark udf.
    '''
    from geolite2 import geolite2
    if ip == None: return None
    reader = geolite2.reader()
    ip_location = reader.get(ip)
    if ip_location and 'postal' in ip_location.keys():
        try:
            return int(ip_location['postal']['code'])
        except:
            return None
    else:
        return None

In [5]:
# transform ip to zipcode
ip_to_zip_udf = udf(ip_to_zip,IntegerType())
# df = df.withColumn('zipcode',when(df.ip.isNotNull(),ip_to_zip_udf(df.ip)).otherwise(df.ip))
df = df.withColumn('zipcode',ip_to_zip_udf(df.ip))