In [1]:
import findspark
findspark.init()

from pyspark import SparkContext
import json
import time
import collections
from operator import add

start = time.time()

global_threshold = 50
filter_threshold = 70
output_file_path = r'task2Output.txt'
input_file_path = r'user_business.csv'

sc = SparkContext.getOrCreate()
partition_num = 4

def turnStr2Pair(pairStr):
    return (pairStr.split(',')[0], pairStr.split(',')[1])
    
# read csv file and generate pairs
with open(input_file_path) as f:
    rawStrList = f.readlines()[1:]
    pairList = [pair.split('\n')[0] for pair in rawStrList]
    f.close()

qualifiedUsersRDD = sc.parallelize(pairList, partition_num).map(lambda pairStr:turnStr2Pair(pairStr))\
                    .groupByKey().mapValues(lambda iterable:set(iterable))\
                    .filter(lambda user_bus_pair:len(user_bus_pair[1])>filter_threshold)\
                    .map(lambda pair: pair[1])

In [2]:
def generateFrequentSingletons(chunk_list, local_threshold):
    counter_dict = dict()
    frequent_singletons = list()   
    candidate = list()
    
    for basket in chunk_list:
        for item in basket:
            if item not in counter_dict.keys():
                counter_dict[item] = 1
            else:
                counter_dict[item] += 1
    candidate = counter_dict.keys()
   
    for key, value in counter_dict.items():
        if value >= local_threshold: 
            frequent_singletons.append(key)
    return frequent_singletons, sorted(candidate)

def countFrequentItemsets(candidate_list, chunk_list, threshold):
    counter_dict = collections.defaultdict(int)
    # chunk_list is list of set
    for basket in chunk_list:
        for candidate in candidate_list:
            if set(candidate).issubset(basket):
                counter_dict[candidate] += 1
    
    frequent_dict = dict(filter(lambda pair: pair[1] >= threshold, counter_dict.items()))
   
    return list(frequent_dict.keys()) # list of tuple

def generateKTupleItemsets(frequent_list, k):
    candidatesList = set()
    for i in range(len(frequent_list)):
        for j in range(i+1, len(frequent_list)):
            item1 = frequent_list[i]
            item2 = frequent_list[j]
            cand = set(item1).union(set(item2))
            if len(cand)==k:
                candidatesList.add(tuple(sorted(cand)))
    return candidatesList
        
def apriori(chunk, threshold, full_size):    
    # chunk_list is needed here because chunk is of type TraversableOnce, which will be empty after calling list
    chunk_list = list(chunk) 
    chunk_size = len(chunk_list)
    # determine local(chunk) threshold, 0.9 is the scaling factor used to reduce false positives
    local_threshold = (chunk_size/full_size) * threshold
    if local_threshold < 1:
        local_threshold = 1
    
    result_candidate_itemsets = list() # contains all k-tuple itemsets as result
    true_frequent_k_itemset_list = list() # contains the true frequent itemsets used to generate candidate for next k in apriori
    next_k_candidate_list = list() # contains candidate of next k-tuple itemsets, used to decide if there are any candidate left 
    k_index = 1
    
    # generate true frequent singletons
    # true_frequent_k_itemset_list is now the frequent singletons
    # next_k_candidate_list is now the singleton candidates
    
    genSingleStart = time.time()
    true_frequent_k_itemset_list, next_k_candidate_list = generateFrequentSingletons(chunk_list, local_threshold)
    
    while len(next_k_candidate_list)!=0: 
        k_index += 1 #generate 2-tuple candidates, 3-tuple, 4-tuple, etc 
        
        if k_index==2: 
            
            result_candidate_itemsets.append([(single,) for single in true_frequent_k_itemset_list]) #append candidate k
            next_k_candidate_list = generateKTupleItemsets([(single,) for single in true_frequent_k_itemset_list], 2) # true k-1 to candidate k
        else:
            true_frequent_k_itemset_list = countFrequentItemsets(next_k_candidate_list, chunk_list, local_threshold) #cand k to true k
            result_candidate_itemsets.append(true_frequent_k_itemset_list) #append candidate k
            next_k_candidate_list = generateKTupleItemsets(true_frequent_k_itemset_list, k_index) # true k to candidate k+1
            
    yield result_candidate_itemsets

In [3]:
full_size = qualifiedUsersRDD.count()

rawCandidates = qualifiedUsersRDD.mapPartitions(lambda partition:apriori(partition, global_threshold, full_size))
candidatesResultRDD = rawCandidates.flatMap(lambda x:x).flatMap(lambda x:x)\
                                .distinct().sortBy(lambda pairs: (len(pairs), pairs))
candidatesResult = candidatesResultRDD.collect()

In [4]:

def countItemsets(cand, busResult):
    # busResult is list of set
    count = 0
    for bus in busResult:
        if set(cand).issubset(bus):
            count+=1
    return (cand, count)

busResult = qualifiedUsersRDD.collect()
frequentItemsets = candidatesResultRDD.map(lambda cand: countItemsets(cand, busResult))\
                              .filter(lambda itemset: itemset[1]>=global_threshold)\
                              .sortBy(lambda pair: (len(pair[0]), pair[0]))\
                              .map(lambda pair: pair[0])

frequentItemsetsResult = frequentItemsets.collect()

In [5]:
outFile = open(output_file_path, 'w')
outFile.write('Candidates: \n')
x_tuple_candidates_str = ''
x_length = 1
current_index = 0
for candidate in candidatesResult:
    if x_length == len(candidate):
        if x_length==1:
            x_tuple_candidates_str += '(\'' + candidate[0] + '\'),'
        else:
            x_tuple_candidates_str += str(candidate) + ','
    else: 
        x_tuple_candidates_str = x_tuple_candidates_str[:-1]
        x_tuple_candidates_str += '\n\n'
        outFile.write(x_tuple_candidates_str)
        x_tuple_candidates_str = str(candidate) + ',' 
        x_length += 1
    current_index += 1
    if current_index==len(candidatesResult): #or at EOF
        x_tuple_candidates_str = x_tuple_candidates_str[:-1]
        outFile.write(x_tuple_candidates_str)
        
outFile.write('\n\nFrequent Itemsets: \n')
x_tuple_frequent_str = ''
x_length = 1
current_index = 0
for frequentItemset in frequentItemsetsResult:
    if x_length == len(frequentItemset):
        if x_length==1:
            x_tuple_frequent_str += '(\'' + frequentItemset[0] + '\'),'
        else:
            x_tuple_frequent_str += str(frequentItemset) + ','
    else: 
        x_tuple_frequent_str = x_tuple_frequent_str[:-1]
        x_tuple_frequent_str += '\n\n'
        outFile.write(x_tuple_frequent_str)
        x_tuple_frequent_str = str(frequentItemset) + ',' 
        x_length += 1
    current_index += 1
    if current_index==len(frequentItemsetsResult): #or at EOF
        x_tuple_frequent_str = x_tuple_frequent_str[:-1]
        outFile.write(x_tuple_frequent_str)
    
outFile.close()

print("Duration: %d" % (time.time() - start))

Duration: 244
