In [18]:
from pyspark.sql import functions as func
from pyspark.sql.functions import collect_list

dataset = sc.textFile("user-ct-test-collection-01.txt")

In [19]:
# filter the headers
header = dataset.first()
filtered_dataset = dataset.filter(lambda line: line != header)

In [20]:
# seperate by \t and convert the userID to int
filtered_dataset = filtered_dataset.map(lambda k: k.split("\t"))
filtered_dataset = filtered_dataset.map(lambda row: (int(row[0]), row[1]))

In [21]:
# distinct 2 identical rows by (userID, query)
filtered_dataset = filtered_dataset.map(lambda row: (row[0],row[1])).distinct()

In [5]:
filtered_dataset.take(10)

[(142, 'dfdf'),
 (142, 'vaniqa.comh'),
 (142, '207 ad2d 530'),
 (142, 'attornyleslie.com'),
 (217, 'mizuno.com'),
 (217, "p; .; p;' p; ' ;' ;';"),
 (217, 'yahoo.com'),
 (217, '-'),
 (1268, 'sstack.com'),
 (1268, 'www.raindanceexpress.com')]

In [22]:
# Convert the rdd into dataframe
df = filtered_dataset.toDF(["UserID", "Query"])

In [23]:
# Aggregation the dataframe into (query : [userID1, userID2, ...])
query_aggr = df.groupBy(df.Query).agg(collect_list('UserID').alias('UserID'))

In [10]:
# filter single queries
q_more_than_2_user = query_aggr.rdd.filter(lambda couple:len(couple[1]) > 1)

In [9]:
q_more_than_2_user.take(10)

[Row(Query='...', UserID=[3554879, 4005384, 6928849, 19135358, 1812007, 9275494, 5226229, 13384381, 16941836, 21806840, 3107412, 5819277, 7402970, 7476529, 20693622]),
 Row(Query='.hotmail.comhttp', UserID=[3732132, 11990167, 18253475, 21398243]),
 Row(Query='.wamu.com', UserID=[16455679, 3313123, 24535160, 18447964]),
 Row(Query='2 flash games', UserID=[4031594, 3781322]),
 Row(Query='aau.com', UserID=[15336031, 6482128, 2286838]),
 Row(Query='acris', UserID=[1730018, 2272416, 4821757, 9456273, 6820867]),
 Row(Query='action village', UserID=[3144279, 21620700]),
 Row(Query='affordable health insurance', UserID=[307464, 1260412]),
 Row(Query='ako', UserID=[1039396, 3172266, 9640439, 1408653, 1753504, 2706422, 13855112, 18760097, 2496878, 15884214, 18167739, 386728, 808930, 1334291, 2191663, 2276626, 2653464, 5933562, 11034304, 20471493, 1413474, 2595980, 2926375, 6155544, 6418168, 9324718, 9596629, 22984027, 955503, 2327099, 3095767, 3294416, 3949075, 5610864, 11581380, 15422574, 17344

In [24]:
# making the cartesian dataframe query x query
q_more_than_2_user_ct = q_more_than_2_user.cartesian(q_more_than_2_user).filter(lambda row: row[1][0] > row[0][0])

In [25]:
# confidence calculation => |x u y| / |x|
def getConf(Xusers, Yusers):
    x_len = len(Xusers)
    xy_inter = float(len(list(set(Xusers) & set(Yusers))))
    return float(xy_inter / x_len)


q_more_than_2_user_union = q_more_than_2_user_ct.map(lambda row: (row[0][0], row[0][1], row[1][0], row[1][1], getConf(row[0][1], row[1][1]), getConf(row[1][1], row[0][1])))


In [29]:
# filter rows with no intersaction between them - 0.6
q_more_than_2_user_union_filter = q_more_than_2_user_union.filter(lambda row: (row[4]) >= 0.6 or (row[5]) >= 0.6)
q_more_than_2_user_union_filter_100 = q_more_than_2_user_union_filter.take(100)
with open('first_100_rows_of_0_6.txt', 'w') as f:
    for item in q_more_than_2_user_union_filter_1:
        for val in item:
            f.write("%s\t" % val)
        f.write("\n\n")

In [30]:
# filter rows with no intersaction between them - 0.8
q_more_than_2_user_union_filter = q_more_than_2_user_union.filter(lambda row: (row[4]) >= 0.8 or (row[5]) >= 0.8)
q_more_than_2_user_union_filter_100 = q_more_than_2_user_union_filter.take(100)
with open('first_100_rows_of_0_8.txt', 'w') as f:
    for item in q_more_than_2_user_union_filter_100:
        for val in item:
            f.write("%s\t" % val)
        f.write("\n\n")

In [10]:
# filter rows with no intersaction between them - 0.9
q_more_than_2_user_union_filter = q_more_than_2_user_union.filter(lambda row: (row[4]) >= 0.9 or (row[5]) >= 0.9)
q_more_than_2_user_union_filter_100 = q_more_than_2_user_union_filter.take(100)
with open('first_100_rows_of_0_9.txt', 'w') as f:
    for item in q_more_than_2_user_union_filter_100:
        for val in item:
            f.write("%s\t" % val)
        f.write("\n\n")

In [17]:
# just for fun 0.5
with open('first_100_rows_of_0_5.txt', 'w') as f:
    for item in q_more_than_2_user_union_filter_100:
        for val in item:
            f.write("%s\t" % val)
        f.write("\n\n")

In [26]:
# filter rows with no intersaction between them
q_more_than_2_user_union_filter = q_more_than_2_user_union.filter(lambda row: (row[4]) >= 0.5 or (row[5]) >= 0.5)
q_more_than_2_user_union_filter_100 = q_more_than_2_user_union_filter.take(100)

In [27]:
q_more_than_2_user_union_filter_100[0]

('disney chanel .com',
 [1987068, 9507074, 1391897, 5579747, 4390949, 10389986, 11381858],
 'google.com',
 [63364,
  69107,
  74891,
  130564,
  183168,
  191233,
  219251,
  253855,
  272846,
  319471,
  363559,
  402504,
  412723,
  446336,
  557580,
  607345,
  661205,
  678977,
  740174,
  742312,
  791215,
  796519,
  810532,
  813011,
  854116,
  902525,
  940229,
  940541,
  949915,
  964256,
  996567,
  1059548,
  1149073,
  1149915,
  1195616,
  1212069,
  1235607,
  1254663,
  1331126,
  1344604,
  1382152,
  1419697,
  1426215,
  1563337,
  1598057,
  1616501,
  1620679,
  1648038,
  1657106,
  1672789,
  1685611,
  1754049,
  1778181,
  1791551,
  1858738,
  1864771,
  1932893,
  2107837,
  2108293,
  2132639,
  2171333,
  2171607,
  2172008,
  2176363,
  2207050,
  2227369,
  2245786,
  2332782,
  2337174,
  2503056,
  2512757,
  2592232,
  2604343,
  2618473,
  2786817,
  2949535,
  2956925,
  2989232,
  2994447,
  3040095,
  3071657,
  3074635,
  3086691,
  3088512,
  31

In [None]:
# -------------- from here below is our old code --------------

In [37]:
# query_keypair_rdd = q_more_than_2_user.map(lambda x : (x[0],x[1]))
# query_dict_more = query_keypair_rdd.collectAsMap()

# converting back to dataframe
# final_df = q_more_than_2_user_union_filter.toDF(["X_query","X_users","Y_query","Y_users","X=>Y", "Y=>X"])

In [5]:
userrdd = user.rdd
user_keypair_rdd = userrdd.map(lambda x : (x[0],x[1]))
user_dict = user_keypair_rdd.collectAsMap()

queryrdd = query.rdd
query_keypair_rdd = queryrdd.map(lambda x : (x[0],x[1]))
query_dict = query_keypair_rdd.collectAsMap()

In [7]:
def count_queries_by_users(users_list_by_query):
    counter_dict = {}
    for user in users_list_by_query:
        user_queries = user_dict[user]
        for query in user_queries:
            if query in counter_dict:
                counter_dict[query] += 1
            else:
                counter_dict[query] = 1
    return counter_dict

In [8]:
def find_assiciative_rule(query, ratio):
    suggestions_search = []
    users_list_by_query = query_dict[query]
    count_dict = count_queries_by_users(users_list_by_query)
    total = count_dict[query]
    for key, value in count_dict.items():
        count_dict[key] = value / total
    for key, value in count_dict.items():
        if value >= ratio:
            suggestions_search.append({key:value})
    return suggestions_search

In [9]:
query = "disneychanne.com"
vector = find_assiciative_rule(query, 0)

In [10]:
len(vector)

241

In [11]:
spark = SparkSession.builder.getOrCreate()

columns = ['query', '0.6']
vals = [("1", 2)]

df = spark.createDataFrame(vals, columns)
vector = find_assiciative_rule(query, 0.6)
newRow = spark.createDataFrame([(query,len(vector))], columns)
df = df.union(newRow)
new_obj = []
for query in query_dict:
    vector = find_assiciative_rule(query, 0.6)
    if len(vector) > 1:
        new_obj.append({query:vector})

In [15]:
print(new_obj[2])

{'....................................': 28}
