<a href="https://colab.research.google.com/github/ElenaSerbuValentina/Apriori_spark/blob/main/Apriori_Spark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PYSPARK SETUP


In [1]:
# Download Java Virtual Machine (JVM)
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# Download Spark
!wget -q https://dlcdn.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
# Unzip the file
!tar xf spark-3.4.0-bin-hadoop3.tgz

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = '/content/spark-3.4.0-bin-hadoop3'

In [3]:
# Install library for finding Spark
!pip install -q findspark
# Import the libary
import findspark
# Initiate findspark
findspark.init()
# Check the location for Spark
findspark.find()

'/content/spark-3.4.0-bin-hadoop3'

In [4]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
conf = SparkConf().setAppName('test_mba')
spark = SparkSession.builder.enableHiveSupport().config(conf=conf).getOrCreate()
#spark = SparkSession.builder.master("local[*]").getOrCreate()
sc = spark.sparkContext

# DATA IMPORT

In [5]:
from google.colab import files
import pandas as pd
import numpy as np
import os

In [6]:
#insert your kaggle keys
os.environ['KAGGLE_USERNAME'] = "eluska"
os.environ['KAGGLE_KEY'] = "91ee9b34765d3cbbbe802cf7b1bd16cb"

In [7]:
!kaggle datasets download -d xhlulu/medal-emnlp --unzip

Downloading medal-emnlp.zip to /content
100% 6.81G/6.82G [01:22<00:00, 39.5MB/s]
100% 6.82G/6.82G [01:22<00:00, 89.2MB/s]


# IMPORTS

In [8]:
!git clone https://github.com/ElenaSerbuValentina/Apriori_spark.git

Cloning into 'Apriori_spark'...
remote: Enumerating objects: 91, done.[K
remote: Counting objects: 100% (91/91), done.[K
remote: Compressing objects: 100% (85/85), done.[K
remote: Total 91 (delta 41), reused 0 (delta 0), pack-reused 0[K
Receiving objects: 100% (91/91), 261.46 KiB | 4.75 MiB/s, done.
Resolving deltas: 100% (41/41), done.


In [9]:
import itertools
from itertools import combinations, product
from pyspark.sql import DataFrame
import nltk
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize
import string

In [10]:
from Apriori_spark.utils.functions import Preprocess, apriori

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...


# DATA PREPARATION

In [11]:
#read data
df = spark.read.csv('/content/full_data.csv', header=True, inferSchema=True)

In [12]:
#data sampling
df = df.sample(fraction=0.0001,seed= 143)

In [13]:
#takes conversation column from rdd
rdd = df.rdd.map(lambda x:x[0])

In [14]:
rdd.take(1)

['anaerobically digested dewatered sludge to cm thick was applied to a forest clearcut as a fertilizer source in northwest washington on gravelly glacial outwash soil this sludge is not microbiologically sterile and may contain pathogenic organisms fecal coliform bacterial counts in sludge applied in summer july fell from x to g in days and to g in days dieoff appeared more rapid in winter januaryapplied sludge when colnts fell from x to g in days initial death rates were related to sludge temperature moisture ph PCS composition and microbial competition aftergrowth of fecal coliforms occurred in warm summer and fall months but counts were of similar magnitude to background levels in forest soils where a maximum count of g was recorded total coliform counts in fresh sludge ranged from x to x g numbers stabilized at to g in spring fall and summer with lower numbers in winter both total and fecal bacteria moved from the sludge to the soil beneath but few penetrated past the first cm the 

In [15]:
#define total numer of transactions
total_count = rdd.count()
total_count

1438

In [16]:
#preprocessing rdd (INITIAL STAGE)
rdd_preprocessed = rdd.map(Preprocess.preprocess)

In [17]:
#encode rdd and get dictionary for decoding the encoded results
#initialize class for running Apriori preprocessing
processor = Preprocess()
#hash the items to integers in order to improve algorithm's running time
rdd_encoded, num2word = processor.encoder(rdd_preprocessed)

#initialize apriori class with minimum support, maximum size of baskets , minimum confidence to generate association rules and the vocabulary
#that maps numbers to words in order to decode the final frequent itemsets
APRIORI = apriori(minSupportpercent =0.1,total_transactions=total_count, maximumBasketSize=3, min_confidence = 0.5, vocab= num2word)

In [18]:
#in order to improve performance
rdd = rdd_encoded
rdd.cache()

PythonRDD[23] at RDD at PythonRDD.scala:53

# FIND SINGLETONS

In [19]:
minSupport = total_count*0.1
singletons = rdd.flatMap(list)\
                 .map(lambda word: (word,1))\
                 .reduceByKey(lambda y,x: x+y)\
                 .filter(lambda t: t[1]>= minSupport)

In [21]:
words_singleton = singletons.take(2)
words_singleton = [(num2word[t[0]],t[1]) for t in words_singleton]

In [22]:
words_singleton

[('significantly', 165), ('analysis', 206)]

# FIND FREQUENT PAIRS

In [23]:
singletons = singletons.map(lambda x: (x[0]))
candidates1 = list(combinations(singletons.toLocalIterator(),2))
minSupport_2 = minSupport/4
#filtering phase to select real frequent pairs
combined_2 = rdd.flatMap(lambda sentence: [(tuple(candidate),1) for candidate in candidates1 if set(list(candidate)).issubset(set(sentence))])\
                .reduceByKey(lambda y,x:x+y)\
                .filter(lambda item : item[1]>= minSupport_2)

In [24]:
words_2 = combined_2.take(2)
words_2 = [(num2word[t[0]],t[1]) if type(t[0])==int else (tuple([num2word[element] for element in t[0]]),t[1]) for t in words_2]

In [25]:
words_2

[(('increased', 'two'), 38), (('high', 'found'), 39)]

# FIND FREQUENT TRIPLETS

In [26]:
#compute candidates for frequent triplets
k=3
candidates = APRIORI.getCombinations(combined_2,k)

In [27]:
minSupport_3 = minSupport/5
combined_3 = rdd.flatMap(lambda sentence: [(tuple(candidate),1) for candidate in candidates if set(list(candidate)).issubset(set(sentence))])\
                .reduceByKey(lambda y,x:x+y)\
                .filter(lambda item : item[1]> minSupport_3)

In [28]:
word_3 = combined_3.take(2)
word_3 = [(num2word[t[0]],t[1]) if type(t[0])==int else (tuple([num2word[element] for element in t[0]]),t[1]) for t in word_3]

In [29]:
word_3

[(('cell', 'also', 'study'), 33), (('cell', 'effect', 'result'), 37)]

# GENERALIZED APRIORI ALGORITHM

In [30]:
#run the generalized Apriori algorithm on my processed rdd
freq_rdd =APRIORI.Apriori(support = sc.parallelize([]), rdd=rdd)

---set up complete---
---singletones found!---
---first candidates found!---
starting 2 items in basket loop
added frequent baskets with 2 items
computing candidates for next iteration
found candidates for 3 items in basket
starting 3 items in basket loop
added frequent baskets with 3 items


In [31]:
results = freq_rdd.collect()

In [32]:
#decode results which are hashed to integers
decoded = [(num2word[t[0]],t[1]) if type(t[0])==int else (tuple([num2word[element] for element in t[0]]),t[1]) for t in results]

In [33]:
decoded

[('significantly', 165),
 ('analysis', 206),
 ('level', 208),
 ('disease', 174),
 ('patient', 335),
 ('compared', 177),
 ('factor', 144),
 ('cell', 315),
 ('time', 178),
 ('protein', 224),
 ('showed', 170),
 ('increased', 185),
 ('case', 157),
 ('high', 191),
 ('result', 375),
 ('associated', 169),
 ('t0', 177),
 ('however', 223),
 ('used', 229),
 ('found', 193),
 ('one', 212),
 ('may', 267),
 ('p', 167),
 ('well', 155),
 ('system', 164),
 ('using', 260),
 ('present', 183),
 ('different', 207),
 ('also', 295),
 ('human', 151),
 ('activity', 215),
 ('significant', 170),
 ('treatment', 251),
 ('method', 149),
 ('role', 176),
 ('mechanism', 150),
 ('change', 155),
 ('effect', 313),
 ('gene', 149),
 ('c', 157),
 ('clinical', 152),
 ('show', 144),
 ('expression', 149),
 ('study', 419),
 ('observed', 179),
 ('two', 245),
 ('response', 154),
 ('control', 167),
 ('data', 168),
 (('increased', 'two'), 38),
 (('high', 'found'), 39),
 (('patient', 'clinical'), 78),
 (('treatment', 'two'), 43),
 (

# ASSOCIATION RULES

In [34]:
#generate association rules
rules_df = APRIORI.generate_association_rules(results, to_decode=True)

In [35]:
rules_df.head(20)

Unnamed: 0,antecedent,consequent,support,confidence,interest
0,"(also, expression)","(cell,)",0.023644,0.641509,0.422455
1,"(expression,)","(cell,)",0.062587,0.604027,0.384973
2,"(effect, expression)","(cell,)",0.021558,0.574074,0.35502
3,"(high, used)","(study,)",0.020167,0.537037,0.24566
5,"(cell, gene)","(expression,)",0.02573,0.521127,0.417511
7,"(clinical,)","(patient,)",0.054242,0.513158,0.280195
8,"(may, treatment)","(effect,)",0.020862,0.508475,0.290811


In [36]:
# save rules to csv
rules_df.to_csv('/content/rules.csv')