#Part 1

In [None]:
%env PYTHONHASHSEED 3
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q pyspark

env: PYTHONHASHSEED=3


In [None]:
from math import sqrt
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import *
spark = SparkSession.builder.master("local[*]").appName('SparkExample').config(
    "spark.executor.memory", "1g").config("spark.ui.port", "4050"
        ).getOrCreate()
sc = spark.sparkContext

def dbg(x):
  """ A helper function to print debugging information on RDDs """
  if isinstance(x, pyspark.RDD):
    print([(t[0], list(t[1]) if 
            isinstance(t[1], pyspark.resultiterable.ResultIterable) else t[1])
           if isinstance(t, tuple) else t
           for t in x.take(100)])
  else:
    print(x)                                 

##Loading data

In [None]:
import urllib.request
url = 'https://drive.google.com/uc?export=download&confirm=t&id=1Ijyh14a0Lh9sjwQUR6PE1TB2phjAZP4P'
filename = "browsing.txt"
urllib.request.urlretrieve(url, filename)

# load up front for use in all questions
text_file = sc.textFile(filename)
# force spark to load the file
print(f"{filename} loaded with {text_file.count()} lines")

browsing.txt loaded with 31101 lines


##Question 1a

In [None]:
from operator import *
from itertools import combinations
from typing import Tuple

SUPPORT = 100

def naive(text_file_rdd: pyspark.RDD[str], support: int = SUPPORT) -> pyspark.RDD[Tuple[Tuple[str, str], int]]:
  line_to_items = text_file_rdd.map(str.split)
  all_combinations = line_to_items.flatMap(lambda items: combinations(items, r=2)).map(lambda pair: tuple(sorted(pair)))
  pair_counts = all_combinations.map(lambda pair: (pair, 1)).reduceByKey(add)
  filtered_counts = pair_counts.filter(lambda kv: kv[1] >= support)
  return filtered_counts
    
results_q1a = naive(text_file).takeOrdered(5, lambda kv: -kv[1])
print(results_q1a)

[(('DAI62779', 'ELE17451'), 1592), (('FRO40251', 'SNA80324'), 1412), (('DAI75645', 'FRO40251'), 1254), (('FRO40251', 'GRO85051'), 1213), (('DAI62779', 'GRO73461'), 1139)]


In [None]:
assert results_q1a == [(('DAI62779', 'ELE17451'), 1592), (('FRO40251', 'SNA80324'), 1412), 
 (('DAI75645', 'FRO40251'), 1254), (('FRO40251', 'GRO85051'), 1213), 
 (('DAI62779', 'GRO73461'), 1139)]

##Question 1b

In [None]:
#Step 1 of the A-priori algorithm (has its own function so we can test it)
def a_priori_step1(text_file_rdd: pyspark.RDD[str]) -> pyspark.RDD[Tuple[str, int]]:
  item_lines = text_file_rdd.map(str.split)
  # A-Priori step 1: Who shows up at least `support` times
  item_counts = item_lines.flatMap(lambda line: ((item, 1) for item in line)).reduceByKey(add)
  return item_counts




results_q1b_step1 = a_priori_step1(text_file).takeOrdered(5, lambda kv: -kv[1])
print(results_q1b_step1)

[('DAI62779', 6667), ('FRO40251', 3881), ('ELE17451', 3875), ('GRO73461', 3602), ('SNA80324', 3044)]


In [None]:
assert results_q1b_step1 == [('DAI62779', 6667), ('FRO40251', 3881), 
                             ('ELE17451', 3875), ('GRO73461', 3602), 
                             ('SNA80324', 3044)]

In [None]:
def a_priori(text_file_rdd, support=100) -> pyspark.RDD[Tuple[Tuple[str, str], int]]:
  frequent_items = (a_priori_step1(text_file_rdd)
          .filter(lambda kv: kv[1] >= support)     # Filter out uncommon items
          .map(lambda kv: kv[0])                  # Strip out the count
  )
  frequent_items = set(frequent_items.collect())  # Turn into local variable
  frequent_items = sc.broadcast(frequent_items)   # Broadcast to all nodes
  # Do the naive algorithm, but filter out uncommon items first.
  item_lines = text_file_rdd.map(lambda line: [item for item in line.split() if item in frequent_items.value])
  all_combinations = item_lines.flatMap(lambda items: combinations(items, r=2)).map(lambda pair: tuple(sorted(pair)))
  pair_counts = all_combinations.map(lambda pair: (pair, 1)).reduceByKey(add)
  filtered_counts = pair_counts.filter(lambda kv: kv[1] >= support)
  return filtered_counts

results_q1b = a_priori(text_file).takeOrdered(5, lambda kv: -kv[1])
print(results_q1b)

[(('DAI62779', 'ELE17451'), 1592), (('FRO40251', 'SNA80324'), 1412), (('DAI75645', 'FRO40251'), 1254), (('FRO40251', 'GRO85051'), 1213), (('DAI62779', 'GRO73461'), 1139)]


In [None]:
assert results_q1b == [(('DAI62779', 'ELE17451'), 1592), (('FRO40251', 'SNA80324'), 1412), 
 (('DAI75645', 'FRO40251'), 1254), (('FRO40251', 'GRO85051'), 1213), 
 (('DAI62779', 'GRO73461'), 1139)]

In [None]:
import time

start_naive = time.time()
results_q1a = naive(text_file).takeOrdered(5, lambda kv: -kv[1])
end_naive = time.time()

start_apriori_100 = time.time()
results_q1b_100 = a_priori(text_file, 100).takeOrdered(5, lambda kv: -kv[1])
end_apriori_100 = time.time()

start_apriori_1000 = time.time()
results_q1b_1000 = a_priori(text_file, 1000).takeOrdered(5, lambda kv: -kv[1])
end_apriori_1000 = time.time()

time_naive = end_naive - start_naive
time_apriori_100 = end_apriori_100 - start_apriori_100
time_apriori_1000 = end_apriori_1000 - start_apriori_1000


print(f"naive took {time_naive} seconds, apriori(100) took {time_apriori_100} seconds, and apriori(1000) took {time_apriori_1000} seconds")

naive took 12.619127988815308 seconds, apriori(100) took 4.440699338912964 seconds, and apriori(1000) took 1.429325819015503 seconds


In [None]:
assert results_q1a == results_q1b_100
assert results_q1a == results_q1b_1000
assert time_naive > time_apriori_100
assert time_apriori_100 > time_apriori_1000

##Question 2 setup

In [None]:
top_item_counts = a_priori_step1(text_file).filter(lambda kv: kv[1]>=100)
top_pair_counts = a_priori(text_file, support=100)


##Question 2a

In [None]:
def _detangle_and_calculate_conf(item: Tuple[str, Tuple[tuple[str, float], float]]) -> Tuple[Tuple[str, str], float]:
  i = item[0]
  j = item[1][0][0]
  support_i_union_j = item[1][0][1]
  support_i = item[1][1]
  conf = support_i_union_j / support_i
  return ((i, j), conf)

def confidence(item_counts: pyspark.RDD[Tuple[str, int]], pair_counts: pyspark.RDD[Tuple[Tuple[str, str], int]], n: int) -> pyspark.RDD[Tuple[Tuple[str, str], float]]:
  # Conf(I -> j) = Support(I ∪ j) / Support(I).
  # Here, Support(I union j) is `pair_counts`, and Support(I) is `item_counts`
  # We need X->Y and Y->X
  # Angry note: I have to divide by n, otherwise I get a floating point inaccuracy of a ten-quadrillionth.
  pair_counts = pair_counts.flatMap(lambda kv: (    # Structure: ((I, j), Support(I ∪ j))
      ((kv[0][0], kv[0][1]), kv[1]/n),
      ((kv[0][1], kv[0][0]), kv[1]/n),
  ))
  item_counts = item_counts.map(lambda kv: (kv[0], kv[1]/n))
  # To compare Support(I ∪ j) with Support(I), we need to union on I, which means
  # an ugly reshuffle to make I the key
  i_to_j_and_support = pair_counts.map(lambda kv: (kv[0][0], (kv[0][1], kv[1])))  # Structure: (I, (j, Support(I ∪ j)))
  i_to_j_support_with_i_support = i_to_j_and_support.join(item_counts)            # Structure: (I, ((j, Support(I ∪ j)), Support(I)))
  conf = i_to_j_support_with_i_support.map(_detangle_and_calculate_conf)

  return conf
  

top_rule_confidences = confidence(top_item_counts, top_pair_counts, text_file.count())
results_q2a = top_rule_confidences.takeOrdered(5, lambda kv: -kv[1])

print(results_q2a)


[(('DAI93865', 'FRO40251'), 1.0), (('GRO85051', 'FRO40251'), 0.9991762767710051), (('GRO38636', 'FRO40251'), 0.9906542056074765), (('ELE12951', 'FRO40251'), 0.9905660377358491), (('DAI88079', 'FRO40251'), 0.9867256637168142)]


In [None]:
assert results_q2a == [(('DAI93865', 'FRO40251'), 1.0), 
                       (('GRO85051', 'FRO40251'), 0.9991762767710051),
                       (('GRO38636', 'FRO40251'), 0.9906542056074765), 
                       (('ELE12951', 'FRO40251'), 0.9905660377358491), 
                       (('DAI88079', 'FRO40251'), 0.9867256637168142)]

##Question 2b

In [None]:
def interest(item_counts: pyspark.RDD[Tuple[str, int]], rule_confidences: pyspark.RDD[Tuple[str, float]], n: int) -> pyspark.RDD[Tuple[Tuple[str, str], float]]:
  # Divide by n so it's in (0, 1]
  item_freq = item_counts.map(lambda kv: (kv[0], kv[1]/n))
  # Make j the key so we can join
  j_to_i_and_conf = rule_confidences.map(lambda kv: (kv[0][1], (kv[0][0], kv[1]))) # Structure: (j, (I, Conf(I -> j)))
  joined_conf_and_freq = j_to_i_and_conf.join(item_freq)
  inter = joined_conf_and_freq.map(lambda kv:((kv[1][0][0], kv[0]), kv[1][0][1] - kv[1][1]))
  return inter


top_interest = interest(top_item_counts, top_rule_confidences, text_file.count())
results_q2b = top_interest.takeOrdered(5, lambda kv: -kv[1])
print(results_q2b)

[(('DAI43868', 'SNA82528'), 0.9538739086342056), (('DAI93865', 'FRO40251'), 0.8752130156586605), (('GRO85051', 'FRO40251'), 0.8743892924296656), (('GRO38636', 'FRO40251'), 0.865867221266137), (('ELE12951', 'FRO40251'), 0.8657790533945096)]


In [None]:
assert results_q2b == [(('DAI43868', 'SNA82528'), 0.9538739086342056), 
                       (('DAI93865', 'FRO40251'), 0.8752130156586605), 
                       (('GRO85051', 'FRO40251'), 0.8743892924296656), 
                       (('GRO38636', 'FRO40251'), 0.865867221266137), 
                       (('ELE12951', 'FRO40251'), 0.8657790533945096)]