In [1]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/27/67/5158f846202d7f012d1c9ca21c3549a58fd3c6707ae8ee823adcaca6473c/pyspark-3.0.2.tar.gz (204.8MB)
[K     |████████████████████████████████| 204.8MB 80kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 55.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.2-py2.py3-none-any.whl size=205186687 sha256=49c0210da8da7e2db025cd1482f103cc62f902b9b9777f98afb4a7e046562bbb
  Stored in directory: /root/.cache/pip/wheels/8b/09/da/c1f2859bcc86375dc972c5b6af4881b3603269bcc4c9be5d16
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.2


In [2]:
from __future__ import print_function
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql import SparkSession
from pyspark.ml.feature import NGram
from pyspark.ml.feature import Word2Vec

In [3]:
# creating spark session
spark = SparkSession.builder.appName("TfIdf Example").getOrCreate()

In [4]:
# creating spark dataframe wiht the input data. You can also read the data from file. label represents the 3 documnets (0.0,0.1,0.2)
sentenceData = spark.createDataFrame([
        (0.0, "Welcome to KDM TF_IDF Tutorial."),
        (0.1, "Learn Spark ml tf_idf in today's lab."),
        (0.2, "Spark Mllib has TF-IDF.")
    ], ["label", "sentence"])

In [5]:
# creating tokens/words from the sentence data
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

In [6]:
wordsData.show()

+-----+--------------------+--------------------+
|label|            sentence|               words|
+-----+--------------------+--------------------+
|  0.0|Welcome to KDM TF...|[welcome, to, kdm...|
|  0.1|Learn Spark ml tf...|[learn, spark, ml...|
|  0.2|Spark Mllib has T...|[spark, mllib, ha...|
+-----+--------------------+--------------------+



In [7]:
# applying tf on the words data
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

In [8]:
# calculating the IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

In [9]:
#displaying the results
rescaledData.select("label", "features").show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(20,[2,8,13,15,17...|
|  0.1|(20,[2,3,6,7],[0....|
|  0.2|(20,[6,14,15],[0....|
+-----+--------------------+



In [10]:
spark2 = SparkSession.builder.appName("Ngram Example").getOrCreate()

In [11]:
#creating dataframe of input
wordDataFrame = spark2.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])


In [12]:
#creating NGrams with n=2 (two words)
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)

In [13]:
# displaying the results
ngramDataFrame.select("ngrams").show(truncate=False)

+------------------------------------------------------------------+
|ngrams                                                            |
+------------------------------------------------------------------+
|[Hi I, I heard, heard about, about Spark]                         |
|[I wish, wish Java, Java could, could use, use case, case classes]|
|[Logistic regression, regression models, models are, are neat]    |
+------------------------------------------------------------------+



In [14]:
# creating spark session
spark3 = SparkSession.builder.appName("Word2Vec Example").getOrCreate()

In [15]:
# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark3.createDataFrame([
    ("McCarthy was asked to analyse the data from the first phase of trials of the vaccine.".split(" "), ),
    ("We have amassed the raw data and are about to begin analysing it.".split(" "), ),
    ("Without more data we cannot make a meaningful comparison of the two systems.".split(" "), ),
    ("Collecting data is a painfully slow process.".split(" "), ),
    ("You need a long series of data to be able to discern such a trend.".split(" "), )
], ["text"])

In [16]:
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)

In [17]:
for row in result.collect():
    text, vector = row
    #printing the results
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

Text: [McCarthy, was, asked, to, analyse, the, data, from, the, first, phase, of, trials, of, the, vaccine.] => 
Vector: [-0.0035007820115424693,-0.0017171840299852192,0.02368130796821788]

Text: [We, have, amassed, the, raw, data, and, are, about, to, begin, analysing, it.] => 
Vector: [-0.017147045116871595,-0.007818681689409109,0.03364002683128302]

Text: [Without, more, data, we, cannot, make, a, meaningful, comparison, of, the, two, systems.] => 
Vector: [-0.01177951755324522,-0.032720054571445174,0.015373159880534962]

Text: [Collecting, data, is, a, painfully, slow, process.] => 
Vector: [0.014085809966283185,0.001814738182084901,-0.0029639844516558306]

Text: [You, need, a, long, series, of, data, to, be, able, to, discern, such, a, trend.] => 
Vector: [-0.00949504499634107,-0.012580770254135131,0.03007583270470301]



In [18]:
# showing the synonyms and cosine similarity of the word in input data
synonyms = model.findSynonyms("data", 5)   # its okay for certain words , real bad for others
synonyms.show(5)

+---------+------------------+
|     word|        similarity|
+---------+------------------+
|      was|0.9340987205505371|
|      and|0.9194383025169373|
|       be|0.9047830700874329|
|analysing|0.8658599853515625|
|      the|0.8481355905532837|
+---------+------------------+



In [19]:
#closing the spark sessions
spark.stop()
spark2.stop()
spark3.stop()

# ICP5

In [20]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [21]:
with open("/content/drive/MyDrive/text1.txt","r+") as t1:
    doc1 = t1.read()
with open("/content/drive/MyDrive/text2.txt","r+") as t2:
    doc2 = t2.read()
with open("/content/drive/MyDrive/text3.txt","r+") as t3:
    doc3 = t3.read()
with open("/content/drive/MyDrive/text4.txt","r+") as t4:
    doc4 = t4.read()
with open("/content/drive/MyDrive/text5.txt","r+") as t5:
    doc5 = t5.read()
# Read all 5 txt files in document list 
documents = [doc1,doc2,doc3,doc4,doc5]

#a. Top 10 TF-IDF words for the above input

In [22]:

from sklearn.feature_extraction.text import TfidfVectorizer
import pandas as pd

# using sklearn library which has inbuilt Tfidf vectorizer class which can generate tfidf for given corpus
vect = TfidfVectorizer()
#created TfidfVectorizer object
tfidf_matrix = vect.fit_transform(documents)
#passed list of documents or corpus to obt method fit_transform
df = pd.DataFrame(tfidf_matrix.toarray(), columns = vect.get_feature_names())
# converted method output to panda data frame 
pd.set_option('display.max_columns', 20)

df.loc['Total'] = df.sum() # adding row to value total

#  used transpose function here to filter out words (which was rows) and then converted matrix back to original version
print (df.T.sort_values('Total', ascending=True).tail(10).T)

             in       him      fish       for       was        he       and  \
0      0.058153  0.000000  0.000000  0.068756  0.081733  0.000000  0.068756   
1      0.062718  0.000000  0.000000  0.148305  0.000000  0.000000  0.148305   
2      0.089482  0.062882  0.000000  0.000000  0.125764  0.000000  0.105797   
3      0.048144  0.202994  0.000000  0.113843  0.202994  0.326059  0.170765   
4      0.059707  0.083916  0.375903  0.070592  0.000000  0.101092  0.000000   
Total  0.318204  0.349792  0.375903  0.401496  0.410491  0.427151  0.493623   

             to        of       the  
0      0.058153  0.408663  0.407074  
1      0.062718  0.176295  0.439023  
2      0.223706  0.000000  0.134224  
3      0.096288  0.000000  0.144432  
4      0.119413  0.167831  0.179120  
Total  0.560278  0.752789  1.303872  


# b. Top 10 TF-IDF words for the lemmatized input

In [23]:
import nltk;nltk.download('punkt');nltk.download('wordnet')
from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()

#tokenize each text
words1 = nltk.word_tokenize(doc1)
words2 = nltk.word_tokenize(doc2)
words3 = nltk.word_tokenize(doc3)
words4 = nltk.word_tokenize(doc4)
words5 = nltk.word_tokenize(doc5)

lemmatized_document1 = ' '.join([lemmatizer.lemmatize(w) for w in words1])
lemmatized_document2 = ' '.join([lemmatizer.lemmatize(w) for w in words2])
lemmatized_document3 = ' '.join([lemmatizer.lemmatize(w) for w in words3])
lemmatized_document4 = ' '.join([lemmatizer.lemmatize(w) for w in words4])
lemmatized_document5 = ' '.join([lemmatizer.lemmatize(w) for w in words5])

#merging each textfiles
documents = [lemmatized_document1,lemmatized_document2,lemmatized_document3,lemmatized_document4,lemmatized_document5]

# using sklearn library which has inbuilt Tfidf vectorizer class which can generate tfidf for given corpus
vect = TfidfVectorizer()
#created TfidfVectorizer object
tfidf_matrix = vect.fit_transform(documents)
#passed list of documents or corpus to obt method fit_transform
df = pd.DataFrame(tfidf_matrix.toarray(), columns = vect.get_feature_names())
# converted method output to panda data frame 

df.loc['Total'] = df.sum() # adding row to value total

# also used transpose function here to filter out words (which was rows) and then converted matrix back to original version
print (df.T.sort_values('Total', ascending=True).tail(10).T)

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Unzipping corpora/wordnet.zip.
            him      fish       for        wa        he       dog       and  \
0      0.000000  0.000000  0.069778  0.082948  0.000000  0.000000  0.069778   
1      0.000000  0.000000  0.148755  0.000000  0.000000  0.000000  0.148755   
2      0.061075  0.000000  0.000000  0.122150  0.000000  0.294306  0.102757   
3      0.201305  0.000000  0.112896  0.201305  0.323346  0.161673  0.169344   
4      0.083916  0.375903  0.070592  0.000000  0.101092  0.000000  0.000000   
Total  0.346296  0.375903  0.402022  0.406403  0.424438  0.455979  0.490634   

             to        of       the  
0      0.059018  0.414739  0.413126  
1      0.062908  0.176830  0.440356  
2      0.217278  0.000000  0.130367  
3      0.095487  0.000000  0.143230  
4      0.119413  0.167831  0.179120  
Total

# c. Top 10 TF-IDF words for the n-gram based input.

In [24]:
# this function takes document and n int value to generate list of n grams
def ngrams(input, n):
    input = input.split(' ')
    output = []
    for i in range(len(input)-n+1):
        output.append(input[i:i+n])
    return output

ngram_doc1 = ' '.join([' '.join(x) for x in ngrams(doc1, 3)])
ngram_doc2 = ' '.join([' '.join(x) for x in ngrams(doc2, 3)])
ngram_doc3 = ' '.join([' '.join(x) for x in ngrams(doc3, 3)])
ngram_doc4 = ' '.join([' '.join(x) for x in ngrams(doc4, 3)])
ngram_doc5 = ' '.join([' '.join(x) for x in ngrams(doc5, 3)])

documents = [doc1,doc2,doc3,doc4,doc5]

# using sklearn library 
vect = TfidfVectorizer( ngram_range=(3,3)) # TfidfVectorizer has inbuilt ngram kwarg which show tfidf for ngrams
#created TfidfVectorizer object
tfidf_matrix = vect.fit_transform(documents)
#passed list of documents or corpus to obt method fit_transform
df = pd.DataFrame(tfidf_matrix.toarray(), columns = vect.get_feature_names())

df.loc['Total'] = df.sum() # adding row to value total

# used transpose function here to filter out words and then converted matrix back to original version
print (df.T.sort_values('Total', ascending=True).tail(10).T)

       boycott over the  the slogan one  never hosted the  the run up  \
0              0.000000        0.000000          0.000000    0.000000   
1              0.132453        0.132453          0.132453    0.132453   
2              0.000000        0.000000          0.000000    0.000000   
3              0.000000        0.000000          0.000000    0.000000   
4              0.000000        0.000000          0.000000    0.000000   
Total          0.132453        0.132453          0.132453    0.132453   

       the olympics before  over the country  the health of  \
0                 0.000000          0.000000       0.000000   
1                 0.132453          0.132453       0.132453   
2                 0.000000          0.000000       0.000000   
3                 0.000000          0.000000       0.000000   
4                 0.000000          0.000000       0.000000   
Total             0.132453          0.132453       0.132453   

       the country human  in the run  smog mig

#Task2

In [25]:
from __future__ import print_function
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql import SparkSession
from pyspark.ml.feature import NGram
from pyspark.ml.feature import Word2Vec
# creating spark session
spark = SparkSession.builder.appName("TfIdf Example").getOrCreate()

documentData = spark.createDataFrame([
        (0.0, doc1),
        (0.1, doc2),
        (0.2, doc3),
        (0.3, doc4),
        (0.5, doc5)
    ], ["label", "document"])

# creating tokens/words from the sentence data
tokenizer = Tokenizer(inputCol="document", outputCol="words")
wordsData = tokenizer.transform(documentData)
print (documentData)
wordsData.show()

DataFrame[label: double, document: string]
+-----+--------------------+--------------------+
|label|            document|               words|
+-----+--------------------+--------------------+
|  0.0|As the sound of f...|[as, the, sound, ...|
|  0.1|China had never h...|[china, had, neve...|
|  0.2|A homeless man ri...|[a, homeless, man...|
|  0.3|Hamlin previously...|[hamlin, previous...|
|  0.5|Andy Trust has be...|[andy, trust, has...|
+-----+--------------------+--------------------+



# a. Try without NLP


In [26]:

# applying tf on the words data
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=50)
tf = hashingTF.transform(wordsData)

# calculating the IDF
tf.cache()
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf = idf.fit(tf)
tfidf = idf.transform(tf)
#displaying the results
tfidf.select("label", "features").show()


print("TFIDF without NLP:")
for row in tfidf.collect():
    print(row)
    print(row['rawFeatures'])
spark.stop()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(50,[0,1,2,4,6,8,...|
|  0.1|(50,[0,1,2,3,5,9,...|
|  0.2|(50,[0,1,3,5,6,8,...|
|  0.3|(50,[1,3,4,5,6,7,...|
|  0.5|(50,[0,5,8,9,10,1...|
+-----+--------------------+

TFIDF without NLP:
Row(label=0.0, document="As the sound of fireworks rang out over Beijing to mark the close of the 2008 Summer Olympics, China's leaders could have been forgiven for breathing a sigh of relief. Remembered today as an event in which record-breaking sporting achievements were matched only by the spectacular pageantry and organization of the Games, the success of the Beijing Olympics was no sure thing.", words=['as', 'the', 'sound', 'of', 'fireworks', 'rang', 'out', 'over', 'beijing', 'to', 'mark', 'the', 'close', 'of', 'the', '2008', 'summer', 'olympics,', "china's", 'leaders', 'could', 'have', 'been', 'forgiven', 'for', 'breathing', 'a', 'sigh', 'of', 'relief.', 'remembered', 'today', 'as', 'an', 'event', 'in', 

# b. Try with Lemmatization


In [27]:

import nltk;nltk.download('punkt');nltk.download('wordnet')
from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()

words1 = nltk.word_tokenize(doc1)
words2 = nltk.word_tokenize(doc2)
words3 = nltk.word_tokenize(doc3)
words4 = nltk.word_tokenize(doc4)
words5 = nltk.word_tokenize(doc5)

lemmatized_document1 = ' '.join([lemmatizer.lemmatize(w) for w in words1])
lemmatized_document2 = ' '.join([lemmatizer.lemmatize(w) for w in words2])
lemmatized_document3 = ' '.join([lemmatizer.lemmatize(w) for w in words3])
lemmatized_document4 = ' '.join([lemmatizer.lemmatize(w) for w in words4])
lemmatized_document5 = ' '.join([lemmatizer.lemmatize(w) for w in words5])

### lemmatizing words from 5 input docs same as previos task

# creating spark session
spark = SparkSession.builder.appName("TfIdf Example").getOrCreate()

documentData = spark.createDataFrame([
        (0.0, lemmatized_document1),
        (0.1, lemmatized_document2),
        (0.2, lemmatized_document3),
        (0.3, lemmatized_document4),
        (0.5, lemmatized_document5)
    ], ["label", "document"])

# creating tokens/words from the sentence data
tokenizer = Tokenizer(inputCol="document", outputCol="words")
wordsData = tokenizer.transform(documentData)
print (documentData)
wordsData.show()


# applying tf on the words data
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=50)
tf = hashingTF.transform(wordsData)
# calculating the IDF
tf.cache()
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf = idf.fit(tf)
tfidf = idf.transform(tf)
#displaying the results
tfidf.select("label", "features").show()


print("TF-IDF with Lemmatization:")
for row in tfidf.collect():
    print(row)
    print(row['rawFeatures'])
spark.stop()

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
DataFrame[label: double, document: string]
+-----+--------------------+--------------------+
|label|            document|               words|
+-----+--------------------+--------------------+
|  0.0|As the sound of f...|[as, the, sound, ...|
|  0.1|China had never h...|[china, had, neve...|
|  0.2|A homeless man ri...|[a, homeless, man...|
|  0.3|Hamlin previously...|[hamlin, previous...|
|  0.5|Andy Trust ha bee...|[andy, trust, ha,...|
+-----+--------------------+--------------------+

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(50,[0,2,4,6,7,8,...|
|  0.1|(50,[0,1,2,3,5,7,...|
|  0.2|(50,[0,1,3,4,5,6,...|
|  0.3|(50,[1,3,4,5,6,7,...|
|  0.5|(50,[0,2,4,7,9,10...|
+-----+--------------------+

TF-IDF with Lemmat

# c. Try with N grams

In [28]:
spark = SparkSession.builder.appName("TfIdf Example").getOrCreate()

documentData = spark.createDataFrame([
        (0.0, doc1.split(' ')),
        (0.1, doc2.split(' ')),
        (0.2, doc3.split(' ')),
        (0.3, doc4.split(' ')),
        (0.5, doc5.split(' '))
    ], ["label", "document"])


# Using ngram 
ngram = NGram(n=2, inputCol="document", outputCol="ngrams")

ngramDataFrame = ngram.transform(documentData)

# applying tf on the words data
hashingTF = HashingTF(inputCol="ngrams", outputCol="rawFeatures", numFeatures=50)
tf = hashingTF.transform(ngramDataFrame)
# calculating the IDF
tf.cache()
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf = idf.fit(tf)
tfidf = idf.transform(tf)
#displaying the results
tfidf.select("label", "features").show()


print("TF-IDF with ngram:")
for row in tfidf.collect():
    print(row)
    print(row['rawFeatures'])
spark.stop()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(50,[0,1,4,5,6,7,...|
|  0.1|(50,[0,1,2,3,4,5,...|
|  0.2|(50,[0,1,2,4,5,6,...|
|  0.3|(50,[0,2,3,4,5,7,...|
|  0.5|(50,[0,1,2,4,6,8,...|
+-----+--------------------+

TF-IDF with ngram:
Row(label=0.0, document=['As', 'the', 'sound', 'of', 'fireworks', 'rang', 'out', 'over', 'Beijing', 'to', 'mark', 'the', 'close', 'of', 'the', '2008', 'Summer', 'Olympics,', "China's", 'leaders', 'could', 'have', 'been', 'forgiven', 'for', 'breathing', 'a', 'sigh', 'of', 'relief.', 'Remembered', 'today', 'as', 'an', 'event', 'in', 'which', 'record-breaking', 'sporting', 'achievements', 'were', 'matched', 'only', 'by', 'the', 'spectacular', 'pageantry', 'and', 'organization', 'of', 'the', 'Games,', 'the', 'success', 'of', 'the', 'Beijing', 'Olympics', 'was', 'no', 'sure', 'thing.'], ngrams=['As the', 'the sound', 'sound of', 'of fireworks', 'fireworks rang', 'rang out', 'out over', 'over Beijing', 'Beijing to',