In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

import findspark
findspark.init("spark-2.4.4-bin-hadoop2.7")# SPARK_HOME

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [3]:
# Put input_docs_sample.zip in your Google Drive

!rm -rf input_docs
!cp /content/drive/My\ Drive/input_docs.zip .
!unzip input_docs.zip > /dev/null
!ls input_docs/ | wc -l

# for the real collection change above input_docs_sample.zip to input_docs.zip
# for the sample collection of 5 docs, the process is fast
# for the real collection, the process takes about 6 min (start to finish, the whole notebook) 

19026


In [4]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

**Create an RDD from a text file**

Each line of the text file becomes an element of the RDD.

In [5]:
# wholeTextFiles generates an RDD of pair values, 
# where the key is the full path of each file, the value is the content of each file
#input = sc.wholeTextFiles("/content/drive/My\ Drive/input_docs");
input = sc.wholeTextFiles("input_docs");

# Now we strip the prefix of filenames and leave only the basename. 
# e.g. 'file:/content/drive/My Drive/Colab Notebooks/data_spark/input_docs/3.html'
# becomes '3.html' 
import os

#(did,text)
input2 = input.map(lambda x: (int(os.path.basename(x[0]).split(".")[0]), x[1]))
print(input2.take(1))


[(16474, '<H2>13-APR-1987 12:42:20.17</H2>\r\n<H2>BANK OF ENGLAND TO MONITOR WHEN-ISSUED TRADING</H2>\r\nThe Bank of England will step up\nreporting requirements for primary dealers in U.K. Government\ngilts during the when-issued trading period in between the date\nan auction is announced and the date the sale is actually held.\n    In its document on the auctions, the Bank said "The Bank is\ncontent for such (when-issued) trading to cevelop subject to\ncertain conditions, in particular for predential supervision of\nthe credit risk to which the gilt-edged market makers and\nInter-dealer brokers may become exposed."\n    The Bank is widely expected to require dealers to report\nwehn-issued trading positions on a daily basis.\n    When-issued trading presents certain unusual risks because\ndealers are buying and selling a security that technically does\nnot exist although there is no uncertainty about when it will\nbe issued.\n    Still, the fact there is no physical delivery possible\

In [6]:
# Doc to wordlist function
# The output will be a list of tuples such as 
# ("apple", (3,10,10/20)), 
# where 3 is docid, 
# 10 is frequency of "apple" in this doc, 
# 20 is maxf in in this doc.

from bs4 import BeautifulSoup
from collections import Counter
import re

from nltk.corpus import stopwords
stop_words = set(stopwords.words('english'))
# for a given doc return a list of tuples of the form (w, (docid, freq, freq/maxfreq))
def dw(docid, htmltext):
  cleantext = BeautifulSoup(htmltext).get_text()
  words = re.findall('[a-zA-Z]+', cleantext)
  words = [w.lower() for w in words]
  words = [w for w in words if not w in stop_words]
  counter = Counter(words)
  most_common,maxf = counter.most_common(1)[0]
  return [(w,(docid,counter[w],counter[w]/maxf)) for w in counter]

word_docid_freq_tf = input2.flatMap(lambda x: dw(x[0],x[1]))
print(word_docid_freq_tf.take(5))
temp1=word_docid_freq_tf

[('apr', (16474, 1, 0.14285714285714285)), ('bank', (16474, 6, 0.8571428571428571)), ('england', (16474, 2, 0.2857142857142857)), ('monitor', (16474, 1, 0.14285714285714285)), ('issued', (16474, 7, 1.0))]


Expected output (all expected results are on the small sample):

<pre>
[('feb', (1, 1, 0.07142857142857142)), ('bahia', (1, 5, 0.35714285714285715))]
</pre>

In [7]:
# Now create an RDD as follows 
# (word, [(did1,freq1,tf1), (did2,freq2,tf2), ...])
word_docid_freq_tf=word_docid_freq_tf.groupByKey().map(lambda x : (x[0], list(x[1])))
print(word_docid_freq_tf.take(2))

[('apr', [(16474, 1, 0.14285714285714285), (14751, 1, 0.5), (12470, 1, 0.3333333333333333), (16897, 1, 0.3333333333333333), (13243, 1, 0.2), (12325, 1, 0.2), (16123, 1, 0.125), (16652, 1, 0.3333333333333333), (15269, 1, 0.25), (16804, 1, 0.1), (14841, 1, 0.5), (12592, 1, 0.3333333333333333), (16379, 1, 0.3333333333333333), (16655, 1, 0.16666666666666666), (15741, 1, 0.25), (12383, 1, 0.1111111111111111), (11861, 1, 0.2), (17340, 1, 0.25), (16585, 1, 0.16666666666666666), (17007, 1, 0.05555555555555555), (14920, 1, 0.3333333333333333), (15675, 1, 0.25), (11896, 1, 0.1), (15529, 1, 0.2), (14906, 1, 0.14285714285714285), (15969, 1, 0.16666666666666666), (15552, 1, 0.1), (13211, 1, 0.3333333333333333), (16195, 1, 0.08333333333333333), (13553, 3, 1.0), (15617, 1, 0.07142857142857142), (14103, 1, 0.3333333333333333), (16554, 1, 0.3333333333333333), (17078, 1, 0.25), (14948, 1, 0.25), (16749, 1, 0.14285714285714285), (11960, 1, 0.3333333333333333), (17388, 1, 0.3333333333333333), (16346, 1, 0

Expected output

<pre>
[('feb', [(1, 1, 0.07142857142857142), (2, 1, 0.2), (5, 1, 0.16666666666666666), (3, 1, 0.3333333333333333), (4, 1, 0.07142857142857142)]), ('bahia', [(1, 5, 0.35714285714285715)])]
</pre>

In [8]:
# (word, [(did,freq,tfidf), ...])
# We easily obtain idf as 1/len(postinglist_tf)
# idf = 1/len(postinglist_tf)
word_docid_freq_tfidf= word_docid_freq_tf.map(lambda x : (x[0], list(map(lambda y: (y[0], y[1], y[2]/len(x[1])), x[1] ))))
print(word_docid_freq_tfidf.take(1))

[('apr', [(16474, 1, 3.011322572874006e-05), (14751, 1, 0.00010539629005059021), (12470, 1, 7.026419336706015e-05), (16897, 1, 7.026419336706015e-05), (13243, 1, 4.215851602023609e-05), (12325, 1, 4.215851602023609e-05), (16123, 1, 2.6349072512647553e-05), (16652, 1, 7.026419336706015e-05), (15269, 1, 5.269814502529511e-05), (16804, 1, 2.1079258010118044e-05), (14841, 1, 0.00010539629005059021), (12592, 1, 7.026419336706015e-05), (16379, 1, 7.026419336706015e-05), (16655, 1, 3.5132096683530073e-05), (15741, 1, 5.269814502529511e-05), (12383, 1, 2.3421397789020048e-05), (11861, 1, 4.215851602023609e-05), (17340, 1, 5.269814502529511e-05), (16585, 1, 3.5132096683530073e-05), (17007, 1, 1.1710698894510024e-05), (14920, 1, 7.026419336706015e-05), (15675, 1, 5.269814502529511e-05), (11896, 1, 2.1079258010118044e-05), (15529, 1, 4.215851602023609e-05), (14906, 1, 3.011322572874006e-05), (15969, 1, 3.5132096683530073e-05), (15552, 1, 2.1079258010118044e-05), (13211, 1, 7.026419336706015e-05),

Expected output

<pre>
[('feb', [(1, 1, 0.014285714285714285), (2, 1, 0.04), (5, 1, 0.03333333333333333), (3, 1, 0.06666666666666667), (4, 1, 0.014285714285714285)])]
</pre>

In [9]:
# Now, we would like to obtain the magnitude of each doc.
# First, produce (did, (freq,tfidf)) for each word of doc did; 
# We do don't need the word itself, just its (freq,tfidf). 
# Then, do reduceByKey on these tuples and obtain maxfreq and 
# magnitude (squared) for each document. 
import math;
#TODO
# RDD of (did,(freq,tfidf)) tuples
did_freq_tfidf=word_docid_freq_tfidf.flatMapValues(lambda x:x)
did_freq_tfidf=did_freq_tfidf.map(lambda x:(x[1][0],(x[1][1],x[1][2]*x[1][2])))
print(did_freq_tfidf.take(2))
# Produce (did,(maxf,magnitudesq))
doc_maxf_mag=did_freq_tfidf.reduceByKey(lambda x,y:(max(x[0],y[0]),x[1]+y[1]))
print(doc_maxf_mag.take(2))

[(16474, (1, 9.068063637900522e-10)), (14751, (1, 1.1108377956428141e-08))]
[(16474, (7, 0.08380229270896267)), (12470, (3, 0.0011261690449096931))]


Excpected result

<pre>
[(1, (1, 0.0002040816326530612)), (2, (1, 0.0016))]
[(2, (5, 3.894100000000001)), (4, (14, 2.94553429705215))]
</pre>

In [0]:
!rm -rf inv_idx
word_docid_freq_tfidf.repartition(1).saveAsTextFile("inv_idx");

In [0]:
!rm -rf doc_mag
doc_maxf_mag.repartition(1).saveAsTextFile("doc_mag");

In [12]:
!ls -lrt inv_idx
!head inv_idx/part-00001
!wc -l inv_idx/part-00000
!wc -l inv_idx/part-00001
!cat inv_idx/part-00000 inv_idx/part-00001 > /content/drive/My\ Drive/inv_idx.txt
!wc -l /content/drive/My\ Drive/inv_idx.txt

total 37768
-rw-r--r-- 1 root root 38673200 Mar 28 01:30 part-00000
-rw-r--r-- 1 root root        0 Mar 28 01:30 _SUCCESS
head: cannot open 'inv_idx/part-00001' for reading: No such file or directory
42562 inv_idx/part-00000
wc: inv_idx/part-00001: No such file or directory
cat: inv_idx/part-00001: No such file or directory
42562 /content/drive/My Drive/inv_idx.txt


In [13]:
!ls -lrt doc_mag
!head doc_mag/part-00000
!wc -l doc_mag/part-00000
!wc -l doc_mag/part-00001
!cat doc_mag/part-00000 doc_mag/part-00001 > /content/drive/My\ Drive/doc_mag.txt
!wc -l /content/drive/My\ Drive/doc_mag.txt

total 636
-rw-r--r-- 1 root root 647381 Mar 28 01:30 part-00000
-rw-r--r-- 1 root root      0 Mar 28 01:30 _SUCCESS
(16474, (7, 0.08380229270896267))
(12470, (3, 0.0011261690449096931))
(16652, (3, 0.022313360779481158))
(16804, (10, 0.0031280579283883515))
(12592, (3, 0.00038765693721077367))
(17340, (4, 0.21173461444382377))
(14920, (3, 0.11128972368854853))
(11896, (10, 0.002500916678258711))
(14906, (7, 0.04403972545457592))
(15552, (10, 0.2614679163769299))
19026 doc_mag/part-00000
wc: doc_mag/part-00001: No such file or directory
cat: doc_mag/part-00001: No such file or directory
19026 /content/drive/My Drive/doc_mag.txt
