In [1]:
SUPPORT_TRASHOLD=0.0005
import pyspark.sql.functions as F
from pyspark.sql import Row
import time
import re

def removeTimestamp(row):
    find = re.compile(r'\d{4}-\d{2}-\d{2}')
    start = re.search(find,row).start()
    return row[0:start]

def spliteToUserIdAndUserSearch(row):
    row = row.split("\t", 1)
    row[1] =  row[1].rstrip('\t')
    return (int(row[0]),row[1])

def uniqueList(line):
    uniqueSearches = set(line[1])
    newLine = [line[0],list(uniqueSearches)]
    return newLine

start = time.time()
log_txt=sc.textFile("user-searches.txt")
header = log_txt.first()


log_txt = log_txt.filter(lambda line: line != header)
logSearch = log_txt.map(lambda line: spliteToUserIdAndUserSearch(removeTimestamp(line))).filter(lambda line: line[1] !='-').distinct()

print(logSearch.take(10))

[(142, 'dfdf'), (142, 'vaniqa.comh'), (142, '207 ad2d 530'), (142, 'attornyleslie.com'), (217, 'mizuno.com'), (217, "p; .; p;' p; ' ;' ;';"), (217, 'yahoo.com'), (1268, 'sstack.com'), (1268, 'www.raindanceexpress.com'), (1326, 'files')]


In [2]:
totalOfTransactions = logSearch.groupByKey().count()

In [3]:
print(totalOfTransactions)

64942


In [4]:
# remove the user id -> return only query 
all_queries = logSearch.map(lambda line: line[1])
# count how much time query is show for all the user divide by number of users
rdd_query_count = all_queries.map(lambda q: (q, 1) ).reduceByKey(lambda c1,c2: c1+c2 ) \
                                                    .map(lambda x: (x[0], x[1] / totalOfTransactions)) \
                                                    .filter(lambda x: (x[1] > SUPPORT_TRASHOLD))

# rdd_query_count is list of queries that pass the thrasholds of support
rdd_query_count.take(10)


[('craiglist', 0.0007853161282375043),
 ('.com', 0.017076776200301808),
 ('sprint.com', 0.0019247944319546672),
 ('www.google', 0.010255304733454467),
 ('msnbc.com', 0.0005851375073142189),
 ('y', 0.005112254011271597),
 ('usps.com', 0.0017862092328539312),
 ('travelocity', 0.007452804040528472),
 ('ww.yahoo.com', 0.0005235440854916695),
 ('best buy', 0.006174740537710573)]

In [None]:
user_query = logSearch.map(lambda x: (x[1],x[0]))
tupleWordAfterTrashold = user_query.join(rdd_query_count)
user_query = tupleWordAfterTrashold.map(lambda x: (x[1][0],x[0]))
user_query = user_query.groupByKey().mapValues(list).filter(lambda kv: len(kv[1]) > 1) 
print(user_query.take(2))

[(382200, ['.com', 'american greetings', 'www.google.com', 'dictionary', 'www.comcast.net']), (1648038, ['.com', 'google.com', 'target', 'http', 'old navy', 'good morning america', 'ebay.com', 'song lyrics', 'dictionary.com', 'currency converter', 'oldnavy.com', 'myspace', 'wachovia', 'www.myspace.com', 'yahoo', 'google', 'universal studios', 'delta airlines', 'myspac', 'myspace.com', 'prom dresses', 'oriental trading company'])]


In [None]:
def get_all_pairs(arr):
        result = []
        for p1 in range(len(arr)):
                for p2 in range(p1+1,len(arr)):
                        result.append((arr[p1],arr[p2]))
        return result
    
all_queries_pairs_tuples = user_query.map(lambda kv: kv[1]).flatMap(lambda arr: get_all_pairs(arr))
all_queries_pairs_tuples.take(50)
# userId | a, b ,c ,d
# 1| 1,1,0,0->a,b
# 2|0,1,1,0->b,c


[('.com', 'american greetings'),
 ('.com', 'www.google.com'),
 ('.com', 'dictionary'),
 ('.com', 'www.comcast.net'),
 ('american greetings', 'www.google.com'),
 ('american greetings', 'dictionary'),
 ('american greetings', 'www.comcast.net'),
 ('www.google.com', 'dictionary'),
 ('www.google.com', 'www.comcast.net'),
 ('dictionary', 'www.comcast.net'),
 ('.com', 'google.com'),
 ('.com', 'target'),
 ('.com', 'http'),
 ('.com', 'old navy'),
 ('.com', 'good morning america'),
 ('.com', 'ebay.com'),
 ('.com', 'song lyrics'),
 ('.com', 'dictionary.com'),
 ('.com', 'currency converter'),
 ('.com', 'oldnavy.com'),
 ('.com', 'myspace'),
 ('.com', 'wachovia'),
 ('.com', 'www.myspace.com'),
 ('.com', 'yahoo'),
 ('.com', 'google'),
 ('.com', 'universal studios'),
 ('.com', 'delta airlines'),
 ('.com', 'myspac'),
 ('.com', 'myspace.com'),
 ('.com', 'prom dresses'),
 ('.com', 'oriental trading company'),
 ('google.com', 'target'),
 ('google.com', 'http'),
 ('google.com', 'old navy'),
 ('google.com',

In [None]:
def sort_small_list(arr):
    if(arr[0] <= arr[1]):
        return arr
    return [arr[1],arr[0]]
# the sort is for (a,b) (b,a) = > (a,b) (a,b) => ((a,b),2)
all_queries_tuples_sorted = all_queries_pairs_tuples.map(lambda kv: sort_small_list(list(kv))) \
                                                .map(lambda arr: (arr[0],arr[1]) )

all_queries_pairs_tuples_count = all_queries_tuples_sorted.map(lambda kv: (kv,1)) \
                                                    .reduceByKey(lambda c1,c2: c1+c2 )\
                                                    .filter(lambda kv: kv[1] > 1) \
                                                    .map(lambda x: (x[0], x[1] / totalOfTransactions)) 


all_queries_pairs_tuples_count.take(10)

[(('american greetings', 'dictionary'), 9.239013273382403e-05),
 (('.com', 'myspac'), 6.159342182254935e-05),
 (('old navy', 'universal studios'), 3.0796710911274674e-05),
 (('good morning america', 'prom dresses'), 3.0796710911274674e-05),
 (('myspace', 'yahoo'), 0.0032182562902282035),
 (('google', 'myspac'), 0.00016938191001201073),
 (('delta airlines', 'prom dresses'), 3.0796710911274674e-05),
 (('bank of america', 'delta.com'), 4.6195066366912014e-05),
 (('amtrak', 'delta.com'), 4.6195066366912014e-05),
 (('amtrak', 'bank of america'), 0.00010778848818946137)]

In [None]:
rdd_queries_tuples_cartesian = all_queries_pairs_tuples_count.cartesian(rdd_query_count)
rdd_queries_tuples_cartesian.take(3)

[((('american greetings', 'dictionary'), 9.239013273382403e-05),
  ('craiglist', 0.0007853161282375043)),
 ((('american greetings', 'dictionary'), 9.239013273382403e-05),
  ('.com', 0.017076776200301808)),
 ((('american greetings', 'dictionary'), 9.239013273382403e-05),
  ('sprint.com', 0.0019247944319546672))]

In [None]:
# this command calculate XUY/X by taking all lines that ((x ,y , number of suply(xUy)),(z ,number of suply(z))) when z ==x
rdd_join_left = rdd_queries_tuples_cartesian.filter(lambda lr: lr[0][0][0] == lr[1][0]) \
                                            .map(lambda lr: (lr[0][0][0],lr[0][0][1],float(lr[0][1]) / lr[1][1]))
rdd_join_left = rdd_join_left.filter(lambda confidance: confidance[2]>0.6)
print(rdd_join_left.take(5))
# this command take XUY/Y

rdd_join_right = rdd_queries_tuples_cartesian.filter(lambda lr: lr[0][0][1] == lr[1][0])\
                                             .map(lambda lr: (lr[0][0][1], lr[0][0][0], float(lr[0][1]) / lr[1][1]))

rdd_join_right = rdd_join_right.filter(lambda confidance: confidance[2]>0.6)

print(rdd_join_right.take(5))

rdd_query_conf = sc.union([rdd_join_left, rdd_join_right])

In [None]:
#######################
####### TASK 1 ########
#######################

x_y_conf06 = sc.union([rdd_join_left, rdd_join_right])
print(x_y_conf06.take(7))

In [None]:
#######################
####### TASK 2  #######
#######################
rddQueryConfDF = sqlContext.createDataFrame(x_y_conf06, ["X", "Y","CONFIDENCE"])
rddQueryConfDF.coalesce(1).write.format('com.databricks.spark.csv').save('./my.csv',header = 'true')

In [None]:
end = time.time()
elapsed_time = time.strftime("%H:%M:%S", time.gmtime(end - start))
print("elapsed time: %s" % elapsed_time)

In [None]:
#########################
####### TASK 4.a ########
#########################
rdd_query_conf_Temp = sc.textFile("my.csv")
headerConf = rdd_query_conf_Temp.first()
x_y_conf06 = rdd_query_conf_Temp.filter(lambda line: line != headerConf)\
                                                        .map(lambda x: x.split(','))\
                                                        .map(lambda x: (x[0],x[1],float(x[2])))

x_y_conf06.take(2)

In [None]:
def filterConfidence(line,Conf):
    if line[2] >=Conf:
        return(True)
    return(False)

x_y_conf08 = x_y_conf06.filter(lambda line: filterConfidence(line,0.8))
x_y_conf09 = x_y_conf08.filter(lambda line: filterConfidence(line,0.9))
print('the amount of related queries for 0.6 confidence')
print(x_y_conf06.count())

print('the amount of related queries for 0.8 confidence')
print(x_y_conf08.count())

print('the amount of related queries for 0.9 confidence')
print(x_y_conf09.count())

In [None]:
tempConfUp06 = x_y_conf06.map(lambda x: (x[2],(x[0],x[1])))

sortTempConfUp06 = tempConfUp06.sortByKey()

In [None]:
print(sortTempConfUp06.top(20))