# CS246 - Homework 1

## Question 2

### Association Rules

### Setup

Let's setup Spark on your Colab environment.  Run the cell below!

In [1]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 8.0 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 43.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=2a451e612c055fe1eb0a963d3af02c3fdc6f5fa729beb9e426596b1a76f53152
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0
The following additional packages will be installed:
  openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-

In [2]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials

# Authenticate and create the PyDrive client
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
drive = GoogleDrive(gauth)

Download the data

In [3]:
id='1NOJZTHn9U1DvJB9eci_Oyd_yGZ_C-Cvu'
downloaded = drive.CreateFile({'id': id})
downloaded.GetContentFile('browsing.txt')

###Evaluation of item sets: 
我们通常有如下几种手段来衡量item set的优劣。<br>
1 $Confidence$ 用来衡量当basket中已经含有A的时候，包含B的概率。
$$conf(A \rightarrow B) = Pr(B|A)$$
2 $lift$ 提升度指当销售一个物品时，另一个物品销售率会增加多少
$$lift(A \rightarrow B) = \frac{conf(A \rightarrow B)}{S(B)}$$where $S(B) = \frac{Support(B)}{N}$ <br>
3 $Conviction$ 
$$conv(A \rightarrow B) = \frac{1 - S(B)}{1 - conf(A \rightarrow B)}$$

Import library

In [4]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
from pyspark.mllib.fpm import FPGrowth
import pandas as pd
import itertools

# create the Spark Session
spark = SparkSession.builder.getOrCreate()

# create the Spark Context
sc = spark.sparkContext

In [5]:
data = sc.textFile("browsing.txt")
transactions = data.map(lambda line: list(set(line.strip().split(' '))))
model = FPGrowth.train(transactions, minSupport= 100 / transactions.count(), numPartitions=10)
result = model.freqItemsets()

In [10]:
result.take(3)

[FreqItemset(items=['SNA21851'], freq=120),
 FreqItemset(items=['DAI59609'], freq=350),
 FreqItemset(items=['GRO56989'], freq=655)]

通过直接调用FP-Growth算法，我们可以直接得到dataset对应的frequent sets。下面的代码将完成associate rules的挖掘。

In [13]:
result.filter(lambda line: len(line[0]) == 1).count()

647

为了挖掘associate rules，我们定义如下的数据结构。

In [11]:
from collections import Counter

class CounterDict:
    def __init__(self):
        self.data = {}

    def __setitem__(self, key, value):
        if isinstance(key, Counter):
            self.data[frozenset(key.items())] = value
        else:
            raise TypeError

    def __getitem__(self, key):
        if isinstance(key, Counter):
            return self.data[frozenset(key.items())]
        else:
            raise TypeError

def generate_hash_freq(freq_items):
    h = CounterDict()
    data = freq_items.map(lambda line: (line[0], line[1])).collect()
    # line : （[item1,item2], cnt）
    for line in data:
        h[Counter(line[0])] = line[1]

    return h


def associate_rules(freq_items, h):
    result = []
    for line in freq_items:
        for recommended in line[0]:
          # line[0]包含了一个bucket中的所有items，我们遍历basket，从中挑出一个item作为recommended，basket中其余的items作为item_set
          # 我们可以直接计算conf(recommended -> item_Set)
            items_set = sorted(list(set(line[0]) - set([recommended])))
            conf = h[Counter(line[0])] / h[Counter(items_set)] 
            result.append(((items_set, recommended), conf))

    return result


h = generate_hash_freq(result)  # h : dict 


我们首先filter所有二元的basket，即len(line[0]==2)，计算他们的conf。<br>
接着筛选出所有的三元basket，选择一个作为recommended,其余作为itemsets，计算对应的conf.

In [None]:
# {a} -> b
pairs = result.filter(lambda line: len(line[0]) == 2).map(lambda line: ((line[0][0], line[0][1]), line[1]))
# pairs : ((item1, item2), cnt)
double_rules = associate_rules(pairs.collect(), h)
sorted(double_rules, key=lambda tup: -tup[1])[:5]

[((['DAI93865'], 'FRO40251'), 1.0),
 ((['GRO85051'], 'FRO40251'), 0.999176276771005),
 ((['GRO38636'], 'FRO40251'), 0.9906542056074766),
 ((['ELE12951'], 'FRO40251'), 0.9905660377358491),
 ((['DAI88079'], 'FRO40251'), 0.9867256637168141)]

In [None]:
# {a, b} -> c
triples = result.filter(lambda line: len(line[0]) == 3).map(lambda line: ((line[0][0], line[0][1], line[0][2]), line[1]))
triple_rules = associate_rules(triples.collect(), h)
sorted(triple_rules, key=lambda tup: (-tup[1], tup[0]))[:5]

[((['DAI23334', 'ELE92920'], 'DAI62779'), 1.0),
 ((['DAI31081', 'GRO85051'], 'FRO40251'), 1.0),
 ((['DAI55911', 'GRO85051'], 'FRO40251'), 1.0),
 ((['DAI62779', 'DAI88079'], 'FRO40251'), 1.0),
 ((['DAI75645', 'GRO85051'], 'FRO40251'), 1.0)]