# Project 2 AMD-DSE

In this project the main purpose is to find frequent itemsets considering two algortihms studied during the course of **Algorithm for massive dataset**:


*   A-priori
*   Park-Chen-Yu



## Set up

The dataset for the project is the MeDAL dataset
https://www.kaggle.com/datasets/xhlulu/medal-emnlp

In [None]:
# download from kaggle
import os
# os.environ['KAGGLE_USERNAME'] = "xxxxxx"
# os.environ['KAGGLE_KEY'] = "xxxxxx"

!kaggle datasets download xhlulu/medal-emnlp

Downloading medal-emnlp.zip to /content
100% 6.81G/6.82G [00:57<00:00, 146MB/s]
100% 6.82G/6.82G [00:57<00:00, 126MB/s]


In [None]:
# unpack
!unzip medal-emnlp.zip -d data

Archive:  medal-emnlp.zip
  inflating: data/full_data.csv      
  inflating: data/pretrain_subset/test.csv  
  inflating: data/pretrain_subset/train.csv  
  inflating: data/pretrain_subset/valid.csv  


In [None]:
!mv data/pretrain_subset/* data/

Import libraries and install them if not already available in Google colab

In [None]:
# import libraries
import pandas as pd
from collections import Counter
import nltk

import itertools
import numpy as np
from itertools import combinations

import time

import math

# to remove stop words
nltk.download('stopwords')
from nltk.corpus import stopwords
stop_words = set(stopwords.words('english'))

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [None]:
# PYSPARK #
# set up for google colab
!pip install pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# set up for local use
#spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()

# to check that it works correctly it should give SparkSession in output
spark



In [None]:
# Mount Google drive for outputs
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


## Data import and dataset selection

We create a sample dataset to tune the algorithms

In [None]:
# take just one sample part of 10000 rows to start the understand the project
df = pd.read_csv('data/test.csv', nrows=9999)
filenew = 'test_10000.csv' # name of output file
df.to_csv(filenew,index=False) # write csv of sample file

# reading the CSV file
df = pd.read_csv('test_10000.csv')

# displaying the contents of the CSV file
print(df.shape)
df.head(10)

(9999, 4)


Unnamed: 0,ABSTRACT_ID,TEXT,LOCATION,LABEL
0,2069316,we developed an animal model of chronic allerg...,89,functional residual capacity
1,6967959,pyogenic granulomas represent the aquisition o...,34,pyogenic granuloma
2,3492721,the l immunotype lipopolysaccharide lps of nei...,97,phosphorylethanolamine
3,8388738,inotropic reserve identified by dobutamine or ...,47,stress echocardiography
4,6234238,pyridoxinedependent seizure is a rare autosoma...,57,severe mental retardation
5,3874752,a total of PA isolates were collected from GA ...,99,metallobetalactamase
6,13253027,while diminished ovarian reserve dor predicts ...,43,antral follicle count
7,8512090,we describe two cases of simple heterozygosity...,160,thalassemia intermedia
8,3967251,by means of EC and quantitative histomorphomet...,3,electrochemical
9,3262191,in the present study lithocholic acid lca meta...,57,eye movement


In [None]:
# Data import with Spark tool
sc = spark.sparkContext
# sample dataset import
sw_sample = sc.textFile('test_10000.csv',minPartitions=4)
# full dataset import
sw_full = sc.textFile('data/full_data.csv',minPartitions=4)

### Functions and pre-processing

The detector must consider as baskets the strings contained in the text column of the full-data.csv file in the dataset, using words as items.
The dataset will be prepared to be used for MBA with the following functions

In [None]:
## FUNCTIONS FOR PRE_PROCESSING##

# function to get rid of stop word with a filter
def isNotStopWord(x):
    x = x.split(" ") # split input string separated by space
    s=[] #create empty string
    for i in x:
        if not i in stop_words:
            s.append(i)
    return " ".join(s)

# function to remove duplicates in a string
def remove_duplicates(input):
    input = input.split(" ") # split string separating word by space
    UniqW = Counter(input) # create a dictionary using the counter method
    s = " ".join(UniqW.keys()) # joins two adjacent elements in iterable way
    return s

# function to remove duplicates and stop words in a string
def remove_dupAndStop(input):
    input = input.split(" ") # split string separating word by space
    UniqW = Counter(input) # create a dictionary using the counter method
    for key in list(UniqW):#.keys():
      if key in stop_words:
        del UniqW[key]
        #UniqW.pop(key)
    s = " ".join(UniqW.keys()) # joins two adjacent elements in iterable way
    return s

In [None]:
# Dataset selection -> simplified one
sw = sw_sample.sample(False, 0.01, 81)
n_baskets = sw.count()
print("You are using the sample dataset")
print("Type of input sample file is:",type(sw))
print("Dimension of sample file is:",n_baskets)

# Data preprocessing
sw = sw.map(lambda x: x.split(',')[1]) #select only TEXT column
firstRow = sw.first() # get first row of the headers
sw= sw.filter(lambda x: x!=firstRow) #remove first row
sw = sw.map(remove_duplicates) #remove duplicates
sw = sw.map(isNotStopWord)#.map(lambda x:x) #remove stopword
#sw = sw.map(remove_dupAndStop) #remove duplicates and stop words
sw = sw.map(lambda word: word.lower()) # get lower case
sw = sw.map(lambda x: x.split(" "))
sw.take(2)

You are using the sample dataset
Type of input sample file is: <class 'pyspark.rdd.PipelinedRDD'>
Dimension of sample file is: 87


[['healthy',
  'skeletal',
  'muscle',
  'mass',
  'essential',
  'attenuating',
  'complications',
  'obesity',
  'importantly',
  'function',
  'maintained',
  'adequate',
  'repair',
  'following',
  'overuse',
  'injury',
  'purpose',
  't0',
  'investigate',
  'impact',
  'dio',
  'dio',
  'functionality',
  'satellite',
  'cell',
  'sc',
  'population',
  'male',
  'cblj',
  'mice',
  'fed',
  'standard',
  'chow',
  'highfat',
  'diet',
  'kcal',
  'fat',
  'weeks',
  'muscles',
  'subjected',
  'ctx',
  'displayed',
  'att',
  'regeneration',
  'indicated',
  'prolonged',
  'necrosis',
  'delayed',
  'expression',
  'myod',
  'myogenin',
  'elevated',
  'collagen',
  'content',
  'persistent',
  'embryonic',
  'mhc',
  'chain',
  'significant',
  'differences',
  'observed',
  'scs',
  'activate',
  'normally',
  'respond',
  'exogenous',
  'hepatocyte',
  'growth',
  'factor',
  'hgf',
  'despite',
  'similar',
  'receptor',
  'cmet',
  'density',
  'furthermore',
  'release',

## Step by Step into the implemented algorithms


 ### A-priori
 Used to understand each step of the A-priori algorithm

To perform A-priori algorithm steps, few functions need to implementes to count singletons, pairs, triples

In [None]:
## FUNCTIONS FOR A_PRIORI##

# count occurances of singletons and sort them descending
def count_freq_1(rdd):
    return (rdd.flatMap(lambda word: word) # flat word
            .map(lambda word: (word, 1)) #add 1
            .reduceByKey(lambda a,b: a+b) #reduce and sum
            .filter(lambda x: x[1] >= min_support) #above the min support
            #.sortBy(lambda x: x[1],False) #sort descending
           )

# function to filter only word which are frequent items
def isInFreqTable(x):
    s=[] #create empty string
    # check word in string that are frequent item
    for i in x:
        if i in pass1_out:
            s.append(i)
    return s

# function to create frequent pairs
def makePairs(x):
    # create pairs of frequent singletons using nested loops
    res = []
    n = len(x)
    for i in range(n):
        for j in range(i+1, n):
            res.append((x[i], x[j]))
    return res

# count occurances of pairs
def count_freq_2(rdd):
    return (rdd.map(isInFreqTable)  #filter only frequent item
            .map(makePairs) #create pairs of frequent items
            .flatMap(lambda xs: [x[0:2] for x in xs])  #flatten pairs
            .map(lambda word: (word, 1)) #add 1 to the pairs
            .reduceByKey(lambda a,b: a+b) # reduce and sum
            .filter(lambda x: x[1] >= min_support) #filter above threshold
            #.sortBy(lambda x: x[1],False) #sort descending
           )

# function to filter only pairs which are frequent pairs
def isInPairTable(x):
    s=[] #create empty string
    # # check words in string that are frequent pairs
    for i in x:
        if i in pass2_outFlat:
            s.append(i)
    return s

# function to create frequent triples
def makeTriple(x):
  res = []
  for combo in combinations(x, 3):  # 2 for pairs, 3 for triplets, etc
    res.append((combo))
  return res

# function to count occurances of triples
def count_freq_3(rdd):
    return (rdd.map(isInPairTable)  #filter only frequent pairs
            .map(makeTriple) #makeComb
            .flatMap(lambda xs: [x[0:3] for x in xs])  #flatten triples
            .map(lambda word: (word, 1)) #add 1 to triples
            .reduceByKey(lambda a,b: a+b) #reduce and sum
            .filter(lambda x: x[1] >= min_support) # above support threshold
           )

The A-priori algorithm is developed as following

In [None]:
#set up
perc_min_supp = 0.05
min_support = round(perc_min_supp*n_baskets) #filter the value of the
print("Number of baskets considered in this project",n_baskets)
print("Min support considered is",f"{perc_min_supp:.0%}", "of the total number of baskets")
print("Total value of the min support",min_support)

Number of baskets considered in this project 87
Min support considered is 5% of the total number of baskets
Total value of the min support 4


1° pass

In [None]:
# count singletons occurance
sw_single = count_freq_1(sw)
print("number of frequent items: ",sw_single.count())
sw_single.take(10)

number of frequent items:  340


[('healthy', 5),
 ('sc', 5),
 ('chain', 4),
 ('significantly', 13),
 ('result', 4),
 ('used', 11),
 ('presence', 14),
 ('examined', 10),
 ('using', 17),
 ('showed', 15)]

In [None]:
# create a list of frequent singletons
pass1_out = sw_single.map(lambda x: x[0]).collect()
pass1_out[0:10]

['healthy',
 'sc',
 'chain',
 'significantly',
 'result',
 'used',
 'presence',
 'examined',
 'using',
 'showed']

2° pass

In [None]:
# count pairs
sw_pair = count_freq_2(sw)
print("number of frequent pairs",sw_pair.count())
sw_pair.take(10)

number of frequent pairs 230


[(('effect', 'presence'), 4),
 (('studies', 'demonstrated'), 4),
 (('patients', 'high'), 4),
 (('patients', 'performed'), 4),
 (('present', 'well'), 4),
 (('t3', 'suggest'), 6),
 (('group', 'groups'), 5),
 (('t0', 'protein'), 4),
 (('determine', 'whether'), 4),
 (('rats', 'significant'), 5)]

In [None]:
# create a list of frequent pairs
pass2_out = sw_pair.map(lambda x: x[0]).collect()
pass2_out[0:10]

[('effect', 'presence'),
 ('studies', 'demonstrated'),
 ('patients', 'high'),
 ('patients', 'performed'),
 ('present', 'well'),
 ('t3', 'suggest'),
 ('group', 'groups'),
 ('t0', 'protein'),
 ('determine', 'whether'),
 ('rats', 'significant')]

In [None]:
pass2_outFlat = np.array(pass2_out).flatten() #flat the list of frequent pairs in which triples will check
pass2_outFlat[0:10]

array(['effect', 'presence', 'studies', 'demonstrated', 'patients',
       'high', 'patients', 'performed', 'present', 'well'], dtype='<U14')

Further steps to find 3-size frequent items

In [None]:
# count triples
sw_triples = count_freq_3(sw)
print("number of frequent triples",sw_triples.count())
sw_triples.take(10)

number of frequent triples 4


[(('determine', 'whether', 'could'), 4),
 (('effects', 'also', 'results'), 4),
 (('effects', 'mgkg', 'significant'), 4),
 (('also', 'cell', 'may'), 4)]

In [None]:
# create a list of frequent triples
pass3_out = sw_triples.map(lambda x: x[0]).collect()

### Park, Chen, Yu

In [None]:
#define min support and number of buckets
n_buckets = math.floor(n_baskets/4)
perc_min_supp = 0.05
min_support = round(perc_min_supp*n_baskets) #filter the value of the
#min_support = 5
print("Number of baskets considered in this project",n_baskets)
print("n_buckets for hashing:", n_buckets)
print("Min support considered is",f"{perc_min_supp:.0%}", "of the total number of baskets")
print("Total value of the min support",min_support)

Number of baskets considered in this project 87
n_buckets for hashing: 21
Min support considered is 5% of the total number of baskets
Total value of the min support 4


1st pass

In [None]:
# 1.0 count items and find frequent one as A-priori
freq1 = count_freq_1(sw)
freq1_out = freq1.map(lambda x: x[0])
print('number of frequent items',freq1.count())
freq1.take(10)

number of frequent items 340


[('healthy', 5),
 ('sc', 5),
 ('chain', 4),
 ('significantly', 13),
 ('result', 4),
 ('used', 11),
 ('presence', 14),
 ('examined', 10),
 ('using', 17),
 ('showed', 15)]

In [None]:
# 1.a generate all pairs while examining each basket
nn_comb = 2
pairs = sw.map(lambda x: list(combinations(x,nn_comb))).flatMap(lambda x:x) # using itertool
print("frequent pairs",pairs.count())
pairs.take(5)

frequent pairs 313016


[('healthy', 'skeletal'),
 ('healthy', 'muscle'),
 ('healthy', 'mass'),
 ('healthy', 'essential'),
 ('healthy', 'attenuating')]

In [None]:
# 1.b: count each pair in the baskets
count_pairs =pairs.map(lambda word:(word,1)) # add 1 to each pair
count_pairs = count_pairs.reduceByKey(lambda a,b:a+b).sortBy(lambda x: x[1],False) # sum the pair grouping them by item
count_pairs.take(10)

[(('also', 'results'), 8),
 (('effect', 'results'), 7),
 (('also', 'suggest'), 7),
 (('t3', 'suggest'), 6),
 (('evaluated', 'results'), 6),
 (('presence', 'low'), 6),
 (('present', 'showed'), 6),
 (('data', 'suggest'), 6),
 (('also', 'may'), 6),
 (('increased', 'due'), 6)]

In [None]:
# 1.c: before hashing pairs to buckets, attached an index
comb_index=count_pairs.zipWithUniqueId().sortBy(lambda x: x[1],True)
#comb_index=count_pairs.zipWithIndex() #in alternative
comb_index.take(20)

[((('also', 'results'), 8), 0),
 ((('healthy', 'skeletal'), 1), 3),
 ((('effect', 'results'), 7), 4),
 ((('healthy', 'investigate'), 1), 7),
 ((('also', 'suggest'), 7), 8),
 ((('healthy', 'population'), 1), 11),
 ((('t3', 'suggest'), 6), 12),
 ((('healthy', 'fed'), 1), 15),
 ((('evaluated', 'results'), 6), 16),
 ((('healthy', 'fat'), 1), 19),
 ((('presence', 'low'), 6), 20),
 ((('healthy', 'displayed'), 1), 23),
 ((('present', 'showed'), 6), 24),
 ((('healthy', 'att'), 1), 27),
 ((('data', 'suggest'), 6), 28),
 ((('healthy', 'prolonged'), 1), 31),
 ((('also', 'may'), 6), 32),
 ((('healthy', 'delayed'), 1), 35),
 ((('increased', 'due'), 6), 36),
 ((('healthy', 'expression'), 1), 39)]

In [None]:
# 1.c: hashing pairs to buckets
# organize data in triples - bucket_id, pairs, value
pairs_buck = comb_index.map(lambda h:(h[1]%n_buckets,(h[0][0]), h[0][1]))
pairs_buck.take(30)

[(0, ('also', 'results'), 8),
 (3, ('healthy', 'skeletal'), 1),
 (4, ('effect', 'results'), 7),
 (7, ('healthy', 'investigate'), 1),
 (8, ('also', 'suggest'), 7),
 (11, ('healthy', 'population'), 1),
 (12, ('t3', 'suggest'), 6),
 (15, ('healthy', 'fed'), 1),
 (16, ('evaluated', 'results'), 6),
 (19, ('healthy', 'fat'), 1),
 (20, ('presence', 'low'), 6),
 (2, ('healthy', 'displayed'), 1),
 (3, ('present', 'showed'), 6),
 (6, ('healthy', 'att'), 1),
 (7, ('data', 'suggest'), 6),
 (10, ('healthy', 'prolonged'), 1),
 (11, ('also', 'may'), 6),
 (14, ('healthy', 'delayed'), 1),
 (15, ('increased', 'due'), 6),
 (18, ('healthy', 'expression'), 1),
 (19, ('also', 'cell'), 6),
 (1, ('healthy', 'collagen'), 1),
 (2, ('effects', 'results'), 6),
 (5, ('healthy', 'significant'), 1),
 (6, ('effects', 'suggest'), 6),
 (9, ('healthy', 'differences'), 1),
 (10, ('cell', 'may'), 6),
 (13, ('healthy', 'scs'), 1),
 (14, ('group', 'groups'), 5),
 (17, ('healthy', 'exogenous'), 1)]

In [None]:
# 1.d keep only bucket_id, value and reduce
buck_freq=pairs_buck.map(lambda t:(t[0],t[2])).reduceByKey(lambda a,b:a+b)
# 1.e filter frequent buckets
buckFreq = buck_freq.filter(lambda x:x[1]>=min_support).sortByKey()
buckFreq.take(20)

[(0, 14910),
 (1, 14904),
 (2, 14905),
 (3, 14906),
 (4, 14909),
 (5, 14904),
 (6, 14905),
 (7, 14906),
 (8, 14909),
 (9, 14903),
 (10, 14905),
 (11, 14906),
 (12, 14908),
 (13, 14902),
 (14, 14904),
 (15, 14906),
 (16, 14907),
 (17, 14902),
 (18, 14904),
 (19, 14906)]

In [None]:
# 1.f: get list of freq pairs
freq_buck = buckFreq.map(lambda x: x[0]).collect()
# 1.g: prepare as bitmap: 1 if in freq bucket table 0 otherwise
bitmap = pairs_buck.map(lambda a:((a[1],a[2]),1 if a[0] in freq_buck else 0 )) #pair,counter of the pair,bucket
bitmap.take(20)

[((('also', 'results'), 8), 1),
 ((('healthy', 'skeletal'), 1), 1),
 ((('effect', 'results'), 7), 1),
 ((('healthy', 'investigate'), 1), 1),
 ((('also', 'suggest'), 7), 1),
 ((('healthy', 'population'), 1), 1),
 ((('t3', 'suggest'), 6), 1),
 ((('healthy', 'fed'), 1), 1),
 ((('evaluated', 'results'), 6), 1),
 ((('healthy', 'fat'), 1), 1),
 ((('presence', 'low'), 6), 1),
 ((('healthy', 'displayed'), 1), 1),
 ((('present', 'showed'), 6), 1),
 ((('healthy', 'att'), 1), 1),
 ((('data', 'suggest'), 6), 1),
 ((('healthy', 'prolonged'), 1), 1),
 ((('also', 'may'), 6), 1),
 ((('healthy', 'delayed'), 1), 1),
 ((('increased', 'due'), 6), 1),
 ((('healthy', 'expression'), 1), 1)]

In [None]:
# 1.h: reorganize bitmap with bucket_id, pair, bit value
bitmap = bitmap.map(lambda a:(a[0][0],(a[0][1]),a[1]))
# 1.i: keep pairs and bitmap value = 1
bit1= bitmap.map(lambda x:(x[0],(x[1],x[2])))
bits_1 = bit1.filter(lambda x: (x[1][1] ==1)) # filter bit=1
bits_1.take(10)

[(('also', 'results'), (8, 1)),
 (('healthy', 'skeletal'), (1, 1)),
 (('effect', 'results'), (7, 1)),
 (('healthy', 'investigate'), (1, 1)),
 (('also', 'suggest'), (7, 1)),
 (('healthy', 'population'), (1, 1)),
 (('t3', 'suggest'), (6, 1)),
 (('healthy', 'fed'), (1, 1)),
 (('evaluated', 'results'), (6, 1)),
 (('healthy', 'fat'), (1, 1))]

2nd pass

In [None]:
def makePairs2(x):
    # function to create pairs of frequent singletons
    # using nested loops with all possible order
    res = []
    n = len(x)
    for i in range(n):
        for j in range(i+1, n):
            res.append((x[i], x[j]))
            res.append((x[j], x[i]))
    return res

In [None]:
# 2.0: take frequent items and make pairs
tt = makePairs2(freq1_out.collect())
# it is a list that need to be transformed into rdd
rdd1 = sc.parallelize(tt)
rdd1 = rdd1.map(lambda x: ((x[0],x[1]),1))
print("Number of pairs generated with freq.items:", rdd1.count())
rdd1.take(2)

Number of pairs generated with freq.items: 115260


[(('healthy', 'sc'), 1), (('sc', 'healthy'), 1)]

In [None]:
# 2.a: link the possible frequent pairs to bitmap with 1 in bit
freq2_bitmap = rdd1.join(bits_1) #join pairs (a,b) with their bucket_id and counter
freq2_bitmap.take(10)

[(('healthy', 'presence'), (1, (1, 1))),
 (('healthy', 'risk'), (1, (1, 1))),
 (('healthy', 'years'), (1, (1, 1))),
 (('healthy', 'identify'), (1, (1, 1))),
 (('healthy', 'mg'), (1, (1, 1))),
 (('assess', 'healthy'), (1, (1, 1))),
 (('healthy', 'two'), (1, (1, 1))),
 (('oral', 'healthy'), (1, (1, 1))),
 (('healthy', 'mice'), (1, (1, 1))),
 (('healthy', 'determine'), (1, (1, 1)))]

In [None]:
# 2.b: reorganize freq2 rdd keeping pairs and count
freq2_bitmap = freq2_bitmap.map(lambda x:(x[0],x[1][1][0]))
freq2_bitmap = freq2_bitmap.filter(lambda x: (x[1] >= min_support))
freq2_bitmap.take(10)

[(('rats', 'significantly'), 4),
 (('significantly', 'compared'), 4),
 (('c2', 'significantly'), 4),
 (('significantly', 'increased'), 4),
 (('level', 'significantly'), 5),
 (('presence', 'showed'), 4),
 (('presence', 'low'), 6),
 (('using', 'caused'), 4),
 (('t0', 'using'), 4),
 (('t0', 'showed'), 4)]

In [None]:
# 2.c: get frequent itemsets
freq_2 = freq2_bitmap.map(lambda x : x[0])
print("number of frequent pairs", freq_2.count())
freq_2.take(5)

number of frequent pairs 230


[('rats', 'significantly'),
 ('significantly', 'compared'),
 ('c2', 'significantly'),
 ('significantly', 'increased'),
 ('level', 'significantly')]

## Using the whole sample dataset
with the full sample dataset of 10000 baskets

In [None]:
sw = sw_sample
n_baskets = sw.count()
print("You are using the sample dataset")
print("Type of input sample file is:",type(sw))
print("Dimension of full dataset is:",n_baskets)

# Data preprocessing
sw = sw.map(lambda x: x.split(',')[1]) #select only TEXT column
firstRow = sw.first() # get first row of the headers
sw= sw.filter(lambda x: x!=firstRow) #remove first row
sw = sw.map(remove_duplicates) #remove duplicates
sw = sw.map(isNotStopWord)#.map(lambda x:x) #remove stopword
sw = sw.map(lambda word: word.lower()) # get lower case
sw = sw.map(lambda x: x.split(" "))


You are using the sample dataset
Type of input sample file is: <class 'pyspark.rdd.RDD'>
Dimension of full dataset is: 10000


### A-priori

In [None]:
#set up
perc_min_supp = 0.02
min_support = round(perc_min_supp*n_baskets) #filter the value of the
print("Number of baskets considered in this project",n_baskets)
print("Min support considered is",f"{perc_min_supp:.0%}", "of the total number of baskets")
print("Total value of the min support",min_support)

# Check time
startTime = time.time()

# count singletons occurance
sw_single = count_freq_1(sw)
# create a list of frequent singletons
#if sw_single.isEmpty()==True: print("No frequent singletons")
pass1_out = sw_single.map(lambda x: x[0]).collect()
print("--> Frequent singletons collected --> DONE")

# time check point
intTime = time.time()
t_inter1 =intTime-startTime
print("-> Intermediate time to calculate frequent singletons: ", t_inter1)

# Check time
startTime = time.time()

# count pairs
sw_pair = count_freq_2(sw)
#if sw_pair.isEmpty()==True: print("No frequent pairs")
# create a list of frequent pairs
pass2_out = sw_pair.map(lambda x: x[0]).collect()
print("--> Frequent pairs collected --> DONE")
pass2_outFlat = np.array(pass2_out).flatten() #flat the list

# time check point
intTime = time.time()
t_inter2 =intTime-startTime
print("-> Intermediate time to calculate frequent pairs: ", t_inter2)

# Check time
startTime = time.time()

# count triples
sw_triples = count_freq_3(sw)
#if sw_triples.isEmpty()==True: print("No frequent triples")
# create a list of frequent triples
pass3_out = sw_triples.map(lambda x: x[0]).collect()
print("--> Frequent triples collected --> DONE")

# time check point
intTime = time.time()
t_inter3 =intTime-startTime
t_Apriori = t_inter1+ t_inter2 + t_inter3

# Print results and time of calculation
print("\n")
print("-> Intermediate time to calculate frequent triples: ", t_inter3)
print("-> Total Duration: ", t_Apriori)
print("\n")
print("Number of frequent singletons", len(pass1_out))
print("Number of frequent pairs", len(pass2_out))
print("Number of frequent triples", len(pass3_out))
print("Total Duration: ", t_Apriori)

Number of baskets considered in this project 10000
Min support considered is 2% of the total number of baskets
Total value of the min support 200
--> Frequent singletons collected --> DONE
-> Intermediate time to calculate frequent singletons:  3.2787694931030273
--> Frequent pairs collected --> DONE
-> Intermediate time to calculate frequent pairs:  36.83454346656799
--> Frequent triples collected --> DONE


-> Intermediate time to calculate frequent triples:  40.39197635650635
-> Total Duration:  80.50528931617737


Number of frequent singletons 729
Number of frequent pairs 443
Number of frequent triples 0
Total Duration:  80.50528931617737


In [None]:
# save freq_single into the output data folder
with open('/content/drive/MyDrive/DSE_Colab_Output/algo_Apriori10000-1_output.txt', 'w')  as file:
    for i in pass1_out :
        file.write(str(i)+"\n")

with open('/content/drive/MyDrive/DSE_Colab_Output/algo_Apriori10000-2_output.txt', 'w')  as file:
    for i in pass1_out :
        file.write(str(i)+"\n")

with open('/content/drive/MyDrive/DSE_Colab_Output/algo_Apriori10000-3_output.txt', 'w')  as file:
    for i in pass1_out :
        file.write(str(i)+"\n")

### PCY

To perform PCY we would a forcing to limit amount of words in each text, otherwise in the first pass we would run into a lot of useless combinations. This fact was visible from step -by step tasks, but here below also some clarifications.

In [None]:
n_words = 85 #n.of words in each string of text
n_buck = math.floor(n_baskets/4) #number of buckets
perc_min_supp = 0.02
min_support = round(perc_min_supp*n_baskets)
#number of possible pairs
poss_pairs = n_baskets*(n_words*(n_words-1)/2)
#avg count of each bucket
avg_buck_count= (n_baskets*n_words**2)/(2*n_buck)
print('avg count of each bucket',avg_buck_count)
s_exp = (n_baskets*n_words**2)/(2*min_support )
print('Number of buckets expected:',s_exp)

avg count of each bucket 14450.0
Number of buckets expected: 180625.0


Make same changes to see a case in which we would benefit to have frequent buckets

In [None]:
n_words = 5 #n.of words in each string of text
n_buck = math.floor(n_baskets/4) #number of buckets
perc_min_supp = 0.0025
min_support = round(perc_min_supp*n_baskets)
#number of possible pairs
poss_pairs = n_baskets*(n_words*(n_words-1)/2)
#avg count of each bucket
avg_buck_count= (n_baskets*n_words**2)/(2*n_buck)
print('avg count of each bucket',avg_buck_count)
s_exp = (n_baskets*n_words**2)/(2*min_support )
print('Number of buckets expected:',s_exp)

avg count of each bucket 50.0
Number of buckets expected: 5000.0


In [None]:
# important adjustment to limit word in strings of text
sw = sw.map(lambda x: x[0:5])

#define min support and number of buckets
n_buckets = math.floor(n_baskets/4)
#perc_min_supp = 0.02
perc_min_supp = 0.0025
min_support = round(perc_min_supp*n_baskets) #filter the value of the
#min_support = 5
print("Number of baskets considered in this project",n_baskets)
print("n_buckets for hashing:", n_buckets)
print("Min support considered is",f"{perc_min_supp:.0%}", "of the total number of baskets")
print("Total value of the min support",min_support)

Number of baskets considered in this project 10000
n_buckets for hashing: 2500
Min support considered is 0% of the total number of baskets
Total value of the min support 25


In [None]:
#### 1st PASS ####

# Check time
startTime = time.time()

# count items and find frequent one as A-priori
freq1 = count_freq_1(sw)
freq1_out = freq1.map(lambda x: x[0])

#freq1.take(10)

# generate all pairs of the baskets
nn_comb = 2
pairs = sw.map(lambda x: list(combinations(x,nn_comb))).flatMap(lambda x:x) # using itertool

# 1.b: count each pair in the baskets
count_pairs =pairs.map(lambda word:(word,1)) # add 1 to each pair
count_pairs = count_pairs.reduceByKey(lambda a,b:a+b).sortBy(lambda x: x[1],False) # sum the pair grouping them by item

# 1.c: before hashing pairs to buckets, attached an index
comb_hash=count_pairs.zipWithUniqueId()#.sortBy(lambda x: x[1],True)
#comb_hash=count_pairs.zipWithIndex()

# 1.d: hashing pairs to buckets
pairs_buck = comb_hash.map(lambda h:(h[1]%n_buckets,(h[0][0]), h[0][1]))
buck_freq=pairs_buck.map(lambda t:(t[0],t[2])).reduceByKey(lambda a,b:a+b) #organize data in triples - bucket_id, pairs, value
# 1.e: check frequent buckets
buckFreq = buck_freq.filter(lambda x:x[1]>=min_support).sortByKey()


# 1.f: get list of freq pairs
freq_buck = buckFreq.map(lambda x: x[0]).collect()
# 1.g-h: if in freq bucket table 0 otherwise
bitmap = pairs_buck.map(lambda a:((a[1],a[2]),1 if a[0] in freq_buck else 0 )) #pair,counter of the pair,bucket
bitmap = bitmap.map(lambda a:(a[0][0],(a[0][1]),a[1]))


# 1.h: keep pairs and bitmap value
bit1= bitmap.map(lambda x:(x[0],(x[1],x[2])))
bits_1 = bit1.filter(lambda x: (x[1][1] ==1)) # filter bit=1


intTime = time.time()
t_PCY1 =intTime-startTime
print("-> Intermediate time of 1st pass: ", t_PCY1)
print("\n")
print('Number of frequent items:',freq1.count())
print("Genarated pairs:",pairs.count())
print("Number of frequent buckets:", buckFreq.count())



-> Intermediate time of 1st pass:  7.88161039352417


Number of frequent items: 275
Genarated pairs: 99986
Number of frequent buckets: 782


In [None]:
#### 2nd PASS ####

# Check time
startTime = time.time()

# 2.0: take frequent items and make pairs
tt = makePairs2(freq1_out.collect())
rdd1 = sc.parallelize(tt)
rdd1 = rdd1.map(lambda x: ((x[0],x[1]),1))

# 2.a: link the possible frequent pairs to bitmap of pair with 1
freq2_bitmap = rdd1.join(bits_1) #join pairs (a,b) with their bucket_id and counter
freq2_bitmap = freq2_bitmap.map(lambda x:(x[0],x[1][1][0]))
#freq2 = freq2_bitmap
freq2 = freq2_bitmap.filter(lambda x: (x[1] >= min_support))

# 2.b get frequent itemsets
freq2_out = freq2.map(lambda x : x[0])

intTime = time.time()
t_PCY2 =intTime-startTime
t_pcy = t_PCY1+t_PCY2

# Print results and time of calculation
print("\n")
print("-> Intermediate time of 2nd pass: ", t_PCY2)
print("-> Total Duration: ", t_pcy)
print("\n")
print("Number of frequent pairs", freq2.count())
# print("Number of frequent pairs", len(freq_2pcy))



-> Intermediate time of 2nd pass:  0.7181837558746338
-> Total Duration:  8.599794149398804


Number of frequent pairs 17


In [None]:
# save freq_pairs into the output data folder
with open('/content/drive/MyDrive/DSE_Colab_Output/algo_PCY10000-1_output.txt', 'w')  as file:
    for i in freq1.sortBy(lambda x: x[1],False).collect():
        file.write(str(i)+"\n")

# save freq_pairs into the output data folder
with open('/content/drive/MyDrive/DSE_Colab_Output/algo_PCY10000-2_output.txt', 'w')  as file:
    for i in freq2.sortBy(lambda x: x[1],False).collect():
        file.write(str(i)+"\n")

### A-Priori on same sample dataset modified for PCY

In [None]:
# Check time
startTime = time.time()

# count singletons occurance
sw_single = count_freq_1(sw)
# create a list of frequent singletons
#if sw_single.isEmpty()==True: print("No frequent singletons")
pass1_out = sw_single.map(lambda x: x[0]).collect()
print("--> Frequent singletons collected --> DONE")

# time check point
intTime = time.time()
t_inter1 =intTime-startTime
print("-> Intermediate time to calculate frequent singletons: ", t_inter1)

# Check time
startTime = time.time()

# count pairs
sw_pair = count_freq_2(sw)
#if sw_pair.isEmpty()==True: print("No frequent pairs")
# create a list of frequent pairs
pass2_out = sw_pair.map(lambda x: x[0]).collect()
print("--> Frequent pairs collected --> DONE")
pass2_outFlat = np.array(pass2_out).flatten() #flat the list

# time check point
intTime = time.time()
t_inter2 =intTime-startTime
print("-> Intermediate time to calculate frequent pairs: ", t_inter2)

# Check time
startTime = time.time()

# count triples
sw_triples = count_freq_3(sw)
#if sw_triples.isEmpty()==True: print("No frequent triples")
# create a list of frequent triples
pass3_out = sw_triples.map(lambda x: x[0]).collect()
print("--> Frequent triples collected --> DONE")

# time check point
intTime = time.time()
t_inter3 =intTime-startTime
t_Apriori = t_inter1+ t_inter2 + t_inter3

# Print results and time of calculation
print("\n")
print("-> Intermediate time to calculate frequent triples: ", t_inter3)
print("-> Total Duration: ", t_Apriori)
print("\n")
print("Number of frequent singletons", len(pass1_out))
print("Number of frequent pairs", len(pass2_out))
print("Number of frequent triples", len(pass3_out))
print("Total Duration: ", t_Apriori)

--> Frequent singletons collected --> DONE
-> Intermediate time to calculate frequent singletons:  2.1801981925964355
--> Frequent pairs collected --> DONE
-> Intermediate time to calculate frequent pairs:  2.223590612411499
--> Frequent triples collected --> DONE


-> Intermediate time to calculate frequent triples:  2.2293832302093506
-> Total Duration:  6.633172035217285


Number of frequent singletons 275
Number of frequent pairs 17
Number of frequent triples 0
Total Duration:  6.633172035217285


In [None]:
# save freq_single into the output data folder
with open('/content/drive/MyDrive/DSE_Colab_Output/algo_Apriori10000-1mod_output.txt', 'w')  as file:
    for i in sw_single.sortBy(lambda x: x[1],False).collect() :
        file.write(str(i)+"\n")

with open('/content/drive/MyDrive/DSE_Colab_Output/algo_Apriori10000-2mod_output.txt', 'w')  as file:
    for i in sw_pair.sortBy(lambda x: x[1],False).collect() :
        file.write(str(i)+"\n")

## Main execution

Use the full-data.csv file to evaluate results on the full dataset

In [None]:
sw = sw_full
n_baskets = sw.count()
print("You are using the full dataset")
print("Type of input sample file is:",type(sw))
print("Dimension of full dataset is:",n_baskets)

You are using the full dataset
Type of input sample file is: <class 'pyspark.rdd.RDD'>
Dimension of full dataset is: 14393620


In [None]:
# Data preprocessing
sw = sw.map(lambda x: x.split(',')[0]) #select only TEXT column
firstRow = sw.first() # get first row of the headers
sw= sw.filter(lambda x: x!=firstRow) #remove first row
sw = sw.map(remove_duplicates) #remove duplicates
sw = sw.map(isNotStopWord)#.map(lambda x:x) #remove stopword
#sw = sw.map(remove_dupAndStop) #remove duplicates and stop words
sw = sw.map(lambda word: word.lower()) # get lower case
sw = sw.map(lambda x: x.split(" "))
#sw.take(2)

In [None]:
sw.first()

['alphabisabolol',
 'primary',
 'antipeptic',
 'action',
 'depending',
 'dosage',
 'caused',
 'alteration',
 'phvalue',
 'proteolytic',
 'activity',
 'pepsin',
 'reduced',
 'percent',
 'addition',
 'bisabolol',
 'ratio',
 'occurs',
 'case',
 'direct',
 'contact',
 'previous',
 'atp',
 'inhibiting',
 'effect',
 'lost']

### A-priori

Run full dataset with the developed A-priori algorithm

In [None]:
#set up
perc_min_supp = 0.02
min_support = round(perc_min_supp*n_baskets) #filter the value of the
print("Number of baskets considered in this project",n_baskets)
print("Min support considered is",f"{perc_min_supp:.0%}", "of the total number of baskets")
print("Total value of the min support",min_support)

# Check time
startTime = time.time()

# count singletons occurance
sw_single = count_freq_1(sw)
# create a list of frequent singletons
#if sw_single.isEmpty()==True: print("No frequent singletons")
pass1_out = sw_single.map(lambda x: x[0]).collect()
print("--> Frequent singletons collected --> DONE")

# time check point
intTime = time.time()
t_inter1 =intTime-startTime
print("-> Intermediate time to calculate frequent singletons: ", t_inter1)

Number of baskets considered in this project 14393620
Min support considered is 2% of the total number of baskets
Total value of the min support 287872
--> Frequent singletons collected --> DONE
-> Intermediate time to calculate frequent singletons:  3003.829658508301


Set a connection to google drive to export frequent itemset that we computed thanks to the algorithms

In [None]:
# save freq_single into the output data folder
with open('/content/drive/MyDrive/DSE_Colab_Output/algo_Apriori1_output.txt', 'w')  as file:
    for i in pass1_out :
        file.write(str(i)+"\n")

In [None]:
# Check time
startTime = time.time()

# count pairs
sw_pair = count_freq_2(sw)
#if sw_pair.isEmpty()==True: print("No frequent pairs")
# create a list of frequent pairs
pass2_out = sw_pair.map(lambda x: x[0]).collect()
print("--> Frequent pairs collected --> DONE")
pass2_outFlat = np.array(pass2_out).flatten() #flat the list

# time check point
intTime = time.time()
t_inter2 =intTime-startTime
print("-> Intermediate time to calculate frequent pairs: ", t_inter2)

--> Frequent pairs collected --> DONE
-> Intermediate time to calculate frequent pairs:  25835.283787488937


In [None]:
# save freq_pairs into the output data folder
with open('/content/drive/MyDrive/DSE_Colab_Output/algo_Apriori2_output.txt', 'w')  as file:
    for i in pass2_out:
        file.write(str(i)+"\n")

In [None]:
# to be used to import back data after reconnection
from ast import literal_eval as make_tuple

# reassign pass1_out
p_list=[]
a=open('/content/drive/MyDrive/DSE_Colab_Output/algo_Apriori1_output.txt',"r").readlines()
for i in a:
    if i!="":
        p_list.append(i.replace("\n",""))
pass1_out = p_list

# reassign pass2_out
tuple_list=[]
a=open('/content/drive/MyDrive/DSE_Colab_Output/algo_Apriori2_output.txt',"r").readlines()
for i in a:
    if i!="":
        temp=make_tuple(i)
        if len(temp)<=4:
            tuple_list.append(temp)
pass2_out = tuple_list
pass2_outFlat = np.array(pass2_out).flatten() #flat the list

# reassign set up
perc_min_supp = 0.02
min_support = round(perc_min_supp*n_baskets) #filter the value of the
print("Number of baskets considered in this project",n_baskets)
print("Min support considered is",f"{perc_min_supp:.0%}", "of the total number of baskets")
print("Total value of the min support",min_support)

# reassign based on results
t_inter1 = 3003.8
t_inter2 = 25835.3

Number of baskets considered in this project 14393620
Min support considered is 2% of the total number of baskets
Total value of the min support 287872


In [None]:
# Check time
startTime = time.time()

# count triples
sw_triples = count_freq_3(sw)
#if sw_triples.isEmpty()==True: print("No frequent triples")
# create a list of frequent triples
pass3_out = sw_triples.map(lambda x: x[0]).collect()
#print("--> Frequent triples collected --> DONE")

# time check point
intTime = time.time()
t_inter3 =intTime-startTime


In [None]:
t_Apriori = t_inter1+ t_inter2 + t_inter3

# Print results and time of calculation
print("\n")
print("-> Intermediate time to calculate frequent triples: ", t_inter3)
print("-> Total Duration: ", t_Apriori)
print("\n")
print("Number of frequent singletons", len(pass1_out))
print("Number of frequent pairs", len(pass2_out))
print("Number of frequent triples", len(pass3_out))
print("Total Duration: ", t_Apriori)



-> Intermediate time to calculate frequent triples:  13281.59319972992
-> Total Duration:  42120.69319972992


Number of frequent singletons 649
Number of frequent pairs 180
Number of frequent triples 0
Total Duration:  42120.69319972992


In [None]:
# save freq_triples into the output data folder
with open('/content/drive/MyDrive/DSE_Colab_Output/algo_Apriori3_output.txt', 'w')  as file:
    for i in pass3_out:
        file.write(str(i)+"\n")

### PCY

Run full dataset analysis using PCY algorithm -> not feasible