In [23]:
import pandas as pd
import numpy as np
import json
import os
import time
import pickle

In [27]:
def to_pkl(data, filename):
    file = open(filename, 'wb')
    pickle.dump(data, file)

def from_pkl(filename):
    file = open(filename, 'rb') 
    return pickle.load(file)

In [2]:
from nltk.tokenize import word_tokenize
from nltk import pos_tag
from sklearn.feature_extraction import _stop_words
from nltk.stem import WordNetLemmatizer
import string

MAX_LENGTH = 10
def preprocess_str(review):
    '''
    Function to preprocess one string into list of tokens
    '''
    review = review.lower()
    review = review.translate(str.maketrans("", "",string.punctuation))
    review = review.translate(str.maketrans('', '', string.digits))
    review = review.strip()
    
    tokens = word_tokenize(review)
    custom_stop_words =['just', 'wasnt', 'didnt', 'went', 'came',]
    stop_words = frozenset().union(_stop_words.ENGLISH_STOP_WORDS, custom_stop_words)
    tokens = [i for i in tokens if not i in stop_words]
    tokens = [word for word in tokens if len(word)>=3]
    
    tagged_tokens = pos_tag(tokens)
    # Extract adjectives from the tagged tokens
    adjectives = [word for word, tag in tagged_tokens if tag == 'JJ']
    
    lemma = WordNetLemmatizer()
    tokens = [lemma.lemmatize(word, pos="a") for word in adjectives]
    
    res = list(set(tokens))
    res.sort()
    return res[:MAX_LENGTH]

In [3]:
from joblib import Parallel, delayed
from concurrent.futures import ThreadPoolExecutor
import concurrent
from itertools import islice
from collections import defaultdict
import multiprocessing
# from mlxtend.preprocessing import TransactionEncoder
import csv
from collections import defaultdict
from ast import literal_eval

NUM_CHUNKS = 0
CHUNK_SIZE = 70_000

def get_chunk_iter_raw(filepath=None):
    if filepath is None:
        filepath = "yelp_academic_dataset_review.csv"
    chunk_iter = pd.read_csv(filepath, chunksize=CHUNK_SIZE)
    if NUM_CHUNKS>0:
        selected_chunks = list(islice(chunk_iter, NUM_CHUNKS))
        chunk_iter = selected_chunks
    return chunk_iter

def for_csv(column_value):
    if column_value is None:
        print(column_value)
        return "[',']"
    
    if len(column_value) == 1:
        column_value.append("")
        
    return "['" + "','".join(column_value) + "']"
    
def chunk__save_preprocessed(idx, chunk, skip=True):
    '''
    Preprocesses the text column into lists of list of word tokens
    Then writes this to a csv. 
    (All the CSVs will be combined in the outer function)
    '''
    output_file = f'yelp_temp_{idx}.csv'
    
    if skip and os.path.isfile(output_file):
        # skip preprocessing if already done before
        print(f"skipping pre-processing for chunk {idx}")
        return len(get_list_of_lists(idx)) 
    
    list_of_lists = Parallel(n_jobs=4)(delayed(preprocess_str)(line) for line in chunk['text'])
    list_of_lists = [l for l in list_of_lists if l]
    df = pd.DataFrame({'text':list_of_lists})
    df = df.dropna()
    df['text'] = df['text'].apply(for_csv)
    df.to_csv(output_file, index=False)
    print(f"completed pre-processing for chunk {idx}")
    return len(list_of_lists)

def process_all():
    '''
    Preprocessing function to be called once before apriori algorithm
    '''
    chunk_iter = get_chunk_iter_raw()
    dataset_length = 0
    chunk_iter_len = len(list(get_chunk_iter_raw()))
    print("------------------PREPROCESSING ------------------")
    # for idx, chunk in enumerate(chunk_iter):
    #     dataset_length+= chunk__save_preprocessed(idx, chunk)
        
    num_threads = multiprocessing.cpu_count()
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        chunk_lengths = list(executor.map(chunk__save_preprocessed,
                                    range(chunk_iter_len), 
                                    chunk_iter), 
                       )

    dataset_length = sum(chunk_lengths)
    # combine_files(chunk_iter_len)
    return dataset_length, chunk_iter_len
    
def combine_files(chunk_iter_len):
    output_file_path = 'joined_file.csv'

    with open(output_file_path, 'w', newline='') as output_file:
        output_file.truncate() # ensure it is empty file
        csv_writer = csv.writer(output_file)
        
        csv_files = [f'yelp_temp_{i}.csv' for i in range(chunk_iter_len)]

        for csv_idx, csv_file in enumerate(csv_files):
            print(csv_idx)
            
            with open(csv_file, 'r') as input_file:
                csv_reader = csv.reader(input_file)
                for row_idx, row in enumerate(csv_reader):
                    if (csv_idx>0) and (row_idx==0): 
                        pass # not first csv file, but first row 
                    else:
                        csv_writer.writerow(row)

 
def get_list_of_lists(idx, chunk=None):
    if chunk is None: 
        df = pd.read_csv(f"yelp_temp_{idx}.csv", converters={'text': literal_eval})
        return df.text.to_list()
    else: 
        return chunk['text'].apply(literal_eval).tolist()

def get_chunk_iter_processed():
    chunk_iter = pd.read_csv("frequent_itemsets.csv", chunksize=CHUNK_SIZE)
    if NUM_CHUNKS>0:
        selected_chunks = list(islice(chunk_iter, NUM_CHUNKS))
        chunk_iter = selected_chunks
    return chunk_iter        
               
def chunk__get_all_unique_items(idx, chunk=None):
    chunk_len_1_itemsets = defaultdict(int)
    start = time.time()
    list_of_lists = get_list_of_lists(idx, chunk)
    # if idx%5==0: 
    #     print (f'retrieved chunk {idx}:\t {round(time.time() - start,2)}')
    
    start = time.time()
    for row in list_of_lists:   
        for item in sorted(set(row)):  
            chunk_len_1_itemsets[item] +=1 # if new item, default = 0 
        # existing = set(chunk_len_1_itemsets.keys())    
        # new_items = set(row).difference(existing)
        # initial_counts = np.zeros(len(new_items))
        # chunk_len_1_itemsets.update(dict(zip(list(new_items), initial_counts)))
        # reappeared = set(row).intersection(existing)
        # chunk_len_1_itemsets.update({item: chunk_len_1_itemsets[item] + 1 for item in reappeared})
        
    if idx%50==0:
        print(f'{len(chunk_len_1_itemsets)} unique items found in chunk {idx}. Took {round(time.time() - start,2)}')
    return chunk_len_1_itemsets

def combine_dictionaries(dicts):
    combined_dict = defaultdict(list)
    for my_dict in dicts:
        for key, value in my_dict.items():
            combined_dict[key].append(value)

    print(f"{len(combined_dict)} unique items")
    return {key: sum(values) for key, values in combined_dict.items()}
        

def get_all_unique_items(chunk_iter_len, chunk_iter = None):
    '''
    This function called only for the first run. 
    Scan database once to get all unique items 
    And also to count the support for these items 
    '''

    num_threads = multiprocessing.cpu_count()
    print("------------------FIRST RUN------------------")
    
    if chunk_iter is None: 
        with ThreadPoolExecutor(max_workers=num_threads) as executor:

            results = list(executor.map(chunk__get_all_unique_items,
                                        range(chunk_iter_len), 
                                        ), 
                        )
    else : 
        with ThreadPoolExecutor(max_workers=num_threads) as executor:

            results = list(executor.map(chunk__get_all_unique_items,
                                        range(chunk_iter_len), 
                                        chunk_iter
                                        ), 
                        )     
    
    combined_dict = combine_dictionaries(results)
    
    return combined_dict
    

    # for idx, chunk in enumerate(data_chunks): 
    #     start = time.time()
    #     list_of_lists = Parallel(n_jobs=4)(delayed(preprocess_str)(line) for line in chunk['text'])
    #     print (f'chunk {idx+1}:\t', round(time.time() - start,2))
        
    #     # list_of_lists = [row.dropna().tolist() for index, row in chunk.iterrows()]
    #     for row in list_of_lists:
    #         existing = set(len_1_itemsets.keys())
    #         new_items = set(row).difference(existing)
    #         initial_counts = np.zeros(len(new_items))
    #         len_1_itemsets.update(dict(zip(list(new_items), initial_counts)))

    #         reappeared = set(row).intersection(existing)
    #         len_1_itemsets.update({item: len_1_itemsets[item] + 1 for item in reappeared})
    #         dataset_length += 1 

    # return len_1_itemsets, dataset_length

In [4]:
from itertools import combinations

def generate_candidates(freq_k_itemsets, k):
    '''
    Function to generate candidate k+1 itemsets from frequent k itemsets
    merge two frequent itemsets if their first k - 1 items are identical (in order)
    -------------
    EXAMPLE:
    ABC, ABD --> ABCD (k = 3)
    '''
    # candidate_itemsets = []
     
    candidate_itemsets = set()
    freq_k_itemsets = [list(t) for t in freq_k_itemsets]
    
    for i, itemset_1 in enumerate(freq_k_itemsets[:-1]):
        for itemset_2 in freq_k_itemsets[i+1:]:
            itemset_1_k = itemset_1[:-1]
            itemset_1_k.sort()
            itemset_2_k = itemset_2[:-1]
            itemset_2_k.sort()

            if np.array_equal(itemset_1_k, itemset_2_k):
                candidate = np.concatenate([itemset_1, [itemset_2[-1]]])
                candidate_itemsets.add(tuple(candidate))

    candidate_itemsets = [np.array(itemset) for itemset in candidate_itemsets]

    print(f"generated {len(candidate_itemsets)} candidates for {k+1}-itemsets")
    return candidate_itemsets
    # return np.unique(np.array(candidate_itemsets), axis=0)

def prune_candidates(candidate_itemsets, freq_k_itemsets, k):
    '''
    Candidate_itemsets are of length k+1
    Function to prune candidates containing subsets of length k that are infrequent 
    -------------------
    INPUTS
        candidate_itemsets : np.array of lists 
    '''
    if len(candidate_itemsets)==0:
        return candidate_itemsets
    if k == 1:
        # nothing to prune, candidate-2-itemsets are all made up of frequent-1-itemsets
        print(f"k==1, returning candidates as it is.")
        return candidate_itemsets
    
    pruned_itemsets = candidate_itemsets.copy()
    freq_k_itemsets = [tuple(sorted(t)) for t in freq_k_itemsets]
    idx_to_prune = []
    
    for idx, itemset in enumerate(pruned_itemsets):
        itemset = sorted(itemset)
        for subset in combinations(itemset, k):
            subset = tuple(sorted(subset))
            if subset not in freq_k_itemsets: 
                idx_to_prune.append(idx)
                # print(f"{subset} is not frequent. Breaking loop")
                break # don't need to further check, pruning this candidate itemset 
            # else : 
                # print(f"{subset} is frequent")
        # if idx not in idx_to_prune:
        #     print(f"{itemset} survived all checks!")
    
    pruned_itemsets = [pruned_itemsets[i] for i in range(len(pruned_itemsets)) if i not in idx_to_prune]
    results = np.unique(np.array(pruned_itemsets), axis=0)
    print(f"pruned candidates from candidate {k+1} itemsets, left with {len(results)}")
    return results 

In [5]:
from collections import Counter

def chunk__count_support(chunk_idx, candidate_itemsets, chunk=None):
    itemset_support = {idx: 0 for idx, _ in enumerate(candidate_itemsets)}
    
    start = time.time()
    list_of_lists = get_list_of_lists(chunk_idx, chunk)

    if chunk_idx%10==0:
        print (f'chunk {chunk_idx}:\t {round(time.time() - start,2)}')
    
    candidate_itemsets = [frozenset(itemset) for itemset in candidate_itemsets]
    itemset_counter = Counter(frozenset(itemset) for transaction in list_of_lists 
                              for itemset in candidate_itemsets 
                              if frozenset(itemset).issubset(set(transaction)))

    for idx, itemset in enumerate(candidate_itemsets):
        itemset_support[idx] += itemset_counter[frozenset(itemset)]

    # count = 0 
    # for transaction in list_of_lists:
    #     if count%10_000==0:
    #         print(f"cleared {count} transactions...")
    #     for idx, itemset in enumerate(candidate_itemsets):
    #         if frozenset(itemset).issubset(frozenset(transaction)):
    #             itemset_support[idx] +=1
    #     count+=1
                
    return itemset_support
                    
def count_support(candidate_itemsets, chunk_iter_len, k, chunk_iter = None):
    '''
    scans through the entire database in chunks and counts the support
    for every itemset in cand
    idate_itemsets
    
    must check that all items in the itemset appear in the transaction 
    '''
    if len(candidate_itemsets)==0:
        return {}
    
    print(f"counting support for {len(candidate_itemsets)} candidate {k+1} itemset")
    num_threads = multiprocessing.cpu_count()
    if chunk_iter is None:
        chunk_iter = [chunk_iter for i in range(chunk_iter_len)] # list of Nones
    with ThreadPoolExecutor(max_workers=num_threads) as executor:

        results = list(executor.map(chunk__count_support,
                                    range(chunk_iter_len), 
                                    [candidate_itemsets for _ in range(chunk_iter_len)], 
                                    chunk_iter
                                    ), 
                        )
    combined_dict = combine_dictionaries(results)
    
    return combined_dict
    
    # indexes = np.arange(len(candidate_itemsets))
    # inital_counts = np.zeros(len(candidate_itemsets))
    # itemset_support = dict(zip(indexes, inital_counts))
    
    # print('-------------------preprocessing reviews------------------')
    # for chunk_idx, chunk in enumerate(data_chunks):
    #     start = time.time()
    #     list_of_lists = Parallel(n_jobs=4)(delayed(preprocess_str)(line) for line in chunk['text'])
    #     # print (f'chunk {chunk_idx+1}:\t', round(time.time() - start,2))
        
    #     # list_of_lists = [row.dropna().tolist() for index, row in chunk.iterrows()]
    
    #     for transaction in list_of_lists:
            
    #         for idx, itemset in enumerate(candidate_itemsets):
    #             all_appear = set(itemset).issubset(set(transaction))
    #             if all_appear:
    #                 # update the support 
    #                 itemset_support[idx] +=1
                    
    # return itemset_support

def get_frequent(candidate_itemsets, itemset_support, minsup, k):
    freq_k_plus_itemsets = {tuple(candidate_itemsets[i]) : count for i, count in itemset_support.items() if count>minsup}
    print(f"generated {len(freq_k_plus_itemsets)} freq-{k+1}-itemsets from {len(candidate_itemsets)} candidate {k} itemsets")
    return list(freq_k_plus_itemsets.keys()), freq_k_plus_itemsets

In [12]:
def apriori(minsup_frac, num_transactions=None, chunk_iter_len=None):
    start = time.time()
    num_freq = {}
    results_dict = []
    
    if num_transactions is None:
        chunk_iter = get_chunk_iter_processed()
        chunk_iter_len = 0 
        num_transactions = 0 
        for chunk in chunk_iter:
            chunk_iter_len+=1
            num_transactions += len(chunk)
        chunk_iter = get_chunk_iter_processed()
        read_separately = False
    else:
        read_separately = True
        chunk_iter = None
        
    minsup = minsup_frac*num_transactions
    print(f'minsup selected: {minsup} ({minsup_frac}*{num_transactions} rows)')
    len_1_itemsets = get_all_unique_items(chunk_iter_len, chunk_iter)
    freq_k_dict = {item:count for item, count in len_1_itemsets.items() if count>minsup}
    
    freq_k_itemsets = np.array([[item] for item, count in len_1_itemsets.items() if count>minsup])
    print(f"generated {len(freq_k_itemsets)} freq-1-itemsets")
    
    num_freq[1] = len(freq_k_itemsets)
    results_dict.append(freq_k_dict)
    k = 1
    
    while(len(freq_k_itemsets)>0):
        candidates = generate_candidates(freq_k_itemsets, k)
        pruned = prune_candidates(candidates, freq_k_itemsets, k)
    
        if not read_separately :
            chunk_iter = get_chunk_iter_processed()
            
        itemset_support = count_support(pruned,chunk_iter_len, k, chunk_iter)
        
        freq_k_plus_itemsets, freq_k_plus_dict = get_frequent(pruned, itemset_support, minsup, k)
        results_dict.append(freq_k_plus_dict)
        num_freq[k+1] = len(freq_k_plus_itemsets)
        
        if len(freq_k_plus_itemsets) == 0 :
            end = time.time()
            time_taken = end-start
            return time_taken, num_freq, results_dict
        
        freq_k_itemsets = freq_k_plus_itemsets
        freq_k_dict = freq_k_plus_dict
        k+=1
    end = time.time()
    time_taken = end-start
    return time_taken, num_freq, results_dict

In [28]:
time_taken, num_freq, results_dict = apriori(0.01)
print(f"\n\n Time Taken {time_taken}")
print(num_freq)
print(results_dict)

minsup selected: 1694.51 (0.01*169451 rows)
------------------FIRST RUN------------------
1405 unique items found in chunk 0. Took 1.65
1405 unique items
generated 281 freq-1-itemsets
generated 39340 candidates for 2-itemsets
k==1, returning candidates as it is.
counting support for 39340 candidate 2 itemset
chunk 0:	 12.58
39340 unique items
generated 618 freq-2-itemsets from 39340 candidate 1 itemsets
generated 14680 candidates for 3-itemsets
pruned candidates from candidate 3 itemsets, left with 2070
counting support for 2070 candidate 3 itemset
chunk 0:	 7.09
2070 unique items
generated 155 freq-3-itemsets from 2070 candidate 2 itemsets
generated 543 candidates for 4-itemsets
pruned candidates from candidate 4 itemsets, left with 55
counting support for 55 candidate 4 itemset
chunk 0:	 1.38
55 unique items
generated 6 freq-4-itemsets from 55 candidate 3 itemsets
generated 0 candidates for 5-itemsets
generated 0 freq-5-itemsets from 0 candidate 4 itemsets


 Time Taken 2293.54531359

In [17]:
results_dict

[{'acidity': 41064,
  'apple': 16144,
  'citrus': 13643,
  'fruit': 71798,
  'herb': 12515,
  'tropical': 4845,
  'firm': 11029,
  'fruity': 11313,
  'juicy': 10534,
  'ripe': 31444,
  'smooth': 9123,
  'tannin': 40214,
  'crisp': 14879,
  'green': 11334,
  'lime': 7370,
  'pineapple': 5990,
  'tart': 7946,
  'bit': 8517,
  'honey': 5032,
  'earthy': 6201,
  'good': 14217,
  'herbal': 6853,
  'tannic': 8431,
  'blackberry': 17879,
  'bodied': 13160,
  'dark': 12817,
  'fresh': 20047,
  'full': 18243,
  'mouth': 10061,
  'plum': 15790,
  'balanced': 7716,
  'bright': 12528,
  'pepper': 11523,
  'red': 21403,
  'savory': 5536,
  'soft': 18041,
  'white': 13338,
  'dry': 22745,
  'spice': 25378,
  'texture': 14526,
  'elegant': 6773,
  'peach': 7606,
  'great': 6760,
  'touch': 12037,
  'attractive': 4569,
  'chocolate': 11800,
  'coffee': 5687,
  'structure': 9753,
  'character': 10400,
  'mineral': 8683,
  'spicy': 9101,
  'tight': 6266,
  'black': 29702,
  'cherry': 41179,
  'oak': 194