In [200]:
!pip install pyspark



In [229]:
from __future__ import print_function
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, split 
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql import SparkSession
from pyspark.ml.feature import NGram
from pyspark.ml.feature import Word2Vec
from pyspark.ml.feature import CountVectorizer
from pyspark.ml import Pipeline
import numpy as np

# Task 1: Finding the top 10 words of our 5 documents
A. In this task we used spark dataframe to enter 5 inputs and creat dataframe  

In [202]:
# Start first spark session
spark = SparkSession.builder.appName("assignment").getOrCreate()

In [203]:
# Enter 5 documents including text data
sentenceData = spark.createDataFrame([
    (0, "In clinical trials, reactogenicity symptoms (side effects that happen within 7 days of getting vaccinated) were common but were mostly mild to moderate.Side effects (such as fever, chills, tiredness, and headache) throughout the body were more common after the second dose of the vaccine."),
    (1, "As people around the world receive COVID-19 vaccines, reports of temporary side effects such as headaches and fevers are rolling in. Much of this was expected — clinical-trial data for the vaccines authorized so far suggested as much. But now that millions of people are vaccinated, compared with the thousands enrolled in early studies, reports of some rare, allergic reactions are surfacing, and questions are arising about whether any deaths are linked to the shots."),
    (2, "A tracheostomy is a hole in the windpipe (trachea) created by a surgeon. This hole, called a stoma, replaces a person’s nose and mouth as the pathway for breathing. A tracheostomy tube is inserted into the stoma to keep the hole open and provide an entryway into the lungs."),
    (3, "Researchers conducted a retrospective analysis of an electronic health record database that included 72,585 patients with COPD who were hospitalized and mechanically ventilated between January 1, 2009, and June 29, 2018. The cohort was then restricted to patients receiving tracheostomy. Receiver operating characteristic (ROC) curve was used to define the most sensitive cut-off for tracheostomy timing to predict 90-day mortality, which was then implemented to create the early and late tracheostomy groups"),
    (4, "In every state except two – Maine and Nebraska – the candidate that gets the most votes wins all of the state’s electoral college votes.Due to these rules, a candidate can win the election without getting the most votes at the national level.")],
 ["document", "sentence"])

In [204]:
# Show the content of the data
sentenceData.show(truncate=False)

+--------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|document|sentence                                                                                                                                                                                                                                                                                                                                                                                                                                                                              

B. Tokenization,vectorization, and finding TF-IDF of our input documents

In [205]:
# Tokenization and vectorization
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
vectorizer  = CountVectorizer(inputCol="words", outputCol="rawFeatures")

idf = IDF(inputCol="rawFeatures", outputCol="features")

pipeline = Pipeline(stages=[tokenizer, vectorizer, idf])

model = pipeline.fit(sentenceData)

C. Finding the top TF-IDF 10 words 


In [206]:
# Count top TF-IDF 10 words

total_counts = model.transform(sentenceData)\
                    .select('rawFeatures').rdd\
                    .map(lambda row: row['rawFeatures'].toArray())\
                    .reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))])

vocabList = model.stages[1].vocabulary
d = {'vocabList':vocabList,'counts':total_counts}

spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).show(10)

+------------+------+
|   vocabList|counts|
+------------+------+
|         the|  21.0|
|         and|   9.0|
|           a|   8.0|
|          to|   8.0|
|          of|   8.0|
|         are|   5.0|
|          as|   5.0|
|        were|   4.0|
|tracheostomy|   4.0|
|         was|   4.0|
+------------+------+
only showing top 10 rows



In [207]:
# Check of our data
model.transform(sentenceData).show(truncate=False)

+--------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
from pyspark.sql.types import ArrayType, StringType

def termsIdx2Term(vocabulary):
    def termsIdx2Term(termIndices):
        return [vocabulary[int(index)] for index in termIndices]
    return udf(termsIdx2Term, ArrayType(StringType()))

vectorizerModel = model.stages[1]
vocabList = vectorizerModel.vocabulary
vocabList

In [209]:
rawFeatures = model.transform(sentenceData).select('rawFeatures')
rawFeatures.show()



+--------------------+
|         rawFeatures|
+--------------------+
|(188,[0,1,3,4,6,7...|
|(188,[0,1,3,4,5,6...|
|(188,[0,1,2,3,6,8...|
|(188,[0,1,2,3,4,7...|
|(188,[0,1,2,3,4,1...|
+--------------------+



#Task2. Finding the top 10 TF-IDF lemmatized words
First install spark nlp package which need special setting in Google Colab

In [1]:
import os

# Install java
! apt-get update -qq
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

# Install pyspark
! pip install --ignore-installed -q pyspark==2.4.4
! pip install --ignore-installed -q spark-nlp==2.7.1
# Install Spark NLP
! pip install --ignore-installed spark-nlp



openjdk version "1.8.0_282"
OpenJDK Runtime Environment (build 1.8.0_282-8u282-b08-0ubuntu1~18.04-b08)
OpenJDK 64-Bit Server VM (build 25.282-b08, mixed mode)
[K     |████████████████████████████████| 215.7MB 72kB/s 
[K     |████████████████████████████████| 204kB 23.3MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
[K     |████████████████████████████████| 143kB 6.9MB/s 
[?25hCollecting spark-nlp
[?25l  Downloading https://files.pythonhosted.org/packages/8d/a5/a5130215b43f3bd0e98bd16c471d36dafeab8855ca17789d4927337fa7dc/spark_nlp-2.7.4-py2.py3-none-any.whl (139kB)
[K     |████████████████████████████████| 143kB 8.9MB/s 
[?25hInstalling collected packages: spark-nlp
Successfully installed spark-nlp-2.7.4


In [211]:
# Import the dependency packages 
import sparknlp

spark = sparknlp.start()
from pyspark.ml import Pipeline
import string
string.punctuation

'!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'

In [212]:
from sparknlp.base import* 
from sparknlp.annotator import*

In [213]:
print("Spark NLP version")
sparknlp.version()

print("Apache Spark version")
spark.version

Spark NLP version
Apache Spark version


'2.4.4'

In [214]:
# Configuration of old Spark version with Google Colab to allow NLP package work
SparkSession.builder.config("spark.jars", "hdfs://somepath/sparknlp.jar")

<pyspark.sql.session.SparkSession.Builder at 0x7f69f241f450>

In [215]:
#Stacking Spark NLP Annotators in Spark ML Pipeline

documentAssembler = DocumentAssembler()\
.setInputCol("sentence")\
.setOutputCol("document")

tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")
    
normalizer = Normalizer() \
    .setInputCols(["token"]) \
    .setOutputCol("normalized")\
    .setLowercase(True)\
    .setCleanupPatterns(["[^\w\d\s]"])

In [216]:
#Pipleine document, tokenizer, and normalizer
nlpPipeline = Pipeline(stages=[
 documentAssembler, 
 tokenizer,
 normalizer
 ])

empty_df = spark.createDataFrame([['']]).toDF("sentence")

pipelineModel = nlpPipeline.fit(empty_df)

In [217]:
result1 = pipelineModel.transform(sentenceData)

In [365]:
# Check for results
result1.show(truncate=50)

+--------+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|document|                                          sentence|                                             words|                                           feature|
+--------+--------------------------------------------------+--------------------------------------------------+--------------------------------------------------+
|       0|In clinical trials, reactogenicity symptoms (si...|[in, clinical, trials,, reactogenicity, symptom...|[0.011457055860029703,-0.0368499701355838,-0.01...|
|       1|As people around the world receive COVID-19 vac...|[as, people, around, the, world, receive, covid...|[0.013541094850127897,-0.015294229711095493,0.0...|
|       2|A tracheostomy is a hole in the windpipe (trach...|[a, tracheostomy, is, a, hole, in, the, windpip...|[-0.015882298530896705,-0.02551314291753331,-0....|
|       3|Resear

In [219]:
#Retrieves lemmas out of words with the objective of returning a base dictionary word
!wget -q https://raw.githubusercontent.com/mahavivo/vocabulary/master/lemmas/AntBNC_lemmas_ver_001.txt

In [220]:
lemmatizer = Lemmatizer() \
    .setInputCols(["token"]) \
    .setOutputCol("lemma") \
    .setDictionary("./AntBNC_lemmas_ver_001.txt", value_delimiter ="\t", key_delimiter = "->")

In [221]:
documentAssembler = DocumentAssembler()\
.setInputCol("sentence")\
.setOutputCol("document")

tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

nlpPipeline = Pipeline(stages=[
 documentAssembler, 
 tokenizer,
 lemmatizer
 ])

empty_df = spark.createDataFrame([['']]).toDF("sentence")

pipelineModel = nlpPipeline.fit(empty_df)

In [291]:
# Check for results
result = pipelineModel.transform(sentenceData)

result.show()

+--------------------+--------------------+--------------------+--------------------+
|            document|            sentence|               token|               lemma|
+--------------------+--------------------+--------------------+--------------------+
|[[document, 0, 28...|In clinical trial...|[[token, 0, 1, In...|[[token, 0, 1, In...|
|[[document, 0, 46...|As people around ...|[[token, 0, 1, As...|[[token, 0, 1, As...|
|[[document, 0, 27...|A tracheostomy is...|[[token, 0, 0, A,...|[[token, 0, 0, A,...|
|[[document, 0, 50...|Researchers condu...|[[token, 0, 10, R...|[[token, 0, 10, R...|
|[[document, 0, 24...|In every state ex...|[[token, 0, 1, In...|[[token, 0, 1, In...|
+--------------------+--------------------+--------------------+--------------------+



In [292]:
#Check for top 10 lemmatized words
import pyspark.sql.functions as F
result_df = result.select(F.explode(F.arrays_zip('token.result',  'lemma.result')).alias("cols")) \
.select(F.expr("cols['0']").alias("token"),
        F.expr("cols['1']").alias("lemma")).toPandas()

result_df.head(10)

Unnamed: 0,token,lemma
0,In,In
1,clinical,clinical
2,trials,trial
3,",",","
4,reactogenicity,reactogenicity
5,symptoms,symptom
6,(,(
7,side,side
8,effects,effect
9,that,that


In [348]:
# Change the data from string to array type
from pyspark.sql.types import StringType
from pyspark.sql.functions import array
df_new4 = sentenceData.withColumn("sentence", array(sentenceData["sentence"]))
df_new4.printSchema

<bound method DataFrame.printSchema of DataFrame[document: bigint, sentence: array<string>]>

In [349]:
# applying tf on the df_new4
hashingTF2 = HashingTF(inputCol="sentence", outputCol="intFeatures", numFeatures=20)
featurizedDatalema = hashingTF2.transform(df_new4)

In [352]:
#calculating the IDF
idf2 = IDF(inputCol="intFeatures", outputCol="lemfeatures")
idfModellem = idf2.fit(featurizedDatalema)
rescaledDatalem = idfModellem.transform(featurizedDatalema)

In [353]:
# Check the results
rescaledDatalem.show()

+--------+--------------------+---------------+--------------------+
|document|            sentence|    intFeatures|         lemfeatures|
+--------+--------------------+---------------+--------------------+
|       0|[In clinical tria...| (20,[2],[1.0])|(20,[2],[0.693147...|
|       1|[As people around...|(20,[12],[1.0])|(20,[12],[1.09861...|
|       2|[A tracheostomy i...| (20,[2],[1.0])|(20,[2],[0.693147...|
|       3|[Researchers cond...|(20,[14],[1.0])|(20,[14],[1.09861...|
|       4|[In every state e...|(20,[15],[1.0])|(20,[15],[1.09861...|
+--------+--------------------+---------------+--------------------+



#Task3. Finding the top 10 TF_IDF ngrams words

In [316]:
#Creating the spark2 object 
spark3 = SparkSession.builder.appName("Ngrams").getOrCreate()

In [317]:
ngramData = spark3.createDataFrame([
    (0, "In clinical trials, reactogenicity symptoms (side effects that happen within 7 days of getting vaccinated) were common but were mostly mild to moderate.Side effects (such as fever, chills, tiredness, and headache) throughout the body were more common after the second dose of the vaccine."),
    (1, "As people around the world receive COVID-19 vaccines, reports of temporary side effects such as headaches and fevers are rolling in. Much of this was expected — clinical-trial data for the vaccines authorized so far suggested as much. But now that millions of people are vaccinated, compared with the thousands enrolled in early studies, reports of some rare, allergic reactions are surfacing, and questions are arising about whether any deaths are linked to the shots."),
    (2, "A tracheostomy is a hole in the windpipe (trachea) created by a surgeon. This hole, called a stoma, replaces a person’s nose and mouth as the pathway for breathing. A tracheostomy tube is inserted into the stoma to keep the hole open and provide an entryway into the lungs."),
    (3, "Researchers conducted a retrospective analysis of an electronic health record database that included 72,585 patients with COPD who were hospitalized and mechanically ventilated between January 1, 2009, and June 29, 2018. The cohort was then restricted to patients receiving tracheostomy. Receiver operating characteristic (ROC) curve was used to define the most sensitive cut-off for tracheostomy timing to predict 90-day mortality, which was then implemented to create the early and late tracheostomy groups"),
    (4, "In every state except two – Maine and Nebraska – the candidate that gets the most votes wins all of the state’s electoral college votes.Due to these rules, a candidate can win the election without getting the most votes at the national level.")],
 ["label", "rawdata"])

In [318]:
# Check data
ngramData.show()

+-----+--------------------+
|label|             rawdata|
+-----+--------------------+
|    0|In clinical trial...|
|    1|As people around ...|
|    2|A tracheostomy is...|
|    3|Researchers condu...|
|    4|In every state ex...|
+-----+--------------------+



In [334]:
# Tokenization of ngramData
tokenizer = Tokenizer(inputCol="rawdata", outputCol="words")
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

idf = IDF(inputCol="ngrams", outputCol="ngramfeatures")

pipeline = Pipeline(stages=[tokenizer, ngram])

model = pipeline.fit(ngramData)

result= model.transform(ngramData)
result3 = result.show(truncate=False)

+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [340]:
# Change the text from string character to array
from pyspark.sql.types import StringType
from pyspark.sql.functions import array
df_new5 = ngramData.withColumn("rawdata", array(ngramData["rawdata"]))
df_new5.printSchema

<bound method DataFrame.printSchema of DataFrame[label: bigint, rawdata: array<string>]>

In [342]:
# applying tf on the words data
hashingTFn = HashingTF(inputCol="rawdata", outputCol="ngramrawFeatures", numFeatures=20)
featurizedDatan = hashingTFn.transform(df_new5)

In [343]:
#calculating the IDF
idfn = IDF(inputCol="ngramrawFeatures", outputCol="ngramfeatures")
idfModeln = idfn.fit(featurizedDatan)
rescaledDatan = idfModeln.transform(featurizedDatan)

In [359]:
# Check the result
rescaledDatan.show()

+-----+--------------------+----------------+--------------------+
|label|             rawdata|ngramrawFeatures|       ngramfeatures|
+-----+--------------------+----------------+--------------------+
|    0|[In clinical tria...|  (20,[2],[1.0])|(20,[2],[0.693147...|
|    1|[As people around...| (20,[12],[1.0])|(20,[12],[1.09861...|
|    2|[A tracheostomy i...|  (20,[2],[1.0])|(20,[2],[0.693147...|
|    3|[Researchers cond...| (20,[14],[1.0])|(20,[14],[1.09861...|
|    4|[In every state e...| (20,[15],[1.0])|(20,[15],[1.09861...|
+-----+--------------------+----------------+--------------------+



# Task2.A Using the Word2Vec to creat word embedding model for documents without NLP

In [299]:
# Creating word embedding model 
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="words", outputCol="feature")

pipeline = Pipeline(stages=[tokenizer, word2Vec])


model = pipeline.fit(sentenceData)
result = model.transform(sentenceData)

In [300]:
# Check for model results
result.show()

+--------+--------------------+--------------------+--------------------+
|document|            sentence|               words|             feature|
+--------+--------------------+--------------------+--------------------+
|       0|In clinical trial...|[in, clinical, tr...|[0.01145705586002...|
|       1|As people around ...|[as, people, arou...|[0.01354109485012...|
|       2|A tracheostomy is...|[a, tracheostomy,...|[-0.0158822985308...|
|       3|Researchers condu...|[researchers, con...|[0.02061817072131...|
|       4|In every state ex...|[in, every, state...|[0.01296122745762...|
+--------+--------------------+--------------------+--------------------+



In [301]:
# Show the vectors of the words
w2v = model.stages[1]
w2v.getVectors().show(10)

+------------+--------------------+
|        word|              vector|
+------------+--------------------+
|mechanically|[-0.1389172822237...|
|        used|[0.07984109967947...|
|        side|[-0.0675490051507...|
|      health|[-0.0470000840723...|
|     rolling|[0.13018737733364...|
| vaccinated)|[-0.0144791118800...|
|         for|[-0.1083322539925...|
|    database|[0.07516460865736...|
|        june|[0.09108892828226...|
|  mortality,|[0.09504242241382...|
+------------+--------------------+
only showing top 10 rows



In [302]:
# showing the synonyms and cosine similarity of the word in input data
synonyms = w2v.findSynonyms("health", 10)  
synonyms.show(10)

+---------+------------------+
|     word|        similarity|
+---------+------------------+
|   stoma,|0.9905773997306824|
| vaccines|0.9840749502182007|
|candidate|0.9829673767089844|
| person’s|0.9622843861579895|
|     keep|0.9610600471496582|
|reactions|0.9148192405700684|
|   people|0.9031999111175537|
|     side|0.8847343325614929|
|   except|0.8423463702201843|
|      two|0.8401006460189819|
+---------+------------------+



#Task2.B Using the Word2Vec to creat word embedding model for documents with lemmatized words


In [311]:
# Creat word embedding model for documents with lemmatized words
tokenizer2 = Tokenizer(inputCol="lemma", outputCol="words")
word2Vec2 = Word2Vec(vectorSize=3, minCount=0, inputCol="words", outputCol="feature")

pipeline2 = Pipeline(stages=[tokenizer, word2Vec])


model2 = pipeline.fit(sentenceData)
result1 = model2.transform(sentenceData)

In [312]:
# Check the results 
result.show()

+--------+--------------------+--------------------+--------------------+
|document|            sentence|               words|             feature|
+--------+--------------------+--------------------+--------------------+
|       0|In clinical trial...|[in, clinical, tr...|[0.01145705586002...|
|       1|As people around ...|[as, people, arou...|[0.01354109485012...|
|       2|A tracheostomy is...|[a, tracheostomy,...|[-0.0158822985308...|
|       3|Researchers condu...|[researchers, con...|[0.02061817072131...|
|       4|In every state ex...|[in, every, state...|[0.01296122745762...|
+--------+--------------------+--------------------+--------------------+



In [313]:
# Creat vectors 
w2v2 = model2.stages[1]
w2v2.getVectors().show(10)

+------------+--------------------+
|        word|              vector|
+------------+--------------------+
|mechanically|[-0.1389172822237...|
|        used|[0.07984109967947...|
|        side|[-0.0675490051507...|
|      health|[-0.0470000840723...|
|     rolling|[0.13018737733364...|
| vaccinated)|[-0.0144791118800...|
|         for|[-0.1083322539925...|
|    database|[0.07516460865736...|
|        june|[0.09108892828226...|
|  mortality,|[0.09504242241382...|
+------------+--------------------+
only showing top 10 rows



In [314]:
#showing the synonyms and cosine similarity of the word in input data
synonyms = w2v2.findSynonyms("health", 10)  
synonyms.show(10)

+---------+------------------+
|     word|        similarity|
+---------+------------------+
|   stoma,|0.9905773997306824|
| vaccines|0.9840749502182007|
|candidate|0.9829673767089844|
| person’s|0.9622843861579895|
|     keep|0.9610600471496582|
|reactions|0.9148192405700684|
|   people|0.9031999111175537|
|     side|0.8847343325614929|
|   except|0.8423463702201843|
|      two|0.8401006460189819|
+---------+------------------+



#Task2.C Using the Word2Vec to creat word embedding model for documents with Ngrams

In [363]:
#Creating embedding word model for ngrams
tokenizer3 = Tokenizer(inputCol="rawdata", outputCol="words")
word2Vec3 = Word2Vec(vectorSize=3, minCount=0, inputCol="words", outputCol="feature")

pipeline3 = Pipeline(stages=[tokenizer, word2Vec])


model3 = pipeline.fit(ngramData)
result4 = model3.transform(ngramData)

In [364]:
#Check the result
result4.show()

+-----+--------------------+--------------------+--------------------+
|label|             rawdata|               words|              ngrams|
+-----+--------------------+--------------------+--------------------+
|    0|In clinical trial...|[in, clinical, tr...|[in clinical, cli...|
|    1|As people around ...|[as, people, arou...|[as people, peopl...|
|    2|A tracheostomy is...|[a, tracheostomy,...|[a tracheostomy, ...|
|    3|Researchers condu...|[researchers, con...|[researchers cond...|
|    4|In every state ex...|[in, every, state...|[in every, every ...|
+-----+--------------------+--------------------+--------------------+



In [329]:
#Creating the vector 
w2v3 = model3.stages[1]
w2v3.getVectors().show()


AttributeError: ignored

In [162]:
# showing the synonyms and cosine similarity of the word in input data
synonyms = w2v3.findSynonyms("health", 10)  
synonyms.show(10)

AttributeError: ignored