In [4]:
from pyspark import SparkContext, SparkConf
import json
import csv

In [5]:
appName = "frequent-itemset-mining"
master = "local[*]"

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")

In [6]:
review_file_path  = '/Users/ZhengYang/Documents/yelp_dataset/review.json'
business_file_path  = '/Users/ZhengYang/Documents/yelp_dataset/business.json'

review_rdd = sc.textFile(review_file_path)
business_rdd = sc.textFile(business_file_path)

In [12]:
# JOIN review.json and business.json by business_id, also filter state == 'NV' 
# uncomment to generate 'usr_biz.csv'
"""
biz_NV = business_rdd \
    .filter(lambda x: json.loads(x)['state'] == 'NV') \
    .map(lambda x: json.loads(x)['business_id']) \
    .collect()
    
usr2biz_NV = review_rdd \
    .map(lambda x: [json.loads(x)['user_id'], json.loads(x)['business_id']]) \
    .filter(lambda x: x[1] in biz_NV).collect()

with open("usr_biz.csv", "w") as f:
    writer = csv.writer(f)
    writer.writerows(usr2biz_NV) 
"""

In [22]:
def split_to_usr_biz(x):
    pair = x.split(',')
    return str(pair[0]), [str(pair[1])]


def apriori(iterator):
    count = {}
    result = []
    baskets = list(iterator)
    
    partition_support = (len(baskets) * float(support)) / total_basket

    for basket in baskets:
        for item in basket[1]:
            count[item] = count.get(item, 0) + 1

    singletons = set([item for item in count if count[item] >= partition_support])
    true_frequent = set([frozenset([item]) for item in singletons])
    
    size = 2
    while len(true_frequent) > 0:
        result += true_frequent
        candidates = set([a.union(b) for a in true_frequent for b in true_frequent if len(a.union(b)) == size])
        count = {}
        true_frequent = set()
        for basket in baskets:
            for candidate in candidates:
                if candidate.issubset(basket[1]):
                    count[candidate] = count.get(candidate, 0) + 1
        size += 1
        true_frequent = set([item for item in count if count[item] >= partition_support])
        
    return result


def full_pass(iterator):
    baskets = list(iterator)
    count = {}
    for candidate in candidates.value:
        for basket in baskets:
            if candidate.issubset(basket[1]):
                count[candidate] = count.get(candidate, 0) + 1
    return count.items()

In [25]:
support = 50
threshold = 70 # only consider the baskets which have more than 70 items

rdd_data = sc.textFile('usr_biz.csv')

In [26]:
baskets = rdd_data \
    .map(lambda x: split_to_usr_biz(x)) \
    .reduceByKey(lambda a, b: a + b) \
    .filter(lambda x: len(x[1]) > threshold) \
    .map(lambda x: (x[0], set(x[1])))

total_basket = baskets.count()

# SON algorithm (2-Phase Map-Reduce)
# 1st Map-Reduce Phase: Generate possible candiates in each local chunk
first_mapper_output = baskets \
    .mapPartitions(apriori) \
    .map(lambda x: (x, 1))

first_reducer_output = first_mapper_output \
    .reduceByKey(lambda a, b: a) \
    .map(lambda x: x[0]) 

candidates = sc.broadcast(first_reducer_output.collect())

# 2nd Map-Reduce Phase: Find ALL Frequent Itemsets (Eliminate False Postives)
second_mapper_output = baskets \
    .mapPartitions(full_pass) 

second_reducer_output = second_mapper_output \
    .reduceByKey(lambda a, b : a + b) \
    .filter(lambda x: x[1] >= support) \
    .collect()


In [49]:
frequent_itemsets = [(sorted(item[0]), item[1]) for item in second_reducer_output]
frequent_itemsets.sort(key = lambda s: (len(s[0]), s[0]))

frequent_singletons = [item for item in frequent_itemsets if len(item[0]) == 1]
frequent_pairs = [item for item in frequent_itemsets if len(item[0]) == 2]
frequent_triplets = [item for item in frequent_itemsets if len(item[0]) == 3]

In [51]:
print("frequent singletons: %d \n" % len(frequent_singletons))
print("frequent pairs: %d \n" % len(frequent_pairs))
print("frequent triplets: %d" % len(frequent_triplets))
print(*frequent_triplets, sep='\n')

frequent singletons: 1312 

frequent pairs: 1515 

frequent triplets: 13
(['4JNXUYY8wbaaDmk3BPzlWw', 'K7lWdNUhCbcnEvI0NhGewg', 'RESDUcs7fIiihp38-d6_6g'], 53)
(['4JNXUYY8wbaaDmk3BPzlWw', 'RESDUcs7fIiihp38-d6_6g', 'iCQpiavjjPzJ5_3gPD5Ebg'], 52)
(['4k3RlMAMd46DZ_JyZU0lMg', '7sPNbCx7vGAaH7SbNPZ6oA', 'JyxHvtj-syke7m9rbza7mA'], 52)
(['4k3RlMAMd46DZ_JyZU0lMg', 'JyxHvtj-syke7m9rbza7mA', 'UPIYuRaZvknINOd1w8kqRQ'], 70)
(['4k3RlMAMd46DZ_JyZU0lMg', 'JyxHvtj-syke7m9rbza7mA', 'W8apgXmOxESpoL_EeogC5w'], 50)
(['4k3RlMAMd46DZ_JyZU0lMg', 'UPIYuRaZvknINOd1w8kqRQ', 'W8apgXmOxESpoL_EeogC5w'], 51)
(['7sPNbCx7vGAaH7SbNPZ6oA', 'JyxHvtj-syke7m9rbza7mA', 'UPIYuRaZvknINOd1w8kqRQ'], 53)
(['A5Rkh7UymKm0_Rxm9K2PJw', 'BxKe9Xt_fN6qBzrTofHuEQ', 'FaHADZARwnY4yvlvpnsfGA'], 50)
(['A5Rkh7UymKm0_Rxm9K2PJw', 'BxKe9Xt_fN6qBzrTofHuEQ', 'gy-HBIeJGlQHs4RRYDLuHw'], 51)
(['A5Rkh7UymKm0_Rxm9K2PJw', 'FaHADZARwnY4yvlvpnsfGA', 'gy-HBIeJGlQHs4RRYDLuHw'], 51)
(['IMLrj2klosTFvPRLv56cng', 'igHYkXZMLAc9UdV5VnR_AA', 'qqs7LP4TXAoOrSlaKRfz3A

In [52]:
"""
    TODO: Analyze Frequent Itemsets/
    compare
"""

'\n    TODO: Analyze Frequent Itemsets/\n    compare\n'