<a href="https://colab.research.google.com/github/SAICHANDUALURI/KDM-ICP-5/blob/main/Untitled5.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/27/67/5158f846202d7f012d1c9ca21c3549a58fd3c6707ae8ee823adcaca6473c/pyspark-3.0.2.tar.gz (204.8MB)
[K     |████████████████████████████████| 204.8MB 68kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 53.5MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.2-py2.py3-none-any.whl size=205186687 sha256=5c786514018e81585cbd9111a8272411263515b0117a9fbfa810c17e57b84997
  Stored in directory: /root/.cache/pip/wheels/8b/09/da/c1f2859bcc86375dc972c5b6af4881b3603269bcc4c9be5d16
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.2


In [3]:
from __future__ import print_function
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql import SparkSession
from pyspark.ml.feature import NGram
from pyspark.ml.feature import Word2Vec

In [5]:
# creating spark session
spark = SparkSession.builder.appName("TfIdf Example").getOrCreate()

In [6]:
# creating spark dataframe wiht the input data. You can also read the data from file. label represents the 3 documnets (0.0,0.1,0.2)
sentenceData = spark.createDataFrame([
        (0.0, "Welcome to KDM TF_IDF Tutorial."),
        (0.1, "Learn Spark ml tf_idf in today's lab."),
        (0.2, "Spark Mllib has TF-IDF.")
    ], ["label", "sentence"])

In [7]:
# creating tokens/words from the sentence data
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

In [8]:
wordsData.show()


+-----+--------------------+--------------------+
|label|            sentence|               words|
+-----+--------------------+--------------------+
|  0.0|Welcome to KDM TF...|[welcome, to, kdm...|
|  0.1|Learn Spark ml tf...|[learn, spark, ml...|
|  0.2|Spark Mllib has T...|[spark, mllib, ha...|
+-----+--------------------+--------------------+



In [10]:
# applying tf on the words data\r\n",
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

In [11]:
# calculating the IDF\r\n",
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

In [12]:
#displaying the results\r\n",
rescaledData.select("label", "features").show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(20,[2,8,13,15,17...|
|  0.1|(20,[2,3,6,7],[0....|
|  0.2|(20,[6,14,15],[0....|
+-----+--------------------+



In [13]:
spark2 = SparkSession.builder.appName("Ngram Example").getOrCreate()

In [15]:
#creating dataframe of input
wordDataFrame = spark2.createDataFrame([
    (0, ["Hi", "I", "heard", "about", "Spark"]),
    (1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
    (2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

In [16]:
#creating NGrams with n=2 (two words)\r\n",
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordDataFrame)

In [17]:
# displaying the results\r\n",
ngramDataFrame.select("ngrams").show(truncate=False)

+------------------------------------------------------------------+
|ngrams                                                            |
+------------------------------------------------------------------+
|[Hi I, I heard, heard about, about Spark]                         |
|[I wish, wish Java, Java could, could use, use case, case classes]|
|[Logistic regression, regression models, models are, are neat]    |
+------------------------------------------------------------------+



In [18]:
# creating spark session\r\n",
spark3 = SparkSession.builder.appName("Word2Vec Example").getOrCreate()

In [19]:
# Input data: Each row is a bag of words from a sentence or document.
documentDF = spark3.createDataFrame([
("McCarthy was asked to analyse the data from the first phase of trials of the vaccine.".split(" "), ),
("We have amassed the raw data and are about to begin analysing it.".split(" "), ),
("Without more data we cannot make a meaningful comparison of the two systems.".split(" "), ),
("Collecting data is a painfully slow process.".split(" "), ),
("You need a long series of data to be able to discern such a trend.".split(" "), )
], ["text"])

In [20]:
# Learn a mapping from words to Vectors.
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="text", outputCol="result")
model = word2Vec.fit(documentDF)
result = model.transform(documentDF)

In [21]:
for row in result.collect():
    text, vector = row
    #printing the results
    print("Text: [%s] => \nVector: %s\n" % (", ".join(text), str(vector)))

Text: [McCarthy, was, asked, to, analyse, the, data, from, the, first, phase, of, trials, of, the, vaccine.] => 
Vector: [0.021903225395362824,0.011923420010134578,0.02301490874378942]

Text: [We, have, amassed, the, raw, data, and, are, about, to, begin, analysing, it.] => 
Vector: [0.06271842372818635,-0.03274114481665982,0.002357496091952691]

Text: [Without, more, data, we, cannot, make, a, meaningful, comparison, of, the, two, systems.] => 
Vector: [-0.005007334268436982,0.022124015761969183,-0.029689349377384554]

Text: [Collecting, data, is, a, painfully, slow, process.] => 
Vector: [-0.021147702554506913,-0.047306668013334274,0.04175957611628941]

Text: [You, need, a, long, series, of, data, to, be, able, to, discern, such, a, trend.] => 
Vector: [-0.04790672076245149,0.030444954708218574,-0.021157699078321456]



In [22]:
# showing the synonyms and cosine similarity of the word in input data
synonyms = model.findSynonyms("data", 5)   # its okay for certain words , real bad for others
synonyms.show(5)

+----------+------------------+
|      word|        similarity|
+----------+------------------+
|       was|0.8477357625961304|
|    series| 0.842510461807251|
|   analyse|0.7764721512794495|
|       are|0.7724130749702454|
|meaningful|0.7415226697921753|
+----------+------------------+



In [23]:
#closing the spark sessions
spark.stop()
spark2.stop()
spark3.stop()

# Creating 5 separate text files containing text data (blogs,news articles etc)

In [35]:
with open("/txt1.txt","r+") as t1:
  text1 = t1.read()
with open("/txt2.txt","r+") as t2:
  text2 = t2.read()
with open("/txt3.txt","r+") as t3:
  text3 = t3.read()
with open("/txt4.txt","r+") as t4:
  text4 = t4.read()
with open("/txt5.txt","r+") as t5:
  text5 = t5.read()
# Read all 5 txt files which contains news articles
documents = [text1,text2,text3,text4,text5]

# 1.Find out the top10 TF-IDF words for the above input

In [36]:
from sklearn.feature_extraction.text import TfidfVectorizer
import pandas as pd
# using sklearn library which has inbuilt Tfidf vectorizer class which can generate tfidf for given corpus\r\n",
vect = TfidfVectorizer()
#created TfidfVectorizer object
tfidf_matrix = vect.fit_transform(documents)
#passed list of documents or corpus to obt method fit_transform
df = pd.DataFrame(tfidf_matrix.toarray(), columns = vect.get_feature_names())
# converted method output to panda data frame
pd.set_option('display.max_columns', 20)
df.loc['Total'] = df.sum() 
# adding row to value total
#filtering values of words whos tfidf is greater than 0.3
# also used transpose function here to filter out words (which was rows) and then converted matrix back to original version\r\n",
print (df.T.sort_values('Total', ascending=True).tail(10).T)

         school    bricks       our        my     salad   rabbits       for  \
0      0.000000  0.000000  0.000000  0.534522  0.000000  0.000000  0.000000   
1      0.000000  0.000000  0.000000  0.000000  0.000000  0.000000  0.000000   
2      0.404907  0.404907  0.404907  0.000000  0.000000  0.000000  0.000000   
3      0.000000  0.000000  0.000000  0.000000  0.000000  0.000000  0.000000   
4      0.000000  0.000000  0.000000  0.000000  0.538498  0.538498  0.538498   
Total  0.404907  0.404907  0.404907  0.534522  0.538498  0.538498  0.538498   

             of       the        is  
0      0.000000  0.000000  0.000000  
1      0.000000  0.222913  0.185038  
2      0.326676  0.000000  0.271171  
3      0.213691  0.427381  0.000000  
4      0.000000  0.000000  0.360638  
Total  0.540367  0.650295  0.816848  


        2.Find out the top10 TF-IDF words for the lemmatized input


In [44]:
import nltk;nltk.download('punkt');nltk.download('wordnet')
from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()
words1 = nltk.word_tokenize(text1)
words2 = nltk.word_tokenize(text2)
words3 = nltk.word_tokenize(text3)
words4 = nltk.word_tokenize(text4)
words5 = nltk.word_tokenize(text5)
lemmatized_document1 = ' '.join([lemmatizer.lemmatize(w) for w in words1])
lemmatized_document2 = ' '.join([lemmatizer.lemmatize(w) for w in words2])
lemmatized_document3 = ' '.join([lemmatizer.lemmatize(w) for w in words3])
lemmatized_document4 = ' '.join([lemmatizer.lemmatize(w) for w in words4])
lemmatized_document5 = ' '.join([lemmatizer.lemmatize(w) for w in words5])
documents = [lemmatized_document1,lemmatized_document2,lemmatized_document3,lemmatized_document4,lemmatized_document5]
# using sklearn library which has inbuilt Tfidf vectorizer class which can generate tfidf for given corpus\r\n",
vect = TfidfVectorizer()
#created TfidfVectorizer object\r\n",
tfidf_matrix = vect.fit_transform(documents)
#passed list of documents or corpus to obt method fit_transform\r\n",
df = pd.DataFrame(tfidf_matrix.toarray(), columns = vect.get_feature_names())
# converted method output to panda data frame \r\n",
df.loc['Total'] = df.sum()
# adding row to value total\r\n
#filtering values of words whos tfidf is greater than 0.3\r\n",
# also used transpose function here to filter out words (which was rows) and then converted matrix back to original version
print (df.T.sort_values('Total', ascending=True).tail(10).T)

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
         school     brick       our        my     salad    rabbit       for  \
0      0.000000  0.000000  0.000000  0.534522  0.000000  0.000000  0.000000   
1      0.000000  0.000000  0.000000  0.000000  0.000000  0.000000  0.000000   
2      0.404907  0.404907  0.404907  0.000000  0.000000  0.000000  0.000000   
3      0.000000  0.000000  0.000000  0.000000  0.000000  0.000000  0.000000   
4      0.000000  0.000000  0.000000  0.000000  0.538498  0.538498  0.538498   
Total  0.404907  0.404907  0.404907  0.534522  0.538498  0.538498  0.538498   

             of       the        is  
0      0.000000  0.000000  0.000000  
1      0.000000  0.222913  0.185038  
2      0.326676  0.000000  0.271171  
3      0.213691  0.427381  0.000000  
4      0.000000  0.000000  0.3

# 3.Find out the top10TF-IDF words for the n-gram based input

In [43]:
# this function takes document and n int value to generate list of n grams\r\n",
def ngrams(input, n):
  input = input.split(' ')
  output = []
  for i in range(len(input)-n+1):
    output.append(input[i:i+n])
    return output
ngram_text1 = ' '.join([' '.join(x) for x in ngrams(text1, 3)])
ngram_text2 = ' '.join([' '.join(x) for x in ngrams(text2, 3)])
ngram_text3 = ' '.join([' '.join(x) for x in ngrams(text3, 3)])
ngram_text4 = ' '.join([' '.join(x) for x in ngrams(text4, 3)])
ngram_text5 = ' '.join([' '.join(x) for x in ngrams(text5, 3)])
# document = [ngram_doc1,ngram_doc2,ngram_doc3,ngram_doc4,ngram_doc5]\r\n",
documents = [text1,text2,text3,text4,text5]
# using sklearn library which has inbuilt Tfidf vectorizer class which can generate tfidf for given corpus\r\n",
vect = TfidfVectorizer( ngram_range=(3,3))
 # TfidfVectorizer has inbuilt ngram kwarg which show tfidf for ngrams\r\n",
#created TfidfVectorizer object\r\n",
tfidf_matrix = vect.fit_transform(documents)
#passed list of documents or corpus to obt method fit_transform\r\n",
df = pd.DataFrame(tfidf_matrix.toarray(), columns = vect.get_feature_names())
# converted method output to panda data frame \r\n",
df.loc['Total'] = df.sum() 
# adding row to value total\r\n",
#filtering values of words whos tfidf is greater than 0.3\r\n",
# also used transpose function here to filter out words (which was rows) and then converted matrix back to original version\r\n",
print (df.T.sort_values('Total', ascending=True).tail(10).T)

       me to finish  my plate at  mom taught me  our school building  \
0          0.316228     0.316228       0.316228             0.000000   
1          0.000000     0.000000       0.000000             0.000000   
2          0.000000     0.000000       0.000000             0.447214   
3          0.000000     0.000000       0.000000             0.000000   
4          0.000000     0.000000       0.000000             0.000000   
Total      0.316228     0.316228       0.316228             0.447214   

       is made of  made of bricks  building is made  school building is  \
0        0.000000        0.000000          0.000000            0.000000   
1        0.000000        0.000000          0.000000            0.000000   
2        0.447214        0.447214          0.447214            0.447214   
3        0.000000        0.000000          0.000000            0.000000   
4        0.000000        0.000000          0.000000            0.000000   
Total    0.447214        0.447214          0.

2.Write a simple spark program to read a dataset and find the W2V similar words (words with higher cosine similarity) for the Top10 TF-IDF Words


In [61]:
from __future__ import print_function
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql import SparkSession
from pyspark.ml.feature import NGram
from pyspark.ml.feature import Word2Vec
# creating spark session\r\n",
spark = SparkSession.builder.appName("TfIdf Example").getOrCreate()
documentData = spark.createDataFrame([
        (0.0, text1),
        (0.1, text2),
        (0.2, text3),
        (0.3, text4),
        (0.5, text5)
        ], ["label", "document"])
# creating tokens/words from the sentence data
tokenizer = Tokenizer(inputCol="document", outputCol="words")
wordsData = tokenizer.transform(documentData)
print (documentData)
wordsData.show()

DataFrame[label: double, document: string]
+-----+--------------------+--------------------+
|label|            document|               words|
+-----+--------------------+--------------------+
|  0.0|My mom taught me ...|[my, mom, taught,...|
|  0.1|The only problem ...|[the, only, probl...|
|  0.2|Our school buildi...|[our, school, bui...|
|  0.3|Every night I get...|[every, night, i,...|
|  0.5|Salad is for rabb...|[salad, is, for, ...|
+-----+--------------------+--------------------+



Try without NLP

In [65]:
# applying tf on the words data\r\n",
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=200)
tf = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors\r\n",
# calculating the IDF\r\n",
tf.cache()
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf = idf.fit(tf)
tfidf = idf.transform(tf)
#displaying the results\r\n",
tfidf.select("label", "features").show()
print("TF-IDF without NLP:")
for each in tfidf.collect():
    print(each)
    print(each['rawFeatures'])
spark.stop()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(200,[28,40,55,88...|
|  0.1|(200,[5,9,17,26,2...|
|  0.2|(200,[9,28,52,79,...|
|  0.3|(200,[17,26,28,32...|
|  0.5|(200,[9,28,115,14...|
+-----+--------------------+

TF-IDF without NLP:
Row(label=0.0, document='My mom taught me to finish everything on my plate at dinner .', words=['my', 'mom', 'taught', 'me', 'to', 'finish', 'everything', 'on', 'my', 'plate', 'at', 'dinner', '.'], rawFeatures=SparseVector(200, {28: 1.0, 40: 1.0, 55: 1.0, 88: 1.0, 125: 1.0, 133: 1.0, 136: 1.0, 156: 1.0, 162: 1.0, 168: 2.0, 169: 1.0, 178: 1.0}), features=SparseVector(200, {28: 0.0, 40: 1.0986, 55: 1.0986, 88: 1.0986, 125: 1.0986, 133: 1.0986, 136: 1.0986, 156: 0.6931, 162: 1.0986, 168: 2.1972, 169: 1.0986, 178: 1.0986}))
(200,[28,40,55,88,125,133,136,156,162,168,169,178],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0])
Row(label=0.1, document='The only problem with a pencil , is that they do not stay sharp l

Try with Lemmatization

In [69]:
import nltk;nltk.download('punkt');nltk.download('wordnet')
from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()
words1 = nltk.word_tokenize(text1)
words2 = nltk.word_tokenize(text2)
words3 = nltk.word_tokenize(text3)
words4 = nltk.word_tokenize(text4)
words5 = nltk.word_tokenize(text5)
lemmatized_document1 = ' '.join([lemmatizer.lemmatize(w) for w in words1])
lemmatized_document2 = ' '.join([lemmatizer.lemmatize(w) for w in words2])
lemmatized_document3 = ' '.join([lemmatizer.lemmatize(w) for w in words3])
lemmatized_document4 = ' '.join([lemmatizer.lemmatize(w) for w in words4])
lemmatized_document5 = ' '.join([lemmatizer.lemmatize(w) for w in words5])
### lemmatizing words from 5 input docs same as previos task\r\n",
# creating spark session\r\n",
spark = SparkSession.builder.appName("TfIdf Example").getOrCreate()
documentData = spark.createDataFrame([
        (0.0, lemmatized_document1),
        (0.1, lemmatized_document2),
        (0.2, lemmatized_document3),
        (0.3, lemmatized_document4),
        (0.5, lemmatized_document5)
            ], ["label", "document"])
        # creating tokens/words from the sentence data\r\n",
tokenizer = Tokenizer(inputCol="document", outputCol="words")
wordsData = tokenizer.transform(documentData)
print (documentData)
wordsData.show()

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
DataFrame[label: double, document: string]
+-----+--------------------+--------------------+
|label|            document|               words|
+-----+--------------------+--------------------+
|  0.0|My mom taught me ...|[my, mom, taught,...|
|  0.1|The only problem ...|[the, only, probl...|
|  0.2|Our school buildi...|[our, school, bui...|
|  0.3|Every night I get...|[every, night, i,...|
|  0.5|Salad is for rabb...|[salad, is, for, ...|
+-----+--------------------+--------------------+



In [70]:
# applying tf on the words data\r\n",
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=200)
tf = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors\r\n",
# calculating the IDF\r\n",
tf.cache()
idf = IDF(inputCol="rawFeatures", outputCol="features")
idf = idf.fit(tf)
tfidf = idf.transform(tf)
#displaying the results\r\n",
tfidf.select("label", "features").show()
print("TF-IDF with Lemmatization:")
for each in tfidf.collect():
    print(each)
    print(each['rawFeatures'])
    spark.stop()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|  0.0|(200,[28,40,55,88...|
|  0.1|(200,[5,9,17,26,2...|
|  0.2|(200,[9,28,52,79,...|
|  0.3|(200,[17,26,28,32...|
|  0.5|(200,[9,28,115,14...|
+-----+--------------------+

TF-IDF with Lemmatization:
Row(label=0.0, document='My mom taught me to finish everything on my plate at dinner .', words=['my', 'mom', 'taught', 'me', 'to', 'finish', 'everything', 'on', 'my', 'plate', 'at', 'dinner', '.'], rawFeatures=SparseVector(200, {28: 1.0, 40: 1.0, 55: 1.0, 88: 1.0, 125: 1.0, 133: 1.0, 136: 1.0, 156: 1.0, 162: 1.0, 168: 2.0, 169: 1.0, 178: 1.0}), features=SparseVector(200, {28: 0.0, 40: 1.0986, 55: 1.0986, 88: 1.0986, 125: 1.0986, 133: 1.0986, 136: 1.0986, 156: 0.6931, 162: 1.0986, 168: 2.1972, 169: 1.0986, 178: 1.0986}))
(200,[28,40,55,88,125,133,136,156,162,168,169,178],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0])
Row(label=0.1, document='The only problem with a pencil , is that they do not stay 