In [1]:
!pip install pyspark
!pip install -U -q PyDrive2

!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

The following additional packages will be installed:
  libxtst6 openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra fonts-nanum fonts-ipafont-gothic
  fonts-ipafont-mincho fonts-wqy-microhei fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  libxtst6 openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 3 newly installed, 0 to remove and 18 not upgraded.
Need to get 39.7 MB of archives.
After this operation, 144 MB of additional disk space will be used.
Selecting previously unselected package libxtst6:amd64.
(Reading database ... 124950 files and directories currently installed.)
Preparing to unpack .../libxtst6_2%3a1.2.3-1build4_amd64.deb ...
Unpacking libxtst6:amd64 (2:1.2.3-1build4) ...
Selecting previously unselected package openjdk-8-jre-headless:amd64.
Preparing to unpack .../openjdk-8-jre-headless_8u432-ga~us1-0ubuntu2~22.04_amd64.deb ...
Unpacking openjdk-8-jre-headless:amd64 (8u432-ga~us1-0ub

In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *

# create the Spark Session
spark = SparkSession.builder.getOrCreate()

# create the Spark Context for working with RDDs
sc = spark.sparkContext

In [3]:
def my_combinations(sorted_list, k):
  """
  Generate all combinations of size k from sorted_list,
  *without* using itertools.
  """
  n = len(sorted_list)

  #doing a backtracking approach
  def backtrack(start, chosen):
    if len(chosen) == k:
      yield tuple(chosen)
      return
    #we can only pick elements from index start..(n - (k-len(chosen)))
    limit=n-(k-len(chosen))+1
    for i in range(start,limit):
      chosen.append(sorted_list[i])
      yield from backtrack(i+1,chosen)
      chosen.pop()

  yield from backtrack(0,[])

In [None]:
# Specify the filenames
transactions_filename = 'walmart1.csv'
item_names_filename = 'ID2Name.csv'

# Read transactions into an RDD
#here is worth to mention:
#if we just simply reading the dataset to transaction
#it will record all the item happened inside in data set e.g ({1,4,4} it will count 4 two time)
#which we only want count number 4 once
#therefore we convert it to line then put into transcation, it will only count 4 one time
transactions=sc.textFile(transactions_filename)\
  .map(lambda line: [int(x) for x in line.strip().split(',')])\
  .map(lambda items: list(set(items)))\
  .filter(lambda items: len(items) > 0)\
  .cache()

# Read the item ID2Name file into a Python dictionary
lines=sc.textFile(item_names_filename)

#build a dictionary('1324', 'item name')
item_id_to_name=(
    lines
    .map(lambda x:x.split(',',1))
    .map(lambda pair:(int(pair[0]),pair[1]))
    .collectAsMap()
)

# Print the number of transactions  (You can keep it in a variable so that you can use it later)
num_transactions=transactions.count()#keep it variable :)
print("Number of transactions: ", num_transactions)

Number of transactions:  10000



Function to calculate frequent 1-itemsets using RDD operations

In [None]:
def find_frequent_1_itemsets(transactions, min_support_count:int):
  '''
  :@transactions: RDD of transactions
  :@min_support_count: the minimum number of transactions an itemset must appear to be frequent
  '''
  #1.count each individual item's frequency
  #2.Filter out items not meeting min_support_count
  #3.return as a list in this format [(item,), count)...]
    #here just print a list for all 1-k items which all satisfy the min_sup

  item_counts=(transactions #the transaction here is RDD!
               .flatMap(lambda t:[(item,1) for item in t]) #map each transaction 't' to am (item,1)
               .reduceByKey(lambda x,y:x+y)  #accumlating same item
               .filter(lambda x:x[1]>=min_support_count) #filter any thing less than min_sup
               .collect()) #fet a python list

  #sort item's IDS to keep consistrency, and put them in tuple form
  frequent_1_itemsets=[((item_count[0],),item_count[1]) for item_count in item_counts]

  return frequent_1_itemsets  #return the frequent-itemsets of length 1

Function to generate candidates of size k from frequent itemsets of size k-1.  

In [None]:
def generate_candidates(frequent_itemsets:list, k:int):
  '''
  :@frequent_itemsets: frequent itemsets of length k-1, which is the output of find_frequent_1_itemsets/find_frequent_k_itemsets
  :@k: the size of candidate itemsets to generate
  '''
  #approach
  #1.extract just the itemsets(ignore the count here)
  #2.join pairs that share the first k-2 items in sorted order
  #3.purne by ensuring all (k-1)-subsets of a condidate are frequent

  #extract just the itemsets
  freq_itemsets_k_1=[sorted(itemset_count[0]) for itemset_count in frequent_itemsets]
  freq_itemsets_k_1=list(set(tuple(x) for x in freq_itemsets_k_1)) #unique set

  #sort the list so we can have a consistent ordering
  freq_itemsets_k_1.sort()

  candidates_k = set()
  length = k - 1


  #join step
  for i in range(len(freq_itemsets_k_1)):
    for j in range(i+1,len(freq_itemsets_k_1)):
      #try to merge only if the first k-1 item is match
      L1=freq_itemsets_k_1[i]
      L2=freq_itemsets_k_1[j]

      # only merge if they share first (k-2) items
      if L1[:length-1]==L2[:length-1]:
        #merge them
        merged = sorted(list(set(L1).union(set(L2))))
        if len(merged)==k:
          #prune step! all (k-1) subsets of merged must be frequent
          all_subsets_frequent=True
          for idx in range(k):
            #build the (k-1)subset by removeing item at position idx
            subset=merged[:idx]+merged[idx+1:]
            #check if that subset is in freq_itemsets_k_1
            if tuple(subset) not in freq_itemsets_k_1:
              all_subsets_frequent=False
              break
          if all_subsets_frequent:
            candidates_k.add(tuple(merged))

  return candidates_k  # return the set of candidates of length k

Function to compute the support count of each candidate and filter out infrequent candidates

In [None]:
def find_frequent_k_itemsets(transactions, candidates, min_support_count):
  '''
  :@transactions: RDD of transactions
  :@candidates: the set of candidates to be considered for frequent itemsets
  :@min_support_count: the minimum number of transactions an itemset must appear to be frequent
  '''
  #quick stop if it not candidates, easy way to save
  if not candidates:
    return []

  #store the data in local variable
  candidate_set=set(tuple(sorted(c))for c in candidates)

  #find k by taking one candidate from candidate_set
  sample_candidate=next(iter(candidate_set))
  k=len(sample_candidate)

  def mapper(trans):
    #sort the transaction once here if needed
    trans_sorted = sorted(trans)
    out = []

    for combo in my_combinations(trans_sorted, k):
      if combo in candidate_set:
        out.append((combo, 1))
    return out

  candidate_counts=(transactions
          .flatMap(mapper)
          .reduceByKey(lambda x,y:x+y)
          .filter(lambda x:x[1]>=min_support_count)
          .collect())


  frequent_k_itemsets=[
      (tuple(sorted(itemset_count[0])),itemset_count[1])
      for itemset_count in candidate_counts
  ]

  return frequent_k_itemsets # return the frequent itemsets of length k

Apriori algorithm: use the above functions to find frequent itemsets of all lengths given a RDD of transactions and a minumum support

In [None]:
def apriori(transactions, min_support):
  '''
  :@transactions: RDD of transactions
  :@min_support: the minimum support threshold for an itemset to be frequent
  '''
  #compute min_support_count
  num_trans=transactions.count() #already define earlier, but that one doesn't work(attentation here)
  min_support_count=int(min_support*num_trans)

  #find frequent 1-items
  freq_1_itemsets=find_frequent_1_itemsets(transactions, min_support_count)

  frequent_itemsets_all=[] #check the return value
  current_freq_itemsets=freq_1_itemsets

  k = 1


  #add transaction reduction: remove infrequent items
  if current_freq_itemsets:
    frequent_itemsets_all.append(current_freq_itemsets)
    freq_1_items=set(x[0][0] for x in current_freq_itemsets)
    transactions=transactions.map(lambda t: [i for i in t if i in freq_1_items])\
                               .filter(lambda t: len(t) > 0)\
                               .cache()
  else:
    return frequent_itemsets_all #no freq singletons

  #iteratively generate freq. itemsets
  while len(current_freq_itemsets)>0:
    k+=1
    candidates_k=generate_candidates(current_freq_itemsets,k)
    if len(candidates_k)==0:
      break

    freq_k_itemsets=find_frequent_k_itemsets(transactions,candidates_k,min_support_count)
    if len(freq_k_itemsets)==0:
      break

    frequent_itemsets_all.append(freq_k_itemsets)
    current_freq_itemsets=freq_k_itemsets

  return frequent_itemsets_all  # return the frequent itemsets of all lengths

Print all frequent itemsets using item IDs and their support counts

In [9]:
# For each level (length), print IDs of frequent itemsets
def print_frequent_itemsets(frequent_itemsets_all):
  '''
  :@frequent_itemsets_all: the output of the apriori function (list of lists. Each inner list is a list of tuples)
  '''
  # Your code
  for i, freq_list in enumerate(frequent_itemsets_all):
    print(f"Frequent itemsets of length {i+1}:")
    for (itemset, count) in freq_list:
      print(f" {tuple(str(x) for x in itemset)}:{count}") #print out for manually check result
    print()

Function to generate strong association rules and remove misleading rules

In [None]:
def generate_association_rules(frequent_itemsets_all, min_confidence):
  '''
  :@frequent_itemsets: the output of the apriori function
  :@min_confidence: the minimum confidence for a rule to be considered interesting
  '''
  # generate the association rules from the frequent itemsets
  # rules are of the form (X, Y, support, confidence) which means X => Y [support, confidence]
  # build a dictionary for support_count of every frequent itemset, so we can quickly
  # look up support count. We will store support_dict[frozenset(itemset)]=count
  global num_transactions
  support_dict={}
  for freq_list in frequent_itemsets_all:
    for (items,cnt) in freq_list:
      support_dict[frozenset(items)]=cnt

  # actually we must use the real transaction count, not sum the freq list (remember this failed attempt)
  # i will fix that, i think i will neeed the support as fraction=cpunt/num_transaction,
  # but we only know the actual #transcation from outside, so let's store absolute count
  # abd do fraction later
  rules=[]
  # a helper function to get all non-empty subsets of a given set
  def all_nonempty_subsets(items):
    s=list(items)
    n = len(s)
    for r in range(1, n): # skip the full set
      for combo in my_combinations(s, r):
        yield combo

  # prune the misleading rules by using the lift measure
  # for each frequent itemset F of size >=2, we generate all non-empty subsets X
  # then Y=F\X. We cna compute confidence=support(F)/support(X)
  # if confindence >=min_confidence, we can keep the rule
  # then we also do lift=confidence/p(Y)=[support(F)/support(X)]/[support(Y)/N]
  # we can keep the rule if lift > 1, which is considered as corraltion


  #here I also compare the min_confidence, since is fast way to pass the meaningless rule
  #go through each level of frequent itemsets
  for freq_list in frequent_itemsets_all:
    for (itemset_tuple, cntF) in freq_list:
      F=frozenset(itemset_tuple)
      supportF=cntF

      for subset in all_nonempty_subsets(F):
        Y=frozenset(subset)
        X=F-Y

        if not X:
          continue

        supportX=support_dict.get(X,0)
        if supportX==0:
          continue  #avoid division as 0


        confidence=float(supportF)/float(supportX)

        if confidence>=min_confidence:
          supportY=support_dict.get(Y,0)
          if supportY==0:
            continue

          lift=confidence/(float(supportY)/float(num_transactions))

          if lift>1.0:
            rule_support=float(supportF)/float(num_transactions)
            #sort each side for consistency in printing
            X_sorted = tuple(sorted(X))
            Y_sorted = tuple(sorted(Y))
            rules.append((X_sorted, Y_sorted, rule_support, confidence))


  # return a list of rules
  return rules

Print the rules using both item IDs and item names

In [None]:
def print_association_rules(association_rules):
  '''
  :@association_rules: the output of generate_association_rules
  '''
  # Print the rules using both item IDs, e.g., ('2064',) --> ('2828',) [support: 0.0314, confidence: 0.6767241379310345]
  print('******** ASSOCIATION RULES with ITEM IDs ********')
  for(X, Y, sup, conf) in association_rules:
    print(f"{tuple(str(x) for x in X)} --> {tuple(str(y) for y in Y)} [support: {sup:.4f}, confidence: {conf}]")

  # Print the rules using both item names, e.g., JUMBO BAG RED RETROSPOT,  --> DOTCOM POSTAGE, [support: 0.0314, confidence: 0.6767241379310345]
  print('\n******** ASSOCIATION RULES with ITEM NAMES ********')
  for (X, Y, sup, conf) in association_rules:
    X_names = [item_id_to_name.get(x, x) for x in X]
    Y_names = [item_id_to_name.get(y, y) for y in Y]
    print(f"{', '.join(X_names)}, --> {', '.join(Y_names)}, [support: {sup:.4f}, confidence: {conf}]")


Recommendation with association rules: given a transaction, recommend zero or items according to the association_rules based on the given recommendation algorithm. The function should print out the fired rules (using item IDs) and return the list of recommended items (using item IDs).

In [None]:
def recommend(transaction, association_rules):
  '''
  :@transaction: a list of item IDs
  :@association_rules: the output of generate_assocication_rules
  '''
  trans_set=set(transaction)

  print('*** Rules fired ***')
  fired_rules=[]
  recommendations=set()

  #LHS & RHS search, which subset search
  for (X, Y, sup, conf) in association_rules:
    X_set = set(X)
    if X_set.issubset(trans_set):
      fired_rules.append((X, Y))
      for y_item in Y:
        if y_item not in trans_set:
          recommendations.add(y_item)
  for (X, Y) in fired_rules:
    print(f"{tuple(str(x) for x in X)} --> {tuple(str(y) for y in Y)}")

  return list(recommendations)

Print the names of the recommended items

In [None]:
def print_recommended_items(recommended_items):
  '''
  :@recommended_items: the output of recommend
  '''
  print('\nRecommended item IDs:')#print ID
  for rid in recommended_items:
    print(rid)

  print('\nRecommended item names:')#print ID's name
  for rid in recommended_items:
    print(item_id_to_name.get(rid, rid))

Main program

In [15]:
# Set the support and confidence thresholds (the values can be changed)
min_support = 0.025
min_confidence = 0.6

# Run Apriori
frequent_itemsets_all = apriori(transactions, min_support)

In [16]:
# Print the generated rules
print_frequent_itemsets(frequent_itemsets_all)

Frequent itemsets of length 1:
 ('2048',):353
 ('2050',):427
 ('2060',):254
 ('2064',):464
 ('4146',):266
 ('4152',):704
 ('4158',):429
 ('4200',):539
 ('4226',):567
 ('4240',):336
 ('4258',):611
 ('2238',):286
 ('4304',):287
 ('4404',):301
 ('4410',):275
 ('4468',):275
 ('2442',):311
 ('4514',):385
 ('4518',):349
 ('4520',):250
 ('4528',):268
 ('2568',):594
 ('4616',):270
 ('4632',):255
 ('2628',):451
 ('4700',):614
 ('2670',):489
 ('2706',):479
 ('2720',):426
 ('2794',):639
 ('2816',):521
 ('2818',):304
 ('2826',):654
 ('2828',):1069
 ('2832',):455
 ('2838',):470
 ('4946',):443
 ('3008',):289
 ('3010',):562
 ('3012',):348
 ('3014',):553
 ('3018',):539
 ('3020',):473
 ('3024',):505
 ('1098',):469
 ('1104',):488
 ('1108',):352
 ('1138',):384
 ('1142',):497
 ('1228',):320
 ('1296',):401
 ('3338',):389
 ('3358',):335
 ('1322',):254
 ('3404',):321
 ('3432',):1031
 ('1540',):316
 ('3672',):323
 ('3678',):503
 ('1696',):381
 ('1718',):256
 ('3784',):298
 ('1750',):281
 ('3828',):503
 ('3836

In [17]:
# Generate association rules
association_rules = generate_association_rules(frequent_itemsets_all, min_confidence)

# Print the generated association rules
print_association_rules(association_rules)

******** ASSOCIATION RULES with ITEM IDs ********
('1696',) --> ('3828',) [support: 0.0258, confidence: 0.6771653543307087]
('2019',) --> ('4473',) [support: 0.0269, confidence: 0.6058558558558559]
('2064',) --> ('2828',) [support: 0.0314, confidence: 0.6767241379310345]
('3970',) --> ('2568',) [support: 0.0450, confidence: 0.734094616639478]
('2568',) --> ('3970',) [support: 0.0450, confidence: 0.7575757575757576]
('2816',) --> ('2828',) [support: 0.0332, confidence: 0.6372360844529751]
('2826',) --> ('2828',) [support: 0.0445, confidence: 0.6804281345565749]
('2832',) --> ('2828',) [support: 0.0283, confidence: 0.621978021978022]
('2838',) --> ('2828',) [support: 0.0292, confidence: 0.6212765957446809]
('3623',) --> ('2568',) [support: 0.0326, confidence: 0.8295165394402035]
('2670',) --> ('2671',) [support: 0.0303, confidence: 0.6196319018404908]
('3623',) --> ('3970',) [support: 0.0310, confidence: 0.7888040712468194]
('3623', '3970') --> ('2568',) [support: 0.0286, confidence: 0.9

In [18]:
# Given a new transaction
new_transaction = [2826, 2839, 1002, 3623]

# Make recommendations
recommended_items = recommend(new_transaction, association_rules)

# Print the item IDs and names of the recommended items
print_recommended_items(recommended_items)

*** Rules fired ***
('2826',) --> ('2828',)
('3623',) --> ('2568',)
('3623',) --> ('3970',)
('3623',) --> ('2568', '3970')

Recommended item IDs:
2568
3970
2828

Recommended item names:
GREEN REGENCY TEACUP AND SAUCER
ROSES REGENCY TEACUP AND SAUCER
JUMBO BAG RED RETROSPOT
