In [None]:
# install Java8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
# download spark3.2.3
!wget -q https://downloads.apache.org/spark/spark-3.2.3/spark-3.2.3-bin-hadoop3.2.tgz 
# unzip it
!tar xf spark-3.2.3-bin-hadoop3.2.tgz
!pip install -q findspark

 **Setting Environment Variables**

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.3-bin-hadoop3.2"
import sys

**Starting a Spark Session**

In [None]:
!pip install pyspark==3.2.3
#import findspark
#findspark.init()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import *
import pyspark.sql.functions as func



Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==3.2.3
  Downloading pyspark-3.2.3.tar.gz (281.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.5/281.5 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m22.7 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.3-py2.py3-none-any.whl size=281990673 sha256=50d42e4f3c836fa9543d14fd1dbd1f6e0878454e4ed005edb1520c68e2db8c77
  Stored in directory: /root/.cache/pip/wheels/9a/99/8c/e2d5ede0e1aefb33c64af344f2cd569354237f0bdd673bd243
Successfully built pyspark
Installing collected packages: py4j

**Opening a Session**

In [None]:
spark = SparkSession.builder.master("local[*]").appName("MyFirstProgram").getOrCreate()
sc=spark.sparkContext

**Quick Installation Test**

In [None]:
# Test the spark
df = spark.createDataFrame([{"hello": "world"} for x in
range(1000)])
df.show(3, False)

+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows



**Checking pyspark version**

In [None]:
import sys
# Check the pyspark version
import pyspark
print(pyspark.__version__)

3.2.3


# Exercise 0 : Download and Prepare your Document Corpus

**Uploading the text files** **and Reading files**

In [None]:
import string 

def lower_clean_str(strr):
  return strr.translate(strr.maketrans('', '', string.punctuation)).lower()

In [None]:
from pyspark import SparkFiles
spark.sparkContext.addFile("https://www.gutenberg.org/files/1112/1112.txt")

wordsRJ = sc.textFile(SparkFiles.get("1112.txt")).flatMap(lambda x: x.split(' ') )\
                        .map(lambda x: lower_clean_str(x))

for i in wordsRJ.collect(): print(i)


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
the
capulets




raise
up
the
montagues
some
others
search



































exeunt
others
of
the
watch




we
see
the
ground
whereon
these
woes
do
lie




but
the
true
ground
of
all
these
piteous
woes




we
cannot
without
circumstance
descry






enter
some
of
the
watch
with
romeos
man
balthasar



2
watch
heres
romeos
man
we
found
him
in
the
churchyard



chief
watch
hold
him
in
safety
till
the
prince
come
hither











enter
friar
laurence
and
another
watchman



3
watch
here
is
a
friar
that
trembles
sighs
and
weeps




we
took
this
mattock
and
this
spade
from
him




as
he
was
coming
from
this
churchyard
side



chief
watch
a
great
suspicion
stay
the
friar
too















enter
the
prince
and
attendants




prince
what
misadventure
is
so
early
up




that
calls
our
person
from
our
morning
rest













enter
capulet
and
his
wife
with
others




cap
what
should
it
be
that
they
so
shriek
ab

# Exercise 1 : Count Words

**Count how many times each word occurs**

In [None]:
import re
precount = wordsRJ.map(lambda word : (word,1))
precount = precount.filter( lambda x : re.match(r"^[a-z]+$",x[0]))
count = precount.reduceByKey(lambda x,y : x+y)
for element in count.collect():
  print(element)

# Exercise 2: Finding Frequent Terms and Stop Words 

In [None]:
!pip install -U nltk

In [None]:
import nltk
nltk.download('stopwords')

from nltk.corpus import stopwords


spark.sparkContext.addFile("https://www.gutenberg.org/files/1524/1524-0.txt")
spark.sparkContext.addFile("https://www.gutenberg.org/cache/epub/1776/pg1776.txt")

wordsRJ = wordsRJ.filter( lambda x : re.match(r"^[a-z]+$",x))
wordsR = sc.textFile(SparkFiles.get("pg1776.txt")).flatMap(lambda x: x.split(' ') )\
                        .map(lambda x: lower_clean_str(x))\
                        .filter( lambda x : re.match(r"^[a-z]+$",x))
wordsH = sc.textFile(SparkFiles.get("1524-0.txt")).flatMap(lambda x: x.split(' ') )\
                        .map(lambda x: lower_clean_str(x))\
                        .filter( lambda x : re.match(r"^[a-z]+$",x))


  
stpW_RJ = wordsRJ.filter(lambda x: x in stopwords.words('english'))
stpW_R = wordsR.filter(lambda x: x in stopwords.words('english'))
stpW_H = wordsH.filter(lambda x: x in stopwords.words('english'))


union = stpW_RJ.union(stpW_R).union(stpW_H)

precountStpW = union.map(lambda word : (word,1))

count = precountStpW.reduceByKey(lambda x,y : x+y)



[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [None]:
result = count.map(lambda word : (word[1],word[0])).sortByKey(ascending=False)
for element in result.collect():
  print(element)

(3004, 'the')
(2636, 'and')
(2049, 'to')
(1961, 'of')
(1588, 'i')
(1440, 'a')
(1342, 'my')
(1191, 'in')
(1180, 'you')
(1067, 'that')
(1032, 'is')
(877, 'with')
(848, 'it')
(841, 'not')
(824, 'this')
(764, 'for')
(675, 'his')
(672, 'me')
(662, 'be')
(616, 'but')
(552, 'as')
(505, 'your')
(482, 'he')
(481, 'what')
(479, 'so')
(456, 'have')
(456, 'or')
(434, 'will')
(412, 'by')
(410, 'him')
(366, 'no')
(361, 'do')
(359, 'all')
(352, 'are')
(335, 'we')
(322, 'our')
(319, 'on')
(314, 'from')
(307, 'if')
(305, 'o')
(272, 'her')
(256, 'at')
(237, 'now')
(226, 'they')
(219, 'more')
(214, 'which')
(205, 'how')
(201, 'then')
(200, 'here')
(188, 'there')
(186, 'an')
(175, 'was')
(169, 'their')
(165, 'she')
(158, 'some')
(158, 'am')
(152, 'when')
(152, 'them')
(152, 'where')
(147, 'such')
(147, 'should')
(146, 'than')
(145, 'too')
(143, 'did')
(137, 'out')
(137, 'any')
(133, 'these')
(130, 'up')
(127, 'can')
(126, 'why')
(124, 'most')
(115, 'who')
(103, 'very')
(101, 'had')
(96, 'were')
(88, 'othe

In [None]:
finalResult = spark.createDataFrame(result,["Count","Stop Word"])

finalResult.coalesce(1).write.mode('overwrite').option("header",True) \
            .csv("Count Stop-Word Result")

# Exercise 3 : Simple Inverted Index

In [None]:
W_RJ = wordsRJ.filter(lambda x: x not in stopwords.words('english')).filter( lambda x : re.match(r"^[a-z]+$",x))
      
W_R = wordsR.filter(lambda x: x not in stopwords.words('english')).filter( lambda x : re.match(r"^[a-z]+$",x))
W_H = wordsH.filter(lambda x: x not in stopwords.words('english')).filter( lambda x : re.match(r"^[a-z]+$",x))


W_RJ_m = W_RJ.map(lambda x: (x,'RomeoJuliet.txt') )
W_R_m = W_R.map(lambda x: (x,'RichardII.txt') )
W_H_m = W_H.map(lambda x: (x,'Hamlet.txt') )

union_ = W_RJ_m.union(W_R_m).union(W_H_m )

union_ = union_.distinct()


preResult = union_.map(lambda x: (x[0],[x[1]])).reduceByKey(lambda x,y: x+y)
#preResult = union_.groupByKey().map(lambda x: (x[0],list(x[1])))
Result = preResult.zipWithIndex().map(lambda x: (x[1],x[0][0],x[0][1]))

for element in Result.collect():
 print(element)





[1;30;43mStreaming output truncated to the last 5000 lines.[0m
(3197, 'proclaims', ['Hamlet.txt'])
(3198, 'bounteous', ['Hamlet.txt'])
(3199, 'dearly', ['Hamlet.txt', 'RomeoJuliet.txt'])
(3200, 'investments', ['Hamlet.txt'])
(3201, 'pledge', ['Hamlet.txt', 'RichardII.txt'])
(3202, 'breach', ['Hamlet.txt', 'RomeoJuliet.txt', 'RichardII.txt'])
(3203, 'observance', ['Hamlet.txt'])
(3204, 'origin', ['Hamlet.txt'])
(3205, 'making', ['Hamlet.txt', 'RomeoJuliet.txt', 'RichardII.txt'])
(3206, 'horridly', ['Hamlet.txt'])
(3207, 'nerve', ['Hamlet.txt'])
(3208, 'unhand', ['Hamlet.txt'])
(3209, 'render', ['Hamlet.txt', 'RichardII.txt'])
(3210, 'days', ['Hamlet.txt', 'RomeoJuliet.txt', 'RichardII.txt'])
(3211, 'fretful', ['Hamlet.txt'])
(3212, 'rots', ['Hamlet.txt'])
(3213, 'wharf', ['Hamlet.txt'])
(3214, 'vial', ['Hamlet.txt', 'RomeoJuliet.txt', 'RichardII.txt'])
(3215, 'reveal', ['Hamlet.txt'])
(3216, 'dwelling', ['Hamlet.txt'])
(3217, 'already', ['Hamlet.txt', 'RomeoJuliet.txt', 'RichardII.txt

# Exercice 4

In [None]:
W_RJ_mm = W_RJ.map(lambda x: ((x,'RomeoJuliet.txt'),1) )
W_R_mm = W_R.map(lambda x: ((x,'RichardII.txt'),1) )
W_H_mm = W_H.map(lambda x: ((x,'Hamlet.txt'),1) )

W_RJ_mr  = W_RJ_mm .reduceByKey(lambda x,y : x+y)
W_R_mr  = W_R_mm .reduceByKey(lambda x,y : x+y)
W_H_mr  = W_H_mm .reduceByKey(lambda x,y : x+y)


union4 = W_RJ_mr.union(W_R_mr).union(W_H_mr)

preResult4 = union4.map(lambda x : (x[0][0],[(x[0][1],x[1])]))\
.reduceByKey(lambda x,y: x+y)
                 
Result4 = preResult4.zipWithIndex().map(lambda x: (x[1],x[0][0],x[0][1]))
                                                                                                 
for i in Result4.collect(): print(i)
#finalResult4 = spark.createDataFrame(Result4,["id","Word","Document #Frequency"])
#finalResult4.show(100,False)


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
(3197, 'posthaste', [('Hamlet.txt', 1)])
(3198, 'tenantless', [('Hamlet.txt', 1)])
(3199, 'sheeted', [('Hamlet.txt', 1)])
(3200, 'squeak', [('Hamlet.txt', 1)])
(3201, 'roman', [('Hamlet.txt', 2)])
(3202, 'trains', [('Hamlet.txt', 2)])
(3203, 'moist', [('Hamlet.txt', 1)])
(3204, 'influence', [('Hamlet.txt', 1)])
(3205, 'precurse', [('Hamlet.txt', 1)])
(3206, 'harbingers', [('Hamlet.txt', 1)])
(3207, 'preceding', [('Hamlet.txt', 1)])
(3208, 'omen', [('Hamlet.txt', 1)])
(3209, 'demonstrated', [('Hamlet.txt', 1)])
(3210, 'blast', [('Hamlet.txt', 2)])
(3211, 'illusion', [('Hamlet.txt', 1)])
(3212, 'partisan', [('Hamlet.txt', 1)])
(3213, 'majestical', [('Hamlet.txt', 2)])
(3214, 'invulnerable', [('Hamlet.txt', 1)])
(3215, 'started', [('Hamlet.txt', 1)])
(3216, 'shrillsounding', [('Hamlet.txt', 1)])
(3217, 'erring', [('Hamlet.txt', 1)])
(3218, 'hies', [('Hamlet.txt', 1)])
(3219, 'probation', [('Hamlet.txt', 1)])
(3220, 'crowing'