<a href="https://colab.research.google.com/github/dummydeveloper13/old_newspaper/blob/main/spark_old_newspapers_market_basket_analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# IMPORTING AND LIBRARIES

In [None]:
import gc
gc.enable()

## Downloading dataset with API

In [None]:
# Install Kaggle API
!pip install --user kaggle

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


### Upload credentials for Kaggle API

In [None]:
from google.colab import files

uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))
  
# Then move kaggle.json into the folder where the API expects to find it.
!mkdir -p ~/.kaggle/ && mv kaggle.json ~/.kaggle/ && chmod 600 ~/.kaggle/kaggle.json

Saving kaggle.json to kaggle.json
User uploaded file "kaggle.json" with length 69 bytes


In [None]:
# Download dataset
!kaggle datasets download alvations/old-newspapers --force

Downloading old-newspapers.zip to /content
 99% 2.03G/2.05G [00:17<00:00, 138MB/s]
100% 2.05G/2.05G [00:17<00:00, 127MB/s]


In [None]:
# Unzip dataset
!unzip 'old-newspapers.zip' -d ''

Archive:  old-newspapers.zip
  inflating: old-newspaper.tsv       


## Packages and Libraries

In [None]:
# Install package contractions
!pip install contractions

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
# Install package NLTK
!pip install --user -U nltk

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [None]:
# Import libraries
import pandas as pd
import numpy as np
import string
import csv
import random
import ast
import gc
gc.enable()

In [None]:
import contractions

In [None]:
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize

In [None]:
# Download some of NLTK required packages
nltk.download('stopwords')
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('omw-1.4')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to /root/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!


True

In [None]:
np.random.seed(1234)

## Read and filter dataset

In [None]:
# Read dataset
df = pd.read_csv('old-newspaper.tsv', sep='\t')

In [None]:
df.head(10)

Unnamed: 0,Language,Source,Date,Text
0,Afrikaans,republikein.com.na,2011/09/14,Die veranderinge aan die Britsgeboude Avensis ...
1,Afrikaans,republikein.com.na,2011/01/20,Duitsland se mans- en vrouespanne is die afgel...
2,Afrikaans,sake24.com,2009/11/28,"Mnr. Estienne de Klerk, uitvoerende direkteur ..."
3,Afrikaans,sake24.com,2009/11/12,Mustek is se finansiële-resultate-advertensie ...
4,Afrikaans,sake24.com,2011/02/04,nadat LMS se raad van trustees in Junie verled...
5,Afrikaans,praag.co.za,2011/06/09,Hierdie hersirkulering van kaders werk net so ...
6,Afrikaans,rapport.co.za,2011/07/15,"Volgens ao. November Filander, polisiewoordvoe..."
7,Afrikaans,republikein.com.na,2011/05/20,Die plant is besonder gehard en kan selfs uite...
8,Afrikaans,republikein.com.na,2011/10/19,Dit volg op twee vorige ekspos deur SMEs Compe...
9,Afrikaans,praag.co.za,2009/06/01,Daarom moet 'n Afrikanerafvaardiging so gou mo...


In [None]:
# Count number of languages
df['Language'].nunique()


66

In [None]:
# Visualize the languages
languages = df['Language'].unique()
languages

array(['Afrikaans', 'Albanian', 'Amharic', 'Arabic', 'Armenian',
       'Azerbaijan', 'Bengali', 'Bosnian', 'Catalan',
       'Chinese (Simplified)', 'Chinese (Traditional)', 'Croatian',
       'Welsh', 'Czech', 'German', 'Danish', 'English', 'Spanish',
       'Spanish (South America)', 'Finnish', 'French', 'Georgian',
       'Galician', 'Greek', 'Hebrew', 'Hindi', 'Hungarian', 'Icelandic',
       'Indonesian', 'Italian', 'Japanese', 'Khmer', 'Kannada', 'Korean',
       'Kazakh', 'Lithuanian', 'Latvian', 'Macedonian', 'Malayalam',
       'Mongolian', 'Malay', 'Nepali', 'Dutch', 'Norwegian (Bokmal)',
       'Punjabi', 'Farsi', 'Polish', 'Portuguese (Brazil)',
       'Portuguese (EU)', 'Romanian', 'Russian', 'Serbian', 'Sinhalese',
       'Slovak', 'Slovenian', 'Swahili', 'Swedish', 'Tamil', 'Telugu',
       'Tagalog', 'Thai', 'Turkish', 'Ukranian', 'Urdu', 'Uzbek',
       'Vietnamese'], dtype=object)

In [None]:
# Filter dataset for English and remove unnecessary columns
dataset = df[(df.Language == 'English')]
dataset = dataset.drop(columns = ["Language", "Source", "Date"])
dataset.reset_index(drop=True, inplace=True)
dataset

Unnamed: 0,Text
0,"He wasn't home alone, apparently."
1,The St. Louis plant had to close. It would die...
2,WSU's plans quickly became a hot topic on loca...
3,The Alaimo Group of Mount Holly was up for a c...
4,And when it's often difficult to predict a law...
...,...
1010237,Serve a taste of spring: Chop fresh vegetables...
1010238,The complaint alleges that Kuvan Adil Piromari...
1010239,But I'm in the mood. After six or more months ...
1010240,That starts this Sunday at Chivas. The Goats a...


In [None]:
# Save dataset to csv
dataset.to_csv('dataset.csv')

## Clear memory

In [None]:
del df, languages, dataset

In [None]:
!rm -rf "old-newspaper.tsv"
!rm -rf "old-newspapers.zip"

In [None]:
gc.collect()

66

# PRE-PROCESSING

`If you are not using the pro version of Google Colab, the session might crush for this part. Session will automatically reconnect. Then, start directly reading the file "dataset.csv" in the next section.`

`NOTICE: You may need to recall the libraries.`

## Replace special characters and expand contractions

In [None]:
# Read dataset
dataset = pd.read_csv('dataset.csv')

In [None]:
# Write the dataset as a list
baskets = []

for i in range(dataset.shape[0]):
  baskets.append(str(dataset.Text[i]))

In [None]:
# Replace special characters and expand contractions
baskets_ext = []

for sentence in baskets:
  sentence = sentence.replace("\'s", "")
  sentence = sentence.replace("\'", "")
  sentence = sentence.replace("/'", "")
  sentence = sentence.replace("/", " ")
  sentence = sentence.replace("-", " ")
  sentence = sentence.replace("ø", "")
  baskets_ext.append(contractions.fix(sentence))

In [None]:
print(baskets_ext[1:10])

['The St. Louis plant had to close. It would die of old age. Workers had been making cars there since the onset of mass automotive production in the 1920s.', 'WSU plans quickly became a hot topic on local online sites. Though most people applauded plans for the new biomedical center, many deplored the potential loss of the building.', 'The Alaimo Group of Mount Holly was up for a contract last fall to evaluate and suggest improvements to Trenton Water Works. But campaign finance records released this week show the two employees donated a total of $4,500 to the political action committee (PAC) Partners for Progress in early June. Partners for Progress reported it gave more than $10,000 in both direct and in kind contributions to Mayor Tony Mack in the two weeks leading up to his victory in the mayoral runoff election June 15.', 'And when it often difficult to predict a law impact, legislators should think twice before carrying any bill. Is it absolutely necessary? Is it an issue serious

In [None]:
del dataset, baskets
gc.collect()

88

## Split sentences

In [None]:
# Split sentences and create a list of lists
itemset_1 = []

for sentence in baskets_ext:
  itemset_1.append(sentence.split(' '))

In [None]:
print(itemset_1[:10])

[['He', 'was', 'not', 'home', 'alone,', 'apparently.'], ['The', 'St.', 'Louis', 'plant', 'had', 'to', 'close.', 'It', 'would', 'die', 'of', 'old', 'age.', 'Workers', 'had', 'been', 'making', 'cars', 'there', 'since', 'the', 'onset', 'of', 'mass', 'automotive', 'production', 'in', 'the', '1920s.'], ['WSU', 'plans', 'quickly', 'became', 'a', 'hot', 'topic', 'on', 'local', 'online', 'sites.', 'Though', 'most', 'people', 'applauded', 'plans', 'for', 'the', 'new', 'biomedical', 'center,', 'many', 'deplored', 'the', 'potential', 'loss', 'of', 'the', 'building.'], ['The', 'Alaimo', 'Group', 'of', 'Mount', 'Holly', 'was', 'up', 'for', 'a', 'contract', 'last', 'fall', 'to', 'evaluate', 'and', 'suggest', 'improvements', 'to', 'Trenton', 'Water', 'Works.', 'But', 'campaign', 'finance', 'records', 'released', 'this', 'week', 'show', 'the', 'two', 'employees', 'donated', 'a', 'total', 'of', '$4,500', 'to', 'the', 'political', 'action', 'committee', '(PAC)', 'Partners', 'for', 'Progress', 'in', 'ear

In [None]:
del baskets_ext
gc.collect()

44

## Remove punctuation, blank spaces and capitalization

In [None]:
punctuation = string.punctuation
punctuation = punctuation.replace("'", "")
punctuation

'!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~'

In [None]:
# Remove punctuation, English possessive, spaces and capitalization
itemset_2 = []
for idx, sentence in enumerate(itemset_1):
  itemset_2.append([])
  for word in sentence:
    new_word = word.translate(str.maketrans('', '', punctuation))
    new_word = new_word.replace("’s", "")
    new_word = new_word.replace("–","")
    new_word = new_word.strip()
    new_word = new_word.lower()
    if new_word != "":
          itemset_2[idx].append(new_word)

In [None]:
print(itemset_2[:10])

[['he', 'was', 'not', 'home', 'alone', 'apparently'], ['the', 'st', 'louis', 'plant', 'had', 'to', 'close', 'it', 'would', 'die', 'of', 'old', 'age', 'workers', 'had', 'been', 'making', 'cars', 'there', 'since', 'the', 'onset', 'of', 'mass', 'automotive', 'production', 'in', 'the', '1920s'], ['wsu', 'plans', 'quickly', 'became', 'a', 'hot', 'topic', 'on', 'local', 'online', 'sites', 'though', 'most', 'people', 'applauded', 'plans', 'for', 'the', 'new', 'biomedical', 'center', 'many', 'deplored', 'the', 'potential', 'loss', 'of', 'the', 'building'], ['the', 'alaimo', 'group', 'of', 'mount', 'holly', 'was', 'up', 'for', 'a', 'contract', 'last', 'fall', 'to', 'evaluate', 'and', 'suggest', 'improvements', 'to', 'trenton', 'water', 'works', 'but', 'campaign', 'finance', 'records', 'released', 'this', 'week', 'show', 'the', 'two', 'employees', 'donated', 'a', 'total', 'of', '4500', 'to', 'the', 'political', 'action', 'committee', 'pac', 'partners', 'for', 'progress', 'in', 'early', 'june', '

## Remove digits

In [None]:
# Remove digits and words containing digits
itemset_3 = []
for idx, sentence in enumerate(itemset_2):
  itemset_3.append([])
  for word in sentence:
    if any(c.isdigit() for c in word) == False: 
      itemset_3[idx].append(word)

In [None]:
print(itemset_3[:10])

[['he', 'was', 'not', 'home', 'alone', 'apparently'], ['the', 'st', 'louis', 'plant', 'had', 'to', 'close', 'it', 'would', 'die', 'of', 'old', 'age', 'workers', 'had', 'been', 'making', 'cars', 'there', 'since', 'the', 'onset', 'of', 'mass', 'automotive', 'production', 'in', 'the'], ['wsu', 'plans', 'quickly', 'became', 'a', 'hot', 'topic', 'on', 'local', 'online', 'sites', 'though', 'most', 'people', 'applauded', 'plans', 'for', 'the', 'new', 'biomedical', 'center', 'many', 'deplored', 'the', 'potential', 'loss', 'of', 'the', 'building'], ['the', 'alaimo', 'group', 'of', 'mount', 'holly', 'was', 'up', 'for', 'a', 'contract', 'last', 'fall', 'to', 'evaluate', 'and', 'suggest', 'improvements', 'to', 'trenton', 'water', 'works', 'but', 'campaign', 'finance', 'records', 'released', 'this', 'week', 'show', 'the', 'two', 'employees', 'donated', 'a', 'total', 'of', 'to', 'the', 'political', 'action', 'committee', 'pac', 'partners', 'for', 'progress', 'in', 'early', 'june', 'partners', 'for',

## Remove stopwords

In [None]:
# Remove the majority of stopwords
stop_words = set(stopwords.words('english'))
new_stopwords = ['i', 'us']
new_stopwords_list = stop_words.union(new_stopwords)

itemset_4 = []
for idx, sentence in enumerate(itemset_3):
  itemset_4.append([])
  for word in sentence:
    if not word in stop_words: 
      itemset_4[idx].append(word)

In [None]:
print(itemset_4[:10])

[['home', 'alone', 'apparently'], ['st', 'louis', 'plant', 'close', 'would', 'die', 'old', 'age', 'workers', 'making', 'cars', 'since', 'onset', 'mass', 'automotive', 'production'], ['wsu', 'plans', 'quickly', 'became', 'hot', 'topic', 'local', 'online', 'sites', 'though', 'people', 'applauded', 'plans', 'new', 'biomedical', 'center', 'many', 'deplored', 'potential', 'loss', 'building'], ['alaimo', 'group', 'mount', 'holly', 'contract', 'last', 'fall', 'evaluate', 'suggest', 'improvements', 'trenton', 'water', 'works', 'campaign', 'finance', 'records', 'released', 'week', 'show', 'two', 'employees', 'donated', 'total', 'political', 'action', 'committee', 'pac', 'partners', 'progress', 'early', 'june', 'partners', 'progress', 'reported', 'gave', 'direct', 'kind', 'contributions', 'mayor', 'tony', 'mack', 'two', 'weeks', 'leading', 'victory', 'mayoral', 'runoff', 'election', 'june'], ['often', 'difficult', 'predict', 'law', 'impact', 'legislators', 'think', 'twice', 'carrying', 'bill', '

## Lemmatize words

In [None]:
# Lemmatize words 
# Lemmatization is the process of grouping together the different inflected forms of a word so they can be analyzed as a single item.
# Lemmatization is preferred over Stemming because lemmatization does morphological analysis of the words
lemmatizer = WordNetLemmatizer()
itemset_5 = []
for idx, sentence in enumerate(itemset_4):
  itemset_5.append([])
  for word in sentence:
    rootWord = lemmatizer.lemmatize(word)
    itemset_5[idx].append(rootWord)

In [None]:
print(itemset_5[:10])

## Remove duplicates

In [None]:
from collections import OrderedDict
itemset_6 = []
for sentence in enumerate(itemset_5):
  new_sentence = list(OrderedDict.fromkeys(sentence[1]))
  itemset_6.append(new_sentence)

In [None]:
print(itemset_6[:10])

## Save

In [None]:
# Save pre-processed words to csv with index
with open('pre_processed.csv', 'w', newline='') as myfile:
     wr = csv.writer(myfile)
     wr.writerow(itemset_6)

In [None]:
del itemset_1, itemset_2, itemset_3, itemset_4, itemset_5, itemset_6
gc.collect()

44

## Subsampling

In [None]:
with open('pre_processed.csv', 'r') as read_obj:
    # Return a reader object which will
    # iterate over lines in the given csvfile
    csv_reader = csv.reader(read_obj)
  
    # convert string to list
    dataset = list(csv_reader)

In [None]:
dataset = pd.DataFrame(dataset)

In [None]:
dataset = dataset.T

In [None]:
dataset.head()

In [None]:
dataset_sample = dataset.sample(500)

### Save for Apriori

In [None]:
apriori_list = dataset_sample[0].transpose().apply(ast.literal_eval).values.tolist()
print(apriori_list)

In [None]:
with open("apriori.csv", "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerows(apriori_list)

### Save for FP-Growth

In [None]:
dataset.to_csv('fp-growth.csv', index=True, header=False)

In [None]:
del dataset

# FREQUENT ITEMSETS WITH SPARK

## Setting up pyspark

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 43 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 49.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845513 sha256=7582f471c7f934e13d772465e5c666ee911aed99110ff1dc011b59ee0f530d6c
  Stored in directory: /root/.cache/pip/wheels/42/59/f5/79a5bf931714dcd201b26025347785f087370a10a3329a899c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
from pyspark.rdd import RDD
from pyspark.ml.fpm import FPGrowth

In [None]:
spark = SparkSession.builder.appName("Market Basket Analysis").master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [None]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

## Apriori Algorithm

### Load dataset in spark

In [None]:
lines = sc.textFile("apriori.csv")

In [None]:
type(lines) 

pyspark.rdd.RDD

In [None]:
print(lines.take(10))

['catcher,mike,napoli,dropped,eighth,lineup,provided,offense,belting,three,run,home,give,ranger,breathing,room', 'man,reservation,girlfriend,neil,labute,acidic,reason,pretty,nonprophet,theater,company,question,love,abba,mamma,mia,fox,theatre', 'whose,life,conform,deeply,held,first,amendment,protected,belief,religious,institution,work,cannot,surprised,let,u,go,publicly,flouting', 'matt,rainey,star,ledgerattorneys,yous,sen,robert,menendez,nj,state,attorney,general,sussex,county,tea,party,argue,justice,whether,mount,effort,recall,senator,arguing,mark,eliastrenton,—,letter,george,washington,appear,trump,text,event,yesterday,supreme,court,hit,history,book,hearing,argument,connected,group', 'proposal,would,second,time,county,tried,pas,bond,issue,purpose', 'everything,dave,store,open,negotiation,put,split,placing,item,consignment,also,sell,merchandise,swap,meet,spend,time,get,clear,picture,business', 'walking,drinking,ok,said,year,old,resident,lower,haight,peep,wearing,yellow,underwear,assure

### Phase-1


In [None]:
def sumOperator(x,y):
    return x+y

In [None]:
# Parsing items into RDD
lblitems = lines.map(lambda line: line.split(','))

In [None]:
print(lblitems.take(10))

[['catcher', 'mike', 'napoli', 'dropped', 'eighth', 'lineup', 'provided', 'offense', 'belting', 'three', 'run', 'home', 'give', 'ranger', 'breathing', 'room'], ['man', 'reservation', 'girlfriend', 'neil', 'labute', 'acidic', 'reason', 'pretty', 'nonprophet', 'theater', 'company', 'question', 'love', 'abba', 'mamma', 'mia', 'fox', 'theatre'], ['whose', 'life', 'conform', 'deeply', 'held', 'first', 'amendment', 'protected', 'belief', 'religious', 'institution', 'work', 'cannot', 'surprised', 'let', 'u', 'go', 'publicly', 'flouting'], ['matt', 'rainey', 'star', 'ledgerattorneys', 'yous', 'sen', 'robert', 'menendez', 'nj', 'state', 'attorney', 'general', 'sussex', 'county', 'tea', 'party', 'argue', 'justice', 'whether', 'mount', 'effort', 'recall', 'senator', 'arguing', 'mark', 'eliastrenton', '—', 'letter', 'george', 'washington', 'appear', 'trump', 'text', 'event', 'yesterday', 'supreme', 'court', 'hit', 'history', 'book', 'hearing', 'argument', 'connected', 'group'], ['proposal', 'would

In [None]:
# Split into RDD of single words
wlitems = lines.flatMap(lambda line:line.split(','))

In [None]:
print(wlitems.take(10))

['catcher', 'mike', 'napoli', 'dropped', 'eighth', 'lineup', 'provided', 'offense', 'belting', 'three']


In [None]:
## Calculate support for each item
# Unique frequent items in dataset
uniqueItems = wlitems.distinct()
# Add 1 as Tuple
supportRdd = wlitems.map(lambda item: (item , 1))

# Sum of values by key
supportRdd = supportRdd.reduceByKey(sumOperator)

In [None]:
print(supportRdd.take(10))

[('catcher', 2), ('mike', 4), ('napoli', 1), ('dropped', 2), ('lineup', 1), ('offense', 2), ('three', 19), ('run', 11), ('home', 18), ('give', 4)]


In [None]:
# First support values
supports = supportRdd.map(lambda item: item[1]) # Return only support values

In [None]:
# Define minimum support value 
minSupport = 0.01*500

# Filter first supportRdd with minimum support 
supportRdd = supportRdd.filter(lambda item: item[1] >= minSupport )

In [None]:
# Create base RDD with will be updated every iteration
baseRdd = supportRdd.map(lambda item: ([item[0]] , item[1]))
print('1 . Table has been created...') 

1 . Table has been created...


In [None]:
supportRdd = supportRdd.map(lambda item: item[0])
supportRddCart = supportRdd

In [None]:
supportRdd.count()

367

### Phase-2

In [None]:
# Define function to remove replicas
def removeReplica(record):

    if(isinstance(record[0], tuple)):
        x1 = record[0]
        x2 = record[1]
    else:
        x1 = [record[0]]
        x2 = record[1]

    if(any(x == x2 for x in x1) == False):
        a = list(x1)
        a.append(x2)
        a.sort()
        result = tuple(a)
        return result 
    else:
        return x1

In [None]:
c = 2

while (supportRdd.isEmpty() == False):

    combined = supportRdd.cartesian(uniqueItems)
    combined = combined.map(lambda item: removeReplica(item))

    combined = combined.filter(lambda item: len(item) == c)
    combined = combined.distinct()

    combined_2 = combined.cartesian(lblitems)
    combined_2 = combined_2.filter(lambda item: all(x in item[1] for x in item[0]))

    combined_2 = combined_2.map(lambda item: item[0])
    combined_2 = combined_2.map(lambda item: (item , 1))
    combined_2 = combined_2.reduceByKey(sumOperator)
    combined_2 = combined_2.filter(lambda item: item[1] >= minSupport)

    baseRdd = baseRdd.union(combined_2)

    combined_2 = combined_2.map(lambda item: item[0])
    supportRdd = combined_2
    print(c ,'. Table has been created... ')
    c = c+1

2 . Table has been created... 
3 . Table has been created... 


In [None]:
# Filter catersian RDD according to conditions 
# Condition 1- If both tuple elements lenght are same 
# Condition 2- Check it is a tuple or single str object (which first table that show single items supports)

class Filter():

    def __init__(self):
        
        self.stages = 1

    def filterForConf(self, item):

        if(len(item[0][0]) > len(item[1][0])  ):
            if(self.checkItemSets(item[0][0] , item[1][0]) == False):
                pass
            else:
                return (item)       
        else:
            pass  
        self.stages = self.stages + 1

    # Check Items sets includes at least one comman item // Example command: # any(l == k for k in z for l in x )
    def checkItemSets(self, item_1 , item_2):

        if(len(item_1) > len(item_2)):
            return all(any(k == l for k in item_1 ) for l in item_2)
        else:
            return all(any(k == l for k in item_2 ) for l in item_1)

    def calculateConfidence(self, item):
      
        # Parent item list
        parent = set(item[0][0])
        
        # Child item list
        if(isinstance(item[1][0] , str)):
            child  = set([item[1][0]])
        else:
            child  = set(item[1][0])
        # Parent and Child support values
        parentSupport = item[0][1]
        childSupport = item[1][1]
        # Finds the item set confidence is going to be found

        support = (parentSupport / childSupport)*100

        return list([list(child), list(parent.difference(child)), support])

In [None]:
# Example ((('x10', 'x3', 'x6', 'x7', 'x9'), 1), (('x10', 'x3', 'x7'), 1))
sets = baseRdd.cartesian(baseRdd)

In [None]:
sets.take(10)

[((['three'], 19), (['three'], 19)),
 ((['three'], 19), (['run'], 11)),
 ((['three'], 19), (['home'], 18)),
 ((['three'], 19), (['question'], 6)),
 ((['three'], 19), (['love'], 7)),
 ((['three'], 19), (['whose'], 5)),
 ((['three'], 19), (['work'], 12)),
 ((['three'], 19), (['cannot'], 6)),
 ((['three'], 19), (['let'], 6)),
 ((['three'], 19), (['go'], 15))]

In [None]:
ff = Filter()

In [None]:
filtered = sets.filter(lambda item: ff.filterForConf(item))
print('# : Aggregated support values preparing for the confidence calculatations')

# : Aggregated support values preparing for the confidence calculatations


In [None]:
filtered.take(10)

[((('said', 'work'), 5), (['work'], 12)),
 ((('said', 'work'), 5), (['said'], 113)),
 ((('said', 'would'), 16), (['would'], 41)),
 ((('said', 'would'), 16), (['said'], 113)),
 ((('last', 'year'), 13), (['year'], 56)),
 ((('last', 'year'), 13), (['last'], 26)),
 ((('last', 'said'), 6), (['said'], 113)),
 ((('said', 'school'), 8), (['said'], 113)),
 ((('got', 'said'), 5), (['said'], 113)),
 ((('say', 'year'), 5), (['year'], 56))]

In [None]:
confidences = filtered.map(lambda item: ff.calculateConfidence(item))
print('# : Confidence is calculated!')

# : Confidence is calculated!


In [None]:
confidences.take(10)

[[['work'], ['said'], 41.66666666666667],
 [['said'], ['work'], 4.424778761061947],
 [['would'], ['said'], 39.02439024390244],
 [['said'], ['would'], 14.15929203539823],
 [['year'], ['last'], 23.214285714285715],
 [['last'], ['year'], 50.0],
 [['said'], ['last'], 5.3097345132743365],
 [['said'], ['school'], 7.079646017699115],
 [['said'], ['got'], 4.424778761061947],
 [['year'], ['say'], 8.928571428571429]]

In [None]:
## Import pandas modules
import pandas as pd

## Create an array with collected baseRddConfidence results
result = confidences.collect()

## Create Data Frame
confidenceTable = pd.DataFrame(data = result , columns=["Antecedent", "Consequent" , "Confidence"])

## Show data frame
print(confidenceTable.sort_values(by=['Confidence'], ascending = False))

      Antecedent Consequent  Confidence
101      [louis]       [st]  100.000000
61       [going]     [said]   91.666667
45   [authority]     [said]   83.333333
78        [feel]     [said]   83.333333
33        [need]     [said]   71.428571
..           ...        ...         ...
74        [said]     [feel]    4.424779
72        [said]  [getting]    4.424779
116       [said]   [county]    4.424779
42        [said]   [change]    4.424779
87        [said]   [little]    4.424779

[130 rows x 3 columns]


## FP-Growth Algorithm

### Load dataset in spark

In [None]:
# Reading Data Set with PySpark
path = "fp-growth.csv"
schema = StructType([\
    StructField("id", IntegerType(), True),\
    StructField("items", StringType(), True)])
dataset = spark.read.schema(schema).csv(path)

In [None]:
dataset.printSchema()

root
 |-- id: integer (nullable = true)
 |-- items: string (nullable = true)



In [None]:
df = dataset.withColumn("items", split(regexp_replace(col("items"), '[\\[\\]]', ""), ","))

In [None]:
df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- items: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [None]:
df.show()

+---+--------------------+
| id|               items|
+---+--------------------+
|  0|['home',  'alone'...|
|  1|['st',  'louis', ...|
|  2|['wsu',  'plan', ...|
|  3|['alaimo',  'grou...|
|  4|['often',  'diffi...|
|  5|['certain',  'amo...|
|  6|['charlevoix',  '...|
|  7|['another',  'lon...|
|  8|['time',  'report...|
|  9|['trying',  'hit'...|
| 10|['mhta',  'presid...|
| 11|['absurdity',  'a...|
| 12|['gm',  'labor', ...|
| 13|['wandry',  'matt...|
| 14|['cheap',  'said'...|
| 15|['andrade',  'chi...|
| 16|['let',  'hair', ...|
| 17|['born',  'april'...|
| 18|['house',  'minor...|
| 19|['first',  'love'...|
+---+--------------------+
only showing top 20 rows



### Model

In [None]:
from pyspark.ml.fpm import FPGrowth

fpGrowth = FPGrowth(itemsCol="items", minSupport=0.02, minConfidence=0.02)
model = fpGrowth.fit(df)

### Results

In [None]:
# Display frequent itemsets.
items = model.freqItemsets

In [None]:
items.show()

+--------------+-----+
|         items| freq|
+--------------+-----+
|     [ 'give']|13403|
|    [ 'among']|10668|
|    [ 'night']|17597|
|    [ 'event']|11121|
|     [ 'york']|11943|
|   [ 'health']|12779|
|   [ 'office']|15790|
|   [ 'better']|14466|
|    [ 'world']|16258|
|    [ 'never']|14725|
|     [ 'room']|11430|
|    [ 'early']|12198|
|  [ 'service']|18020|
|   [ 'change']|12885|
|    [ 'point']|22520|
|[ 'including']|19745|
|   [ 'season']|27323|
| [ 'national']|13620|
|      [ 'run']|20653|
|[ 'community']|13948|
+--------------+-----+
only showing top 20 rows



In [None]:
# Display generated association rules.
rules = model.associationRules

In [None]:
rules.show()

+----------+----------+-------------------+------------------+--------------------+
|antecedent|consequent|         confidence|              lift|             support|
+----------+----------+-------------------+------------------+--------------------+
|[ 'would']| [ 'said']| 0.3343840137976152|1.5113134165485431|0.022261992671062974|
| [ 'said']| [ 'year']|0.10348962061560486|0.9916960996912486| 0.02289748396918758|
| [ 'said']|[ 'would']|0.10061739441660701| 1.511313416548543|0.022261992671062974|
| [ 'year']| [ 'said']|0.21941664690538298|0.9916960996912487| 0.02289748396918758|
+----------+----------+-------------------+------------------+--------------------+

