#Part 1

In [1]:
%env PYTHONHASHSEED 3
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q pyspark

env: PYTHONHASHSEED=3
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m13.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
from math import sqrt
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import *
spark = SparkSession.builder.master("local[*]").appName('SparkExample').config(
    "spark.executor.memory", "1g").config("spark.ui.port", "4050"
        ).getOrCreate()
sc = spark.sparkContext

def dbg(x):
  """ A helper function to print debugging information on RDDs """
  if isinstance(x, pyspark.RDD):
    print([(t[0], list(t[1]) if 
            isinstance(t[1], pyspark.resultiterable.ResultIterable) else t[1])
           if isinstance(t, tuple) else t
           for t in x.take(100)])
  else:
    print(x)                                 

##Loading data

In [3]:
import urllib.request
url = 'https://drive.google.com/uc?export=download&confirm=t&id=1Ijyh14a0Lh9sjwQUR6PE1TB2phjAZP4P'
filename = "browsing.txt"
urllib.request.urlretrieve(url, filename)

# load up front for use in all questions
text_file = sc.textFile(filename)
# force spark to load the file
print(f"{filename} loaded with {text_file.count()} lines")

browsing.txt loaded with 31101 lines


##Question 1a

In [135]:
from operator import *
def items_pair_tup(line):
  return [tuple(sorted((line[j], line[i]))) for i in range(len(line)) for j in range(i)]
  
def naive(text_file_rdd):
  wordsByLine = text_file_rdd.map(lambda x : x.split(" ")).map(lambda x : x[:-1])
  itemPairTup = wordsByLine.flatMap(items_pair_tup)
  itemPairCount = itemPairTup.map(lambda x : (x, 1)).reduceByKey(add)
  return itemPairCount
    
results_q1a = naive(text_file).takeOrdered(5, lambda kv: -kv[1])
print(results_q1a)

[(('DAI62779', 'ELE17451'), 1592), (('FRO40251', 'SNA80324'), 1412), (('DAI75645', 'FRO40251'), 1254), (('FRO40251', 'GRO85051'), 1213), (('DAI62779', 'GRO73461'), 1139)]


In [67]:
assert results_q1a == [(('DAI62779', 'ELE17451'), 1592), (('FRO40251', 'SNA80324'), 1412), 
 (('DAI75645', 'FRO40251'), 1254), (('FRO40251', 'GRO85051'), 1213), 
 (('DAI62779', 'GRO73461'), 1139)]

##Question 1b

In [136]:
#Step 1 of the A-priori algorithm (has its own function so we can test it)
def a_priori_step1(text_file_rdd):
  items = text_file_rdd.flatMap(lambda x : x.split(" ")).filter(lambda x : x != '')
  itemsCount = items.map(lambda x : (x, 1)).reduceByKey(add)
  return itemsCount
results_q1b_step1 = a_priori_step1(text_file).takeOrdered(5, lambda kv: -kv[1])
print(results_q1b_step1)

[('DAI62779', 6667), ('FRO40251', 3881), ('ELE17451', 3875), ('GRO73461', 3602), ('SNA80324', 3044)]


In [70]:
assert results_q1b_step1 == [('DAI62779', 6667), ('FRO40251', 3881), 
                             ('ELE17451', 3875), ('GRO73461', 3602), 
                             ('SNA80324', 3044)]

In [217]:
def a_priori(text_file_rdd, support=100):
  supportItems = a_priori_step1(text_file_rdd).filter(lambda x : x[1] > support)
  freqItems = sc.broadcast(supportItems.map(lambda x : x[0]).collect())

  def filter_uncommon(text):
    return [item  for item in text.split(' ') if item in freqItems.value]

  freqItemByLine = text_file_rdd.map(filter_uncommon)
  freqItemPair = freqItemByLine.flatMap(items_pair_tup)
  freqWordsPairCount = freqItemPair.map(lambda x : (x, 1)).reduceByKey(add)
  return freqWordsPairCount

results_q1b = a_priori(text_file).takeOrdered(5, lambda kv: -kv[1])
print(results_q1b)

[(('DAI62779', 'ELE17451'), 1592), (('FRO40251', 'SNA80324'), 1412), (('DAI75645', 'FRO40251'), 1254), (('FRO40251', 'GRO85051'), 1213), (('DAI62779', 'GRO73461'), 1139)]


In [218]:
assert results_q1b == [(('DAI62779', 'ELE17451'), 1592), (('FRO40251', 'SNA80324'), 1412), 
 (('DAI75645', 'FRO40251'), 1254), (('FRO40251', 'GRO85051'), 1213), 
 (('DAI62779', 'GRO73461'), 1139)]

In [219]:
import time

start_naive = time.time()
results_q1a = naive(text_file).takeOrdered(5, lambda kv: -kv[1])
end_naive = time.time()

start_apriori_100 = time.time()
results_q1b_100 = a_priori(text_file, 100).takeOrdered(5, lambda kv: -kv[1])
end_apriori_100 = time.time()

start_apriori_1000 = time.time()
results_q1b_1000 = a_priori(text_file, 1000).takeOrdered(5, lambda kv: -kv[1])
end_apriori_1000 = time.time()

time_naive = end_naive - start_naive
time_apriori_100 = end_apriori_100 - start_apriori_100
time_apriori_1000 = end_apriori_1000 - start_apriori_1000


print(f"naive took {time_naive} seconds, apriori(100) took {time_apriori_100} seconds, and apriori(1000) took {time_apriori_1000} seconds")

naive took 8.573988676071167 seconds, apriori(100) took 4.893145322799683 seconds, and apriori(1000) took 1.2100610733032227 seconds


In [208]:
assert results_q1a == results_q1b_100
assert results_q1a == results_q1b_1000
assert time_naive > time_apriori_100
assert time_apriori_100 > time_apriori_1000

##Question 2 setup

In [209]:
top_item_counts = a_priori_step1(text_file).filter(lambda kv: kv[1]>=100)
top_pair_counts = a_priori(text_file, support=100)


##Question 2a

In [210]:
def confidence(item_counts, pair_counts, n):
  res = []
  item_support = item_counts.map(lambda x : (x[0], x[1]/n))
  pair_support = pair_counts.map(lambda x : (x[0], x[1]/n))
  X_Ypair_map = pair_support.map(lambda x : (x[0][0],(x[0][1], x[1])))
  Y_Xpair_map = pair_support.map(lambda x : (x[0][1],(x[0][0], x[1])))
  X_Yjoin = item_support.join(X_Ypair_map)
  Y_Xjoin = item_support.join(Y_Xpair_map)
  confCombined = X_Yjoin.union(Y_Xjoin)
  confidence = confCombined.map(lambda x : ((x[0],x[1][1][0]), x[1][1][1]/x[1][0]))
  return confidence

top_rule_confidences = confidence(top_item_counts, top_pair_counts, text_file.count())
results_q2a = top_rule_confidences.takeOrdered(5, lambda kv: -kv[1])

print(results_q2a)

[(('DAI93865', 'FRO40251'), 1.0), (('GRO85051', 'FRO40251'), 0.9991762767710051), (('GRO38636', 'FRO40251'), 0.9906542056074765), (('ELE12951', 'FRO40251'), 0.9905660377358491), (('DAI88079', 'FRO40251'), 0.9867256637168142)]


In [211]:
assert results_q2a == [(('DAI93865', 'FRO40251'), 1.0), 
                       (('GRO85051', 'FRO40251'), 0.9991762767710051),
                       (('GRO38636', 'FRO40251'), 0.9906542056074765), 
                       (('ELE12951', 'FRO40251'), 0.9905660377358491), 
                       (('DAI88079', 'FRO40251'), 0.9867256637168142)]

##Question 2b

In [212]:
def interest(item_counts, rule_confidences, n):
  #Map the items with the corresponding its probability (Y, Probability of Y)
  ProbY = item_counts.map(lambda x : (x[0], x[1]/n))
  #swap the pair((X, Y), _) form to ((Y, X), _) to make the key Y 
  X_YconfPair = rule_confidences.map(lambda x : (x[0][1],(x[0][0], x[1])))
  #Join to get the form of (Y,(Prob(Y), (X, Conf(X->Y)))) to calculate interest next
  X_Yjoin = ProbY.join(X_YconfPair)
  #Calculate Interest by subtracting Conf(X->Y) - Prob(Y)
  #and mapping them in form ((X, Y), Interest(X->Y))
  interest = X_Yjoin.map(lambda x : ((x[1][1][0], x[0]), x[1][1][1]-x[1][0]))
  return interest
  
top_interest = interest(top_item_counts, top_rule_confidences, text_file.count())
results_q2b = top_interest.takeOrdered(5, lambda kv: -kv[1])
print(results_q2b)

[(('DAI43868', 'SNA82528'), 0.9538739086342056), (('DAI93865', 'FRO40251'), 0.8752130156586605), (('GRO85051', 'FRO40251'), 0.8743892924296656), (('GRO38636', 'FRO40251'), 0.865867221266137), (('ELE12951', 'FRO40251'), 0.8657790533945096)]


In [213]:
assert results_q2b == [(('DAI43868', 'SNA82528'), 0.9538739086342056), 
                       (('DAI93865', 'FRO40251'), 0.8752130156586605), 
                       (('GRO85051', 'FRO40251'), 0.8743892924296656), 
                       (('GRO38636', 'FRO40251'), 0.865867221266137), 
                       (('ELE12951', 'FRO40251'), 0.8657790533945096)]