<a href="https://colab.research.google.com/github/TommasoLocatelli/to-the-master-s-degree-and-beyond-/blob/main/AMD/AMD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# INSTRUCTIONS

**Market-basket analysis**

The task is to implement a system finding frequent itemsets (aka market-basket analysis), analyzing the «Ukraine Conflict Twitter» dataset that is published on Kaggle and released under the CC-BY-SA 4.0 license, with attribution required. The detector must consider as baskets the strings contained in the text column of the CSV files in the dataset, using words as items.

**Solution**

This notebook implement the Toivonen's algorithm.

# LIBRARIES and FUNCTIONS

Installing and importing libraries needed later.

In [23]:
import os

In [24]:
%%capture
!pip install pyspark
!pip install findspark
!wget http://setup.johnsnowlabs.com/colab.sh -O - | bash

In [25]:
import numpy as np
import pandas as pd
import csv
import os
from pyspark.sql import SparkSession, Row
import random
import sparknlp
import math

from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline

In [26]:
from operator import add
import time

Functions needed in the pass over the sample section

In [27]:
def unpack(record):
    unpacked_list = []
    x=record
    while type(x[0][0])!=str:
      unpacked_list.append(x[1])
      x=x[0]
    unpacked_list.append(x[1])
    unpacked_list.append(x[0])
    return tuple(unpacked_list)

def distinct_subtuple(record):
  value=True
  for i in range(len(record)):
    for j in range(i+1,len(record)):
      if record[i]==record[j]:
        value=False
  return value

def words_tuple(record):
  Set=set()
  for t in record:
    for w in t:
      Set.add(w)
  return tuple(Set)

# DATA PREPARATION

## Download the data with the Kaggle API

In [None]:
# Access token to kaggle

os.environ["KAGGLE_USERNAME"] = "tommasolocatelli"
os.environ["KAGGLE_KEY"] = str(hex(251792213593000947272027102748106226772))[2:]

In [None]:
%%capture
!kaggle datasets download -d bwandowando/ukraine-russian-crisis-twitter-dataset-1-2-m-rows
!unzip ukraine-russian-crisis-twitter-dataset-1-2-m-rows.zip -d data

In [None]:
# Move all *.gzip files to *.gz in order to have spark read directly the compressed file
sh = """
for file in data/*.gzip; do
    mv "$file" "data/$(basename "$file" .gzip).gz"
done
"""
with open('script.sh', 'w') as file:
  file.write(sh)

!bash script.sh

## Ingest data in a spark context

In [28]:
# Build spark context
spark = sparknlp.start()
sc = spark.sparkContext

In [29]:
# Choose which file to read
FILENAME = r"data/0819_UkraineCombinedTweetsDeduped.csv.gz"
raw_df = spark.read.csv(FILENAME, header=True, escape="\"", quote="\"", multiLine=True)

In [30]:
# Let's focus only on Italian tweets
raw_df = raw_df.where(raw_df.language == "it").select("text")

In [31]:
# Tweet example
raw_df.head()['text']

'Stesso fiuto politico, stesso naso x la sostanza. Stupefacente! Si è finalmente capito quale legame intellettuale leghi #SannaMarin e #Zelensky . ❄️☃️❄️'

In [32]:
# Number of tweets
raw_df.count()

1898

## Data cleaning

In [33]:
# In this step we define a sparkNLP pipeline which will preprocess our data
# by tokenizing it and removing unwanted tokens

documentAssembler = DocumentAssembler() \
    .setInputCol("text") \
    .setOutputCol("document")

tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

linkRemover = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("tokensWoutLinks") \
    .setCleanupPatterns(["http\S+|www\S+|https\S+"]) \
    .setLowercase(True)

punctuationRemover = Normalizer() \
    .setInputCols(["tokensWoutLinks"]) \
    .setOutputCol("tokensWoutLinksAndPuct") \
    .setCleanupPatterns(["(?U)[^\w -]|_|-(?!\w)|(?<!\w)-"])

stopWordsCleaner = StopWordsCleaner.pretrained("stopwords_it", "it") \
      .setInputCols("tokensWoutLinksAndPuct")\
      .setOutputCol("cleanedTokens")\
      .setCaseSensitive(False)

stemmer = Stemmer() \
    .setInputCols(["cleanedTokens"]) \
    .setOutputCol("cleanedStemmedTokens")

pipeline = Pipeline().setStages([
    documentAssembler,
    tokenizer,
    linkRemover,
    punctuationRemover,
    stopWordsCleaner,
    stemmer
])

result = pipeline.fit(raw_df).transform(raw_df)

cleaned_df = result.selectExpr("cleanedStemmedTokens.result")

stopwords_it download started this may take some time.
Approximate size to download 2.4 KB
[OK!]


## RDD

In [34]:
#finally obtaining the rdd on which the analysis will be done
input_rdd = cleaned_df.rdd.map(lambda x: (1,list(set(x[0]))))

In [35]:
#check the actual number of partition
input_rdd.getNumPartitions()

1

# TOIVONEN STEP BY STEP

## Set the parametes and sample selection

Check the tot number of basket to choose a resonable support threshold

In [None]:
number_of_baskets=input_rdd.count()
print(number_of_baskets)

1898


Parameters:

1. **s** is the support threshold for the whole
dataset
2. **p** is the sample size fraction
3. **a** is the regularization term

In [None]:
s=170
p=0.1
a=0.95
seed=1

**t** is the threshold for the sample **t = a p s**

In [None]:
t=a*p*s
print(t)

16.15


"The safest way to pick the sample is to read the entire dataset, and for each
basket, select that basket for the sample with some fixed probability p. Suppose
there are m baskets in the entire file. At the end, we shall have a sample whose
size is very close to pm baskets. However, if we have reason to believe that the
baskets appear in random order in the file already, then we do not even have
to read the entire file. We can select the first pm baskets for our sample. Or, if
the file is part of a distributed file system, we can pick some chunks at random
to serve as the sample." Mining of massive dataset

In [None]:
sample_rdd=input_rdd.sample(False,p,seed)

In [None]:
actual_sample_size=sample_rdd.count()
print(actual_sample_size)

183


In [None]:
p*number_of_baskets

189.8

It is needed to correct **t** as the actual sample size is different from the expected

In [None]:
t=s*a*(actual_sample_size/number_of_baskets)
print(t)

15.571390937829294


## aPriori

To find the candidate in the sample I use an aPriori function developed by another project.

https://github.com/ranieri-unimi/ukraine-malchiodi-2022/blob/main/ukraine.ipynb

In [36]:
def is_sub(sub, lst) : return all(e in lst for e in sub)
def add(a, b) : return a+b
def splat(t): return tuple(sorted(list(j for i in t for j in (i if isinstance(i, tuple) else (i,)))))
def doubled(t): return len(set(t)) == len(t)

def aPriori(data, THRESHOLD):

  frequent_items_pipe = (data
                    .flatMap(lambda x: x[-1])
                    .map(lambda x: (x,1))
                    .reduceByKey(add)
                    .filter(lambda x: x[-1] > THRESHOLD)
                    )

  frequent_items = frequent_items_pipe.collect()
  frequent_items_pipe = frequent_items_pipe.map(lambda x: (1, x[0]))

  frequent_itemsets = frequent_items
  frequent_itemsets_pipe = frequent_items_pipe

  while len(frequent_itemsets):
    yield frequent_itemsets
    ### COUNTING PHASE
    candidate_itemsets_pipe = (frequent_itemsets_pipe
                          .join(frequent_items_pipe)
                          .map(lambda x : x[-1])
                          .map(splat)
                          .distinct()
                          .filter(doubled)
                          )
    
    candidate_itemsets = candidate_itemsets_pipe.collect()

    ### FILTER PHASE
    frequent_itemsets_pipe = (data
                         .map(lambda x : [(pair, is_sub(pair, x[-1])) for pair in candidate_itemsets] )
                         .flatMap(lambda x : x)
                         .reduceByKey(add)
                         .filter(lambda x: x[-1] > THRESHOLD)
                         )
    
    frequent_itemsets = frequent_itemsets_pipe.collect()
    frequent_itemsets_pipe = frequent_itemsets_pipe.map(lambda x: (1, x[0]))

## Pass over the sample

### step by step

In [None]:
#all the singletons in the dataset
singleton_itemsets_rdd=input_rdd.flatMap(lambda x: (x[1]))\
                                .distinct()

#frequent itemstes generator
fgen=aPriori(sample_rdd, t)

#frequent singletons
Candidates_singleton=sc.parallelize(next(fgen))\
                        .map(lambda x: (x[0]))

#all the singletons not frequent in the sample
NB_singleton_itemsets = singleton_itemsets_rdd.subtract(Candidates_singleton)

In [None]:
singleton_itemsets_rdd.take(3)

['sostanza', 'capito', 'leghi']

In [None]:
singleton_itemsets_rdd.count()

9476

In [None]:
Candidates_singleton.take(3)

['sannamarin', 'zelenski', 'putin']

In [None]:
Candidates_singleton.count()

47

In [None]:
NB_singleton_itemsets.take(3)

['sostanza', 'leghi', 'x']

In [None]:
NB_singleton_itemsets.count()

9429

In [None]:
#frequent couples
Candidates_couple=sc.parallelize(next(fgen))\
                    .map(lambda x: x[0])\
                    .map(lambda x: tuple(sorted(list(x))))

In [None]:
Candidates_couple.take(2)

[('russia', 'ucraina'), ('tv', 'ucraina')]

In [None]:
Candidates_couple.count()

116

In [None]:
#negative border couples
NB_couple=Candidates_singleton.cartesian(Candidates_singleton)\
                              .filter(lambda x: len(x)==len(set(x)))\
                              .map(lambda x: tuple(sorted(list(x))))\
                              .distinct()\
                              .subtract(Candidates_couple)

In [None]:
NB_couple.take(10)

[('sannamarin', 'zelenski'),
 ('russo', 'sannamarin'),
 ('russia', 'sannamarin'),
 ('president', 'sannamarin'),
 ('lucraina', 'sannamarin'),
 ('ucraina', 'zelenski'),
 ('crimea', 'zelenski'),
 ('missili', 'putin'),
 ('medvedev', 'putin'),
 ('19agosto', 'putin')]

In [None]:
NB_couple.count()

965

In [None]:
# frequent triples
Candidates_triplet=sc.parallelize(next(fgen))\
                              .map(lambda x: x[0])\
                              .map(lambda x: tuple(sorted(list(x))))

In [None]:
Candidates_triplet.take(2)

[('2022', 'tv', 'ucraina'), ('homepag', 'tv', 'ucraina')]

In [None]:
Candidates_triplet.count()

455

In [None]:
NB_triplet=Candidates_couple.cartesian(Candidates_couple)\
                            .cartesian(Candidates_couple)\
                            .map(unpack)\
                            .filter(distinct_subtuple)\
                            .map(words_tuple)\
                            .filter(lambda x: len(x)==3)\
                            .map(lambda x: tuple(sorted(list(x))))\
                            .distinct()\
                            .subtract(Candidates_triplet)

In [None]:
NB_triplet.take(1)

[('putin', 'russia', 'ucraina')]

In [None]:
NB_triplet.count()

2

### generalization

**Problem**

How to obtain the sets with cardinality n that have to belong to the negative border given the sets of cardinality n-1 frequent in the sample.

**Procedure**

1. take the rdd of tuples representing itemsets frequent in the sample and cycle the cartesian product with itself n-1 times, and unpack the result in order to obtain a tuple with n elements each being a tuple of n-1 elements

2. filter out the tuples that have repetitions inside

3. for each tuple of tuples obtain a tuple of the distinct words

4. filter out the tuples with len different from n

5. sort them and take the dinstinct tuples

6. subtract the tuples the are frequent

**"Proof"**

if the len of the tuple is equal to n in step 4, it means that there are n differe subtuples frequent in the sample made just of the words inside the tuple take into account, so all the subsets are frequent. By filtering out the tuples frequent in the step 6 we obtain exactly the negative border.

In [37]:
def pass_over_the_sample(input_rdd,sample_rdd,t):

  #all the singletons in the dataset
  singleton_itemsets_rdd=input_rdd.flatMap(lambda x: x[1])\
                                  .distinct()
  #frequent itemstes generator
  fgen=aPriori(sample_rdd, t)

  #frequent singletons
  Candidates_singleton=sc.parallelize(next(fgen))\
                          .map(lambda x: x[0])

  #all the singletons not frequent in the sample
  NB_singleton_itemsets = singleton_itemsets_rdd.subtract(Candidates_singleton)

  Candidates=Candidates_singleton.map(lambda x: (x,))
  Negative_border=NB_singleton_itemsets.map(lambda x: (x,))

  #initialize the cardinality counter
  n=1
  #initialize the precedent candidats
  prec_Candidates_tuple=Candidates_singleton

  while True:
    n+=1
    try:
      # frequent tuples
      Candidates_tuple=sc.parallelize(next(fgen))\
                              .map(lambda x: x[0])\
                              .map(lambda x: tuple(sorted(list(x))))
      #step 1
      NB_tuple=prec_Candidates_tuple
      for i in range(n-1):
        NB_tuple=NB_tuple.cartesian(prec_Candidates_tuple)
      NB_tuple=NB_tuple.map(unpack)

      #step 2-6
      NB_tuple=NB_tuple.filter(distinct_subtuple)\
                        .map(words_tuple)\
                        .filter(lambda x: len(x)==n)\
                        .map(lambda x: tuple(sorted(list(x))))\
                        .distinct()\
                        .subtract(Candidates_tuple)

      Candidates+=Candidates_tuple
      Negative_border+=NB_tuple

      prec_Candidates_tuple=Candidates_tuple

    except:
      #step1
      NB_tuple=prec_Candidates_tuple
      for i in range(n-1):
        NB_tuple=NB_tuple.cartesian(prec_Candidates_tuple)
      NB_tuple=NB_tuple.map(unpack)

      #step 2-5
      NB_tuple=NB_tuple.filter(distinct_subtuple)\
                        .map(words_tuple)\
                        .filter(lambda x: len(x)==n)\
                        .map(lambda x: tuple(sorted(list(x))))\
                        .distinct()

      Negative_border+=NB_tuple
      break
      
  return Candidates,Negative_border

In [None]:
# validation on the book example
sample_list=[(1,['B','C']),(1,['B','C']),(1,['C','D']),(1,['C','D']),(1,['A']),(1,['A'])]
#sample_list+=[(1,['B','D']),(1,['B','D'])] # if you want to test an higher cardinality negative border set
input_list=sample_list+[(1,['E'])]

input=sc.parallelize(input_list)
sample=sc.parallelize(sample_list)

In [None]:
C,NB=pass_over_the_sample(input,sample,1)

In [None]:
C.collect()

[('C',), ('B',), ('D',), ('A',), ('B', 'C'), ('C', 'D')]

In [None]:
NB.collect()

[('E',), ('A', 'B'), ('B', 'D'), ('A', 'D'), ('A', 'C')]

## Pass over the entire dataset

In [None]:
candidates,negative_border=pass_over_the_sample(input_rdd,sample_rdd,t)

In [None]:
candidates.collect()

[('putin',),
 ('zelenski',),
 ('russia',),
 ('nato',),
 ('ucraina',),
 ('guerra',),
 ('putin', 'russia')]

In [None]:
negative_border.count()

9470

In [38]:
def pass_over_the_entire_dataset(input_rdd,cands,nb,s):
  #check the negative border
  num_nb_frequent_itemsets=input_rdd.map(lambda x: x[1])\
        .cartesian(nb)\
        .filter(lambda x: set(x[1]).issubset(set(x[0])))\
        .map(lambda x: (x[1],1))\
        .reduceByKey(add)\
        .filter(lambda x: x[1]>s).count()
  if num_nb_frequent_itemsets==0:
    #check the candidates
    frequent_itemsets=input_rdd.map(lambda x: x[1])\
        .cartesian(cands)\
        .filter(lambda x: set(x[1]).issubset(set(x[0])))\
        .map(lambda x: (x[1],1))\
        .reduceByKey(add)\
        .filter(lambda x: x[1]>s)
    response=frequent_itemsets
  else:
    response=f'{num_nb_frequent_itemsets} sets in the negative border happen to be frequent in the dataset!'
  return response

In [None]:
response=pass_over_the_entire_dataset(input_rdd,candidates,negative_border,s)

In [None]:
response.collect()

[(('zelenski',), 211),
 (('russia',), 550),
 (('ucraina',), 421),
 (('nato',), 214),
 (('putin', 'russia'), 174),
 (('putin',), 636)]

# FINAL ALGORITHM and APPLICATION

In [39]:
def toivonen(input_rdd,s,p,a,seed):
  start_time = time.time()
  
  #parameters and sample selection
  number_of_baskets=input_rdd.count()
  sample_rdd=input_rdd.sample(False,p,seed)
  actual_sample_size=sample_rdd.count()
  t=s*a*(actual_sample_size/number_of_baskets)
  mid_time=time.time()
  print(f'Sample selection ended after {mid_time - start_time} seconds')

  #pass over the sample
  candidates,negative_border=pass_over_the_sample(input_rdd,sample_rdd,t)
  mid_time2=time.time()
  print(f'Pass over the sample ended after {mid_time2 - mid_time} seconds')

  #pass over the entire dataset
  response=pass_over_the_entire_dataset(input_rdd,candidates,negative_border,s)
  fin_time=time.time()
  print(f'Pass over the entire dataset ended after {fin_time - mid_time2} seconds')

  return response

Experiment 1

In [54]:
s=170
p=0.05
a=0.95
seed=2

In [55]:
response=toivonen(input_rdd,s,p,a,seed)

Sample selection ended after 5.090605974197388 seconds
Pass over the sample ended after 8.51623249053955 seconds
Pass over the entire dataset ended after 58.71092963218689 seconds


In [56]:
if type(response) is str:
  print(response)
else:
  print(f'There are {response.count()} frequent itemsets')

1 sets in the negative border happen to be frequent in the dataset!


Experiment 2

In [57]:
s=170
p=0.05
a=0.7
seed=2

In [58]:
response=toivonen(input_rdd,s,p,a,seed)

Sample selection ended after 4.862320423126221 seconds
Pass over the sample ended after 8.253049850463867 seconds
Pass over the entire dataset ended after 58.59363532066345 seconds


In [59]:
if type(response) is str:
  print(response)
else:
  print(f'There are {response.count()} frequent itemsets')

There are 6 frequent itemsets


In [60]:
response.collect()

[(('ucraina',), 421),
 (('russia',), 550),
 (('zelenski',), 211),
 (('nato',), 214),
 (('putin', 'russia'), 174),
 (('putin',), 636)]