In [3]:
import os.path
import pandas as pd

# Useful function for printing rules out in readable format
products = pd.read_csv(os.path.join('..', 'resource', 'asnlib', 'publicdata', 'products.csv.bz2'))
product_table = dict(products[['product_id', 'product_name']].to_dict('tight')['data'])

def print_rules(rules):
    for LHS, RHS in rules:
        if isinstance(LHS, int):
            l_string = f'{LHS}'
        else:
            l_string = f'{", ".join([product_table[p_id] for p_id in LHS])}'
        if isinstance(RHS, int):
            r_string = f'{RHS}'
        else:
            r_string = f'{", ".join([product_table[p_id] for p_id in RHS])}'
        print(f'{l_string} -> {r_string}')


In [9]:
import itertools


def map1(key, value, itemsets):
    # Emit every product from every order
    for product in value:
        yield product, 1

def reduce1(key, value):
    # Emit every product that appears more than once
    count = sum(value)
    if count > 5:
        yield (key,), count

def map2(key, value, itemsets):
    # Emit every pair of products in this order where both products are in the frequent itemsets
    for pair in itertools.combinations([product for product in sorted(value) if (product,) in itemsets], 2):
        yield pair, 1

def reduce2(key, value):
    # Emit every pair of products that appears more than once
    count = sum(value)
    if count > 5:
        yield key, count

# Define the sequence of functions to be run by the MapReduce "platform"
stages = [map1, reduce1, map2, reduce2]

# Transform frequent itemsets into association rules

def itemsets2rules(itemsets):
    rules = []
    for itemset, value in itemsets.items():
        if len(itemset) > 1:
            # Puts last item on RHS, everything else on LHS
            item_list = list(itemset)
            rules.append((set(item_list[:-1]), set(item_list[-1:])))
    return rules

def prune_rules(rules, max_length=2):
    pruned_rules = [rule for rule in rules if len(rule[0]) + len(rule[1]) <= max_length]
    
    return pruned_rules

def precision_rules(itemsets):
    rules = itemsets2rules(itemsets)
    pruned_rules = prune_rules(rules)
    
    return pruned_rules

def recall_rules(itemsets):
    rules = itemsets2rules(itemsets)
    pruned_rules = prune_rules(rules)
    
    return pruned_rules




In [10]:
# Reads in the data file to be used to extract the association rules
import time

product_orders = pd.read_csv(os.path.join('..', 'resource', 'asnlib', 'publicdata', 'order_products__prior.csv.bz2'),
                             nrows=1000001)

baskets = product_orders.groupby(['order_id'])['product_id']
itemsets = {}
print(f'Using {len(baskets):,} orders')

assert len(stages) % 2 == 0, 'There should be an even number of stages (i.e., matching pairs of map and reduce)'

# Serialized MapReduce
map_phase = True
steps = 0
start_whole = time.time()
for stage_num, worker_fun in enumerate(stages):
    start = time.time()
    if map_phase:
        map_out = {}
        for order_id, products in baskets:
            for key, value in worker_fun(order_id, list(products), itemsets):
                # Accumulate list of values for each key
                try:
                    map_out[key].append(value)
                except KeyError:
                    map_out[key] = [value]
                steps += 1
        print(f'Keys output by mapper: {len(map_out):,} [steps: {steps:,}]')
    else:
        for key, value in map_out.items():
            for new_key, new_value in worker_fun(key, value):
                itemsets[new_key] = new_value
                steps += 1
        print(f'Frequent itemsets: {len(itemsets):,} [steps: {steps:,}]')
    stage_time = round(time.time()-start)
    print(f'{"Map" if map_phase else "Reduce"} {stage_num//2+1} took {stage_time:d} seconds')
    map_phase = not map_phase
# Generate rules
rules_p = precision_rules(itemsets)
rules_r = recall_rules(itemsets)
duration = int(round(time.time()-start_whole))
print(f'Rules created (precision): {len(rules_p):,}')
print(f'Rules created (recall): {len(rules_r):,}')
print(f'MapReduce total time={duration:,} s, steps={steps:,}')


Using 99,260 orders
Keys output by mapper: 35,098 [steps: 1,000,001]
Map 1 took 3 seconds
Frequent itemsets: 15,357 [steps: 1,015,358]
Reduce 1 took 0 seconds
Keys output by mapper: 3,477,858 [steps: 7,771,545]
Map 2 took 13 seconds
Frequent itemsets: 153,959 [steps: 7,910,147]
Reduce 2 took 2 seconds
Rules created (precision): 138,602
Rules created (recall): 138,602
MapReduce total time=19 s, steps=7,910,147


In [11]:
def check_rules(rules):
    """Make sure rules are well-formed"""
    for i, rule in enumerate(rules):
        assert len(rule) == 2, f'Rule {rule} is not a list/tuple of length 2 (i.e., LHS and RHS)'
        LHS, RHS = rule
        if isinstance(LHS, int):
            LHS = set(LHS)
        if isinstance(RHS, int):
            RHS = set(RHS)
        rules[i] = (set(LHS), set(RHS))
        overlap = rules[i][0] & rules[i][1]
        assert len(overlap) == 0, f'Overlapping LHS and RHS: {", ".join(sorted(overlap))}'
    
def evaluate_rules(rules, baskets):
    rule_firings = 0
    tp = fp = 0
    for order, product_series in baskets:
        # Do any rules match?
        product_set = set(product_series)
        predictions = set()
        for LHS, RHS in rules:
            is_fired = len(product_set & LHS) == len(LHS)
            if is_fired:
                # Rule fires
                rule_firings += 1
                predictions |= RHS
        # Predicted items that appear in order
        tp += len(predictions & product_set)
        # Predicted items that do not appear in order
        fp += len(predictions - product_set)
    print(f'Rules fired {rule_firings:,} times')
    return tp/len(baskets), fp/len(baskets)

check_rules(rules_p)
check_rules(rules_r)
# Test rules on a separate data set
test_orders = pd.read_csv(os.path.join('..', 'resource', 'asnlib', 'publicdata', 'order_products__train.csv.bz2'),
                          nrows=9998)
baskets = test_orders.groupby(['order_id'])['product_id']
products = set(test_orders['product_id'].unique())
print(f'Testing {len(baskets):,} orders, spanning {len(products):,} products')

tp_rate_p, fp_rate_p = evaluate_rules(rules_p, baskets) 
print(f'Precision-oriented rules have a TP rate of {tp_rate_p:.02f}/order, and a FP rate of {fp_rate_p:.02f}/order')
tp_rate_r, fp_rate_r = evaluate_rules(rules_r, baskets) 
print(f'Recall-oriented rules have a TP rate of {tp_rate_r:.02f}/order, and a FP rate of {fp_rate_r:.02f}/order')


Testing 938 orders, spanning 4,660 products
Rules fired 1,996,118 times
Precision-oriented rules have a TP rate of 5.24/order, and a FP rate of 1198.12/order
Rules fired 1,996,118 times
Recall-oriented rules have a TP rate of 5.24/order, and a FP rate of 1198.12/order
