# name： Ziling Huang
# username: zhu51

# Lab 3: Association Rules (Recommendations)
Note: You will need to download the browsing.txt file, you can use this in colab:
!gdown 'https://drive.google.com/uc?export=download&confirm=t&id=1Ijyh14a0Lh9sjwQUR6PE1TB2phjAZP4P'

Association Rules are frequently used for Market Basket Analysis (MBA) by retailers to understand the purchase behavior of their customers. This information can be then used for many different purposes such as recommendations, cross-selling and up-selling of products, sales promotions, loyalty programs, store design, discount plans and many others.
					
Evaluation of item sets: Once you have found the frequent itemsets of a dataset, you need to choose a subset of them that are significant and interesting. Commonly used metrics for measuring significance and interest for selecting rules for recommendations are: 

Confidence, conf(I -> j) = support(I u j) / support(I)
Interest, interest(I -> j) = conf(I -> j) - Pr[j]

Application in product recommendations: The action or practice of selling additional products or services to existing customers is called cross-selling. Giving product recommendations is one of the examples of cross-selling that are frequently used by online retailers. One simple method to give product recommendations is to recommend products that are frequently browsed together by the customers. Suppose we want to recommend new products to the customer based on the products they have already browsed online. 

In [2]:
# start the Spark context
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q pyspark


import os
os.environ['PYTHONHASHSEED']="0"
os.environ["PYSPARK_PYTHON"]="python3"
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64/"
# A few additional libraries we will need
from math import sqrt

import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import *

try:
  conf = SparkConf().setMaster("local[*]").set("spark.executor.memory", "1g").set("spark.executorEnv.PYTHONHASHSEED","0").set("spark.ui.port", "4050")
  sc = SparkContext(conf = conf)
  spark = SparkSession.builder.getOrCreate()
except ValueError:
  #it's ok if the server is already started
  pass

def dbg(x):
  """ A helper function to print debugging information on RDDs """
  if isinstance(x, pyspark.RDD):
    print([(t[0], list(t[1]) if 
            isinstance(t[1], pyspark.resultiterable.ResultIterable) else t[1])
           if isinstance(t, tuple) else t
           for t in x.take(100)])
  else:
    print(x)
    

import unittest
Test = unittest.TestCase()

[K     |████████████████████████████████| 281.4 MB 38 kB/s 
[K     |████████████████████████████████| 198 kB 54.9 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [3]:
!gdown 'https://drive.google.com/uc?export=download&confirm=t&id=1Ijyh14a0Lh9sjwQUR6PE1TB2phjAZP4P'

Downloading...
From: https://drive.google.com/uc?export=download&confirm=t&id=1Ijyh14a0Lh9sjwQUR6PE1TB2phjAZP4P
To: /content/browsing.txt
100% 3.46M/3.46M [00:00<00:00, 18.2MB/s]


# Question 1: Find products which are frequently browsed together [45 points total]
Write a Spark map-reduce program to find products which are frequently browsed together in the given browsing.txt. Each line represents a browsing session of a customer (a “basket”). On each line, each string of 8 characters represents the ID of an item browsed during that session. The items are separated by spaces. For example, this first line of browsing.txt:

FRO11987 ELE17451 ELE89019 SNA90258 GRO99222 

represents a browsing session (a “basket”) with 5 item IDs.


1a) [15 points for correct top 5 most frequent pairs] Implement a naive Spark approach to finding frequest product pairs with support = 100 (i.e. product pairs need to occur together at least 100 times to be considered frequent):
Create an RDD from the lines in the file


In [4]:
# load the file into a distributed dataset of lines
browsing = sc.textFile("browsing.txt")

In [57]:
print(browsing.collect()[100])

SNA94736 ELE12792 FRO28730 GRO35122 FRO95943 FRO16725 GRO22635 SNA64512 


2.Map each line into a list of all pairs of items in the basket. 
Hint: this is an N^2 operation and can be done by writing a function with a nested loop that processes a single basket and then mapping this function over the RDD
Hint: you can ensure that a pair is counted regardless of whether it appears in a basket as …,A,...,B,... or …,B,...,A,... by outputting it as tuple(sorted((item1, item2))) which would output (A,B) for both cases

In [5]:
from itertools import combinations
combination = (browsing.flatMap(lambda line: [sorted(value) for value in combinations(line.split(), 2)]))

In [61]:
j = [(1, "a"), (2, "q")]
for i in j:
  print(i)
  for a in i:
    print(a)

(1, 'a')
1
a
(2, 'q')
2
q


3.Reduce the pairs into pair counts

In [6]:
pairsCount = combination.map(lambda a: (tuple(a), 1)).reduceByKey(lambda a, b: a + b).filter(lambda a: a[1]>=100).sortBy(lambda a: a[1], False)

4.Take the top 5 most frequent pairs and display them, you should get:

[(('DAI62779', 'ELE17451'), 1592), (('FRO40251', 'SNA80324'), 1412), (('DAI75645', 'FRO40251'), 1254), (('FRO40251', 'GRO85051'), 1213), (('DAI62779', 'GRO73461'), 1139)]


In [7]:
topFive = pairsCount.take(5)
dbg(topFive)

[(('DAI62779', 'ELE17451'), 1592), (('FRO40251', 'SNA80324'), 1412), (('DAI75645', 'FRO40251'), 1254), (('FRO40251', 'GRO85051'), 1213), (('DAI62779', 'GRO73461'), 1139)]


1b) [30 points total] The naive approach can be slow and disk/memory intensive for large sets of browsing data. Improve this by applying the two step A-priori algorithm.

1. Create an RDD from the lines in the file

2. [15 points for correct top 5 most frequent items] Map each line into its items and then perform an item count on the entire RDD (this is step 1 of the A-priori algorithm). If you take the top 5 most frequent items you should get:
[('DAI62779', 6667), ('FRO40251', 3881), ('ELE17451', 3875), ('GRO73461', 3602), ('SNA80324', 3044)]


In [11]:
counts = browsing.flatMap(lambda line:[a for a in line.split()]).map(lambda a: (a, 1)).reduceByKey(lambda a, b: a+b).sortBy(lambda a:a[1], False)
dbg(counts)

[('DAI62779', 6667), ('FRO40251', 3881), ('ELE17451', 3875), ('GRO73461', 3602), ('SNA80324', 3044), ('ELE32164', 2851), ('DAI75645', 2736), ('SNA45677', 2455), ('FRO31317', 2330), ('DAI85309', 2293), ('ELE26917', 2292), ('FRO80039', 2233), ('GRO21487', 2115), ('SNA99873', 2083), ('GRO59710', 2004), ('GRO71621', 1920), ('FRO85978', 1918), ('GRO30386', 1840), ('ELE74009', 1816), ('GRO56726', 1784), ('DAI63921', 1773), ('GRO46854', 1756), ('ELE66600', 1713), ('DAI83733', 1712), ('FRO32293', 1702), ('ELE66810', 1697), ('SNA55762', 1646), ('DAI22177', 1627), ('FRO78087', 1531), ('ELE99737', 1516), ('GRO94758', 1489), ('ELE34057', 1489), ('FRO35904', 1436), ('FRO53271', 1420), ('SNA93860', 1407), ('SNA90094', 1390), ('GRO38814', 1352), ('ELE56788', 1345), ('GRO61133', 1321), ('DAI88807', 1316), ('ELE74482', 1316), ('ELE59935', 1311), ('SNA96271', 1295), ('DAI43223', 1290), ('ELE91337', 1289), ('GRO15017', 1275), ('DAI31081', 1261), ('GRO81087', 1220), ('DAI22896', 1219), ('GRO85051', 1214),

  3.Filter the frequent items and keep only those with >= 100 occurrences. Collect these as a map and broadcast this map to every Spark worker (see the Spark documentation for broadcast).

In [23]:
filterCount = dict(counts.filter(lambda a:a[1] > 100).collect())
filterCount = sc.broadcast(filterCount)

4.We can now begin Step 2 of the A-priori algorithm. This now follows the same as the naive algorithm except that when we map each line into a list of all pairs of items in the basket we first check to ensure that both items in the pair occur in the broadcast map from Step 1 of the A-priori algorithm. This significantly decreases the number of pairs that we output from this step.

In [24]:
def getAprior(lists):
  Aprior = []
  for i in range(0, len(lists)):
    first = lists[i]
    if first in filterCount.value.keys():
      for j in range(i+1, len(lists)):
        second = lists[j]
        if second in filterCount.value.keys():
          Aprior.append(tuple(sorted((first, second))))
  return Aprior

5.Reduce the pairs into pair counts

In [27]:
pairs = browsing.flatMap(lambda line: [(value, 1) for value in getAprior(line.split())]).reduceByKey(lambda a,b: a+b).sortBy(lambda a:a[1], False)

6.[15 points for correct top 5 most frequent pairs] Take the top 5 most frequent pairs and display them, you should still get:

[(('DAI62779', 'ELE17451'), 1592), (('FRO40251', 'SNA80324'), 1412), (('DAI75645', 'FRO40251'), 1254), (('FRO40251', 'GRO85051'), 1213), (('DAI62779', 'GRO73461'), 1139)]


In [28]:
dbg(pairs)

[(('DAI62779', 'ELE17451'), 1592), (('FRO40251', 'SNA80324'), 1412), (('DAI75645', 'FRO40251'), 1254), (('FRO40251', 'GRO85051'), 1213), (('DAI62779', 'GRO73461'), 1139), (('DAI75645', 'SNA80324'), 1130), (('DAI62779', 'FRO40251'), 1070), (('DAI62779', 'SNA80324'), 923), (('DAI62779', 'DAI85309'), 918), (('ELE32164', 'GRO59710'), 911), (('FRO40251', 'GRO73461'), 882), (('DAI62779', 'DAI75645'), 882), (('DAI62779', 'ELE92920'), 877), (('FRO40251', 'FRO92469'), 835), (('DAI62779', 'ELE32164'), 832), (('DAI75645', 'GRO73461'), 712), (('DAI43223', 'ELE32164'), 711), (('DAI62779', 'GRO30386'), 709), (('ELE17451', 'FRO40251'), 697), (('DAI85309', 'ELE99737'), 659), (('DAI62779', 'ELE26917'), 650), (('GRO21487', 'GRO73461'), 631), (('DAI62779', 'SNA45677'), 604), (('ELE17451', 'SNA80324'), 597), (('DAI62779', 'GRO71621'), 595), (('DAI62779', 'SNA55762'), 593), (('DAI62779', 'DAI83733'), 586), (('ELE17451', 'GRO73461'), 580), (('GRO73461', 'SNA80324'), 562), (('DAI62779', 'GRO59710'), 561), ((

# Question 2: Find interesting pair rules 
[45 points: 15 for correct expressions for confidence score, 15 for correct expressions for interest score, 15 for ensuring the X->Y and Y->X rules are all computed so that the top 5 confidence and interesting rules are correct]

2a) Using the item pairs (X, Y) found in Part 1, compute the confidence score for the corresponding association rules X->Y and Y->X for each of them. Here are the top 5 confidence score rules:

[(('DAI93865', 'FRO40251'), 1.0), (('GRO85051', 'FRO40251'), 0.9991762767710051), (('GRO38636', 'FRO40251'), 0.9906542056074765), (('ELE12951', 'FRO40251'), 0.9905660377358491), (('DAI88079', 'FRO40251'), 0.9867256637168142)]



In [43]:
def score(pair):
  result = []
  X, Y, count = pair[0][0], pair[0][1], pair[1]
  # X->Y, Y->X
  result.append(((X, Y), count/frequentItemCount.value[X]))
  result.append(((Y, X), count/frequentItemCount.value[Y]))
  # print(result)
  return result

confidenceScore = pairs.flatMap(lambda pairCount: [(result) for result in score(pairCount)]).sortBy(lambda a: -a[1])
dbg(confidenceScore)

[(('DAI93865', 'FRO40251'), 1.0), (('GRO85051', 'FRO40251'), 0.999176276771005), (('GRO38636', 'FRO40251'), 0.9906542056074766), (('ELE12951', 'FRO40251'), 0.9905660377358491), (('DAI88079', 'FRO40251'), 0.9867256637168141), (('FRO92469', 'FRO40251'), 0.983510011778563), (('DAI43868', 'SNA82528'), 0.972972972972973), (('DAI23334', 'DAI62779'), 0.9545454545454546), (('DAI74977', 'GRO83463'), 0.8240740740740741), (('DAI20027', 'DAI70456'), 0.8070175438596491), (('DAI33885', 'GRO46854'), 0.7407407407407407), (('SNA46500', 'GRO44993'), 0.7403846153846154), (('SNA81556', 'DAI85309'), 0.7363636363636363), (('ELE92920', 'DAI62779'), 0.7326649958228906), (('DAI53152', 'FRO40251'), 0.717948717948718), (('SNA18336', 'DAI62779'), 0.7136812411847673), (('ELE55848', 'GRO32086'), 0.7094594594594594), (('DAI74112', 'DAI42493'), 0.6991150442477876), (('GRO89004', 'ELE25077'), 0.698051948051948), (('GRO81647', 'GRO73461'), 0.6775510204081633), (('DAI37288', 'ELE32164'), 0.6464088397790055), (('SNA18336

2b) Then compute the interest score for each of them. Report the top 5. You will need to use maps, joins, and lambda expressions. Here are the top 5 interesting rules:

[(('DAI43868', 'SNA82528'), 0.9538739086342056), (('DAI93865', 'FRO40251'), 0.8752130156586605), (('GRO85051', 'FRO40251'), 0.8743892924296656), (('GRO38636', 'FRO40251'), 0.865867221266137), (('ELE12951', 'FRO40251'), 0.8657790533945096)]

In [44]:
def count(line):
  result = []
  for i in line.split():
    if i not in result:
      result.append((i, 1))
  return result

def interestScore(file, score):
  """
   Then compute the interest score for each of them. Report the top 5. You will need to use maps, joins, and lambda expressions
  """
  counts = (file.flatMap(lambda each: [a for a in count(each)]).reduceByKey(lambda a, b: a+b))
  total = file.map(lambda each: 1).sum()
  result = score.map(lambda a: ((a[0][1]), (a[0][0], a[1]))).join(counts).map(lambda a: ((a[1][0][0], a[0]), a[1][0][1]-a[1][1]/total)).sortBy(lambda a: a[1], False)
  return result

interestScore = interestScore(browsing, confidenceScore)
dbg(interestScore)

[(('DAI43868', 'SNA82528'), 0.9538739086342057), (('DAI93865', 'FRO40251'), 0.8752130156586605), (('GRO85051', 'FRO40251'), 0.8743892924296655), (('GRO38636', 'FRO40251'), 0.8658672212661371), (('ELE12951', 'FRO40251'), 0.8657790533945096), (('DAI88079', 'FRO40251'), 0.8619386793754746), (('FRO92469', 'FRO40251'), 0.8587230274372235), (('DAI74977', 'GRO83463'), 0.8065826750836879), (('DAI20027', 'DAI70456'), 0.7932880817844746), (('DAI23334', 'DAI62779'), 0.7401793569923212), (('SNA46500', 'GRO44993'), 0.7020257201722429), (('DAI33885', 'GRO46854'), 0.6842795337055971), (('GRO89004', 'ELE25077'), 0.6826826673214249), (('ELE55848', 'GRO32086'), 0.6819362286951753), (('DAI74112', 'DAI42493'), 0.6669295839731983), (('SNA81556', 'DAI85309'), 0.66263610348688), (('SNA18336', 'ELE92920'), 0.6032614337318303), (('DAI53152', 'FRO40251'), 0.5931617336073784), (('ELE32244', 'ELE66600'), 0.5852722623574467), (('SNA44451', 'DAI18527'), 0.5742400565898202), (('FRO17734', 'ELE28189'), 0.569952939872