In [None]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/27/67/5158f846202d7f012d1c9ca21c3549a58fd3c6707ae8ee823adcaca6473c/pyspark-3.0.2.tar.gz (204.8MB)
[K     |████████████████████████████████| 204.8MB 69kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 43.7MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.2-py2.py3-none-any.whl size=205186687 sha256=473ac55e23f829fbcb1e5a23a9d8fdc8f9fe85af99ed9de12cc10f9b9f50bd26
  Stored in directory: /root/.cache/pip/wheels/8b/09/da/c1f2859bcc86375dc972c5b6af4881b3603269bcc4c9be5d16
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.2


In [27]:
from __future__ import print_function
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql import SparkSession
from pyspark.ml.feature import NGram
from pyspark.ml.feature import Word2Vec

**Importing 5 text files containing data**

In [17]:
with open("/content/para1.txt","r+") as f1:
  file1 = f1.read()
with open("/content/para2.txt","r+") as f2:
  file2 = f2.read()
with open("/content/para3.txt","r+") as f3:
    file3 = f3.read()
with open("/content/para4.txt","r+") as f4:
    file4 = f4.read()
with open("/content/para5.txt","r+") as f5:
    file5 = f5.read()

documents = [file1,file2,file3,file4,file5]

**printing all the data**

In [19]:
documents

['The tech giant has blocked news to Australians on its platform since last Thursday amid a dispute over a proposed law which would force it and Google to pay news publishers for content.',
 'The lessons that the piano teacher Cornelia Vertenstein taught her students also resounded with many others, including me.',
 'Cornelia Vertenstein, a Holocaust survivor, gave her last piano lesson on Feb. She was not feeling well, so she arranged a ride to the hospital.',
 'Pneumonia settled in, and family gathered, sensing the end of a quietly extraordinary life.',
 'She began giving lessons at age in war-torn Romania. She did not stop for nearly years. Toward the end, adapting to the pandemic, Ms. Vertenstein gave lessons on FaceTime from her home in Denver.']

**1.Finding Out top 10 Tf-idf words**

In [20]:
from sklearn.feature_extraction.text import TfidfVectorizer
import pandas as pd

# using sklearn library which has inbuilt Tfidf vectorizer class which can generate tfidf for given corpus
vect = TfidfVectorizer()
#created TfidfVectorizer object
tfidf_matrix = vect.fit_transform(documents)
#passed list of documents or corpus to obt method fit_transform
df = pd.DataFrame(tfidf_matrix.toarray(), columns = vect.get_feature_names())
# converted method output to panda data frame 
pd.set_option('display.max_columns', 20)

df.loc['Total'] = df.sum() # adding row to value total

#filtering values of words whos tfidf is greater than 0.3
# also used transpose function here to filter out words (which was rows) and then converted matrix back to original version
print (df.T.sort_values('Total', ascending=True).tail(10).T)

             on     piano  cornelia       her  vertenstein   lessons  \
0      0.121420  0.000000  0.000000  0.000000     0.000000  0.000000   
1      0.000000  0.210014  0.210014  0.174331     0.174331  0.210014   
2      0.154133  0.185682  0.185682  0.154133     0.154133  0.000000   
3      0.000000  0.000000  0.000000  0.000000     0.000000  0.000000   
4      0.118174  0.000000  0.000000  0.118174     0.118174  0.284725   
Total  0.393726  0.395696  0.395696  0.446637     0.446637  0.494739   

             to        in       she       the  
0      0.242841  0.000000  0.000000  0.086392  
1      0.000000  0.000000  0.000000  0.248076  
2      0.154133  0.000000  0.371363  0.109667  
3      0.000000  0.241293  0.000000  0.142512  
4      0.118174  0.284725  0.284725  0.168163  
Total  0.515147  0.526018  0.656088  0.754808  


**2.Finding out top 10 Tf-idf words for lemmatized input data**

In [21]:
import nltk;
nltk.download('punkt');
nltk.download('wordnet')
from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()

w1 = nltk.word_tokenize(file1)
w2 = nltk.word_tokenize(file2)
w3 = nltk.word_tokenize(file3)
w4 = nltk.word_tokenize(file4)
w5 = nltk.word_tokenize(file5)

lemmatized_document1 = ' '.join([lemmatizer.lemmatize(w) for w in w1])
lemmatized_document2 = ' '.join([lemmatizer.lemmatize(w) for w in w2])
lemmatized_document3 = ' '.join([lemmatizer.lemmatize(w) for w in w3])
lemmatized_document4 = ' '.join([lemmatizer.lemmatize(w) for w in w4])
lemmatized_document5 = ' '.join([lemmatizer.lemmatize(w) for w in w5])

documents = [lemmatized_document1,lemmatized_document2,lemmatized_document3,lemmatized_document4,lemmatized_document5]

# using sklearn library which has inbuilt Tfidf vectorizer class which can generate tfidf for given corpus
vect = TfidfVectorizer()
#created TfidfVectorizer object
tfidf_matrix = vect.fit_transform(documents)
#passed list of documents or corpus to obt method fit_transform
df = pd.DataFrame(tfidf_matrix.toarray(), columns = vect.get_feature_names())
# converted method output to panda data frame 

df.loc['Total'] = df.sum() # adding row to value total

#filtering values of words whos tfidf is greater than 0.3
# also used transpose function here to filter out words (which was rows) and then converted matrix back to original version
print (df.T.sort_values('Total', ascending=True).tail(10).T)

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Unzipping corpora/wordnet.zip.
             on     piano  cornelia       her  vertenstein        to  \
0      0.117616  0.000000  0.000000  0.000000     0.000000  0.235231   
1      0.000000  0.211469  0.211469  0.175539     0.175539  0.000000   
2      0.156434  0.188454  0.188454  0.156434     0.156434  0.156434   
3      0.000000  0.000000  0.000000  0.000000     0.000000  0.000000   
4      0.119692  0.000000  0.000000  0.119692     0.119692  0.119692   
Total  0.393742  0.399924  0.399924  0.451665     0.451665  0.511358   

             in    lesson       she       the  
0      0.000000  0.000000  0.000000  0.083685  
1      0.000000  0.175539  0.000000  0.249794  
2      0.000000  0.156434  0.376909  0.111304  
3      0.241293  0.000000  0.000000  0.142512  
4      0.288383  0.239384  0.288383  0.170

**3.Finding out top 10 TF-IDF words for N-gram based input data**

In [23]:
# this function takes document and n int value to generate list of n grams
def ngrams(input, n):
    input = input.split(' ')
    output = []
    for i in range(len(input)-n+1):
        output.append(input[i:i+n])
    return output

ngram_doc1 = ' '.join([' '.join(x) for x in ngrams(file1, 2)])
ngram_doc2 = ' '.join([' '.join(x) for x in ngrams(file2, 2)])
ngram_doc3 = ' '.join([' '.join(x) for x in ngrams(file3, 2)])
ngram_doc4 = ' '.join([' '.join(x) for x in ngrams(file4, 2)])
ngram_doc5 = ' '.join([' '.join(x) for x in ngrams(file5, 2)])

# documents = [ngram_doc1,ngram_doc2,ngram_doc3,ngram_doc4,ngram_doc5]

documents = [file1,file2,file3,file4,file5]

# using sklearn library which has inbuilt Tfidf vectorizer class which can generate tfidf for given corpus
vect = TfidfVectorizer( ngram_range=(2,2)) # TfidfVectorizer has inbuilt ngram kwarg which show tfidf for ngrams
#created TfidfVectorizer object
tfidf_matrix = vect.fit_transform(documents)
#passed list of documents or corpus to obt method fit_transform
df = pd.DataFrame(tfidf_matrix.toarray(), columns = vect.get_feature_names())
# converted method output to panda data frame 

df.loc['Total'] = df.sum() # adding row to value total

#filtering values of words whos tfidf is greater than 0.3
# also used transpose function here to filter out words (which was rows) and then converted matrix back to original version
print (df.T.sort_values('Total', ascending=True).tail(10).T)

       quietly extraordinary  of quietly    end of  extraordinary life  \
0                   0.000000    0.000000  0.000000            0.000000   
1                   0.000000    0.000000  0.000000            0.000000   
2                   0.000000    0.000000  0.000000            0.000000   
3                   0.292968    0.292968  0.292968            0.292968   
4                   0.000000    0.000000  0.000000            0.000000   
Total               0.292968    0.292968  0.292968            0.292968   

       family gathered    in and  pneumonia settled    to the  \
0             0.000000  0.000000           0.000000  0.000000   
1             0.000000  0.000000           0.000000  0.000000   
2             0.000000  0.000000           0.000000  0.174805   
3             0.292968  0.292968           0.292968  0.000000   
4             0.000000  0.000000           0.000000  0.139807   
Total         0.292968  0.292968           0.292968  0.314612   

       cornelia vertenste

**2.Permorfing a spark program to read a dataset and find the W2V similar words (words with higher cosine similarity) for the Top10 TF-IDF Words**

In [24]:
from __future__ import print_function
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql import SparkSession
from pyspark.ml.feature import NGram
from pyspark.ml.feature import Word2Vec

In [25]:
# creating spark session
spark = SparkSession.builder.appName("TfIdf Example").getOrCreate()

documentData = spark.createDataFrame([
        (0.0, file1),
        (0.1, file2),
        (0.2, file3),
        (0.3, file4),
        (0.5, file5)
    ], ["label", "document"])

In [26]:
# creating tokens/words from the sentence data
tokenizer = Tokenizer(inputCol="document", outputCol="words")
wordsData = tokenizer.transform(documentData)
print (documentData)
wordsData.show()

DataFrame[label: double, document: string]
+-----+--------------------+--------------------+
|label|            document|               words|
+-----+--------------------+--------------------+
|  0.0|The tech giant ha...|[the, tech, giant...|
|  0.1|The lessons that ...|[the, lessons, th...|
|  0.2|Cornelia Vertenst...|[cornelia, verten...|
|  0.3|Pneumonia settled...|[pneumonia, settl...|
|  0.5|She began giving ...|[she, began, givi...|
+-----+--------------------+--------------------+



**a.Performing a task without NLP**

In [28]:
# applying tf on the words data
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=200)
tf = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors
# calculating the IDF
tf.cache()
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf = idf.fit(tf)
tfidf = idf.transform(tf)
#displaying the results
tfidf.select("label", "features").show()


print("TF-IDF without NLP:")
for each in tfidf.collect():
    print(each)
    print(each['rawFeatures'])
spark.stop()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(200,[1,4,17,33,6...|
|  0.1|(200,[15,17,40,50...|
|  0.2|(200,[3,5,9,13,15...|
|  0.3|(200,[17,23,53,67...|
|  0.5|(200,[5,17,40,46,...|
+-----+--------------------+

TF-IDF without NLP:
Row(label=0.0, document='The tech giant has blocked news to Australians on its platform since last Thursday amid a dispute over a proposed law which would force it and Google to pay news publishers for content.', words=['the', 'tech', 'giant', 'has', 'blocked', 'news', 'to', 'australians', 'on', 'its', 'platform', 'since', 'last', 'thursday', 'amid', 'a', 'dispute', 'over', 'a', 'proposed', 'law', 'which', 'would', 'force', 'it', 'and', 'google', 'to', 'pay', 'news', 'publishers', 'for', 'content.'], rawFeatures=SparseVector(200, {1: 1.0, 4: 1.0, 17: 1.0, 33: 2.0, 64: 1.0, 66: 2.0, 67: 3.0, 74: 1.0, 86: 1.0, 87: 1.0, 88: 2.0, 91: 1.0, 95: 1.0, 97: 1.0, 107: 1.0, 111: 1.0, 129: 1.0, 136: 1.0, 137: 1.0, 144: 1.

**b.Performing the task with lemmitization**

In [29]:
import nltk;
nltk.download('punkt');
nltk.download('wordnet')
from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()

w1 = nltk.word_tokenize(file1)
w2 = nltk.word_tokenize(file2)
w3 = nltk.word_tokenize(file3)
w4 = nltk.word_tokenize(file4)
w5 = nltk.word_tokenize(file5)

lemmatized_document1 = ' '.join([lemmatizer.lemmatize(w) for w in w1])
lemmatized_document2 = ' '.join([lemmatizer.lemmatize(w) for w in w2])
lemmatized_document3 = ' '.join([lemmatizer.lemmatize(w) for w in w3])
lemmatized_document4 = ' '.join([lemmatizer.lemmatize(w) for w in w4])
lemmatized_document5 = ' '.join([lemmatizer.lemmatize(w) for w in w5])

### lemmatizing words from 5 input docs same as previos task

# creating spark session
spark = SparkSession.builder.appName("TfIdf Example").getOrCreate()

documentData = spark.createDataFrame([
        (0.0, lemmatized_document1),
        (0.1, lemmatized_document2),
        (0.2, lemmatized_document3),
        (0.3, lemmatized_document4),
        (0.5, lemmatized_document5)
    ], ["label", "document"])

# creating tokens/words from the sentence data
tokenizer = Tokenizer(inputCol="document", outputCol="words")
wordsData = tokenizer.transform(documentData)
print (documentData)
wordsData.show()

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
DataFrame[label: double, document: string]
+-----+--------------------+--------------------+
|label|            document|               words|
+-----+--------------------+--------------------+
|  0.0|The tech giant ha...|[the, tech, giant...|
|  0.1|The lesson that t...|[the, lesson, tha...|
|  0.2|Cornelia Vertenst...|[cornelia, verten...|
|  0.3|Pneumonia settled...|[pneumonia, settl...|
|  0.5|She began giving ...|[she, began, givi...|
+-----+--------------------+--------------------+



In [30]:
# applying tf on the words data
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=200)
tf = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors
# calculating the IDF
tf.cache()
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf = idf.fit(tf)
tfidf = idf.transform(tf)
#displaying the results
tfidf.select("label", "features").show()


print("TF-IDF with Lemmatization:")
for each in tfidf.collect():
    print(each)
    print(each['rawFeatures'])
spark.stop()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(200,[4,5,17,28,3...|
|  0.1|(200,[9,15,17,28,...|
|  0.2|(200,[3,5,9,13,15...|
|  0.3|(200,[17,23,28,53...|
|  0.5|(200,[5,9,16,17,2...|
+-----+--------------------+

TF-IDF with Lemmatization:
Row(label=0.0, document='The tech giant ha blocked news to Australians on it platform since last Thursday amid a dispute over a proposed law which would force it and Google to pay news publisher for content .', words=['the', 'tech', 'giant', 'ha', 'blocked', 'news', 'to', 'australians', 'on', 'it', 'platform', 'since', 'last', 'thursday', 'amid', 'a', 'dispute', 'over', 'a', 'proposed', 'law', 'which', 'would', 'force', 'it', 'and', 'google', 'to', 'pay', 'news', 'publisher', 'for', 'content', '.'], rawFeatures=SparseVector(200, {4: 2.0, 5: 1.0, 17: 1.0, 28: 1.0, 33: 2.0, 38: 1.0, 64: 1.0, 66: 1.0, 67: 2.0, 74: 1.0, 86: 2.0, 87: 1.0, 88: 2.0, 91: 1.0, 95: 1.0, 97: 1.0, 111: 1.0, 129: 1.0, 136: 1.0, 137

**c.Performing the task with N-grams data**

In [31]:
spark = SparkSession.builder.appName("TfIdf Example").getOrCreate()

documentData = spark.createDataFrame([
        (0.0, file1.split(' ')),
        (0.1, file2.split(' ')),
        (0.2, file3.split(' ')),
        (0.3, file4.split(' ')),
        (0.5, file5.split(' '))
    ], ["label", "document"])


ngram = NGram(n=2, inputCol="document", outputCol="ngrams")

ngramDataFrame = ngram.transform(documentData)

# applying tf on the words data
hashingTF = HashingTF(inputCol="ngrams", outputCol="rawFeatures", numFeatures=200)
tf = hashingTF.transform(ngramDataFrame)
# alternatively, CountVectorizer can also be used to get term frequency vectors
# calculating the IDF
tf.cache()
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf = idf.fit(tf)
tfidf = idf.transform(tf)
#displaying the results
tfidf.select("label", "features").show()


print("TF-IDF with ngram:")
for each in tfidf.collect():
    print(each)
    print(each['rawFeatures'])
spark.stop()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(200,[5,7,14,27,2...|
|  0.1|(200,[28,38,57,60...|
|  0.2|(200,[3,4,5,9,15,...|
|  0.3|(200,[25,31,61,95...|
|  0.5|(200,[13,14,18,23...|
+-----+--------------------+

TF-IDF with ngram:
Row(label=0.0, document=['The', 'tech', 'giant', 'has', 'blocked', 'news', 'to', 'Australians', 'on', 'its', 'platform', 'since', 'last', 'Thursday', 'amid', 'a', 'dispute', 'over', 'a', 'proposed', 'law', 'which', 'would', 'force', 'it', 'and', 'Google', 'to', 'pay', 'news', 'publishers', 'for', 'content.'], ngrams=['The tech', 'tech giant', 'giant has', 'has blocked', 'blocked news', 'news to', 'to Australians', 'Australians on', 'on its', 'its platform', 'platform since', 'since last', 'last Thursday', 'Thursday amid', 'amid a', 'a dispute', 'dispute over', 'over a', 'a proposed', 'proposed law', 'law which', 'which would', 'would force', 'force it', 'it and', 'and Google', 'Google to', 'to pay', 'pay news'