In [1]:
import hmac
import hashlib
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from IPython.display import clear_output
import datetime
import random
import json
import os
import pandas as pd
from db_utils import *
from pprint import pprint

###  Hash IPs and Save Clean Traces

In [186]:
"""
SCHEMA:
client_ip,
user_agent,
x_forwarded_for,
CONCAT_WS('||', COLLECT_LIST(request)) as requests,
day_ts,
hashed_ip
"""

"\nSCHEMA:\nclient_ip,\nuser_agent,\nx_forwarded_for,\nCONCAT_WS('||', COLLECT_LIST(request)) as requests,\nday_ts,\nhashed_ip\n"

In [187]:
def parse_requests(x):
    requests = []
    for r in x[3].split('||'):
        t = r.split('|')
        if len(t) != 3:
            continue
        requests.append({'t': t[0], 'r': t[1], 'p': t[2]})
    requests.sort(key = lambda x: x['t']) # sort by time
    x[3] = requests
    return x    

In [60]:
def get_hash_function():
    key = name = input("Key ")
    clear_output()
    def hash_function(s):
        code = hmac.new(key.encode('utf-8'), s.encode('utf-8'), hashlib.sha1)
        s = code.hexdigest()
        return s
    return hash_ip

In [61]:
hash_function = get_hash_function()

In [189]:
def to_tsv_row(x):
    return '\t'.join([x[5], x[1].replace('\t', ' '), json.dumps(x[3]).replace('\t', ' ')]) 

In [28]:
hive_table = '/user/hive/warehouse/ellery.db/survey_qa_day'
output_table = '/user/ellery/readers/survey_qa_day'
#hive_table = '/user/hive/warehouse/ellery.db/survey_qa_hour'
#output_table = '/user/ellery/readers/survey_qa_hour'

In [29]:
sqlContext = SQLContext(sc)
trace_rdd = sc.textFile(hive_table) \
        .map(lambda x: x.strip().split('\t')) \
        .filter(lambda x: len(x) == 5) \
        .map(parse_requests) \
        .filter(lambda x: len(x[3]) > 0) \
        .map(hash_ip) \
        .map(to_tsv_row)
        
os.system('hadoop fs -rm -r ' + output_table)
trace_rdd.saveAsTextFile(output_table)

NameError: name 'parse_requests' is not defined

### Load Traces

In [46]:
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

trace_rdd = sc.textFile(output_table) \
            .map(lambda x: x.strip().split('\t')) \
            .filter(lambda x: len(x) == 3) \
            .map(lambda x: Row(key=x[0] + x[1], requests=x[2])) \
            

traceDF = sqlContext.createDataFrame(trace_rdd)
traceDF.registerTempTable("traceDF")
traceDF.persist()

DataFrame[key: string, requests: string]

In [49]:
sqlContext.sql("SELECT COUNT(*) FROM traceDF").collect()

[Row(_c0=70214156)]

In [31]:
#trace_rdd.count()

70214156

In [None]:
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)

### Join Traces to EL

In [32]:
def query_db(host,query, params):
    conn = pymysql.connect(host =host, read_default_file="/etc/mysql/conf.d/analytics-research-client.cnf")
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    cursor.execute(query, params)
    rows = cursor.fetchall()
    conn.close()
    return mysql_to_pandas(rows)


In [33]:
query = """
SELECT *
FROM
    log.QuickSurveysResponses_15266417 r,
    log.QuickSurveyInitiation_15278946 i
WHERE
    r.event_surveyInstanceToken = i.event_surveyInstanceToken
    AND i.event_eventName ='impression'
""" 

d_click = query_db('analytics-store.eqiad.wmnet', query, {})

In [34]:
#start_mw =  '20160217010000'
#stop_mw =   '20160217015900'

start_mw = '20160217000000'
stop_mw =  '20160217235959'

In [35]:
d_click_reduced = d_click[d_click['timestamp'] < stop_mw]
d_click_reduced = d_click_reduced[d_click_reduced['timestamp'] > start_mw]

In [36]:
d_click_reduced.shape

(88, 33)

In [37]:
d_click_list = []
for i, r in d_click_reduced.iterrows():
    key = r['clientIp'] + r['i.userAgent'][1:-1]
    data = dict(r)
    d_click_list.append((key, data))
d_click_rdd = sc.parallelize(d_click_list)


In [50]:
d_click_row_rdd = d_click_rdd.map(lambda x: Row(key=x[0], click_data=x[1])) 
clickDF = sqlContext.createDataFrame(d_click_row_rdd)
clickDF.registerTempTable("clickDF")
clickDF.persist()

DataFrame[click_data: map<string,string>, key: string]

In [52]:
key

'6cb38beb663010a9758b494d682ff9e69d12ec9eMozilla/5.0 (iPad; CPU OS 9_2_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13D15 Safari/601.1'

In [54]:
query = """
SELECT *
FROM 
    traceDF JOIN clickDF
WHERE clickDF.key = traceDF.key
"""

res = sqlContext.sql(query).collect()

In [55]:
len(res)

87

In [38]:
click_traces_rdd = trace_rdd.join(d_click_rdd)

In [39]:
click_traces = click_traces_rdd.collect()

KeyboardInterrupt: 

In [11]:
for i, r in d_click_reduced.iterrows():
    key = r['clientIp'] + r['i.userAgent'][1:-1]
    print('\n\n\nLinking EL to Webrequest\n')
    print('Key:', key)
    print('Page:', r['event_pageTitle'], r['timestamp'])
    pprint(trace_rdd.lookup(key))




Linking EL to Webrequest

Key: 1e8693f06808a7b77ec64f450e35a8e4751d4084Mozilla/5.0 (Linux; U; Android 5.0; en-US; Micromax AQ5001 Build/LRX21M) AppleWebKit/534.30 (KHTML, like Gecko) Version/4.0 UCBrowser/10.8.0.718 U3/0.8.0 Mobile Safari/534.30
Page: List_of_amendments_of_the_Constitution_of_India 20160217013758
[]



Linking EL to Webrequest

Key: 5e27e69075e38060befdda71b686aebd06bd11b2Mozilla/5.0 (BB10; Touch) AppleWebKit/537.35+ (KHTML, like Gecko) Version/10.3.2.2639 Mobile Safari/537.35+
Page: Gotham_(TV_series) 20160217010114
[[{'p': '/wiki/Gotham_(TV_series)',
   'r': 'https://www.google.ca/search?q=gotham+tv&oq=gotham+&gs_l=mobile-heirloom-serp.1.0.41l2j0i131l3.3469.8009.0.11405.8.5.0.3.3.0.176.781.0j5.5.0....0...1c.1.34.mobile-heirloom-serp..0.8.922.C7tD0AgAwAE',
   't': '2016-02-17 01:00:30'},
  {'p': '/wiki/Income_trust',
   'r': 'https://www.google.ca/search?q=income+trust&oq=income+tr&gs_l=mobile-heirloom-serp.1.0.0l5.2629.10487.0.13227.10.7.0.3.3.0.199.1113.0j7.7.0..

In [40]:
trace_rdd.unpersist()

PythonRDD[44] at RDD at PythonRDD.scala:43

# Scratch

### test sessionization

In [141]:
def sessionize(requests):
    # break request stream into sessions based on 30 min gap
    # assume requests are sorted in time
    def to_datetime(s):
        return datetime.datetime.strptime(s, "%Y-%m-%d %H:%M:%S")
    
    sessions = []
    
    delta = datetime.timedelta(minutes = 30)
    session = [requests[0]]
    prev_time = to_datetime(requests[0]['t'])
    
    for i in range(1, len(requests)):
        curr_time = to_datetime(requests[i]['t'])
        if curr_time > (prev_time + delta):
            sessions.append(session)
            session = []
        session.append(requests[i])
        prev_time = curr_time
    sessions.append(session)
        
    return sessions

td = datetime.datetime.strptime('2016-02-17 01:36:32', "%Y-%m-%d %H:%M:%S")
ts = '2016-02-17 01:36:32'
timess = [{'t': ts}]
timesd = [{'t': td}]

for i in range(2):
    n = int(60*random.random())
    t = timesd[-1]['t'] + datetime.timedelta(minutes=n)
    timesd.append({'t':t})
    timess.append({'t':datetime.datetime.strftime(t, "%Y-%m-%d %H:%M:%S")})
    

timess
sessionize(timess)

[[{'t': '2016-02-17 01:36:32'}, {'t': '2016-02-17 01:54:32'}],
 [{'t': '2016-02-17 02:42:32'}]]