In [1]:
from pyspark import SparkContext
from pyspark import SparkConf

conf = SparkConf().setAppName("apriori")
sc = SparkContext(conf=conf)

In [2]:
data = sc.textFile('browsing.txt')
data.take(5)

['FRO11987 ELE17451 ELE89019 SNA90258 GRO99222 ',
 'GRO99222 GRO12298 FRO12685 ELE91550 SNA11465 ELE26917 ELE52966 FRO90334 SNA30755 ELE17451 FRO84225 SNA80192 ',
 'ELE17451 GRO73461 DAI22896 SNA99873 FRO86643 ',
 'ELE17451 ELE37798 FRO86643 GRO56989 ELE23393 SNA11465 ',
 'ELE17451 SNA69641 FRO86643 FRO78087 SNA11465 GRO39357 ELE28573 ELE11375 DAI54444 ']

In [3]:
def generate_next_c(f_k, k):
    next_c = [var1 | var2 for index, var1 in enumerate(f_k) for var2 in f_k[index + 1:] if
              list(var1)[:k - 2] == list(var2)[:k - 2]]
    return next_c


In [4]:
def generate_f_k(sc, c_k, shared_itemset, sup):
    def get_sup(x):
        x_sup = len([1 for t in shared_itemset.value if x.issubset(t)])
        if x_sup >= sup:
            return x, x_sup
        else:
            return ()

    f_k = sc.parallelize(c_k).map(get_sup).filter(lambda x: x).collect()
    return f_k


# For k = 1

In [5]:
n_samples = data.count()
print(n_samples)

31101


In [6]:
sup = 100

In [7]:
itemset = data.map(lambda line: sorted([str(item) for item in line.strip().split(' ')]))
itemset.take(5)

[['ELE17451', 'ELE89019', 'FRO11987', 'GRO99222', 'SNA90258'],
 ['ELE17451',
  'ELE26917',
  'ELE52966',
  'ELE91550',
  'FRO12685',
  'FRO84225',
  'FRO90334',
  'GRO12298',
  'GRO99222',
  'SNA11465',
  'SNA30755',
  'SNA80192'],
 ['DAI22896', 'ELE17451', 'FRO86643', 'GRO73461', 'SNA99873'],
 ['ELE17451', 'ELE23393', 'ELE37798', 'FRO86643', 'GRO56989', 'SNA11465'],
 ['DAI54444',
  'ELE11375',
  'ELE17451',
  'ELE28573',
  'FRO78087',
  'FRO86643',
  'GRO39357',
  'SNA11465',
  'SNA69641']]

In [8]:
shared_itemset = sc.broadcast(itemset.map(lambda x: set(x)).collect())
# shared_itemset.take(5)

In [9]:
frequent_itemset = []

# prepare candidate_1
k = 1
c_k = itemset.flatMap(lambda x: set(x)).distinct().collect()
c_k = [{x} for x in c_k]
c_k[:4]

[{'FRO11987'}, {'SNA90258'}, {'FRO90334'}, {'SNA80192'}]

In [10]:
print("C{}: {}".format(k, c_k))
f_k = generate_f_k(sc, c_k, shared_itemset, sup)

C1: [{'FRO11987'}, {'SNA90258'}, {'FRO90334'}, {'SNA80192'}, {'ELE91550'}, {'FRO84225'}, {'ELE52966'}, {'GRO73461'}, {'DAI22896'}, {'FRO86643'}, {'ELE23393'}, {'SNA69641'}, {'GRO39357'}, {'ELE11375'}, {'DAI50921'}, {'GRO75578'}, {'SNA91554'}, {'DAI22177'}, {'SNA85662'}, {'ELE59935'}, {'DAI14125'}, {'ELE66810'}, {'DAI91535'}, {'DAI49199'}, {'DAI54690'}, {'FRO76833'}, {'GRO12935'}, {'SNA55952'}, {'ELE82555'}, {'GRO36567'}, {'GRO48282'}, {'DAI87514'}, {'FRO82427'}, {'DAI48891'}, {'FRO47475'}, {'DAI17810'}, {'ELE11111'}, {'FRO76487'}, {'FRO92261'}, {'SNA66979'}, {'GRO49037'}, {'DAI88808'}, {'ELE52446'}, {'FRO41069'}, {'FRO16142'}, {'FRO70974'}, {'DAI22534'}, {'ELE76964'}, {'FRO19520'}, {'FRO24098'}, {'FRO13639'}, {'FRO98878'}, {'FRO38366'}, {'SNA59903'}, {'DAI55911'}, {'FRO31317'}, {'SNA72163'}, {'DAI18334'}, {'ELE20196'}, {'FRO60023'}, {'GRO94047'}, {'SNA40380'}, {'FRO98184'}, {'ELE30182'}, {'SNA45033'}, {'FRO70489'}, {'DAI63921'}, {'FRO74481'}, {'SNA18336'}, {'FRO62970'}, {'SNA83730'}, {

In [11]:
print(len(c_k))
print(len(f_k))
print(f_k)

12592
647
[({'FRO11987'}, 104), ({'SNA90258'}, 550), ({'SNA80192'}, 258), ({'ELE52966'}, 380), ({'GRO73461'}, 3602), ({'DAI22896'}, 1219), ({'FRO86643'}, 235), ({'SNA69641'}, 599), ({'GRO39357'}, 296), ({'ELE11375'}, 214), ({'DAI50921'}, 350), ({'SNA91554'}, 208), ({'DAI22177'}, 1627), ({'SNA85662'}, 349), ({'ELE59935'}, 1311), ({'DAI14125'}, 163), ({'ELE66810'}, 1697), ({'DAI49199'}, 664), ({'GRO12935'}, 169), ({'SNA55952'}, 1094), ({'GRO36567'}, 832), ({'GRO48282'}, 184), ({'DAI87514'}, 254), ({'DAI48891'}, 825), ({'ELE11111'}, 811), ({'FRO92261'}, 915), ({'SNA66979'}, 703), ({'DAI88808'}, 123), ({'FRO41069'}, 409), ({'FRO16142'}, 931), ({'DAI22534'}, 287), ({'FRO19520'}, 133), ({'FRO24098'}, 521), ({'FRO98878'}, 157), ({'FRO38366'}, 353), ({'SNA59903'}, 891), ({'DAI55911'}, 974), ({'FRO31317'}, 2330), ({'SNA72163'}, 1090), ({'ELE20196'}, 545), ({'FRO98184'}, 480), ({'ELE30182'}, 274), ({'DAI63921'}, 1773), ({'SNA18336'}, 709), ({'FRO62970'}, 115), ({'SNA83730'}, 379), ({'DAI91290'},

In [12]:
frequent_itemset.append(f_k)

# For k = 2

In [13]:
c_k = generate_next_c([set(item) for item in map(lambda x: x[0], f_k)], 2)



In [14]:
f_k = generate_f_k(sc, c_k, shared_itemset, sup)
print(f_k)

[({'SNA90258', 'ELE17451'}, 113), ({'GRO99222', 'SNA90258'}, 156), ({'DAI62779', 'SNA90258'}, 114), ({'GRO73461', 'DAI22896'}, 304), ({'GRO73461', 'SNA69641'}, 150), ({'GRO73461', 'DAI22177'}, 248), ({'ELE59935', 'GRO73461'}, 116), ({'GRO73461', 'ELE66810'}, 228), ({'GRO73461', 'SNA55952'}, 117), ({'GRO73461', 'GRO36567'}, 117), ({'GRO73461', 'DAI48891'}, 117), ({'ELE11111', 'GRO73461'}, 158), ({'FRO16142', 'GRO73461'}, 197), ({'GRO73461', 'FRO24098'}, 112), ({'SNA59903', 'GRO73461'}, 123), ({'GRO73461', 'DAI55911'}, 116), ({'FRO31317', 'GRO73461'}, 395), ({'SNA72163', 'GRO73461'}, 285), ({'DAI63921', 'GRO73461'}, 219), ({'SNA18336', 'GRO73461'}, 121), ({'GRO73461', 'DAI91290'}, 161), ({'ELE12792', 'GRO73461'}, 116), ({'GRO73461', 'GRO85051'}, 147), ({'DAI73122', 'GRO73461'}, 146), ({'FRO73056', 'GRO73461'}, 195), ({'GRO73461', 'ELE32164'}, 486), ({'DAI88807', 'GRO73461'}, 313), ({'GRO73461', 'FRO66272'}, 110), ({'GRO73461', 'DAI88079'}, 145), ({'GRO73461', 'SNA80324'}, 562), ({'GRO734

In [15]:
frequent_itemset.append(f_k)

# For k = 3

In [16]:
c_k = generate_next_c([set(item) for item in map(lambda x: x[0], f_k)], 3)
print(c_k)

[{'GRO99222', 'SNA90258', 'SNA80324'}, {'GRO99222', 'SNA90258', 'ELE17451'}, {'ELE26917', 'GRO99222', 'SNA90258'}, {'GRO99222', 'SNA90258', 'FRO40251'}, {'DAI62779', 'GRO73461', 'SNA90258'}, {'DAI62779', 'DAI22896', 'SNA90258'}, {'DAI62779', 'SNA90258', 'SNA69641'}, {'DAI62779', 'SNA90258', 'DAI50921'}, {'DAI62779', 'SNA90258', 'DAI22177'}, {'DAI62779', 'ELE59935', 'SNA90258'}, {'DAI62779', 'SNA90258', 'ELE66810'}, {'DAI62779', 'SNA90258', 'DAI49199'}, {'DAI62779', 'SNA90258', 'SNA55952'}, {'DAI62779', 'SNA90258', 'GRO36567'}, {'DAI62779', 'DAI48891', 'SNA90258'}, {'DAI62779', 'ELE11111', 'SNA90258'}, {'DAI62779', 'FRO92261', 'SNA90258'}, {'DAI62779', 'FRO16142', 'SNA90258'}, {'DAI62779', 'FRO24098', 'SNA90258'}, {'DAI62779', 'SNA59903', 'SNA90258'}, {'DAI62779', 'SNA90258', 'DAI55911'}, {'DAI62779', 'FRO31317', 'SNA90258'}, {'DAI62779', 'SNA72163', 'SNA90258'}, {'DAI62779', 'ELE20196', 'SNA90258'}, {'DAI62779', 'SNA90258', 'FRO98184'}, {'DAI62779', 'DAI63921', 'SNA90258'}, {'DAI62779'

In [17]:
f_k = generate_f_k(sc, c_k, shared_itemset, sup)
print(f_k)

[({'GRO73461', 'FRO40251', 'GRO85051'}, 147), ({'FRO73056', 'GRO73461', 'GRO44993'}, 106), ({'GRO73461', 'DAI43223', 'ELE32164'}, 111), ({'DAI88807', 'SNA72163', 'GRO73461'}, 110), ({'GRO73461', 'FRO40251', 'DAI88079'}, 144), ({'GRO73461', 'FRO40251', 'SNA80324'}, 232), ({'DAI75645', 'GRO73461', 'SNA80324'}, 230), ({'DAI75645', 'GRO73461', 'FRO47962'}, 111), ({'DAI75645', 'GRO73461', 'ELE17451'}, 121), ({'DAI75645', 'GRO73461', 'FRO40251'}, 293), ({'GRO73461', 'DAI75645', 'GRO21487'}, 114), ({'GRO73461', 'DAI75645', 'GRO38814'}, 101), ({'GRO73461', 'DAI75645', 'GRO46854'}, 101), ({'GRO73461', 'GRO56726', 'FRO40251'}, 103), ({'GRO73461', 'FRO40251', 'ELE17451'}, 159), ({'GRO30386', 'GRO73461', 'ELE17451'}, 103), ({'GRO59710', 'GRO73461', 'ELE32164'}, 137), ({'DAI62779', 'DAI22896', 'GRO73461'}, 101), ({'DAI62779', 'FRO31317', 'GRO73461'}, 100), ({'DAI62779', 'GRO73461', 'ELE32164'}, 131), ({'DAI62779', 'GRO73461', 'SNA80324'}, 198), ({'DAI62779', 'DAI75645', 'GRO73461'}, 261), ({'DAI627

In [18]:
frequent_itemset.append(f_k)

In [19]:
def apriori(sc, f_input, f_output, min_sup):
    # read the raw data
    data = sc.textFile(f_input)
    # count the total number of samples
    n_samples = data.count()
    # min_sup to frequency
    sup = n_samples * min_sup
    # split sort
    itemset = data.map(lambda line: sorted([int(item) for item in line.strip().split(' ')]))
    # share the whole itemset with all workers
    shared_itemset = sc.broadcast(itemset.map(lambda x: set(x)).collect())
    # store for all freq_k
    frequent_itemset = []

    # prepare candidate_1
    k = 1
    c_k = itemset.flatMap(lambda x: set(x)).distinct().collect()
    c_k = [{x} for x in c_k]

    # when candidate_k is not empty
    while len(c_k) > 0:
        # generate freq_k
        Dprint("C{}: {}".format(k, c_k))
        f_k = generate_f_k(sc, c_k, shared_itemset, sup)
        Dprint("F{}: {}".format(k, f_k))

        frequent_itemset.append(f_k)
        # generate candidate_k+1
        k += 1
        c_k = generate_next_c([set(item) for item in map(lambda x: x[0], f_k)], k)

    # output the result to file system
    sc.parallelize(frequent_itemset, numSlices=1).saveAsTextFile(f_output)
    sc.stop()
