# **Preambule**

In [3]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [4]:
!wget -q https://downloads.apache.org/spark/spark-3.2.3/spark-3.2.3-bin-hadoop3.2.tgz

In [5]:
!tar xf spark-3.2.3-bin-hadoop3.2.tgz

In [6]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.3-bin-hadoop3.2"
import sys

In [7]:
!pip install pyspark==3.2.3

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==3.2.3
  Downloading pyspark-3.2.3.tar.gz (281.5 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.5/281.5 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m22.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.3-py2.py3-none-any.whl size=281990673 sha256=3620b14e68feb65629d88541dee8d32be9dc5f0ebdcc6cfd08c4be81166cf60a
  Stored in directory: /root/.cache/pip/wheels/9a/99/8c/e2d5ede0e1aefb33c64af344f2cd569354237f0bdd673bd243
Successfully built pyspark
Installing collected packages: py4j

In [8]:
#import findspark
#findspark.init()
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import *

In [9]:
# Check the pyspark version
import pyspark
print(pyspark.__version__)

3.2.3


In [10]:
spark = SparkSession.builder.master("local[*]").appName("MyFirstProgram").getOrCreate()
sc=spark.sparkContext

# Test the spark
df = spark.createDataFrame([{"hello": "world"} for x in range(1000)])
df.show(3, False)

+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows





---


# **Exercise n°0 : Prepare your documents**

In [11]:
##### Download all the text files
import time
from pyspark import SparkFiles

RJ_url = "https://www.gutenberg.org/files/1112/1112.txt"
spark.sparkContext.addFile(RJ_url)
RomeoJuliet_rdd=sc.textFile(SparkFiles.get("1112.txt"))

Hamlet_url = "https://www.gutenberg.org/files/1524/1524-0.txt"
spark.sparkContext.addFile(Hamlet_url)
Hamlet_rdd=sc.textFile(SparkFiles.get("1524-0.txt"))

Richard_url = "https://www.gutenberg.org/cache/epub/1776/pg1776.txt"
spark.sparkContext.addFile(Richard_url)
Richard_rdd=sc.textFile(SparkFiles.get("pg1776.txt"))

In [12]:
##### Create the cleaning function

import string

def cleaningFunc(text):
  text = text.lower()
  text = text.translate(text.maketrans("","",string.punctuation))
  return text

In [13]:
##### Clean all the texts
start = time.time()

RJ_clean = RomeoJuliet_rdd.flatMap(lambda line : line.split(" ")).map(cleaningFunc)

Hamlet_clean = Hamlet_rdd.flatMap(lambda line : line.split(" ")).map(cleaningFunc)

Richard_clean = Richard_rdd.flatMap(lambda line : line.split(" ")).map(cleaningFunc)

end = time.time()

print(end-start)

Richard_clean.take(20)

0.011086225509643555


['',
 'this',
 'etext',
 'file',
 'is',
 'presented',
 'by',
 'project',
 'gutenberg',
 'in',
 'cooperation',
 'with',
 'world',
 'library',
 'inc',
 'from',
 'their',
 'library',
 'of',
 'the']



---



# **Exercise n°1 : Count Words**


In [14]:
##### Count the (filtered) words of each documents

import re
import string
from pyspark import SparkContext

start = time.time()

RJ_CountWords = RJ_clean.filter(lambda x: re.match('[a-z]+', x)).map(lambda word: (word, 1)).reduceByKey(lambda x, y : x + y)

Hamlet_CountWords = Hamlet_clean.filter(lambda x: re.match('[a-z]+', x)).map(lambda word: (word, 1)).reduceByKey(lambda x, y : x + y)

Richard_CountWords = Richard_clean.filter(lambda x: re.match('[a-z]+', x)).map(lambda word: (word, 1)).reduceByKey(lambda x, y : x + y)

end = time.time()

print(end-start)

Richard_CountWords.take(10)

0.20787382125854492


[('this', 204),
 ('is', 296),
 ('presented', 1),
 ('project', 26),
 ('gutenberg', 21),
 ('in', 306),
 ('cooperation', 1),
 ('world', 31),
 ('library', 16),
 ('inc', 11)]



---


# **Exercice n°2 : Finding Frequent Terms and Stop Words**

In [15]:
###### Import the list of stopwords

import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords
stopwords = stopwords.words('english')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [16]:
##### Keep only the stopwords from the text files

start = time.time()

RJ_StopWords = RJ_clean.filter(lambda x : x in stopwords).filter(lambda x: re.match('[a-z]+', x)).map(lambda word: (word, 1)).reduceByKey(lambda x, y : x + y)

Hamlet_StopWords = Hamlet_clean.filter(lambda x : x in stopwords).filter(lambda x: re.match('[a-z]+', x)).map(lambda word: (word, 1)).reduceByKey(lambda x, y : x + y)

Richard_StopWords = Richard_clean.filter(lambda x : x in stopwords).filter(lambda x: re.match('[a-z]+', x)).map(lambda word: (word, 1)).reduceByKey(lambda x, y : x + y)

end = time.time()

print(end-start)

RJ_StopWords.take(10)

0.0829305648803711


[('of', 535),
 ('this', 279),
 ('was', 51),
 ('at', 87),
 ('when', 55),
 ('there', 67),
 ('is', 375),
 ('an', 91),
 ('as', 167),
 ('no', 114)]

In [17]:
###### Create a rdd with all the stopwords of the 3 texts

start = time.time()

all_StopWords_V1 = RJ_StopWords.union(Hamlet_StopWords).union(Richard_StopWords).reduceByKey(lambda x, y : x + y).sortBy(lambda row : row[1], ascending=False)

end = time.time()

print(end-start)

all_StopWords_V1.take(30)

2.2852823734283447


[('the', 3004),
 ('and', 2636),
 ('to', 2049),
 ('of', 1961),
 ('i', 1588),
 ('a', 1440),
 ('my', 1342),
 ('in', 1191),
 ('you', 1180),
 ('that', 1067),
 ('is', 1032),
 ('with', 877),
 ('it', 848),
 ('not', 841),
 ('this', 824),
 ('for', 764),
 ('his', 675),
 ('me', 672),
 ('be', 662),
 ('but', 616),
 ('as', 552),
 ('your', 505),
 ('he', 482),
 ('what', 481),
 ('so', 479),
 ('have', 456),
 ('or', 456),
 ('will', 434),
 ('by', 412),
 ('him', 410)]

In [18]:
start = time.time()

all_StopWords = all_StopWords_V1.map(lambda x : (x[1],x[0]))

end = time.time()

print(end-start)

all_StopWords.take(30)

0.00017118453979492188


[(3004, 'the'),
 (2636, 'and'),
 (2049, 'to'),
 (1961, 'of'),
 (1588, 'i'),
 (1440, 'a'),
 (1342, 'my'),
 (1191, 'in'),
 (1180, 'you'),
 (1067, 'that'),
 (1032, 'is'),
 (877, 'with'),
 (848, 'it'),
 (841, 'not'),
 (824, 'this'),
 (764, 'for'),
 (675, 'his'),
 (672, 'me'),
 (662, 'be'),
 (616, 'but'),
 (552, 'as'),
 (505, 'your'),
 (482, 'he'),
 (481, 'what'),
 (479, 'so'),
 (456, 'have'),
 (456, 'or'),
 (434, 'will'),
 (412, 'by'),
 (410, 'him')]

In [19]:
##### Create the dataframe

start = time.time()

columns = ["count","stopword"]

allStopWordsDF = spark.createDataFrame(all_StopWords, columns)

end = time.time()

print(end-start)

allStopWordsDF.show()

0.3372330665588379
+-----+--------+
|count|stopword|
+-----+--------+
| 3004|     the|
| 2636|     and|
| 2049|      to|
| 1961|      of|
| 1588|       i|
| 1440|       a|
| 1342|      my|
| 1191|      in|
| 1180|     you|
| 1067|    that|
| 1032|      is|
|  877|    with|
|  848|      it|
|  841|     not|
|  824|    this|
|  764|     for|
|  675|     his|
|  672|      me|
|  662|      be|
|  616|     but|
+-----+--------+
only showing top 20 rows



In [20]:
##### Create the csv file
start = time.time()

allStopWordsFormatCSV = allStopWordsDF.write.csv("StopWord")

end = time.time()

print(end-start)


2.290879964828491


---

# **Exercise n°3 : Simple Inverted Index**

In [21]:
##### Delete the stopwords from the text files

start = time.time()

RJ_OtherWords = RJ_clean.filter(lambda x : x not in stopwords).filter(lambda x: re.match('[a-z]+', x)).map(lambda word: (word, "RJ.txt")).reduceByKey(lambda x, y : x )

Hamlet_OtherWords = Hamlet_clean.filter(lambda x : x not in stopwords).filter(lambda x: re.match('[a-z]+', x)).map(lambda word: (word, "Hamlet.txt")).reduceByKey(lambda x, y : x)

Richard_OtherWords = Richard_clean.filter(lambda x : x not in stopwords).filter(lambda x: re.match('[a-z]+', x)).map(lambda word: (word, "Richard.txt")).reduceByKey(lambda x, y : x)

end = time.time()

print(end-start)

Richard_OtherWords.take(10)

0.11527538299560547


[('presented', 'Richard.txt'),
 ('project', 'Richard.txt'),
 ('gutenberg', 'Richard.txt'),
 ('cooperation', 'Richard.txt'),
 ('world', 'Richard.txt'),
 ('library', 'Richard.txt'),
 ('inc', 'Richard.txt'),
 ('shakespeare', 'Richard.txt'),
 ('cdroms', 'Richard.txt'),
 ('placed', 'Richard.txt')]

In [22]:
###### Create a rdd with all the other words of the 3 texts

start = time.time()

all_OtherWords = RJ_OtherWords.union(Hamlet_OtherWords).union(Richard_OtherWords).reduceByKey(lambda x, y : x + ", " + y).sortByKey().zipWithIndex().map(lambda x : (x[1],x[0]))

end = time.time()

print(end-start)

all_OtherWords.take(10)

3.383584976196289


[(0, ('abate', 'RJ.txt, Hamlet.txt')),
 (1, ('abatements', 'Hamlet.txt')),
 (2, ('abbey', 'RJ.txt')),
 (3, ('abbot', 'Richard.txt')),
 (4, ('abed', 'RJ.txt')),
 (5, ('abels', 'Richard.txt')),
 (6, ('abet', 'Richard.txt')),
 (7, ('abhorred', 'RJ.txt, Hamlet.txt')),
 (8, ('abhors', 'RJ.txt')),
 (9, ('abide', 'RJ.txt, Hamlet.txt, Richard.txt'))]



---

# **Exercise n°4 : Extended Inverted Index**

In [23]:
##### Delete the stopwords from the text files

start = time.time()

RJ_OtherWords_Extended = RJ_clean.filter(lambda x : x not in stopwords).filter(lambda x: re.match('[a-z]+', x)).map(lambda word: (word, "RJ.txt #1")).reduceByKey(lambda x, y : "RJ.txt #"+str(int(x[x.find("#")+1:])+int(y[y.find("#")+1:])) )

Hamlet_OtherWords_Extended = Hamlet_clean.filter(lambda x : x not in stopwords).filter(lambda x: re.match('[a-z]+', x)).map(lambda word: (word, "Hamlet.txt #1")).reduceByKey(lambda x, y : "Hamlet.txt #"+str(int(x[x.find("#")+1:])+int(y[y.find("#")+1:])) )

Richard_OtherWords_Extended = Richard_clean.filter(lambda x : x not in stopwords).filter(lambda x: re.match('[a-z]+', x)).map(lambda word: (word, "Richard.txt #1")).reduceByKey(lambda x, y : "Richard.txt #"+str(int(x[x.find("#")+1:])+int(y[y.find("#")+1:])) )

end = time.time()

print(end-start)

RJ_OtherWords_Extended.take(10)

0.1475062370300293


[('project', 'RJ.txt #90'),
 ('gutenberg', 'RJ.txt #32'),
 ('ebook', 'RJ.txt #13'),
 ('juliet', 'RJ.txt #65'),
 ('shakespeare', 'RJ.txt #8'),
 ('files', 'RJ.txt #3'),
 ('produced', 'RJ.txt #2'),
 ('proofing', 'RJ.txt #1'),
 ('tools', 'RJ.txt #1'),
 ('developed', 'RJ.txt #1')]

In [24]:
###### Create a rdd with all the other words of the 3 texts

start = time.time()

all_OtherWords_Extended = RJ_OtherWords_Extended.union(Hamlet_OtherWords_Extended).union(Richard_OtherWords_Extended).reduceByKey(lambda x, y : x + ", " + y).sortByKey().zipWithIndex().map(lambda x : (x[1],x[0]))

end = time.time()

print(end-start)

all_OtherWords_Extended.take(10)

3.214059591293335


[(0, ('abate', 'RJ.txt #1, Hamlet.txt #1')),
 (1, ('abatements', 'Hamlet.txt #1')),
 (2, ('abbey', 'RJ.txt #1')),
 (3, ('abbot', 'Richard.txt #7')),
 (4, ('abed', 'RJ.txt #1')),
 (5, ('abels', 'Richard.txt #1')),
 (6, ('abet', 'Richard.txt #1')),
 (7, ('abhorred', 'RJ.txt #1, Hamlet.txt #1')),
 (8, ('abhors', 'RJ.txt #1')),
 (9, ('abide', 'RJ.txt #1, Hamlet.txt #1, Richard.txt #1'))]