## Spark setting

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar xf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

In [3]:
import findspark
findspark.init("spark-3.0.3-bin-hadoop2.7")# SPARK_HOME
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [4]:
import pyspark
type(spark)

pyspark.sql.session.SparkSession

## Kaggle APIs

________________________________________________________________________

In [5]:
from google.colab import files
files.upload() #upload kaggle.json

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"federicofiorio","key":"3c9fda0dd26cc20d5f652a577743142c"}'}

In [6]:
! pip install kaggle



In [7]:
! mkdir ~/.kaggle

In [8]:
!cp kaggle.json ~/.kaggle/

In [9]:
!ls ~/.kaggle

kaggle.json


In [10]:
!chmod 600 /root/.kaggle/kaggle.json

In [11]:
!kaggle datasets download -d bwandowando/ukraine-russian-crisis-twitter-dataset-1-2-m-rows

Downloading ukraine-russian-crisis-twitter-dataset-1-2-m-rows.zip to /content
100% 7.48G/7.49G [00:53<00:00, 176MB/s]
100% 7.49G/7.49G [00:53<00:00, 150MB/s]


In [12]:
!ls

kaggle.json
sample_data
spark-3.0.3-bin-hadoop2.7
spark-3.0.3-bin-hadoop2.7.tgz
ukraine-russian-crisis-twitter-dataset-1-2-m-rows.zip


In [13]:
!unzip ukraine-russian-crisis-twitter-dataset-1-2-m-rows.zip

Archive:  ukraine-russian-crisis-twitter-dataset-1-2-m-rows.zip
  inflating: 0401_UkraineCombinedTweetsDeduped.csv.gzip  
  inflating: 0402_UkraineCombinedTweetsDeduped.csv.gzip  
  inflating: 0403_UkraineCombinedTweetsDeduped.csv.gzip  
  inflating: 0404_UkraineCombinedTweetsDeduped.csv.gzip  
  inflating: 0405_UkraineCombinedTweetsDeduped.csv.gzip  
  inflating: 0406_UkraineCombinedTweetsDeduped.csv.gzip  
  inflating: 0407_UkraineCombinedTweetsDeduped.csv.gzip  
  inflating: 0408_UkraineCombinedTweetsDeduped.csv.gzip  
  inflating: 0409_UkraineCombinedTweetsDeduped.csv.gzip  
  inflating: 0410_UkraineCombinedTweetsDeduped.csv.gzip  
  inflating: 0411_UkraineCombinedTweetsDeduped.csv.gzip  
  inflating: 0412_UkraineCombinedTweetsDeduped.csv.gzip  
  inflating: 0413_UkraineCombinedTweetsDeduped.csv.gzip  
  inflating: 0414_UkraineCombinedTweetsDeduped.csv.gzip  
  inflating: 0415_UkraineCombinedTweetsDeduped.csv.gzip  
  inflating: 0416_UkraineCombinedTweetsDeduped.csv.gzip  
  inflat

## Use the data

________________________________________________________________________

In [14]:
import numpy as np
import pandas as pd
import csv
import os
import warnings

In [15]:
filename = r"0401_UkraineCombinedTweetsDeduped.csv.gzip"
df = pd.read_csv(filename, compression='gzip', index_col=0,encoding='utf-8', quoting=csv.QUOTE_ALL)

  exec(code_obj, self.user_global_ns, self.user_ns)


In [16]:
df[0:2].T

Unnamed: 0,0,1
userid,16882774,3205296069
username,Yaniela,gregffff
acctdesc,"Animal lover, supports those who fight injusti...",
location,Hawaii,
following,1158,122
followers,392,881
totaltweets,88366,99853
usercreatedts,2008-10-21 07:34:04.000000,2015-04-25 11:24:34.000000
tweetid,1509681950042198030,1509681950151348229
tweetcreatedts,2022-04-01 00:00:00.000000,2022-04-01 00:00:00.000000


In [17]:
type(df)

pandas.core.frame.DataFrame

In [18]:
df = df[['tweetid', 'text', 'hashtags','language']] #keep only 3 columns
df.head()

Unnamed: 0,tweetid,text,hashtags,language
0,1509681950042198030,‚ö°The Ukrainian Air Force would like to address...,[],en
1,1509681950151348229,Chernihiv oblast. Ukrainians welcome their lib...,"[{'text': 'russianinvasion', 'indices': [77, 9...",en
2,1509681950683926556,America üá∫üá∏ is preparing for something worse th...,"[{'text': 'RussianUkrainianWar', 'indices': [7...",en
3,1509681951116046336,JUST IN: #Anonymous has hacked &amp; released ...,"[{'text': 'Anonymous', 'indices': [25, 35]}]",en
4,1509681951304990720,***PUBLIC MINT NOW LIVE***\n\nFor \n@billionai...,[],en


In [19]:
pd.options.display.max_colwidth = 1200
print(df[df['tweetid']==	1509681950042198030]["text"],df[df['tweetid']==	1509681950042198030]["hashtags"])

0    ‚ö°The Ukrainian Air Force would like to address misinformation published in multiple Western media outlets regarding the situation in the üá∫üá¶ sky and support from our @NATO allies. \nMore in üßµ(1/16)\n#ProtectU–êSky #StopRussia #UkraineUnderAtta—Åk
Name: text, dtype: object 0    []
Name: hashtags, dtype: object


As we can see in the previous cell, the tweet has the hashtag #StopRussia #UkraineUnderAtta—Åk BUT the dataframe (1st tuple) sais there are no hashtag in the tweet. We will try to recover form this.

In [20]:
from pyspark.sql.types import *

schema = StructType([StructField("tweetid", StringType(), True)\
                   ,StructField("text", StringType(), True)\
                   ,StructField("hashtags", StringType(), True)\
                   ,StructField("language", StringType(), True)])

#create spark dataframe using schema
df_spark = spark.createDataFrame(df,schema=schema)

In [21]:
type(df_spark)

pyspark.sql.dataframe.DataFrame

In [22]:
df_spark.show()

+-------------------+--------------------+--------------------+--------+
|            tweetid|                text|            hashtags|language|
+-------------------+--------------------+--------------------+--------+
|1509681950042198030|‚ö°The Ukrainian Ai...|                  []|      en|
|1509681950151348229|Chernihiv oblast....|[{'text': 'russia...|      en|
|1509681950683926556|America üá∫üá∏ is p...|[{'text': 'Russia...|      en|
|1509681951116046336|JUST IN: #Anonymo...|[{'text': 'Anonym...|      en|
|1509681951304990720|***PUBLIC MINT NO...|                  []|      en|
|1509681952000937999|The Amazing story...|[{'text': 'Russia...|      en|
|1509681952978210849|&amp;quot;How we ...|                  []|      en|
|1509681953053843466|India's purchase ...|[{'text': 'Russia...|      en|
|1509681953091457035|The most basic te...|[{'text': 'Ukrain...|      en|
|1509681953418711050|"The image that R...|[{'text': 'Putin'...|      en|
|1509681953418752008|#Russia‚Äôs Preside...|[

## RDD TEXT SERVE PER PRENDERE GLI HASHTAG DAL TWEET IN QUANTO LA COLONNA HASHTAG DEL DATASET NON √® ACCURATA

In [23]:
import nltk
from nltk import word_tokenize
import re
nltk.download('punkt') #to make it work

rdd = df_spark.rdd #to get the rdd from dataframe

rdd.take(10)
rdd=rdd.filter(lambda x: x[3] == "en") #filter out non english tweets


rdd_text = rdd.map(lambda x : (x[0], x[1])).map(lambda x:(x[0], word_tokenize(x[1]))) #id, word_tokenized

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


## ESTRAGGO SOLO GLI HASHTAGS

In [142]:
def processHashtags(tweet):
  hashtags = []
  for i,word in enumerate(tweet):
    if word == '#':
      try:
        hashtags.append(re.sub(r'[^\w\s]','',tweet[i+1]).lower()) #remove punctuation with regexp and put them lowercase
      except:
        pass
  return hashtags

hashtags_per_tweet = rdd_text.map(lambda x: (x[0], processHashtags(x[1])))
hashtags_per_tweet.first()

('1509681950042198030', ['protectu–∞sky', 'stoprussia', 'ukraineunderatta—Åk'])

In [143]:
#reconvert the rdd to spark dataframe
deptColumns = ["tweet","hashtag"]
new_df_spark = hashtags_per_tweet.toDF(deptColumns)
new_df_spark.printSchema()
new_df_spark.show(truncate=False)

root
 |-- tweet: string (nullable = true)
 |-- hashtag: array (nullable = true)
 |    |-- element: string (containsNull = true)

+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|tweet              |hashtag                                                                                                                                                                   |
+-------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1509681950042198030|[protectu–∞sky, stoprussia, ukraineunderatta—Åk]                                                                                                                            |
|1509681950151348229|[russianinvasion, standwithukraine, ukraineunderattack, ukrainewillwin, puti

APRIORI

In [144]:
hashtag=new_df_spark.select('hashtag')

In [145]:
basket_file = hashtag.rdd.flatMap(list)

In [146]:
basket_file.take(10)

[['protectu–∞sky', 'stoprussia', 'ukraineunderatta—Åk'],
 ['russianinvasion',
  'standwithukraine',
  'ukraineunderattack',
  'ukrainewillwin',
  'putinisawarcriminal',
  'stopputin',
  'russianukrainianwar',
  'russiagohome',
  '—Ä–æ—Å—Å–∏—è—Å–º–æ—Ç—Ä–∏',
  '–Ω–µ—Ç–≤–æ–π–Ω–µ'],
 ['russianukrainianwar', 'china', 'taiwan'],
 ['anonymous', 'oprussia', 'ddosecrets'],
 ['nft', 'mint'],
 ['russia',
  'ukraine',
  'motivation',
  'netde',
  'edude',
  'delaware',
  'government',
  'usa'],
 ['ukraine', 'ukrainewar', 'russia', 'ukraineinvasion'],
 ['russian', 'moscow'],
 ['ukraine'],
 ['putin', 'medvedev', 'russia', 'ukraine']]

## Riduco dimensioni baskets per essere veloci

In [147]:
num_baskets = 500
basket_file = basket_file.take(num_baskets) 

In [148]:
basket_file = spark.sparkContext.parallelize(basket_file)

In [149]:
type(basket_file)

pyspark.rdd.RDD

In [150]:
basket_file.take(10)

[['protectu–∞sky', 'stoprussia', 'ukraineunderatta—Åk'],
 ['russianinvasion',
  'standwithukraine',
  'ukraineunderattack',
  'ukrainewillwin',
  'putinisawarcriminal',
  'stopputin',
  'russianukrainianwar',
  'russiagohome',
  '—Ä–æ—Å—Å–∏—è—Å–º–æ—Ç—Ä–∏',
  '–Ω–µ—Ç–≤–æ–π–Ω–µ'],
 ['russianukrainianwar', 'china', 'taiwan'],
 ['anonymous', 'oprussia', 'ddosecrets'],
 ['nft', 'mint'],
 ['russia',
  'ukraine',
  'motivation',
  'netde',
  'edude',
  'delaware',
  'government',
  'usa'],
 ['ukraine', 'ukrainewar', 'russia', 'ukraineinvasion'],
 ['russian', 'moscow'],
 ['ukraine'],
 ['putin', 'medvedev', 'russia', 'ukraine']]

_______________________________________________________________________________

In [151]:
#DA RIVEDERE
count = 250000 #obtaine doing count=basket_file.count() (too much time to repeat this all the time)
threshold = 10
print(count, threshold)

250000 10


## Start apriori: PHASE 1

In [152]:
#setp 1 calculate the freq of each item in the basket file
singleton=basket_file.flatMap(list).map(lambda item: (item,1)).reduceByKey(lambda a,b: a+b)

singleton.take(20)

[('ukraineunderatta—Åk', 27),
 ('russianinvasion', 20),
 ('standwithukraine', 51),
 ('ukrainewillwin', 22),
 ('stopputin', 26),
 ('russianukrainianwar', 29),
 ('russiagohome', 20),
 ('–Ω–µ—Ç–≤–æ–π–Ω–µ', 20),
 ('taiwan', 1),
 ('anonymous', 11),
 ('ddosecrets', 2),
 ('netde', 1),
 ('delaware', 1),
 ('usa', 5),
 ('ukraineinvasion', 2),
 ('moscow', 4),
 ('putin', 32),
 ('medvedev', 1),
 ('russianarmy', 2),
 ('soviet', 2)]

## PHASE 2

In [153]:
#step 2: filter out all the non frequent singleton
freq_singleton=singleton.filter(lambda x: x[1]>=threshold)

freq_singleton.take(20)

[('ukraineunderatta—Åk', 27),
 ('russianinvasion', 20),
 ('standwithukraine', 51),
 ('ukrainewillwin', 22),
 ('stopputin', 26),
 ('russianukrainianwar', 29),
 ('russiagohome', 20),
 ('–Ω–µ—Ç–≤–æ–π–Ω–µ', 20),
 ('anonymous', 11),
 ('putin', 32),
 ('ukrainian', 31),
 ('russiaukrainewar', 13),
 ('mariupol', 20),
 ('ukrainerussianwar', 10),
 ('ukrainerussiawar', 33),
 ('settodraw', 10),
 ('tanzaniaproject', 10),
 ('africa', 10),
 ('mining', 10),
 ('kherson', 16)]

In [154]:
#step 3: now create all the possible pairs made up by frequen singletons

from itertools import combinations
pairs=list(combinations(freq_singleton.map(lambda x: x[0]).toLocalIterator(),2)) #without toLocalIterator error (reference https://stackoverflow.com/questions/32771737/convert-an-rdd-to-iterable-pyspark)

pairs[:10]

[('ukraineunderatta—Åk', 'russianinvasion'),
 ('ukraineunderatta—Åk', 'standwithukraine'),
 ('ukraineunderatta—Åk', 'ukrainewillwin'),
 ('ukraineunderatta—Åk', 'stopputin'),
 ('ukraineunderatta—Åk', 'russianukrainianwar'),
 ('ukraineunderatta—Åk', 'russiagohome'),
 ('ukraineunderatta—Åk', '–Ω–µ—Ç–≤–æ–π–Ω–µ'),
 ('ukraineunderatta—Åk', 'anonymous'),
 ('ukraineunderatta—Åk', 'putin'),
 ('ukraineunderatta—Åk', 'ukrainian')]

In [155]:
#step 4
#flatmap so I get all the prev computed pairs in only 1 list to be able to perform reduceByKey
flatted_couples = basket_file.map(lambda x: [(pair,1) for pair in pairs if set(pair).issubset(set(x))]).flatMap(lambda x: x).cache()

In [156]:
reduced_elements = flatted_couples.reduceByKey(lambda a, b: a + b)

In [157]:
freq_pairs = reduced_elements.filter(lambda x : x[1] >= threshold).cache()

In [158]:
freq_pairs.collect()

[(('protectu–∞sky', 'stoprussia'), 20),
 (('russianinvasion', 'standwithukraine'), 20),
 (('russianinvasion', 'ukrainewillwin'), 20),
 (('russianinvasion', 'stopputin'), 20),
 (('russianinvasion', 'russianukrainianwar'), 20),
 (('russianinvasion', 'russiagohome'), 20),
 (('russianinvasion', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('standwithukraine', 'ukrainewillwin'), 20),
 (('standwithukraine', 'stopputin'), 23),
 (('standwithukraine', 'russianukrainianwar'), 20),
 (('standwithukraine', 'russiagohome'), 20),
 (('standwithukraine', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('ukrainewillwin', 'stopputin'), 20),
 (('ukrainewillwin', 'russianukrainianwar'), 20),
 (('ukrainewillwin', 'russiagohome'), 20),
 (('ukrainewillwin', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('stopputin', 'russianukrainianwar'), 20),
 (('stopputin', 'russiagohome'), 20),
 (('stopputin', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('russianukrainianwar', 'russiagohome'), 20),
 (('russianukrainianwar', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('russiagohome', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20

## PCY 

In [159]:
singleton = basket_file.flatMap(list).map(lambda item: (item,1)).reduceByKey(lambda a,b: a+b)
singleton.take(20)

[('ukraineunderatta—Åk', 27),
 ('russianinvasion', 20),
 ('standwithukraine', 51),
 ('ukrainewillwin', 22),
 ('stopputin', 26),
 ('russianukrainianwar', 29),
 ('russiagohome', 20),
 ('–Ω–µ—Ç–≤–æ–π–Ω–µ', 20),
 ('taiwan', 1),
 ('anonymous', 11),
 ('ddosecrets', 2),
 ('netde', 1),
 ('delaware', 1),
 ('usa', 5),
 ('ukraineinvasion', 2),
 ('moscow', 4),
 ('putin', 32),
 ('medvedev', 1),
 ('russianarmy', 2),
 ('soviet', 2)]

In [160]:
from sys import hash_info
#different step w.r.t. apriori, creating all the pairs (non from freq singleton) and hashing them
#creating hashtable, fixed size:

HASH_TABLE_SIZE = 50000 #care for this one
hash_table = np.zeros(HASH_TABLE_SIZE, dtype=int)

def hashing(pair_to_hash):
  return int(abs(hash(frozenset(pair_to_hash))) % HASH_TABLE_SIZE)

In [161]:
#create all possible pairs and hash them
from itertools import combinations
pairs_first_pass = list(combinations(singleton.map(lambda x: x[0]).toLocalIterator(),2))#creating all pairs with singletons

#pairs_first_rdd = spark.sparkContext.parallelize(pairs_first_pass)


#per ogni basket, se una pair √® presente nel basket, creo (hashing(pair),1)
#bisogna contare sul basket file:
hashtable_rdd = basket_file.map(lambda x: [(hashing(pair),1) for pair in pairs_first_pass if set(pair).issubset(set(x))]).flatMap(lambda x: x).cache()\
                           .reduceByKey(lambda a,b: a+b)

In [162]:
#meaning of the output: the pair/s hashed to the bucket 3810 are 20
hashtable_rdd.take(50)

[(3810, 20),
 (8470, 20),
 (45826, 20),
 (48060, 20),
 (32986, 20),
 (4722, 20),
 (44910, 20),
 (36688, 20),
 (49450, 20),
 (26764, 23),
 (334, 20),
 (49142, 20),
 (41194, 20),
 (6280, 20),
 (44534, 20),
 (13390, 20),
 (16628, 20),
 (19204, 20),
 (10434, 20),
 (14790, 20),
 (3482, 20),
 (36290, 20),
 (15814, 20),
 (35128, 20),
 (46578, 20),
 (37488, 20),
 (7580, 2),
 (25056, 7),
 (11512, 1),
 (9014, 1),
 (666, 1),
 (11242, 1),
 (19058, 1),
 (47636, 1),
 (11402, 1),
 (32444, 1),
 (22040, 1),
 (34716, 1),
 (36188, 1),
 (5682, 1),
 (14412, 1),
 (44648, 1),
 (27226, 5),
 (44714, 7),
 (15546, 1),
 (32744, 1),
 (42858, 1),
 (13110, 2),
 (7882, 2),
 (25340, 9)]

In [163]:
hashtable_list = list(hashtable_rdd.map(lambda x: x).toLocalIterator()) #PROVARE A NON USARE LA CONVERSIONE?

for pair in hashtable_list:
  hash_table[pair[0]] = pair[1]

bitmap_freq = [hash_table[i]>=threshold for i in range(HASH_TABLE_SIZE)] #creo bitmap, scarto pairs non freq

print(len(hash_table),len(bitmap_freq),len(hashtable_list))

50000 50000 2319


## secondo pass PCY

In [164]:
#per ogni freq singleton, devo controllare che la sua coppia sia freq nella hashtable e poi contarlo
#dopo averli contati tutti riduco in base alla threshold
from itertools import combinations

freq_singleton = singleton.filter(lambda x: x[1]>=threshold)                           
pairs=list(combinations(freq_singleton.map(lambda x: x[0]).toLocalIterator(),2)) #formed from freq singletons

In [165]:
#CONVERT PAIRS INTO RDD
pairs_rdd = spark.sparkContext.parallelize(pairs)

In [166]:
#candidate pairs = pairs of freq singleton and the pair is freq in the hash_table
candidate_pairs = pairs_rdd.filter(lambda x : bitmap_freq[hashing(x)] == True)
candidate_pairs_list = list(candidate_pairs.map(lambda x: x).toLocalIterator()) #prima qua avevo messo pairs_rdd ma √® sbagliato, perch√® cos√¨ √® uguale ad Apriori
candidate_pairs_list[:10]

[('ukraineunderatta—Åk', 'protectu–∞sky'),
 ('ukraineunderatta—Åk', 'stoprussia'),
 ('russianinvasion', 'standwithukraine'),
 ('russianinvasion', 'ukrainewillwin'),
 ('russianinvasion', 'stopputin'),
 ('russianinvasion', 'russianukrainianwar'),
 ('russianinvasion', 'russiagohome'),
 ('russianinvasion', '–Ω–µ—Ç–≤–æ–π–Ω–µ'),
 ('russianinvasion', 'ukrainerussiawar'),
 ('russianinvasion', 'ukraineunderattack')]

In [167]:
#check against threshold
flatted_couples = basket_file.map(lambda x: [(pair,1) for pair in candidate_pairs_list if set(pair).\
                                             issubset(set(x))]).flatMap(lambda x: x).cache()

reduced_elements = flatted_couples.reduceByKey(lambda a, b: a + b)
freq_pairs = reduced_elements.filter(lambda x : x[1] >= threshold).cache()
result_PCY = list(freq_pairs.map(lambda x: x).toLocalIterator())
freq_pairs.collect()

[(('protectu–∞sky', 'stoprussia'), 20),
 (('russianinvasion', 'standwithukraine'), 20),
 (('russianinvasion', 'ukrainewillwin'), 20),
 (('russianinvasion', 'stopputin'), 20),
 (('russianinvasion', 'russianukrainianwar'), 20),
 (('russianinvasion', 'russiagohome'), 20),
 (('russianinvasion', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('standwithukraine', 'ukrainewillwin'), 20),
 (('standwithukraine', 'stopputin'), 23),
 (('standwithukraine', 'russianukrainianwar'), 20),
 (('standwithukraine', 'russiagohome'), 20),
 (('standwithukraine', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('ukrainewillwin', 'stopputin'), 20),
 (('ukrainewillwin', 'russianukrainianwar'), 20),
 (('ukrainewillwin', 'russiagohome'), 20),
 (('ukrainewillwin', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('stopputin', 'russianukrainianwar'), 20),
 (('stopputin', 'russiagohome'), 20),
 (('stopputin', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('russianukrainianwar', 'russiagohome'), 20),
 (('russianukrainianwar', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('russiagohome', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20

### MULTISTAGE

In [168]:
HASH_TABLE_SIZE = 50000
hash_table_1 = np.zeros(HASH_TABLE_SIZE, dtype=int)
hash_table_2 = np.zeros(HASH_TABLE_SIZE, dtype=int)

def hashing_1(pair_to_hash):
  return int(abs(hash(frozenset(pair_to_hash))) % HASH_TABLE_SIZE)


def hashing_2(pair_to_hash):
  return int((abs(hash(frozenset(pair_to_hash)))*3 /5 +9 *2 )% HASH_TABLE_SIZE)  #just create another hash function

hashing_1(('russianarmy', 'chernobyl')), hashing_2(('russianarmy', 'chernobyl'))

(25988, 5520)

In [169]:
#1st pass multistage = PCY

singleton = basket_file.flatMap(list).map(lambda item: (item,1)).reduceByKey(lambda a,b: a+b)
#create all possible pairs and hash them
from itertools import combinations
pairs_first_pass = list(combinations(singleton.map(lambda x: x[0]).toLocalIterator(),2))#creating all pairs

hashtable_rdd = basket_file.map(lambda x: [(hashing_1(pair),1) for pair in pairs_first_pass if set(pair).issubset(set(x))]).flatMap(lambda x: x).cache()\
                           .reduceByKey(lambda a,b: a+b)

hashtable_list = list(hashtable_rdd.map(lambda x: x).toLocalIterator()) #PROVARE A NON USARE LA CONVERSIONE?

for pair in hashtable_list:
  hash_table_1[pair[0]] = pair[1]

len(hashtable_list)

2319

### 2nd Stage

In [170]:
from itertools import combinations
bitmap_1 = [hash_table_1[i]>=threshold for i in range(HASH_TABLE_SIZE)] #creo bitmap, scarto pairs non freq

freq_singleton = singleton.filter(lambda x: x[1]>=threshold)                           
pairs=list(combinations(freq_singleton.map(lambda x: x[0]).toLocalIterator(),2)) #formed from freq singletons

#I have the bitmap_1, I have the freq singletons, and the pairs from freq singletons.
#I have to check against bitmap_1 the pairs formed from freq singlentons, than hash them again to create bitmap 2 and go to the third stage

In [171]:
#CONVERT PAIRS INTO RDD
pairs_rdd = spark.sparkContext.parallelize(pairs) #formed from freq singletons

In [172]:
#check against bitmap 1
pairs_first_check = pairs_rdd.filter(lambda x : bitmap_1[hashing_1(x)] == True) #pairs from freq singletons and resulted freq in bitmap_1
#hash them again to hashtable 2

pairs_first_check_list = list(pairs_first_check .map(lambda x: x).toLocalIterator())#creating all pairs

#2nd scan, usual count (counting candidates in the baskets)
hashtable_rdd_2 = basket_file.map(lambda x: [(hashing_2(pair),1) for pair in pairs_first_check_list if set(pair).issubset(set(x))]).flatMap(lambda x: x).cache()\
                           .reduceByKey(lambda a,b: a+b)

hashtable_list_2 = list(hashtable_rdd_2.map(lambda x: x).toLocalIterator()) #PROVARE A NON USARE LA CONVERSIONE?

for pair in hashtable_list_2:
  hash_table_2[pair[0]] = pair[1]

len(hashtable_list_2) #should be less than in stage 1

75

### 3rd Stage

In [173]:
bitmap_2 = [hash_table_2[i]>=threshold for i in range(HASH_TABLE_SIZE)] #creo bitmap, scarto pairs non freq

In [174]:
#I have now bitmap_1 , bitmap_2 and all the pairs from freq singletons (pairs_rdd)
#if pair (from freq singletons) is in bitmap_1 and bitmap_2 it means it's a CANDIDATE pair

#candidate pairs = singleton freq and their pair is freq in the hash_table
candidate_pairs_1 = pairs_rdd.filter(lambda x : bitmap_1[hashing_1(x)] == True)  #if the pair from freq_singleton is hashed to bitmap_1
#if the pair from first check is hashed to bitmap_2 goes to candidate pairs
#because it's formed from freq_singletons, they hash to a frequent bucket (candidate_pairs_1) and they also hash to bitmap_2
candidate_pairs = candidate_pairs_1.filter(lambda x : bitmap_2[hashing_2(x)] == True) 
candidate_pairs_list = list(candidate_pairs.map(lambda x: x).toLocalIterator())

candidate_pairs_list[:10]

[('ukraineunderatta—Åk', 'protectu–∞sky'),
 ('ukraineunderatta—Åk', 'stoprussia'),
 ('russianinvasion', 'standwithukraine'),
 ('russianinvasion', 'ukrainewillwin'),
 ('russianinvasion', 'stopputin'),
 ('russianinvasion', 'russianukrainianwar'),
 ('russianinvasion', 'russiagohome'),
 ('russianinvasion', '–Ω–µ—Ç–≤–æ–π–Ω–µ'),
 ('russianinvasion', 'ukraineunderattack'),
 ('russianinvasion', 'putinisawarcriminal')]

In [175]:
#usual count and check against threshold

flatted_couples = basket_file.map(lambda x: [(pair,1) for pair in candidate_pairs_list if set(pair).\
                                             issubset(set(x))]).flatMap(lambda x: x).cache()

reduced_elements = flatted_couples.reduceByKey(lambda a, b: a + b)
freq_pairs = reduced_elements.filter(lambda x : x[1] >= threshold).cache()
result_MULTI_STAGE=list(freq_pairs.map(lambda x: x).toLocalIterator())
freq_pairs.collect()

[(('protectu–∞sky', 'stoprussia'), 20),
 (('russianinvasion', 'standwithukraine'), 20),
 (('russianinvasion', 'ukrainewillwin'), 20),
 (('russianinvasion', 'stopputin'), 20),
 (('russianinvasion', 'russianukrainianwar'), 20),
 (('russianinvasion', 'russiagohome'), 20),
 (('russianinvasion', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('standwithukraine', 'ukrainewillwin'), 20),
 (('standwithukraine', 'stopputin'), 23),
 (('standwithukraine', 'russianukrainianwar'), 20),
 (('standwithukraine', 'russiagohome'), 20),
 (('standwithukraine', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('ukrainewillwin', 'stopputin'), 20),
 (('ukrainewillwin', 'russianukrainianwar'), 20),
 (('ukrainewillwin', 'russiagohome'), 20),
 (('ukrainewillwin', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('stopputin', 'russianukrainianwar'), 20),
 (('stopputin', 'russiagohome'), 20),
 (('stopputin', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('russianukrainianwar', 'russiagohome'), 20),
 (('russianukrainianwar', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('russiagohome', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20

## MULTI-HASH

1ST STAGE

In [176]:
singleton = basket_file.flatMap(list).map(lambda item: (item,1)).reduceByKey(lambda a,b: a+b)
singleton.take(20)

[('ukraineunderatta—Åk', 27),
 ('russianinvasion', 20),
 ('standwithukraine', 51),
 ('ukrainewillwin', 22),
 ('stopputin', 26),
 ('russianukrainianwar', 29),
 ('russiagohome', 20),
 ('–Ω–µ—Ç–≤–æ–π–Ω–µ', 20),
 ('taiwan', 1),
 ('anonymous', 11),
 ('ddosecrets', 2),
 ('netde', 1),
 ('delaware', 1),
 ('usa', 5),
 ('ukraineinvasion', 2),
 ('moscow', 4),
 ('putin', 32),
 ('medvedev', 1),
 ('russianarmy', 2),
 ('soviet', 2)]

In [177]:
HASH_TABLE_SIZE = 25000
hash_table_1 = np.zeros(HASH_TABLE_SIZE, dtype=int)
hash_table_2 = np.zeros(HASH_TABLE_SIZE, dtype=int)

def hashing_1(pair_to_hash):
  return int(abs(hash(frozenset(pair_to_hash))) % HASH_TABLE_SIZE)


def hashing_2(pair_to_hash):
  return int((abs(hash(frozenset(pair_to_hash)))*3 /5 +9 *2 )% HASH_TABLE_SIZE)  #just create another hash function

hashing_1(('russianarmy', 'chernobyl')), hashing_2(('russianarmy', 'chernobyl'))

(988, 5520)

In [178]:
#create all possible pairs and hash them
from itertools import combinations
pairs_first_pass = list(combinations(singleton.map(lambda x: x[0]).toLocalIterator(),2))#creating all pairs with singletons

#pairs_first_rdd = spark.sparkContext.parallelize(pairs_first_pass)


#per ogni basket, se una pair √® presente nel basket, creo (hashing(pair),1)
#bisogna contare sul basket file:
hashtable_rdd_1 = basket_file.map(lambda x: [(hashing_1(pair),1) for pair in pairs_first_pass if set(pair).issubset(set(x))]).flatMap(lambda x: x).cache()\
                           .reduceByKey(lambda a,b: a+b)

hashtable_rdd_2 = basket_file.map(lambda x: [(hashing_2(pair),1) for pair in pairs_first_pass if set(pair).issubset(set(x))]).flatMap(lambda x: x).cache()\
                           .reduceByKey(lambda a,b: a+b)

In [179]:
hashtable_rdd_2.take(50)

[(1872, 20),
 (8, 21),
 (8936, 21),
 (6736, 20),
 (2520, 20),
 (9040, 20),
 (4760, 32),
 (2864, 20),
 (20544, 21),
 (3936, 21),
 (21928, 20),
 (16968, 20),
 (24200, 21),
 (5952, 23),
 (288, 20),
 (14488, 20),
 (24032, 21),
 (23200, 21),
 (19752, 21),
 (18552, 21),
 (19336, 21),
 (12336, 21),
 (23792, 24),
 (1736, 20),
 (18048, 21),
 (18432, 20),
 (1328, 22),
 (11472, 20),
 (15112, 22),
 (20064, 20),
 (11280, 20),
 (7688, 20),
 (23720, 22),
 (19072, 21),
 (6192, 20),
 (3816, 20),
 (15544, 20),
 (17496, 20),
 (7208, 22),
 (21408, 22),
 (19712, 20),
 (17256, 23),
 (6072, 23),
 (19728, 20),
 (11312, 22),
 (7888, 22),
 (2512, 20),
 (21888, 20),
 (18192, 2),
 (20288, 1)]

In [180]:
hashtable_list_1 = list(hashtable_rdd_1.map(lambda x: x).toLocalIterator()) #PROVARE A NON USARE LA CONVERSIONE?

for pair in hashtable_list_1:
  hash_table_1[pair[0]] = pair[1]

bitmap_freq_1 = [hash_table_1[i]>=threshold for i in range(HASH_TABLE_SIZE)] #creo bitmap, scarto pairs non freq


hashtable_list_2 = list(hashtable_rdd_2.map(lambda x: x).toLocalIterator()) #PROVARE A NON USARE LA CONVERSIONE?

for pair in hashtable_list_2:
  hash_table_2[pair[0]] = pair[1]

bitmap_freq_2 = [hash_table_2[i]>=threshold for i in range(HASH_TABLE_SIZE)] #creo bitmap, scarto pairs non freq


2nd *PASS*

In [181]:
#per ogni freq singleton, devo controllare che la sua coppia sia freq nella hashtable e poi contarlo
#dopo averli contati tutti riduco in base alla threshold
from itertools import combinations

freq_singleton = singleton.filter(lambda x: x[1]>=threshold) 
                          
pairs=list(combinations(freq_singleton.map(lambda x: x[0]).toLocalIterator(),2))

In [182]:
#CONVERT PAIRS INTO RDD
pairs_rdd = spark.sparkContext.parallelize(pairs)

In [183]:
#candidate pairs = pairs of freq singleton and the pair is freq in both the hash_tables

candidate_pairs = pairs_rdd.filter(lambda x : bitmap_freq_1[hashing_1(x)] == True)
candidate_pairs_2 = candidate_pairs.filter(lambda x : bitmap_freq_2[hashing_2(x)] == True)

candidate_pairs_list = list(candidate_pairs_2.map(lambda x: x).toLocalIterator()) #prima qua avevo messo pairs_rdd ma √® sbagliato, perch√® cos√¨ √® uguale ad Apriori
candidate_pairs_list[:10]

[('ukraineunderatta—Åk', 'protectu–∞sky'),
 ('ukraineunderatta—Åk', 'stoprussia'),
 ('russianinvasion', 'standwithukraine'),
 ('russianinvasion', 'ukrainewillwin'),
 ('russianinvasion', 'stopputin'),
 ('russianinvasion', 'russianukrainianwar'),
 ('russianinvasion', 'russiagohome'),
 ('russianinvasion', '–Ω–µ—Ç–≤–æ–π–Ω–µ'),
 ('russianinvasion', 'ukraineunderattack'),
 ('russianinvasion', 'putinisawarcriminal')]

In [184]:
#check against threshold
flatted_couples = basket_file.map(lambda x: [(pair,1) for pair in candidate_pairs_list if set(pair).\
                                             issubset(set(x))]).flatMap(lambda x: x).cache()

reduced_elements = flatted_couples.reduceByKey(lambda a, b: a + b)
freq_pairs = reduced_elements.filter(lambda x : x[1] >= threshold).cache()
result_MULTI_HASH = list(freq_pairs.map(lambda x: x).toLocalIterator())
freq_pairs.collect()

[(('protectu–∞sky', 'stoprussia'), 20),
 (('russianinvasion', 'standwithukraine'), 20),
 (('russianinvasion', 'ukrainewillwin'), 20),
 (('russianinvasion', 'stopputin'), 20),
 (('russianinvasion', 'russianukrainianwar'), 20),
 (('russianinvasion', 'russiagohome'), 20),
 (('russianinvasion', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('standwithukraine', 'ukrainewillwin'), 20),
 (('standwithukraine', 'stopputin'), 23),
 (('standwithukraine', 'russianukrainianwar'), 20),
 (('standwithukraine', 'russiagohome'), 20),
 (('standwithukraine', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('ukrainewillwin', 'stopputin'), 20),
 (('ukrainewillwin', 'russianukrainianwar'), 20),
 (('ukrainewillwin', 'russiagohome'), 20),
 (('ukrainewillwin', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('stopputin', 'russianukrainianwar'), 20),
 (('stopputin', 'russiagohome'), 20),
 (('stopputin', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('russianukrainianwar', 'russiagohome'), 20),
 (('russianukrainianwar', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('russiagohome', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20

## SON

In [185]:
# define Apriori function to use in SON
from itertools import combinations
def Apriori(basket_file, threshold):
  singleton=basket_file.flatMap(list).map(lambda item: (item,1)).reduceByKey(lambda a,b: a+b)
  freq_singleton=singleton.filter(lambda x: x[1]>=threshold)
  pairs=list(combinations(freq_singleton.map(lambda x: x[0]).toLocalIterator(),2))
  flatted_couples = basket_file.map(lambda x: [(pair,1) for pair in pairs if set(pair).issubset(set(x))]).flatMap(lambda x: x).cache()
  reduced_elements = flatted_couples.reduceByKey(lambda a, b: a + b)
  freq_pairs = reduced_elements.filter(lambda x : x[1] >= threshold).cache()
  return freq_pairs

In [186]:
#We need to define the size of the chunk for dividing the basket file
CHUNK_SIZE = 2 

chunks = basket_file.repartition(CHUNK_SIZE).glom().collect() #provo a farlo partendo dalla lista, lavorando solo sugli RDD non riesco ma probabilmente √® possibile

i = 0
for chunk in chunks:
  for basket in chunk:
    print(basket)
    i +=1
i

['protectu–∞sky', 'stoprussia', 'ukraineunderatta—Åk']
['russianinvasion', 'standwithukraine', 'ukraineunderattack', 'ukrainewillwin', 'putinisawarcriminal', 'stopputin', 'russianukrainianwar', 'russiagohome', '—Ä–æ—Å—Å–∏—è—Å–º–æ—Ç—Ä–∏', '–Ω–µ—Ç–≤–æ–π–Ω–µ']
['russianukrainianwar', 'china', 'taiwan']
['anonymous', 'oprussia', 'ddosecrets']
['nft', 'mint']
['russia', 'ukraine', 'motivation', 'netde', 'edude', 'delaware', 'government', 'usa']
['ukraine', 'ukrainewar', 'russia', 'ukraineinvasion']
['russian', 'moscow']
['ukraine']
['putin', 'medvedev', 'russia', 'ukraine']
['ukraine', 'kyiv']
['exclusive', 'itvideo', 'russiaukrainewar', 're']
['ukraine', 'mariupol']
['putin', 'gas']
['ukraine', 'ukrainerussianwar']
['biden', 'covid', 'politics', 'vote']
['vladimirputin', 'russian']
['russia']
['ukrainewar', 'tigray', '500daysoftigraygenocide']
['ukraine', 'kyiv']
['ukraine', 'kyiv']
['ukraineian', 'nypost', 'nato', 'thinktank', 'russia']
['ukrainerussiawar']
['ukraine', 'russianvodka', 'uk

500

In [187]:
#candidate_pairs_list = list(candidate_pairs.map(lambda x: x).toLocalIterator())

all_pairs = spark.sparkContext.parallelize([]) #empty RDD
for chunk in chunks:
  print(chunk)
  rdd_chunk = spark.sparkContext.parallelize(chunk)
  freq_pairs = Apriori(rdd_chunk, 3)
  all_pairs = all_pairs.union(freq_pairs)

[['protectu–∞sky', 'stoprussia', 'ukraineunderatta—Åk'], ['russianinvasion', 'standwithukraine', 'ukraineunderattack', 'ukrainewillwin', 'putinisawarcriminal', 'stopputin', 'russianukrainianwar', 'russiagohome', '—Ä–æ—Å—Å–∏—è—Å–º–æ—Ç—Ä–∏', '–Ω–µ—Ç–≤–æ–π–Ω–µ'], ['russianukrainianwar', 'china', 'taiwan'], ['anonymous', 'oprussia', 'ddosecrets'], ['nft', 'mint'], ['russia', 'ukraine', 'motivation', 'netde', 'edude', 'delaware', 'government', 'usa'], ['ukraine', 'ukrainewar', 'russia', 'ukraineinvasion'], ['russian', 'moscow'], ['ukraine'], ['putin', 'medvedev', 'russia', 'ukraine'], ['ukraine', 'kyiv'], ['exclusive', 'itvideo', 'russiaukrainewar', 're'], ['ukraine', 'mariupol'], ['putin', 'gas'], ['ukraine', 'ukrainerussianwar'], ['biden', 'covid', 'politics', 'vote'], ['vladimirputin', 'russian'], ['russia'], ['ukrainewar', 'tigray', '500daysoftigraygenocide'], ['ukraine', 'kyiv'], ['ukraine', 'kyiv'], ['ukraineian', 'nypost', 'nato', 'thinktank', 'russia'], ['ukrainerussiawar'], ['ukrai

In [188]:
all_pairs.reduceByKey(lambda a, b: a+b).collect() #manca qualche pezzo, perch√® le chiavi opposte le vede come diverse

[(('protectu–∞sky', 'stoprussia'), 20),
 (('russianinvasion', 'standwithukraine'), 12),
 (('ukrainewillwin', 'stopputin'), 12),
 (('ukrainewillwin', 'russiagohome'), 20),
 (('ukrainewillwin', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('russiagohome', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('ukraineunderattack', 'putinisawarcriminal'), 20),
 (('ukraineunderattack', '—Ä–æ—Å—Å–∏—è—Å–º–æ—Ç—Ä–∏'), 20),
 (('nft', 'mint'), 33),
 (('russia', 'ukraine'), 29),
 (('ukraine', 'kyiv'), 22),
 (('johnsonout67', 'slavaukraini'), 5),
 (('ukraine', 'russian'), 4),
 (('settodraw', 'tanzaniaproject'), 10),
 (('settodraw', 'africa'), 10),
 (('tanzaniaproject', 'mining'), 10),
 (('ukraine', 'oil'), 4),
 (('gas', 'biden'), 4),
 (('russia', 'ukrainewar'), 3),
 (('ukraine', 'ukrainewar'), 5),
 (('russia', 'war'), 4),
 (('nftcommunity', 'nftdrop'), 4),
 (('nft', 'eth'), 3),
 (('nft', 'nfts'), 3),
 (('sumy', 'kherson'), 4),
 (('mariupol', 'savemariupol'), 3),
 (('stopputin', 'standwithukraine'), 11),
 (('russianukrainianwar', 'standwith

In [189]:
result_SON = list(all_pairs.map(lambda x: x).toLocalIterator())

In [192]:
result_PCY

[(('protectu–∞sky', 'stoprussia'), 20),
 (('russianinvasion', 'standwithukraine'), 20),
 (('russianinvasion', 'ukrainewillwin'), 20),
 (('russianinvasion', 'stopputin'), 20),
 (('russianinvasion', 'russianukrainianwar'), 20),
 (('russianinvasion', 'russiagohome'), 20),
 (('russianinvasion', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('standwithukraine', 'ukrainewillwin'), 20),
 (('standwithukraine', 'stopputin'), 23),
 (('standwithukraine', 'russianukrainianwar'), 20),
 (('standwithukraine', 'russiagohome'), 20),
 (('standwithukraine', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('ukrainewillwin', 'stopputin'), 20),
 (('ukrainewillwin', 'russianukrainianwar'), 20),
 (('ukrainewillwin', 'russiagohome'), 20),
 (('ukrainewillwin', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('stopputin', 'russianukrainianwar'), 20),
 (('stopputin', 'russiagohome'), 20),
 (('stopputin', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('russianukrainianwar', 'russiagohome'), 20),
 (('russianukrainianwar', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20),
 (('russiagohome', '–Ω–µ—Ç–≤–æ–π–Ω–µ'), 20

In [191]:
if ((result_MULTI_HASH == result_PCY) and (result_MULTI_HASH == result_MULTI_STAGE)):
    print ("The lists are identical")
else :
    print ("The lists are not identical")

The lists are identical
