**Create 5 separate text files containing text data (blogs,news articles etc)and perform the following Tasks:**

In [1]:
#installing pyspark
!pip install pyspark



In [2]:
#importing libraries needed for icp
from __future__ import print_function
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql import SparkSession
from pyspark.ml.feature import NGram
from pyspark.ml.feature import Word2Vec

In [3]:
# This line of code is to create the spark session, we name our sparksession as TF_IDF
spark = SparkSession.builder.appName("TFIDF icp").getOrCreate()

In [4]:
# Creating the spark dataframe with the five text files mounted to my google drive
spark2 = spark.read.text('/content/text*.txt')

In [5]:
# Here we are applying tokenization on the text files, let us call it textdata
tokenizer = Tokenizer(inputCol="value", outputCol="words")
textdata = tokenizer.transform(spark2)

In [6]:
#let us view our textdata and their tokens
textdata.show()

+--------------------+--------------------+
|               value|               words|
+--------------------+--------------------+
|Kobach’s law bloc...|[kobach’s, law, b...|
|                    |                  []|
|The law was neces...|[the, law, was, n...|
|                    |                  []|
|Kansas Attorney G...|[kansas, attorney...|
|Even during the C...|[even, during, th...|
|Even during the C...|[even, during, th...|
|                    |                  []|
|Today, however, M...|[today,, however,...|
|Draghi’s ascensio...|[draghi’s, ascens...|
|AllianceChicago c...|[alliancechicago,...|
|Mark Johnson, an ...|[mark, johnson,, ...|
+--------------------+--------------------+



**1a) Find out the top10 TF-IDF words for the above input.**

In [7]:
# applying tf on the textwords
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=10)
featurizedData = hashingTF.transform(textdata)

In [8]:
# calculating the IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
print(rescaledData)

DataFrame[value: string, words: array<string>, rawFeatures: vector, features: vector]


In [9]:
# Display the results of the top 10 TF-IDF words for our input data
rescaledData.select("value","features").show(10, truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                       

**1b)Find out the top10 TF-IDF words for the lemmatized input**

In [None]:
#import the nltk package
import nltk
#call the nltk downloader
nltk.download()
from nltk.stem import WordNetLemmatizer
wordnet_lemmatizer = WordNetLemmatizer()
nltk.download('wordnet')
nltk.download('punkt')


NLTK Downloader
---------------------------------------------------------------------------
    d) Download   l) List    u) Update   c) Config   h) Help   q) Quit
---------------------------------------------------------------------------

Download which package (l=list; x=cancel)?
Packages:
  [ ] abc................. Australian Broadcasting Commission 2006
  [ ] alpino.............. Alpino Dutch Treebank
  [ ] averaged_perceptron_tagger Averaged Perceptron Tagger
  [ ] averaged_perceptron_tagger_ru Averaged Perceptron Tagger (Russian)
  [ ] basque_grammars..... Grammars for Basque
  [ ] biocreative_ppi..... BioCreAtIvE (Critical Assessment of Information
                           Extraction Systems in Biology)
  [ ] bllip_wsj_no_aux.... BLLIP Parser: WSJ Model
  [ ] book_grammars....... Grammars from NLTK Book
  [ ] brown............... Brown Corpus
  [ ] brown_tei........... Brown Corpus (TEI XML Version)
  [ ] cess_cat............ CESS-CAT Treebank
  [ ] cess_esp............ CESS-E

In [10]:
#using wordnet lemmatizer to get lemmas from text files 
from nltk.stem import WordNetLemmatizer
def lemmaTF_IDF(textwords):
    lemmatizer = WordNetLemmatizer() 
    lemma_text = textwords.rdd.map(lambda x: (x.doc,[lemmatizer.lemmatize(t, pos='v') for t in tokenize(x.text)]))

In [11]:
#creating sparksession
spark4 = SparkSession.builder.appName("TFIDF lemma").getOrCreate()
files = ["text1.txt", "text/text2.txt", "text/text3.txt", "text/text4.txt", "text/text5.txt"]

In [12]:
#creating df 
from pyspark import SparkContext
sc = SparkContext.getOrCreate();
textwords = sc.wholeTextFiles("text*").toDF(["doc","text"])

In [13]:
import re
import numpy as np
from __future__ import division

def tokenize(s):
  return re.split("\\W+", s.lower())

# Function to sort the list of tuples by its second item 
def Sort_Tuple(tup):  
    # getting length of list of tuples 
    lst = len(tup)  
    for i in range(0, lst):  
        for j in range(0, lst-i-1):  
            if (tup[j][1] > tup[j + 1][1]):  
                temp = tup[j]  
                tup[j]= tup[j + 1]  
                tup[j + 1]= temp  
    return tup 

def tf_idf(N, tf, df):
    result = []
    for key, value in tf.items():
        doc = key[0]
        term = key[1]
        x = df[term]
        if (x>0):
          tf_idf = float(value)*np.log(N/x)
        
        result.append((term, tf_idf))
    return result

In [14]:
def tokenTF_IDF(textwords):
    tokenized_text = textwords.rdd.map(lambda x: (x.doc, tokenize(x.text)) )

    term_frequency = tokenized_text.flatMapValues(lambda x: x).countByValue()

    document_frequency = tokenized_text.flatMapValues(lambda x: x) \
                        .map(lambda x : (x[1],1))\
                        .reduceByKey(lambda a, b: a + b)
    tmp = {}
    for x in document_frequency.collect():
        tmp[x[0]] = x[1]

    number_of_docs = 5
    tf_idf_output = tf_idf(number_of_docs, term_frequency, tmp)
    # tf_idf_output
    return Sort_Tuple(tf_idf_output)[-10:]
tokenTF_IDF(textwords)

[('u', 1.8325814637483102),
 ('state', 1.8325814637483102),
 ('alliancechicago', 1.8325814637483102),
 ('community', 1.8325814637483102),
 ('health', 1.8325814637483102),
 ('research', 1.8325814637483102),
 ('institutional', 1.8325814637483102),
 ('board', 1.8325814637483102),
 ('irb', 1.8325814637483102),
 ('european', 1.8325814637483102)]

In [15]:
#Finding out the top10 TF-IDF words for the lemmatized input
#getting the lemmas of the words
import nltk
nltk.download('wordnet')
from nltk.stem import WordNetLemmatizer
def lemmaTF_IDF(tech_text):
    lemmatizer = WordNetLemmatizer() 
    lemma_text = textwords.rdd.map(lambda x: (x.doc,[lemmatizer.lemmatize(t, pos='v') for t in tokenize(x.text)]))

    # calculating TF of lemmatized words
    term_frequency = lemma_text.flatMapValues(lambda x: x).countByValue()

    # calculating DF of lemmatized tf
    document_frequency = lemma_text.flatMapValues(lambda x: x) \
                        .map(lambda x : (x[1],1))\
                        .reduceByKey(lambda a, b: a + b)
    tmp = {}
    for x in document_frequency.collect():
        tmp[x[0]] = x[1]

    # showing the top 10 TFIDF words for the lemmatized result  
    number_of_docs = 5
    tf_idf_output = tf_idf(number_of_docs, term_frequency, tmp)

    return Sort_Tuple(tf_idf_output)[-10:]
lemmaTF_IDF(textwords)

[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


[('u', 1.8325814637483102),
 ('state', 1.8325814637483102),
 ('alliancechicago', 1.8325814637483102),
 ('community', 1.8325814637483102),
 ('health', 1.8325814637483102),
 ('research', 1.8325814637483102),
 ('process', 1.8325814637483102),
 ('institutional', 1.8325814637483102),
 ('irb', 1.8325814637483102),
 ('european', 1.8325814637483102)]

**1c) Find out the top10TF-IDF words for the n-gram based input.**

In [16]:
# Create the Spark session
spark2 = SparkSession.builder.appName("Ngrams").getOrCreate()
# Create the dataframe with five text files
txt = spark2.read.text('text*.txt')

In [17]:
# perfoming tokenization on the textfiles
tokenizer = Tokenizer(inputCol="value", outputCol="words")
wordsData = tokenizer.transform(txt)

In [18]:

# Here i am Creating n-grams dataframe and i choose n=5
ngram = NGram(n=5, inputCol="words", outputCol="ngrams")
ngramDF = ngram.transform(wordsData)

In [19]:
# calculating TF on ngram texts
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=10)
featurizedData = hashingTF.transform(ngramDF)

In [20]:
# Calculate the inverse document frequency
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

In [21]:
# Displaying  the results of the top 10 TFIDF after applying ngram for n=5
rescaledData.select("features").show(10, truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                   |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|(10,[0,1,2,4,5,6,7,8,9],[0.7870927934024732,0.7870927934024732,0.0,0.7870927934024732,0.5247285289349821,0.36772478012531734,0.26236426446749106,0.5247285289349821,0.7870927934024732])                   |
|(10,[2],[0.0])                                                                                                                                                                 

**Write a simple spark program to read a dataset and find the W2V similar words (words with higher cosine similarity) for the Top10 TF-IDF Words**

In [22]:
#importing libraries
from __future__ import print_function
from pyspark import SparkContext
from pyspark.mllib.feature import HashingTF, IDF

In [23]:
# Importing text file 
sc = SparkContext.getOrCreate()
files = sc.textFile("/content/text*.txt").map(lambda line: line.split(" "))

In [24]:
# This code will calculate the TFIDF without using NLP
hashingTF = HashingTF(numFeatures=10)
tf = hashingTF.transform(files)

tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)
print("TFIDF without NLP:")
for each in tfidf.collect():
    print(each)
sc.stop()

TFIDF without NLP:
(10,[0,1,2,4,5,6,7,8,9],[0.0,0.5247285289349821,0.5247285289349821,0.26236426446749106,0.26236426446749106,0.5247285289349821,1.0494570578699642,0.5247285289349821,0.7870927934024732])
(10,[0],[0.0])
(10,[0,1,2,3,4,5,6,7,8,9],[0.0,1.0494570578699642,1.8365498512724374,0.36772478012531734,0.26236426446749106,1.3118213223374553,1.3118213223374553,1.5741855868049464,1.5741855868049464,0.7870927934024732])
(10,[0],[0.0])
(10,[0,1,2,3,4,5,6,7,8,9],[0.0,0.7870927934024732,1.3118213223374553,1.103174340375952,0.5247285289349821,2.0989141157399285,2.0989141157399285,1.3118213223374553,0.7870927934024732,0.5247285289349821])
(10,[0,1,2,3,4,5,6,7,8,9],[0.0,1.0494570578699642,0.7870927934024732,1.103174340375952,2.0989141157399285,1.3118213223374553,0.7870927934024732,2.0989141157399285,1.3118213223374553,2.6236426446749106])
(10,[0,1,2,3,4,5,6,7,8,9],[0.0,7.34619940508975,3.1483711736098927,6.619046042255712,2.0989141157399285,2.8860069091424014,2.8860069091424014,4.4601924959

**TRY WITH LEMMATIZATION**

In [25]:
#IMPORTING LIBRARIES AND SINCE WE HAVE TO PERFORM LEMMATIZATION, WE ARE DOWNLOADING AND IMPORTING NLTK AS WELL
from __future__ import print_function

import nltk
nltk.download('punkt')
from pyspark import SparkContext
from pyspark.mllib.feature import HashingTF, IDF
from nltk.stem import WordNetLemmatizer
#creating df and importing text file
sc = SparkContext.getOrCreate()
files = sc.textFile("/content/text*.txt").map(lambda line: line.split(" "))

# PERFORMING LEMMATIZATION ON TEXT FILE

lemmatizer = WordNetLemmatizer()

word_list = list(map(' '.join, files.collect()))
word_list1 = ''
for i in word_list:
    word_list1 = word_list1 + ' ' + i
word_list2 = nltk.word_tokenize(word_list1)
lemmatized_document = ' '.join([lemmatizer.lemmatize(w) for w in word_list2])
print(lemmatized_document)

# i will store my lemmatized text document in file named "text6.txt"

f = open("text6.txt", "w+")
f.write('' + lemmatized_document)
f.close()

document1 = sc.textFile("/content/text*.txt").map(lambda line: line.split(" "))

# Calculting TFIDF on lemmatized words

hashingTF = HashingTF(numFeatures=20)
tf = hashingTF.transform(document1)
tf.cache()
idf = IDF().fit(tf)
tfidf = idf.transform(tf)

print("TFIDF with Lemmatization:")
for each in tfidf.collect():
    print(each)
sc.stop()

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
Kobach ’ s law blocked the registration of more than 35,000 eligible Kansas voter and dissuaded countless others from attempting to register . The law wa necessary , Kobach argued , because the handful of known voter fraud case were the “ tip of the iceberg. ” When he failed in court to provide evidence supporting the claim , U.S. District Court Judge Julie Robinson said the iceberg wa “ only an icicle ” and ruled the law unconstitutional . Kansas Attorney General Derek Schmidt and Secretary of State Scott Schwab picked up the baton when Kobach left office following his failed attempt to become governor . An appeal court again rejected the state ’ s argument in support of the law , and the U.S. Supreme Court AllianceChicago community health center have their own unique research review process and may include formal Institutional Review Board ( IRB ) review and/or in-person pre

**c) TRY WITH NGRAMS**

In [26]:
#importing libraries
import nltk
import re
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import pandas as pd

txt1 = []
# Importing text document
with open('/content/text3.txt') as file:
    txt1 = file.readlines()
#removing special characters
def remove_string_special_characters(s):
    stripped = re.sub('[^a-zA-z\s]', '', s)
    stripped = re.sub('_', '', stripped)
    stripped = re.sub('\s+', ' ', stripped)
    stripped = stripped.strip()
    if stripped != '':
        return stripped.lower()
#calculating the word2vector
vectorizer = CountVectorizer(ngram_range=(2, 2))
X1 = vectorizer.fit_transform(txt1)
features = (vectorizer.get_feature_names())
print("\n\nFeatures : \n", features)
print("\n\nX1 : \n", X1.toarray())

vectorizer = TfidfVectorizer(ngram_range=(3, 3))
X2 = vectorizer.fit_transform(txt1)
scores = (X2.toarray())
print("\n\nScores : \n", scores)

# Finding the top ranking features=
sums = X2.sum(axis=0)
data1 = []
for col, term in enumerate(features):
    data1.append((term, sums[0, col]))
ranking = pd.DataFrame(data1, columns=['term', 'rank'])
words = (ranking.sort_values('rank', ascending=False))
print("\n\nWords head : \n", words.head(10))



Features : 
 ['1970s now', 'acting on', 'admonition don', 'against the', 'alone it', 'american presidents', 'and dwindling', 'and higher', 'and nervous', 'and not', 'and reagan', 'are turning', 'as the', 'bartering steel', 'because of', 'bismarck admonition', 'britain and', 'but what', 'by bartering', 'carter and', 'cold war', 'defied three', 'demand as', 'don rile', 'during the', 'dwindling demand', 'east europeans', 'economic sense', 'efficiency suddenly', 'energy but', 'europeans who', 'even during', 'even the', 'for soviet', 'french are', 'germany defied', 'global oil', 'have made', 'higher efficiency', 'home alone', 'however merkel', 'industrial world', 'is acting', 'is home', 'is not', 'it is', 'just the', 'made economic', 'merkel is', 'might have', 'nervous east', 'new stage', 'nixon carter', 'nord stream', 'not just', 'not only', 'now reflects', 'of oversupply', 'of the', 'oil shocks', 'on new', 'only because', 'only bismarck', 'oversupply and', 'pipes for', 'presidents nixon

**COSINE SIMILARITY**

In [36]:
#creating sparksession for cosine similarity
spark3 = SparkSession.builder.appName("Ngram Similarity").getOrCreate()

In [37]:
# Creating dataframe with five text abstracts
abst = spark3.read.text('text*.txt')

In [38]:
# Tokenize the abstract texts
tokenizer = Tokenizer(inputCol="value", outputCol="words")
wordsData = tokenizer.transform(abst)

In [39]:

# Creating n-grams with n=2
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordsData)

In [40]:
# displaying the results
ngramDataFrame.select("ngrams").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [52]:
# Create a mapping from words to vectors
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="words", outputCol="result")
model = word2Vec.fit(ngramDataFrame)
# print(model.getVectors().collect())
# result = model.getVectors().collect()

In [53]:
# Showing the synonyms and cosine similarity of the word cold in input data
synonyms = model.findSynonyms("cold", 10)
synonyms.show(10)

+------------+------------------+
|        word|        similarity|
+------------+------------------+
|     because|0.9971444606781006|
|  necessary,| 0.993667483329773|
|       solar|0.9915562868118286|
|         law|0.9875060319900513|
|        rile|0.9873764514923096|
|   “strongly|0.9747214913368225|
|     against|0.9743301868438721|
|        war,|0.9715194702148438|
|pro-european|0.9650136828422546|
|       case,| 0.955816388130188|
+------------+------------------+



In [43]:
# creating spark session
spark4 = SparkSession.builder.appName("Word2Vec Similarity").getOrCreate()

In [44]:
# Create the dataframe with five text abstracts
abst = spark4.read.text('text**.txt')

In [45]:
# Tokenize the abstract texts
tokenizer = Tokenizer(inputCol="value", outputCol="words")
wordsData = tokenizer.transform(abst)

In [50]:
# Create a mapping from words to vectors
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="words", outputCol="result")
model = word2Vec.fit(wordsData)
# print(model.getVectors().collect())
# result = model.getVectors().collect()

In [51]:
# Show the synonyms and cosine similarity of the word in input data
synonyms = model.findSynonyms("health", 10)
synonyms.show(10)

+---------+------------------+
|     word|        similarity|
+---------+------------------+
|   failed|0.9568229913711548|
|  iceberg|0.9516424536705017|
|  expense|0.9511805176734924|
|    “home|0.9346444606781006|
|  include|0.9265121817588806|
|    might|0.9211022853851318|
|       us|0.9195706844329834|
|   europe|0.9180535674095154|
|       an|0.9174166917800903|
|russians.|0.8989675641059875|
+---------+------------------+



In [49]:

# Tokenize the abstract texts
tokenizer = Tokenizer(inputCol="value", outputCol="words")
wordsData = tokenizer.transform(abst)

In [None]:

#closing the spark sessions
spark.stop()
spark2.stop()
spark3.stop()
spark4.stop()