In [1]:
# import libraries
from collections import defaultdict
import pandas as pd
import numpy as np

# defines the mapreduce function
def mapreduce(mapper, reducer, data):
    # map phase
    mdata = [pair for item in data for pair in mapper(item)]

    # shuffles and sorts
    sdata = defaultdict(list)
    for key, value in mdata:
        sdata[key].append(value)
    sitems = sdata.items()

    # reduces the data
    rdata = [reducer(item) for item in sitems]
    rdata = [x for x in rdata if x is not None]  # Drop None pairs

    return rdata, [len(item) for item in data], len(mdata), len(sdata)


In [2]:
# defines the mapper function for word counting
def wcmapper(data):
    key_value_pairs = []
    for line in data:
        words = line.split()
        for word in words:
            key_value_pairs.append((word.lower(), 1))  # Convert to lowercase for uniformity
    return key_value_pairs

# Define the reducer function for word counting
def wcreducer(key_value):
    key, values = key_value
    return (key, sum(values))  # Sum all counts for each word


In [3]:
# sample input data
input_texts = [
    "Hello world",
    "MapReduce is powerful",
    "Hello world and MapReduce"
]

# mapreduce with input as a single chunk
result_singlechunk, _, _, _ = mapreduce(
    lambda data: wcmapper(data),
    lambda item: wcreducer(item),
    [input_texts]
)

# mapreduce with input split into multiple chunks
result_multchunks, _, _, _ = mapreduce(
    lambda data: wcmapper(data),
    lambda item: wcreducer(item),
    [[line] for line in input_texts]  # splits single-line chunks
)

# displays results
print("Single Chunk Result:", result_singlechunk)
print("Multiple Chunks Result:", result_multchunks)


Single Chunk Result: [('hello', 2), ('world', 2), ('mapreduce', 2), ('is', 1), ('powerful', 1), ('and', 1)]
Multiple Chunks Result: [('hello', 2), ('world', 2), ('mapreduce', 2), ('is', 1), ('powerful', 1), ('and', 1)]


In [4]:
# loads in data
basket_data = pd.read_csv("basket.csv", header=None)

# flattens the dataset into array and count occurrences of each item
all_items = basket_data.values.flatten()  
all_items = pd.Series(all_items).dropna()  # Drop nan values
all_items = all_items[all_items.str.isalpha()]

# frequency of the items
item_counts = all_items.value_counts()

# 10 most and least common items
mostcitems = item_counts.head(10)
leastcitems = item_counts.tail(10)

# results
print("Most Common Items:")
print(mostcitems)

print("\nLeast Common Items:")
print(leastcitems)


Most Common Items:
soda           1514
yogurt         1334
sausage         924
pastry          785
newspapers      596
frankfurter     580
pork            566
butter          534
beef            516
curd            514
Name: count, dtype: int64

Least Common Items:
syrup          21
soap           20
prosecco       19
cookware       17
honey          13
cream          12
liqueur         9
decalcifier     9
whisky          8
bags            4
Name: count, dtype: int64


In [5]:
from mlxtend.preprocessing import TransactionEncoder

# converts each row to a list of items
basketlist = basket_data.apply(lambda row: row.dropna().tolist(), axis=1).tolist()

# one-hot encode the data
te = TransactionEncoder()
basketoneh = te.fit_transform(basketlist)
basketoneh_df = pd.DataFrame(basketoneh, columns=te.columns_)



In [6]:
from mlxtend.frequent_patterns import fpgrowth

# different thresholds
thresholds = [0.01, 0.005, 0.001]  # range of thresholds

for support in thresholds:
    frequent_itemsets = fpgrowth(basketoneh_df, min_support=support, use_colnames=True)
    print(f"\nSupport Threshold: {support}")
    print(f"Number of Frequent Itemsets: {len(frequent_itemsets)}")
    print(frequent_itemsets.head())



Support Threshold: 0.01
Number of Frequent Itemsets: 69
    support       itemsets
0  0.157912   (whole milk)
1  0.051724       (pastry)
2  0.018778  (salty snack)
3  0.085873       (yogurt)
4  0.060345      (sausage)

Support Threshold: 0.005
Number of Frequent Itemsets: 126
    support       itemsets
0  0.157912   (whole milk)
1  0.051724       (pastry)
2  0.018778  (salty snack)
3  0.085873       (yogurt)
4  0.060345      (sausage)

Support Threshold: 0.001
Number of Frequent Itemsets: 750
    support       itemsets
0  0.157912   (whole milk)
1  0.051724       (pastry)
2  0.018778  (salty snack)
3  0.085873       (yogurt)
4  0.060345      (sausage)


In [7]:
from mlxtend.frequent_patterns import association_rules

# uses fpgrowth to compute frequent itemsets with a min support of 0.0005
frequentitemsets = fpgrowth(basketoneh_df, min_support=0.0005, use_colnames=True)

numis = len(frequentitemsets)

# generates association rules with min conf of 0.5
rules = association_rules(frequentitemsets, metric="confidence", min_threshold=0.25, num_itemsets=numis)

# filters rules where the antecedent or consequent has at least 2 items
filtrules = rules[(rules['antecedents'].apply(lambda x: len(x) >= 2)) | (rules['consequents'].apply(lambda x: len(x) >= 2))]

# display the resulting association rules
print("Association Rules with at least 2 items in antecedents or consequents:")
print(filtrules)


Association Rules with at least 2 items in antecedents or consequents:
                 antecedents         consequents  antecedent support  \
0          (sausage, yogurt)        (whole milk)            0.005747   
1  (rolls/buns, white bread)        (whole milk)            0.002138   
2   (sausage, shopping bags)  (other vegetables)            0.001938   
4            (sausage, pork)        (whole milk)            0.001537   

   consequent support   support  confidence      lift  representativity  \
0            0.157912  0.001470    0.255814  1.619975               1.0   
1            0.157912  0.000601    0.281250  1.781052               1.0   
2            0.122093  0.000535    0.275862  2.259442               1.0   
4            0.157912  0.000601    0.391304  2.477985               1.0   

   leverage  conviction  zhangs_metric   jaccard  certainty  kulczynski  
0  0.000563    1.131555       0.384919  0.009065   0.116261    0.132562  
1  0.000264    1.171600       0.439474  0.00

In [8]:
from mlxtend.frequent_patterns import fpgrowth

# mapper function for frequent itemsets
def frequentismapper(data_chunk):
    # computes frequent itemsets on the chunk
    frequentis = fpgrowth(data_chunk, min_support=0.0005, use_colnames=True)
    
    # generates key,value pairs
    kvpairs = []
    for _, row in frequentis.iterrows():
        itemset = frozenset(row['itemsets'])  # key: itemset as a frozenset
        cnt = row['support'] * len(data_chunk)  # value: count derived from support
        kvpairs.append((itemset, cnt))
    return kvpairs


In [9]:
# reducer function
def frequentisreducer(key_value):
    key, values = key_value  # key: itemset, values: list of counts
    ts = sum(values)
    if ts >= 0.0005:  # checks threshold
        return (key, ts)  #returns itemset and total support
    return None  # or excludes ones below the threshold


In [None]:
import numpy as np

# defines splits 
splits = [2, 4, 8, 16]
moutputs = []
skeys = []

for nchunks in splits:
    # Split the one-hot encoded DataFrame into chunks
    chunkeddata = np.array_split(basketoneh_df, nchunks)
    
    # Run MapReduce on the chunks
    _, mapper_lengths, lenm, lens = mapreduce(
        lambda data: frequentismapper(data),
        lambda item: frequentisreducer(item),
        chunkeddata
    )
    
    # Store results
    moutputs.append(lenm)
    skeys.append(lens)

# verifies consistency with direct computation
frequent_itemsets_full = fpgrowth(basketoneh_df, min_support=0.0005, use_colnames=True)
print(f"Number of Frequent Itemsets from Full Data: {len(frequent_itemsets_full)}")


In [None]:
import matplotlib.pyplot as plt

# Plot results
plt.figure(figsize=(8, 5))
plt.plot(splits, mapper_outputs, label='Mapper Outputs', marker='o')
plt.plot(splits, shuffled_keys, label='Distinct Keys', marker='x')
plt.xlabel('Number of Splits')
plt.ylabel('Count')
plt.title('Mapper Outputs vs. Distinct Keys')
plt.legend()
plt.show()
