In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

import findspark
findspark.init("spark-2.4.4-bin-hadoop2.7")# SPARK_HOME

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Put input_docs_sample.zip in your Google Drive

!rm -rf input_docs
!cp /content/drive/My\ Drive/input_docs_sample.zip .
!unzip input_docs_sample.zip > /dev/null
!ls input_docs/ | wc -l

# for the real collection change above input_docs_sample.zip to input_docs.zip
# for the sample collection of 5 docs, the process is fast
# for the real collection, the process takes about 6 min (start to finish, the whole notebook) 

5


In [None]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

**Create an RDD from a text file**

Each line of the text file becomes an element of the RDD.

In [None]:
# wholeTextFiles generates an RDD of pair values, 
# where the key is the full path of each file, the value is the content of each file
#input = sc.wholeTextFiles("/content/drive/My\ Drive/input_docs");
input = sc.wholeTextFiles("input_docs");

# Now we strip the prefix of filenames and leave only the basename. 
# e.g. 'file:/content/drive/My Drive/Colab Notebooks/data_spark/input_docs/3.html'
# becomes '3.html' 
import os

#(did,text)
input2 = input.map(lambda x: (int(os.path.basename(x[0]).split(".")[0]), x[1]))

print(input2.take(1))

[(1, '<H2>26-FEB-1987 15:01:01.79</H2>\r\n<H2>BAHIA COCOA REVIEW</H2>\r\nShowers continued throughout the week in\nthe Bahia cocoa zone, alleviating the drought since early\nJanuary and improving prospects for the coming temporao,\nalthough normal humidity levels have not been restored,\nComissaria Smith said in its weekly review.\n    The dry period means the temporao will be late this year.\n    Arrivals for the week ended February 22 were 155,221 bags\nof 60 kilos making a cumulative total for the season of 5.93\nmln against 5.81 at the same stage last year. Again it seems\nthat cocoa delivered earlier on consignment was included in the\narrivals figures.\n    Comissaria Smith said there is still some doubt as to how\nmuch old crop cocoa is still available as harvesting has\npractically come to an end. With total Bahia crop estimates\naround 6.4 mln bags and sales standing at almost 6.2 mln there\nare a few hundred thousand bags still in the hands of farmers,\nmiddlemen, exporters a

In [None]:
# Doc to wordlist function
# The output will be a list of tuples such as 
# ("apple", (3,10,10/20)), 
# where 3 is docid, 
# 10 is frequency of "apple" in this doc, 
# 20 is maxf in in this doc.

from bs4 import BeautifulSoup
from collections import Counter
import re

from nltk.corpus import stopwords
stop_words = set(stopwords.words('english'))

# for a given doc return a list of tuples of the form (w, (docid, freq, freq/maxfreq))
def dw(docid, htmltext):
  cleantext = BeautifulSoup(htmltext).get_text()
  #TODO
  #returning just a dummy result
  return [(_,(_,_,_))]

word_docid_freq_tf = input2.flatMap(lambda x: dw(x[0],x[1]))

print(word_docid_freq_tf.take(2))

[(True, (True, True, True)), (True, (True, True, True))]


Expected output (all expected results are on the small sample):

<pre>
[('feb', (1, 1, 0.07142857142857142)), ('bahia', (1, 5, 0.35714285714285715))]
</pre>

In [None]:
# Now create an RDD as follows 
# (word, [(did1,freq1,tf1), (did2,freq2,tf2), ...])
		    
#TODO

# creating a dummy RDD
word_postinglist_freq_tf = sc.parallelize([ ('test', [(1, 1, 0.5), (2, 1, 0.2)]) ])

In [None]:
print(word_postinglist_freq_tf.map(lambda x : (x[0], list(x[1]))).take(1))

[('test', [(1, 1, 0.5), (2, 1, 0.2)])]


Expected output

<pre>
[('feb', [(1, 1, 0.07142857142857142), (2, 1, 0.2), (5, 1, 0.16666666666666666), (3, 1, 0.3333333333333333), (4, 1, 0.07142857142857142)]), ('bahia', [(1, 5, 0.35714285714285715)])]
</pre>

In [None]:
# (word, [(did,freq,tfidf), ...])
# We easily obtain idf as 1/len(postinglist_tf)
# idf = 1/len(postinglist_tf)

#TODO

# creating a dummy RDD
word_postinglist_freq_tfidf = sc.parallelize([('test', [(1,3,0.12), (2,5,0.876)])])

print(word_postinglist_freq_tfidf.take(1))

[('test', [(1, 3, 0.12), (2, 5, 0.876)])]


Expected output

<pre>
[('feb', [(1, 1, 0.014285714285714285), (2, 1, 0.04), (5, 1, 0.03333333333333333), (3, 1, 0.06666666666666667), (4, 1, 0.014285714285714285)])]
</pre>

In [None]:
# Now, we would like to obtain the magnitude of each doc.
# First, produce (did, (freq,tfidf)) for each word of doc did; 
# We do don't need the word itself, just its (freq,tfidf). 
# Then, do reduceByKey on these tuples and obtain maxfreq and 
# magnitude (squared) for each document. 

#TODO

# RDD of (did,(freq,tfidf)) tuples
# creating a dummy RDD
did_freq_tfidfsq_rdd = sc.parallelize([(1,(3,0.12)), (1,(5,0.13)), (2,(2,0.11)), (2,(6,0.14))])

print(did_freq_tfidfsq_rdd.take(2))

# Produce (did,(maxf,magnitudesq))
# creating a dummy RDD
doc_maxf_mag = sc.parallelize([[(1, (5, 0.23)), (2, (6, 0.34))]])

print(doc_maxf_mag.take(2))

[(1, (3, 0.12)), (1, (5, 0.13))]
[[(1, (5, 0.23)), (2, (6, 0.34))]]


Excpected result

<pre>
[(1, (1, 0.0002040816326530612)), (2, (1, 0.0016))]
[(2, (5, 3.894100000000001)), (4, (14, 2.94553429705215))]
</pre>

In [None]:
!rm -rf inv_idx
word_postinglist_freq_tfidf.saveAsTextFile("inv_idx");

In [None]:
!rm -rf doc_mag
doc_maxf_mag.saveAsTextFile("doc_mag");

In [None]:
!ls -lrt inv_idx
!head inv_idx/part-00001
!wc -l inv_idx/part-00000
!wc -l inv_idx/part-00001
!cat inv_idx/part-00000 inv_idx/part-00001 > /content/drive/My\ Drive/inv_idx.txt
!wc -l /content/drive/My\ Drive/inv_idx.txt

total 4
-rw-r--r-- 1 root root  0 Feb 19 19:43 part-00000
-rw-r--r-- 1 root root 40 Feb 19 19:43 part-00001
-rw-r--r-- 1 root root  0 Feb 19 19:43 _SUCCESS
('test', [(1, 3, 0.12), (2, 5, 0.876)])
0 inv_idx/part-00000
1 inv_idx/part-00001
1 /content/drive/My Drive/inv_idx.txt


In [None]:
!ls -lrt doc_mag
!head doc_mag/part-00000
!wc -l doc_mag/part-00000
!wc -l doc_mag/part-00001
!cat doc_mag/part-00000 doc_mag/part-00001 > /content/drive/My\ Drive/doc_mag.txt
!wc -l /content/drive/My\ Drive/doc_mag.txt

total 4
-rw-r--r-- 1 root root  0 Feb 19 19:43 part-00000
-rw-r--r-- 1 root root 33 Feb 19 19:43 part-00001
-rw-r--r-- 1 root root  0 Feb 19 19:43 _SUCCESS
0 doc_mag/part-00000
1 doc_mag/part-00001
1 /content/drive/My Drive/doc_mag.txt
