****Lab 3: Association Rules (Recommendations)****

Association Rules are frequently used for Market Basket Analysis (MBA) by retailers to understand the purchase behavior of their customers. This information can be then used for many different purposes such as recommendations, cross-selling and up-selling of products, sales promotions, loyalty programs, store design, discount plans and many others.
                    
Evaluation of item sets: Once you have found the frequent itemsets of a dataset, you need to choose a subset of them that are significant and interesting. Commonly used metrics for measuring significance and interest for selecting rules for recommendations are: 

Confidence, conf(I -> j) = support(I u j) / support(I)

Interest, interest(I -> j) = conf(I -> j) - Pr[j]

Application in product recommendations: The action or practice of selling additional products or services to existing customers is called cross-selling. Giving product recommendations is one of the examples of cross-selling that are frequently used by online retailers. One simple method to give product recommendations is to recommend products that are frequently browsed together by the customers. Suppose we want to recommend new products to the customer based on the products they have already browsed online. 



Note: You will need to download the browsing.txt file, you can use this in colab:

In [1]:
!gdown 'https://drive.google.com/uc?export=download&confirm=t&id=1Ijyh14a0Lh9sjwQUR6PE1TB2phjAZP4P'

Downloading...
From: https://drive.google.com/uc?export=download&confirm=t&id=1Ijyh14a0Lh9sjwQUR6PE1TB2phjAZP4P
To: /content/browsing.txt
  0% 0.00/3.46M [00:00<?, ?B/s]100% 3.46M/3.46M [00:00<00:00, 165MB/s]


In [2]:
# start the Spark context
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install -q pyspark


import os
os.environ['PYTHONHASHSEED']="0"
os.environ["PYSPARK_PYTHON"]="python3"
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64/"
# A few additional libraries we will need
from math import sqrt

import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import *

try:
  conf = SparkConf().setMaster("local[*]").set("spark.executor.memory", "1g").set("spark.executorEnv.PYTHONHASHSEED","0").set("spark.ui.port", "4050")
  sc = SparkContext(conf = conf)
  spark = SparkSession.builder.getOrCreate()
except ValueError:
  #it's ok if the server is already started
  pass

def dbg(x):
  """ A helper function to print debugging information on RDDs """
  if isinstance(x, pyspark.RDD):
    print([(t[0], list(t[1]) if 
            isinstance(t[1], pyspark.resultiterable.ResultIterable) else t[1])
           if isinstance(t, tuple) else t
           for t in x.take(100)])
  else:
    print(x)
    

import unittest
Test = unittest.TestCase()

[K     |████████████████████████████████| 281.4 MB 34 kB/s 
[K     |████████████████████████████████| 198 kB 58.5 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


# Question 1: Find products which are frequently browsed together
[45 points total]

Write a Spark map-reduce program to find products which are frequently browsed together in the given browsing.txt. Each line represents a browsing session of a customer (a “basket”). On each line, each string of 8 characters represents the ID of an item browsed during that session. The items are separated by spaces. For example, this first line of browsing.txt:

FRO11987 ELE17451 ELE89019 SNA90258 GRO99222 

represents a browsing session (a “basket”) with 5 item IDs.


## 1a) Naive All-Pairs
[15 points for correct top 5 most frequent pairs]

Implement a naive Spark approach to finding frequest product pairs with support = 100 (i.e. product pairs need to occur together at least 100 times to be considered frequent):

1. Create an RDD from the lines in the file

2. Map each line into a list of all pairs of items in the basket. 

    Hint: this is an N^2 operation and can be done by writing a function with a nested loop that processes a single basket and then mapping this function over the RDD

    Hint: you can ensure that a pair is counted regardless of whether it appears in a basket as …,A,...,B,... or …,B,...,A,... by outputting it as tuple(sorted((item1, item2))) which would output (A,B) for both cases

3. Reduce the pairs into pair counts

4. Take the top 5 most frequent pairs and display them, you should get:

[(('DAI62779', 'ELE17451'), 1592), (('FRO40251', 'SNA80324'), 1412), (('DAI75645', 'FRO40251'), 1254), (('FRO40251', 'GRO85051'), 1213), (('DAI62779', 'GRO73461'), 1139)]


In [27]:
def naive_allpairs(basket): ## Takes a single order
  item_list = basket.strip().split() ## Splits items into a list
  appears_together = []
  for i, item1 in enumerate(item_list): ## For each (i, item) pair
    for j, item2 in enumerate(item_list): ## Compare to each other (i, item) pair
      if i < j: ## ensure each pair is unique per basket
        appears_together.append(sorted((item1, item2))) ## Say there's a relationship and a basket of (A, B) = (B, A)
  return appears_together

file = sc.textFile("browsing.txt")

baskets = file.flatMap(lambda line: naive_allpairs(line))

mapped_baskets = baskets.map(lambda x: ((x[0], x[1]), 1))
reduced_baskets = mapped_baskets.reduceByKey(lambda a, b: a + b)

dbg(reduced_baskets.takeOrdered(5, lambda x: -x[1]))

## Takes 14 seconds

[(('DAI62779', 'ELE17451'), 1592), (('FRO40251', 'SNA80324'), 1412), (('DAI75645', 'FRO40251'), 1254), (('FRO40251', 'GRO85051'), 1213), (('DAI62779', 'GRO73461'), 1139)]


## 1b) A-Priori All-Pairs
[30 points total]
The naive approach can be slow and disk/memory intensive for large sets of browsing data. Improve this by applying the two step A-priori algorithm.
1. Create an RDD from the lines in the file
2. [15 points for correct top 5 most frequent items] Map each line into its items and then perform an item count on the entire RDD (this is step 1 of the A-priori algorithm). If you take the top 5 most frequent items you should get:

  [('DAI62779', 6667), ('FRO40251', 3881), ('ELE17451', 3875), ('GRO73461', 3602), ('SNA80324', 3044)]

3. Filter the frequent items and keep only those with >= 100 occurrences. Collect these as a map and broadcast this map to every Spark worker (see the Spark documentation for broadcast).
4. We can now begin Step 2 of the A-priori algorithm. This now follows the same as the naive algorithm except that when we map each line into a list of all pairs of items in the basket we first check to ensure that both items in the pair occur in the broadcast map from Step 1 of the A-priori algorithm. This significantly decreases the number of pairs that we output from this step.
5. Reduce the pairs into pair counts
6. [15 points for correct top 5 most frequent pairs] Take the top 5 most frequent pairs and display them, you should still get:

  [(('DAI62779', 'ELE17451'), 1592), (('FRO40251', 'SNA80324'), 1412), (('DAI75645', 'FRO40251'), 1254), (('FRO40251', 'GRO85051'), 1213), (('DAI62779', 'GRO73461'), 1139)]


In [28]:
def priori_allpairs(basket): ## Takes a single order
  item_list = basket.strip().split() ## Splits items into a list
  appears_together = []
  for i, item1 in enumerate(item_list): ## For each (i, item) pair
    for j, item2 in enumerate(item_list): ## Compare to each other (i, item) pair
      if i < j and (item1 in broadcastedRDD.value and item2 in broadcastedRDD.value): ## ensure each pair is unique per basket and they're both in the most frequent items RDD
        appears_together.append(sorted((item1, item2))) ## Say there's a relationship and a basket of (A, B) = (B, A)
  return appears_together

file = sc.textFile("browsing.txt")

### A-PRIORI STEP ONE ###
items = file.flatMap(lambda line: [(item, 1) for item in line.strip().split()])
item_freq = items.reduceByKey(lambda a, b: a + b)

dbg(item_freq.takeOrdered(5, lambda x: -x[1]))

most_freq = item_freq.filter(lambda x: x[1] >= 100).collectAsMap()
broadcastedRDD = sc.broadcast(most_freq)
### ###

### A-PRIORI STEP TWO ###
baskets = file.flatMap(lambda line: priori_allpairs(line))

mapped_baskets = baskets.map(lambda x: ((x[0], x[1]), 1))
reduced_baskets = mapped_baskets.reduceByKey(lambda a, b: a + b)

dbg(reduced_baskets.takeOrdered(5, lambda x: -x[1]))
### ###

## Takes 7 seconds

[('DAI62779', 6667), ('FRO40251', 3881), ('ELE17451', 3875), ('GRO73461', 3602), ('SNA80324', 3044)]
[(('DAI62779', 'ELE17451'), 1592), (('FRO40251', 'SNA80324'), 1412), (('DAI75645', 'FRO40251'), 1254), (('FRO40251', 'GRO85051'), 1213), (('DAI62779', 'GRO73461'), 1139)]


# Question 2: Find interesting pair rules
[45 points: 15 for correct expressions for confidence score, 15 for correct expressions for interest score, 15 for ensuring the X->Y and Y->X rules are all computed so that the top 5 confidence and interesting rules are correct]


## 2a) Compute Confidence Score

Using the item pairs (X, Y) found in Part 1, compute the confidence score for the corresponding association rules X->Y and Y->X for each of them.

Here are the top 5 confidence score rules:

[(('DAI93865', 'FRO40251'), 1.0), (('GRO85051', 'FRO40251'), 0.9991762767710051), (('GRO38636', 'FRO40251'), 0.9906542056074765), (('ELE12951', 'FRO40251'), 0.9905660377358491), (('DAI88079', 'FRO40251'), 0.9867256637168142)]


In [46]:
# Using reduced_baskets for ((X, Y), n) tuples
# Using item_freq for (X, n) tuples

def confidence(x, y, xy_freq):
  x_freq = broadcastedRDD.value[x]
  return ((x, y), xy_freq/x_freq)

xy_confidence = reduced_baskets.map(lambda x: confidence(x[0][0], x[0][1], x[1]))
yx_confidence = reduced_baskets.map(lambda x: confidence(x[0][1], x[0][0], x[1]))

confidence_scores = xy_confidence.union(yx_confidence)
dbg(confidence_scores.takeOrdered(5, lambda x: -x[1]))

[(('DAI93865', 'FRO40251'), 1.0), (('GRO85051', 'FRO40251'), 0.999176276771005), (('GRO38636', 'FRO40251'), 0.9906542056074766), (('ELE12951', 'FRO40251'), 0.9905660377358491), (('DAI88079', 'FRO40251'), 0.9867256637168141)]


## 2b) Compute Interest Score

Then compute the interest score for each of them. Report the top 5. You will need to use maps, joins, and lambda expressions. Here are the top 5 interesting rules:

[(('DAI43868', 'SNA82528'), 0.9538739086342056), (('DAI93865', 'FRO40251'), 0.8752130156586605), (('GRO85051', 'FRO40251'), 0.8743892924296656), (('GRO38636', 'FRO40251'), 0.865867221266137), (('ELE12951', 'FRO40251'), 0.8657790533945096)]


In [45]:
def interest(x, y, confidence, total):
  # Interest(i -> J) = Confidence(i -> J) - Freq(J)/Total
  y_freq = broadcastedRDD.value[y]
  return((x, y), confidence - y_freq/total)

total_baskets = file.count()
interestRDD = confidence_scores.map(lambda x: interest(x[0][0], x[0][1], x[1], total_baskets))


dbg(interestRDD.takeOrdered(5, lambda x: -x[1]))

[(('DAI43868', 'SNA82528'), 0.9538739086342057), (('DAI93865', 'FRO40251'), 0.8752130156586605), (('GRO85051', 'FRO40251'), 0.8743892924296655), (('GRO38636', 'FRO40251'), 0.8658672212661371), (('ELE12951', 'FRO40251'), 0.8657790533945096)]
