In [1]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/27/67/5158f846202d7f012d1c9ca21c3549a58fd3c6707ae8ee823adcaca6473c/pyspark-3.0.2.tar.gz (204.8MB)
[K     |████████████████████████████████| 204.8MB 77kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 19.4MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.2-py2.py3-none-any.whl size=205186687 sha256=30fef9f2c826a02610d4e72e2e8e6cad9f110be6d5ca37e9b8805634b55a41a0
  Stored in directory: /root/.cache/pip/wheels/8b/09/da/c1f2859bcc86375dc972c5b6af4881b3603269bcc4c9be5d16
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.2


In [2]:
from __future__ import print_function
from pyspark import SparkConf, SparkContext
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql import SparkSession
from pyspark.ml.feature import NGram
from pyspark.ml.feature import Word2Vec

In [3]:
# Create the Spark session
spark = SparkSession.builder.appName("TF_IDF").getOrCreate()

In [4]:
# Create the dataframe with five text abstracts
abst = spark.read.text('input*.txt')

In [5]:
# Tokenize the abstract texts
tokenizer = Tokenizer(inputCol="value", outputCol="words")
wordsData = tokenizer.transform(abst)

In [6]:
wordsData.show()

+--------------------+--------------------+
|               value|               words|
+--------------------+--------------------+
|The term soul is ...|[the, term, soul,...|
|subjective essenc...|[subjective, esse...|
|from its investig...|[from, its, inves...|
|literature and ex...|[literature, and,...|
|their study is to...|[their, study, is...|
|literature. In th...|[literature., in,...|
|quantum physics, ...|[quantum, physics...|
|studies on medita...|[studies, on, med...|
|context, this pap...|[context,, this, ...|
|as uninhibited me...|[as, uninhibited,...|
|Among several pos...|[among, several, ...|
|provided by theor...|[provided, by, th...|
|theory. We show m...|[theory., we, sho...|
|inhabitants of a ...|[inhabitants, of,...|
|accurately measur...|[accurately, meas...|
|they cannot relia...|[they, cannot, re...|
|factual questions...|[factual, questio...|
|with those in oth...|[with, those, in,...|
|apply to observer...|[apply, to, obser...|
|            theory).|          

In [7]:
# applying tf on the words data
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=10)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

In [8]:
# calculating the IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
print(rescaledData)

DataFrame[value: string, words: array<string>, rawFeatures: vector, features: vector]


In [9]:
# Display the results
rescaledData.select("features").show(10, truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                                    |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|(10,[0,1,3,4,5,6,7,8,9],[0.4289956055183586,0.5293851084541643,0.35536235447490483,0.2954642128938359,0.17768117723745241,0.7940776626812465,0.19051219595701865,0.41170410840829763,1.174197955387005])                    |
|(10,[1,2,3,4,5,6,7,8,9],[0.5293851084541643,1.0062071553441607,0.5330435317123572,0.2954642128938359,0.3553

In [25]:
# Create the Spark session
spark2 = SparkSession.builder.appName("Ngrams").getOrCreate()

In [26]:
# Create the dataframe with five text abstracts
abst = spark2.read.text('input*.txt')

In [27]:
# Tokenize the abstract texts
tokenizer = Tokenizer(inputCol="value", outputCol="words")
wordsData = tokenizer.transform(abst)

In [28]:

# Creating n-grams with n=5
ngram = NGram(n=5, inputCol="words", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordsData)

In [32]:
# Apply topic frequency on the abstracts
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=10)
featurizedData = hashingTF.transform(ngramDataFrame)

In [33]:
# Calculate the inverse document frequency
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

In [34]:
# Display the results
rescaledData.select("features").show(10, truncate=False)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features                                                                                                                                                                                                                    |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|(10,[0,1,3,4,5,6,7,8,9],[0.4289956055183586,0.5293851084541643,0.35536235447490483,0.2954642128938359,0.17768117723745241,0.7940776626812465,0.19051219595701865,0.41170410840829763,1.174197955387005])                    |
|(10,[1,2,3,4,5,6,7,8,9],[0.5293851084541643,1.0062071553441607,0.5330435317123572,0.2954642128938359,0.3553

# Cosine Similarity

In [10]:
spark3 = SparkSession.builder.appName("Ngram Similarity").getOrCreate()

In [11]:
# Create the dataframe with five text abstracts
abst = spark3.read.text('input*.txt')


In [12]:
# Tokenize the abstract texts
tokenizer = Tokenizer(inputCol="value", outputCol="words")
wordsData = tokenizer.transform(abst)

In [13]:
# Creating n-grams with n=2
ngram = NGram(n=2, inputCol="words", outputCol="ngrams")
ngramDataFrame = ngram.transform(wordsData)

In [14]:
# displaying the results
ngramDataFrame.select("ngrams").show(truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|ngrams                                                                                                                                                                                                                                                  |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[the term, term soul, soul is, is used, used in, in the, the traditional, traditional literature, literature as, as a, a synonym, synonym for, for one’s, one’s true, true self, self and, and is, is associated, associated with, with the]          

In [15]:
# Create a mapping from words to vectors
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="words", outputCol="result")
model = word2Vec.fit(ngramDataFrame)
print(model.getVectors().collect())
result = model.getVectors().collect()

[Row(word='beneath', vector=DenseVector([-0.063, -0.1433, -0.0275])), Row(word='used', vector=DenseVector([0.1425, 0.0271, 0.1235])), Row(word='providing', vector=DenseVector([0.0157, 0.098, -0.1096])), Row(word='cells', vector=DenseVector([-0.0462, -0.0827, -0.1413])), Row(word='measure', vector=DenseVector([0.0346, 0.0103, 0.1353])), Row(word='is,', vector=DenseVector([-0.0772, -0.1257, 0.1589])), Row(word='number', vector=DenseVector([-0.1333, -0.0183, 0.1237])), Row(word='for', vector=DenseVector([-0.0761, 0.1601, -0.0055])), Row(word='hamiltonian', vector=DenseVector([0.1538, 0.0424, -0.0704])), Row(word='find', vector=DenseVector([0.1352, -0.1649, 0.1625])), Row(word='factual', vector=DenseVector([0.1439, 0.1665, 0.1454])), Row(word='superconscious', vector=DenseVector([-0.0677, -0.011, 0.0132])), Row(word='shifts', vector=DenseVector([-0.0902, -0.1046, -0.1303])), Row(word='due', vector=DenseVector([0.1268, 0.0562, 0.0994])), Row(word='proposed', vector=DenseVector([-0.0757, -0.

In [16]:
# Show the synonyms and cosine similarity of the word in input data
synonyms = model.findSynonyms("science", 10)
synonyms.show(10)

+-----------+------------------+
|       word|        similarity|
+-----------+------------------+
| describing|0.9997835755348206|
|      prior|0.9970971345901489|
|       call| 0.979093074798584|
|      don’t|0.9779413342475891|
|       term|0.9775862097740173|
|investigate| 0.971872091293335|
| omniscient|0.9716155529022217|
|literature.|0.9690399765968323|
|      case.|0.9689924120903015|
|     field,|0.9689012765884399|
+-----------+------------------+



In [17]:
# creating spark session
spark4 = SparkSession.builder.appName("Word2Vec Similarity").getOrCreate()

In [19]:
# Create the dataframe with five text abstracts
abst = spark4.read.text('input*.txt')

In [20]:
# Tokenize the abstract texts
tokenizer = Tokenizer(inputCol="value", outputCol="words")
wordsData = tokenizer.transform(abst)

In [22]:
# Create a mapping from words to vectors
word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="words", outputCol="result")
model = word2Vec.fit(wordsData)
print(model.getVectors().collect())
result = model.getVectors().collect()

[Row(word='beneath', vector=DenseVector([-0.063, -0.1433, -0.0275])), Row(word='used', vector=DenseVector([0.1425, 0.0271, 0.1235])), Row(word='providing', vector=DenseVector([0.0157, 0.098, -0.1096])), Row(word='cells', vector=DenseVector([-0.0462, -0.0827, -0.1413])), Row(word='measure', vector=DenseVector([0.0346, 0.0103, 0.1353])), Row(word='is,', vector=DenseVector([-0.0772, -0.1257, 0.1589])), Row(word='number', vector=DenseVector([-0.1333, -0.0183, 0.1237])), Row(word='for', vector=DenseVector([-0.0761, 0.1601, -0.0055])), Row(word='hamiltonian', vector=DenseVector([0.1538, 0.0424, -0.0704])), Row(word='find', vector=DenseVector([0.1352, -0.1649, 0.1625])), Row(word='factual', vector=DenseVector([0.1439, 0.1665, 0.1454])), Row(word='superconscious', vector=DenseVector([-0.0677, -0.011, 0.0132])), Row(word='shifts', vector=DenseVector([-0.0902, -0.1046, -0.1303])), Row(word='due', vector=DenseVector([0.1268, 0.0562, 0.0994])), Row(word='proposed', vector=DenseVector([-0.0757, -0.

In [23]:
# Show the synonyms and cosine similarity of the word in input data
synonyms = model.findSynonyms("science", 10)
synonyms.show(10)

+-----------+------------------+
|       word|        similarity|
+-----------+------------------+
| describing|0.9997835755348206|
|      prior|0.9970971345901489|
|       call| 0.979093074798584|
|      don’t|0.9779413342475891|
|       term|0.9775862097740173|
|investigate| 0.971872091293335|
| omniscient|0.9716155529022217|
|literature.|0.9690399765968323|
|      case.|0.9689924120903015|
|     field,|0.9689012765884399|
+-----------+------------------+



In [None]:
#closing the spark sessions
spark.stop()
spark2.stop()
spark3.stop()
spark4.stop()