In [1]:
import sys
import json
from operator import add
import string
import csv
from itertools import combinations
import copy
import math
import time
from functools import reduce
import numpy as np 
import pandas as pd

In [2]:
spark_df = spark.read.option("header",True).csv("./Groceries_dataset.csv")

In [3]:
spark_df.count()

38765

In [4]:
spark_df.head(4)

[Row(Member_number='1808', Date='21-07-2015', itemDescription='tropical fruit'),
 Row(Member_number='2552', Date='05-01-2015', itemDescription='whole milk'),
 Row(Member_number='2300', Date='19-09-2015', itemDescription='pip fruit'),
 Row(Member_number='1187', Date='12-12-2015', itemDescription='other vegetables')]

In [5]:
spark_df.describe()

DataFrame[summary: string, Member_number: string, Date: string, itemDescription: string]

In [6]:
spark_df.printSchema()

root
 |-- Member_number: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- itemDescription: string (nullable = true)



In [7]:
spark_df.show(3)

+-------------+----------+---------------+
|Member_number|      Date|itemDescription|
+-------------+----------+---------------+
|         1808|21-07-2015| tropical fruit|
|         2552|05-01-2015|     whole milk|
|         2300|19-09-2015|      pip fruit|
+-------------+----------+---------------+
only showing top 3 rows



In [8]:
# Distinct number of records
spark_df.distinct().count()

38006

In [9]:
spark_df.count()

38765

In [10]:
# Distinct numbers of items
spark_df.select('itemDescription').distinct().count()

167

In [3]:
# DF of distinct items
items = spark_df.select('itemDescription').distinct()
items.show(4)

+------------------+
|   itemDescription|
+------------------+
|         beverages|
|pickled vegetables|
|    snack products|
|           vinegar|
+------------------+
only showing top 4 rows



## Preprocessing Using Spark Data Frame

In [4]:
from pyspark.sql import functions as F
df_grouped = spark_df.groupBy(['Member_number', 'date']).agg(F.collect_list("itemDescription"))

In [5]:
df_grouped.show()

+-------------+----------+-----------------------------+
|Member_number|      date|collect_list(itemDescription)|
+-------------+----------+-----------------------------+
|         1020|12-08-2014|         [canned beer, sho...|
|         1074|29-01-2014|           [detergent, candy]|
|         1081|06-03-2015|         [other vegetables...|
|         1087|19-06-2014|         [specialty chocol...|
|         1160|04-09-2015|         [butter, chewing ...|
|         1162|06-08-2015|         [canned beer, cit...|
|         1299|20-11-2014|         [canned beer, whi...|
|         1475|28-03-2015|         [pasta, hamburger...|
|         1476|05-11-2014|           [liquor, UHT-milk]|
|         1564|19-06-2015|         [tropical fruit, ...|
|         1565|30-10-2014|             [oil, chocolate]|
|         1616|08-12-2015|         [whole milk, pip ...|
|         1620|20-08-2015|            [chicken, yogurt]|
|         1654|28-09-2015|         [cream cheese , r...|
|         1751|04-09-2014|     

In [6]:
df_grouped.printSchema()

root
 |-- Member_number: string (nullable = true)
 |-- date: string (nullable = true)
 |-- collect_list(itemDescription): array (nullable = true)
 |    |-- element: string (containsNull = true)



In [7]:
df_grouped = df_grouped.withColumnRenamed("collect_list(itemDescription)", "items")

In [8]:
df_grouped.printSchema()

root
 |-- Member_number: string (nullable = true)
 |-- date: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [8]:
df_grouped.show(10)

+-------------+----------+--------------------+
|Member_number|      date|               items|
+-------------+----------+--------------------+
|         1020|12-08-2014|[canned beer, sho...|
|         1074|29-01-2014|  [detergent, candy]|
|         1081|06-03-2015|[other vegetables...|
|         1087|19-06-2014|[specialty chocol...|
|         1160|04-09-2015|[butter, chewing ...|
|         1162|06-08-2015|[canned beer, cit...|
|         1299|20-11-2014|[canned beer, whi...|
|         1475|28-03-2015|[pasta, hamburger...|
|         1476|05-11-2014|  [liquor, UHT-milk]|
|         1564|19-06-2015|[tropical fruit, ...|
+-------------+----------+--------------------+
only showing top 10 rows



In [17]:
print("NUmber of unique customers :")
df_grouped.select('Member_number').distinct().count()

NUmber of unique customers :


3898

In [18]:
df_grouped.count()

14963

In [73]:
# Saving the transactions as csv file
df_grouped.toPandas().loc[:, ["items"]].to_csv("transactions.csv")

In [9]:
# Dropping member_number and date columns
df = df_grouped.drop("Member_number")
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [10]:
transactions_with_header = df.drop("date")
transactions_with_header.printSchema()

root
 |-- items: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [11]:
transactions_with_header.show(5)

+--------------------+
|               items|
+--------------------+
|[canned beer, sho...|
|  [detergent, candy]|
|[other vegetables...|
|[specialty chocol...|
|[butter, chewing ...|
+--------------------+
only showing top 5 rows



In [78]:
transactions_with_header.count()

14963

In [149]:
transactions_with_header.toPandas().loc[:, ["items"]].to_csv('transactions.csv')

## PCY Implementation Using RDD

In [72]:
def openfile():
    file_name = "transactions.csv"
    minsupport = 20
    nbuckets=20
    
    #doc = open(file_name).read()
    
    #newfile=doc.split("\n")
    
    data = sc.textFile(file_name)
#     data = data.map(lambda line : ''.join(line.split(',')))
    newfile = data.collect()
    
    buckets=[]
    for x in newfile:
       buckets.append([y.strip() for y in x.split(',')])

    size1freqset(minsupport,nbuckets,buckets)

In [74]:
def size1freqset(minsupport,nbuckets,buckets):

    candidatelist1=[]
    finalist1=[]

    for i in range(0, len(buckets)):
        for b in buckets[i]:
            candidatelist1.append(b)

    candidatelist1=sorted(set(candidatelist1))
    # print(candidatelist1)
    
    lala=0
    for k in candidatelist1:
        for i in range(0, len(buckets)):
            for j in buckets[i]:
                if(k==j):
                    lala+=1
        if lala>=minsupport:
            finalist1.append(k)
        lala=0

    itemiddic={}
    counter=1
    for c in candidatelist1:
        itemiddic[c]=counter
        counter+=1

    if finalist1:
        print("Frequent Itemsets of size 1")
        with open("output1.txt", "w") as f:
            f.write("Frequent Itemsets of size 1 \n" + "Total number of itemsets : " + str(len(finalist1)) + '\n'.join(finalist1))
        for x in finalist1:
            print(x)
        size2freqset(minsupport,nbuckets,itemiddic,buckets,finalist1)
    else:
        print("That's all folks!")

In [76]:
def size2freqset(minsupport,nbuckets,itemiddic,buckets,finalist1):

    k=3

    countofbuckets=[0]*nbuckets
    bitmap=[0]*nbuckets
    pairs=[]

    """PCY Pass 1"""
    for i in range(0,len(buckets)):
        for x in range(0,len(buckets[i])-1):
            for y in range(x+1,len(buckets[i])):
                if(buckets[i][x]<buckets[i][y]):
                    countofbuckets[int(str(itemiddic[buckets[i][x]])+str(itemiddic[buckets[i][y]]))%nbuckets]+=1
                    if ([buckets[i][x],buckets[i][y]] not in pairs):
                        pairs.append(sorted([buckets[i][x],buckets[i][y]]))
                else:
                    countofbuckets[int(str(itemiddic[buckets[i][y]])+str(itemiddic[buckets[i][x]]))%nbuckets]+=1
                    if ([buckets[i][y],buckets[i][x]] not in pairs):
                        pairs.append(sorted([buckets[i][y],buckets[i][x]]))

    pairs=sorted(pairs)

    for x in range(0,len(countofbuckets)):
        if countofbuckets[x]>=minsupport:
            bitmap[x]=1
        else:
            bitmap[x]=0

    prunedpairs=[]

    """Checking condition 1 of PCY Pass 2"""

    for i in range(0,len(pairs)):
        for j in range(0,len(pairs[i])-1):
            if(pairs[i][j] in finalist1 and pairs[i][j+1] in finalist1):
                prunedpairs.append(pairs[i])

    candidatelist2=[]

    """Checking condition 2 of PCY Pass 2"""
    for i in range(0, len(prunedpairs)):
        for j in range(0,len(prunedpairs[i])-1):
            if bitmap[int(str(itemiddic[prunedpairs[i][j]])+str(itemiddic[prunedpairs[i][j+1]]))%nbuckets]==1:
                candidatelist2.append(prunedpairs[i])

    """Appending frequent items of size 2 to finallist2"""
    finalist2=[]
    p=0
    for c in range(0,len(candidatelist2)):
        for b in range(0,len(buckets)):
            if set(candidatelist2[c]).issubset(set(buckets[b])):
                p+=1
        if p>=minsupport and p!=0:
            finalist2.append(sorted(candidatelist2[c]))
        p=0

    finalist2=sorted(finalist2)

    if finalist2:
        print("\nFrequent Itemsets of size 2")
        with open("output2.txt", "w") as f:
            f.write("Frequent Itemsets of size 2 \n" + "Total number of itemsets : " + str(len(finalist1)) + '\n'.join(finalist2)
        for b in finalist2:
            print(','.join(b))
        sizekfreqset(minsupport,nbuckets,itemiddic,buckets,finalist2,k)
    else:
        print("That's all folks!")

SyntaxError: invalid syntax (<ipython-input-76-2e39c44db5d0>, line 64)

In [56]:
def sizekfreqset(minsupport,nbuckets,itemiddic,buckets,prevout,k):

	"""Creating Candidate List of Size k"""
	
	kcountofbuckets=[0]*nbuckets
	kbitmap=[0]*nbuckets
	kcombination=[]
	prevout=prevout
	
	"""Make k combination e.g. triplets"""
	for a in prevout:
		for b in prevout:
			if(a!=b):
				if set(a) & set(b) and len(list(set(a) & set(b))) >= k-2:
					kcombination.append(sorted(set(a)|set(b)))

	# print("checking kcombination")
	kcombination=sorted(kcombination)

	"""PCY Pass 1 Hashing"""

	hashingstring=""

	for i in range(0,len(kcombination)):
		for x in kcombination[i]:
			hashingstring+=str(itemiddic[x])
		kcountofbuckets[int(hashingstring)%nbuckets]+=1
		hashingstring=""

	for x in range(0,len(kcountofbuckets)):
		if kcountofbuckets[x]>=minsupport:
			kbitmap[x]=1
		else:
			kbitmap[x]=0

	"""Condition 1 is automatically satisfied"""

	"""Checking condition 2 of PCY Pass 2"""
	hashingstring=""
	klist=[]
	
	for i in range(0, len(kcombination)):
		for j in kcombination[i]:
			hashingstring+=str(itemiddic[j])
		if kbitmap[int(hashingstring)%nbuckets]==1:
			klist.append(kcombination[i])
		teststring=""

	"""Creating Candidate List of Size k"""
	candidatelistk=[]
	count=1
	for i in range(1, len(klist)):
		if klist[i]==klist[i-1]:
			count+=1
		else:
			# print("ELement is: ",klist[i-1]," and count is: ",count)
			if count>=k:
				candidatelistk.append(klist[i-1])
			count=1
	if count>=k:
		candidatelistk.append(klist[i-1])
		# print("ELement is: ",klist[i-1]," and count is: ",count)
	
	"""Appending frequent items of size k to output"""
	output=[]
	counter=0
	for c in range(0,len(candidatelistk)):
		for b in range(0,len(buckets)):
			if set(candidatelistk[c]).issubset(set(buckets[b])):
				counter+=1
		if counter>=minsupport and counter!=0:
			output.append(candidatelistk[c])
		counter=0

	if output:
		print("\nFrequent Itemsets of size ",k)
		for b in output:
			print(','.join(b))
		k+=1
		sizekfreqset(minsupport,nbuckets,itemiddic,buckets,output,k)
		# print("\n")
	else:
		print("That's all folks!")

In [73]:
openfile()

Frequent Itemsets of size 1
'Instant food products'
'UHT-milk'
'abrasive cleaner'
'artif. sweetener'
'baking powder'
'beef'
'berries'
'beverages'
'bottled beer'
'bottled water'
'brandy'
'brown bread'
'butter milk'
'butter'
'cake bar'
'candles'
'candy'
'canned beer'
'canned fish'
'canned fruit'
'canned vegetables'
'cat food'
'cereals'
'chewing gum'
'chicken'
'chocolate marshmallow'
'chocolate'
'citrus fruit'
'cleaner'
'cling film/bags'
'coffee'
'condensed milk'
'cream cheese '
'curd cheese'
'curd'
'dental care'
'dessert'
'detergent'
'dish cleaner'
'dishes'
'dog food'
'domestic eggs'
'female sanitary products'
'finished products'
'fish'
'flour'
'flower (seeds)'
'frankfurter'
'frozen dessert'
'frozen fish'
'frozen meals'
'frozen potato products'
'frozen vegetables'
'fruit/vegetable juice'
'grapes'
'ham'
'hamburger meat'
'hard cheese'
'herbs'
'house keeping products'
'hygiene articles'
'ice cream'
'instant coffee'
'jam'
'ketchup'
'kitchen towels'
'light bulbs'
'liquor (appetizer)'
'liquor'


Frequent Itemsets of size  3
'UHT-milk','other vegetables'
'UHT-milk','rolls/buns'
'UHT-milk','tropical fruit'
'UHT-milk','whole milk'
'beef','brown bread'
'beef','citrus fruit'
'beef','margarine'
'beef','newspapers'
'beef','other vegetables'
'beef','rolls/buns'
'beef','root vegetables'
'beef','soda'
'beef','whipped/sour cream'
'beef','whole milk'
'beef','yogurt'
'berries','other vegetables'
'berries','rolls/buns'
'berries','soda'
'berries','whole milk'
'berries','yogurt'
'beverages','other vegetables'
'beverages','sausage'
'beverages','soda'
'beverages','whole milk'
'bottled beer','brown bread'
'bottled beer','butter'
'bottled beer','canned beer'
'bottled beer','citrus fruit'
'bottled beer','domestic eggs'
'bottled beer','frankfurter'
'bottled beer','frozen vegetables'
'bottled beer','fruit/vegetable juice'
'bottled beer','newspapers'
'bottled beer','other vegetables'
'bottled beer','pastry'
'bottled beer','pip fruit'
'bottled beer','rolls/buns'
'bottled beer','root vegetables'
'bott

That's all folks!
