In [1]:
#requirement 

# Processing & Analytical goals:

# Sessionize the web log by IP. Sessionize = aggregrate all page hits by visitor/IP during a session. https://en.wikipedia.org/wiki/Session_(web_analytics)

# Determine the average session time

# Determine unique URL visits per session. To clarify, count a hit to a unique URL only once per session.

# Find the most engaged users, ie the Is with the longest session times


# Tips:
# How to hook jupyter to pyspark: https://blog.sicara.com/get-started-pyspark-jupyter-guide-tutorial-ae2fe84f594f
# For this dataset, complete the sessionization by time window, determine the best session window time on your 
# own, or start with 15 minutes.
# The log file was taken from an AWS Elastic Load Balancer: 
# http://docs.aws.amazon.com/ElasticLoadBalancing/latest/DeveloperGuide/access-log-collection.html#access-log-entry-format


In [2]:
# Read data from local file system
data = spark.sparkContext.textFile("/Users/edwinguo/edwin/PythonDSClass/project/shoppingwebsampe.log")

In [4]:
# To have a quick peek of the data
data.first()

u'2015-07-22T09:00:28.019143Z marketpalce-shop 123.242.248.130:54635 10.0.6.158:80 0.000022 0.026109 0.00002 200 200 0 699 "GET https://democlass.com:443/shop/authresponse?code=f2405b05-e2ee-4b0d-8f6a-9fed0fcfe2e0&state=null HTTP/1.1" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.130 Safari/537.36" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2'

In [25]:
# pick the most important and related fileds for the analytics job
def pickFields(arr):
    parts = arr[0].split(" ")
    return [parts[0], parts[2], parts[12]]

data.map(lambda x: pickFields(x.split("\" "))).first()

[u'2015-07-22T09:00:28.019143Z',
 u'123.242.248.130:54635',
 u'https://democlass.com:443/shop/authresponse?code=f2405b05-e2ee-4b0d-8f6a-9fed0fcfe2e0&state=null']

In [64]:
# since we need to sort and do match calculation on the timestamp, the best way is to convert the ts to epoch timestamp
import time
from calendar import timegm

ts_fmt = "%Y-%m-%dT%H:%M:%S.%fZ"

def convertToEpoch(time_str):
    utc_time = time.strptime(time_str, ts_fmt)
    return timegm(utc_time)

In [69]:
# now let's revist pickFields
def pickFields(arr):
    parts = arr[0].split(" ")
    # we only care about the cient ip to identify each session, not the port
    return (parts[2].split(":")[0], (convertToEpoch(parts[0]), parts[12]))

stage1 = data.map(lambda x: pickFields(x.split("\" ")))
stage1.first()

(u'123.242.248.130',
 (1437555628,
  u'https://democlass.com:443/shop/authresponse?code=f2405b05-e2ee-4b0d-8f6a-9fed0fcfe2e0&state=null'))

In [70]:
stage2 = stage1.groupByKey()

In [71]:
rec = stage2.first()

In [72]:
# Now let's take a look at the data after group
list(rec[1])

[(1437541812, u'https://democlass.com:443/democlasscash'),
 (1437541842, u'https://democlass.com:443/democlasscash'),
 (1437541843, u'https://democlass.com:443/checkout'),
 (1437541847, u'https://democlass.com:443/checkout'),
 (1437541853, u'https://democlass.com:443/summary/1115382319'),
 (1437541865, u'https://democlass.com:443/summary/1115382319?count=5'),
 (1437541866, u'https://democlass.com:443/styles/app.css'),
 (1437541889, u'https://democlass.com:443/summary/1115382319?count=4'),
 (1437541891, u'https://democlass.com:443/styles/app.css'),
 (1437541904, u'https://democlass.com:443/summary/1115382319?count=3'),
 (1437541905, u'https://democlass.com:443/styles/app.css'),
 (1437541918, u'https://democlass.com:443/summary/1115382319?count=2'),
 (1437541919, u'https://democlass.com:443/styles/app.css'),
 (1437541931, u'https://democlass.com:443/summary/1115382319?count=1'),
 (1437541933, u'https://democlass.com:443/styles/app.css'),
 (1437541939, u'https://democlass.com:443/settings

In [78]:
# Cool, the above output looks pretty decent at this point. Now the next thing we
# need to do it to sort the collection base on the timestamp in a accending order.
def sortTup(tup):
    tempTup = list(tup)
    tempTup.sort(key=lambda tup: tup[0])
    return tempTup

stage3 = stage2.mapValues(lambda x : sortTup(x))

In [79]:
# Now the sequence within each collection is sorted
stage3.first()

(u'106.76.166.196',
 [(1437541812, u'https://democlass.com:443/democlasscash'),
  (1437541842, u'https://democlass.com:443/democlasscash'),
  (1437541843, u'https://democlass.com:443/checkout'),
  (1437541847, u'https://democlass.com:443/checkout'),
  (1437541853, u'https://democlass.com:443/summary/1115382319'),
  (1437541865, u'https://democlass.com:443/summary/1115382319?count=5'),
  (1437541866, u'https://democlass.com:443/styles/app.css'),
  (1437541889, u'https://democlass.com:443/summary/1115382319?count=4'),
  (1437541891, u'https://democlass.com:443/styles/app.css'),
  (1437541904, u'https://democlass.com:443/summary/1115382319?count=3'),
  (1437541905, u'https://democlass.com:443/styles/app.css'),
  (1437541918, u'https://democlass.com:443/summary/1115382319?count=2'),
  (1437541919, u'https://democlass.com:443/styles/app.css'),
  (1437541931, u'https://democlass.com:443/summary/1115382319?count=1'),
  (1437541933, u'https://democlass.com:443/styles/app.css'),
  (1437541939, 

In [96]:
# let's use a session time as 15 minutes 
sessionTime = 900

In [81]:
# At this stage, we need to start to put the above info into the bucket it belongs to

#    |________|________|________|________|........
# start  15 min   30 min     45 min  60 min .....

In [111]:
def bucketData(datas, sessionWindow):
    result = []
    # temp for each iteration
    temp = []
    startTime = 0
    
    for data in datas:
        if (startTime == 0):
            startTime = data[0]
            temp.append(data)
        # if the current time minue the current starttime
        # is less then sessionWindow, then add to the temp
        elif (data[0] - startTime <= sessionWindow):
            temp.append(data)
        else:
            # first put the temp into the result
            result.append(temp)
            # clear the state of temp and startTime
            temp = [data]
            startTime = 0
    
    if len(temp) != 0:
        result.append(temp)
            
    return result

In [112]:
bucketData(stage3.first()[1], sessionTime)

[[(1437541812, u'https://democlass.com:443/democlasscash'),
  (1437541842, u'https://democlass.com:443/democlasscash'),
  (1437541843, u'https://democlass.com:443/checkout'),
  (1437541847, u'https://democlass.com:443/checkout'),
  (1437541853, u'https://democlass.com:443/summary/1115382319'),
  (1437541865, u'https://democlass.com:443/summary/1115382319?count=5'),
  (1437541866, u'https://democlass.com:443/styles/app.css'),
  (1437541889, u'https://democlass.com:443/summary/1115382319?count=4'),
  (1437541891, u'https://democlass.com:443/styles/app.css'),
  (1437541904, u'https://democlass.com:443/summary/1115382319?count=3'),
  (1437541905, u'https://democlass.com:443/styles/app.css'),
  (1437541918, u'https://democlass.com:443/summary/1115382319?count=2'),
  (1437541919, u'https://democlass.com:443/styles/app.css'),
  (1437541931, u'https://democlass.com:443/summary/1115382319?count=1'),
  (1437541933, u'https://democlass.com:443/styles/app.css'),
  (1437541939, u'https://democlass.

In [118]:
# Test Case
testdata2 = [(1437541812, u'https://democlass.com:443/democlasscash'),
 (1437541842, u'https://democlass.com:443/democlasscash'),
 (1437541843, u'https://democlass.com:443/checkout'),
 (1437541847, u'https://democlass.com:443/checkout'),
 (1437541853, u'https://democlass.com:443/summary/1115382319'),
 (1437541865, u'https://democlass.com:443/summary/1115382319?count=5'),
 (1437541866, u'https://democlass.com:443/styles/app.css'),
 (1437541889, u'https://democlass.com:443/summary/1115382319?count=4'),
 (1437545891, u'https://democlass.com:443/styles/app.css'),
 (1437545904, u'https://democlass.com:443/summary/1115382319?count=3'),
 (1437545905, u'https://democlass.com:443/styles/app.css'),
 (1437545918, u'https://democlass.com:443/summary/1115382319?count=2'),
 (1437545919, u'https://democlass.com:443/styles/app.css'),
 (1437545931, u'https://democlass.com:443/summary/1115382319?count=1'),
 (1437545933, u'https://democlass.com:443/styles/app.css'),
 (1437545939, u'https://democlass.com:443/settings'),
 (1437545974, u'https://democlass.com:443/logout'),
 (1437545974, u'https://democlass.com:443/'),
 (1437545980, u'https://democlass.com:443/settings'),
 (1437545984, u'https://democlass.com:443/login'),
 (1437545987, u'https://democlass.com:443/login')]

bucketData(testdata2, sessionTime)

[[(1437541812, u'https://democlass.com:443/democlasscash'),
  (1437541842, u'https://democlass.com:443/democlasscash'),
  (1437541843, u'https://democlass.com:443/checkout'),
  (1437541847, u'https://democlass.com:443/checkout'),
  (1437541853, u'https://democlass.com:443/summary/1115382319'),
  (1437541865, u'https://democlass.com:443/summary/1115382319?count=5'),
  (1437541866, u'https://democlass.com:443/styles/app.css'),
  (1437541889, u'https://democlass.com:443/summary/1115382319?count=4')],
 [(1437545891, u'https://democlass.com:443/styles/app.css'),
  (1437545904, u'https://democlass.com:443/summary/1115382319?count=3'),
  (1437545905, u'https://democlass.com:443/styles/app.css'),
  (1437545918, u'https://democlass.com:443/summary/1115382319?count=2'),
  (1437545919, u'https://democlass.com:443/styles/app.css'),
  (1437545931, u'https://democlass.com:443/summary/1115382319?count=1'),
  (1437545933, u'https://democlass.com:443/styles/app.css'),
  (1437545939, u'https://democlass

In [119]:
# Test Case
testdata3 = [(1437541812, u'https://democlass.com:443/democlasscash'),
 (1437545987, u'https://democlass.com:443/login')]

bucketData(testdata3, sessionTime)

[[(1437541812, u'https://democlass.com:443/democlasscash')],
 [(1437545987, u'https://democlass.com:443/login')]]

In [120]:
# Test Case
testdata4 = [(1437541812, u'https://democlass.com:443/democlasscash')]

bucketData(testdata4, sessionTime)

[[(1437541812, u'https://democlass.com:443/democlasscash')]]

In [121]:
# Test Case
bucketData([], sessionTime)

[]

In [122]:
stage4 = stage3.mapValues(lambda x: bucketData(x, sessionTime))

In [124]:
# in case you want to see more results, you can call take on the 
# rdd
stage4.take(10)

[(u'106.76.166.196',
  [[(1437541812, u'https://democlass.com:443/democlasscash'),
    (1437541842, u'https://democlass.com:443/democlasscash'),
    (1437541843, u'https://democlass.com:443/checkout'),
    (1437541847, u'https://democlass.com:443/checkout'),
    (1437541853, u'https://democlass.com:443/summary/1115382319'),
    (1437541865, u'https://democlass.com:443/summary/1115382319?count=5'),
    (1437541866, u'https://democlass.com:443/styles/app.css'),
    (1437541889, u'https://democlass.com:443/summary/1115382319?count=4'),
    (1437541891, u'https://democlass.com:443/styles/app.css'),
    (1437541904, u'https://democlass.com:443/summary/1115382319?count=3'),
    (1437541905, u'https://democlass.com:443/styles/app.css'),
    (1437541918, u'https://democlass.com:443/summary/1115382319?count=2'),
    (1437541919, u'https://democlass.com:443/styles/app.css'),
    (1437541931, u'https://democlass.com:443/summary/1115382319?count=1'),
    (1437541933, u'https://democlass.com:443/st

In [142]:
# Now let's calculate average session time
def calculateSession(data):
    return map(lambda x: x[-1][0] - x[0][0] ,data)

#stage4.first()
stage4.mapValues(lambda x: calculateSession(x)).take(10)

[(u'106.76.166.196', [175]),
 (u'61.0.222.100', [196]),
 (u'14.98.92.61', [369]),
 (u'203.38.135.36', [11]),
 (u'182.66.27.124', [0]),
 (u'106.66.127.187', [8]),
 (u'49.204.42.220', [8]),
 (u'125.63.73.247', [125]),
 (u'27.97.102.67', [148, 134]),
 (u'42.99.164.95', [5, 495, 767])]

In [144]:
# now let's get the average for 
stage4.mapValues(lambda x: calculateSession(x)).mapValues(lambda y: sum(y) / len(y)).take(10)

[(u'106.76.166.196', 175),
 (u'61.0.222.100', 196),
 (u'14.98.92.61', 369),
 (u'203.38.135.36', 11),
 (u'182.66.27.124', 0),
 (u'106.66.127.187', 8),
 (u'49.204.42.220', 8),
 (u'125.63.73.247', 125),
 (u'27.97.102.67', 141),
 (u'42.99.164.95', 422)]

In [146]:
# Awesome, now let's move on to the uniq url per session
stage4.first()

(u'106.76.166.196',
 [[(1437541812, u'https://democlass.com:443/democlasscash'),
   (1437541842, u'https://democlass.com:443/democlasscash'),
   (1437541843, u'https://democlass.com:443/checkout'),
   (1437541847, u'https://democlass.com:443/checkout'),
   (1437541853, u'https://democlass.com:443/summary/1115382319'),
   (1437541865, u'https://democlass.com:443/summary/1115382319?count=5'),
   (1437541866, u'https://democlass.com:443/styles/app.css'),
   (1437541889, u'https://democlass.com:443/summary/1115382319?count=4'),
   (1437541891, u'https://democlass.com:443/styles/app.css'),
   (1437541904, u'https://democlass.com:443/summary/1115382319?count=3'),
   (1437541905, u'https://democlass.com:443/styles/app.css'),
   (1437541918, u'https://democlass.com:443/summary/1115382319?count=2'),
   (1437541919, u'https://democlass.com:443/styles/app.css'),
   (1437541931, u'https://democlass.com:443/summary/1115382319?count=1'),
   (1437541933, u'https://democlass.com:443/styles/app.css'),


In [181]:
from itertools import groupby

def gb(data):
  #  return data
    result = []
    for k, g in groupby(data):
        result.append((k, len(list(g)))) 
    return result

def calculateUniqUrl(data):
    input = map(lambda x: map(lambda y: y[1], x), data)
    s1 = map(lambda x: gb(x), input)
    return s1

    
stage4.mapValues(lambda x: calculateUniqUrl(x)).first()

(u'106.76.166.196',
 [[(u'https://democlass.com:443/democlasscash', 2),
   (u'https://democlass.com:443/checkout', 2),
   (u'https://democlass.com:443/summary/1115382319', 1),
   (u'https://democlass.com:443/summary/1115382319?count=5', 1),
   (u'https://democlass.com:443/styles/app.css', 1),
   (u'https://democlass.com:443/summary/1115382319?count=4', 1),
   (u'https://democlass.com:443/styles/app.css', 1),
   (u'https://democlass.com:443/summary/1115382319?count=3', 1),
   (u'https://democlass.com:443/styles/app.css', 1),
   (u'https://democlass.com:443/summary/1115382319?count=2', 1),
   (u'https://democlass.com:443/styles/app.css', 1),
   (u'https://democlass.com:443/summary/1115382319?count=1', 1),
   (u'https://democlass.com:443/styles/app.css', 1),
   (u'https://democlass.com:443/settings', 1),
   (u'https://democlass.com:443/logout', 1),
   (u'https://democlass.com:443/', 1),
   (u'https://democlass.com:443/settings', 1),
   (u'https://democlass.com:443/login', 2)]])

In [182]:
stage4.mapValues(lambda x: calculateUniqUrl(x)).take(10)

[(u'106.76.166.196',
  [[(u'https://democlass.com:443/democlasscash', 2),
    (u'https://democlass.com:443/checkout', 2),
    (u'https://democlass.com:443/summary/1115382319', 1),
    (u'https://democlass.com:443/summary/1115382319?count=5', 1),
    (u'https://democlass.com:443/styles/app.css', 1),
    (u'https://democlass.com:443/summary/1115382319?count=4', 1),
    (u'https://democlass.com:443/styles/app.css', 1),
    (u'https://democlass.com:443/summary/1115382319?count=3', 1),
    (u'https://democlass.com:443/styles/app.css', 1),
    (u'https://democlass.com:443/summary/1115382319?count=2', 1),
    (u'https://democlass.com:443/styles/app.css', 1),
    (u'https://democlass.com:443/summary/1115382319?count=1', 1),
    (u'https://democlass.com:443/styles/app.css', 1),
    (u'https://democlass.com:443/settings', 1),
    (u'https://democlass.com:443/logout', 1),
    (u'https://democlass.com:443/', 1),
    (u'https://democlass.com:443/settings', 1),
    (u'https://democlass.com:443/login