# The first part of the project:
### 1. Create an Inverted Index from .Html Documents :

Reads the documents (.html files) as text files.

Cleans all the documents using the NLTK library - Convert to lowercase, remove stopwords, punctuations, special characters, numbers etc. 
 
Calculates the TF-IDF (Term frequency - Inverse Document Frequency) and the document magnitude

Save the inverted index and magnitude in text files.


In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

import findspark
findspark.init("spark-2.4.4-bin-hadoop2.7")# SPARK_HOME

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Upload a zip file "input_docs_sample.zip" containing the .html documents to Google Drive

!rm -rf input_docs
!cp /content/drive/MyDrive/input_docs_sample.zip .
!unzip input_docs_sample.zip > /dev/null
!ls input_docs/ | wc -l


5


In [None]:
# Importing the NLTK package for text processing

import nltk
nltk.download('stopwords')
nltk.download('punkt')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

**Creating an RDD from a text file**

Each line of the text file becomes an element of the RDD.

In [None]:
# wholeTextFiles generates an RDD of pair values, 
# where the key is the full path of each file, the value is the content of each file

input = sc.wholeTextFiles("input_docs");

# Stripping the path of the file and obtain only the basename. 
# e.g. 'file:/content/drive/My Drive/Colab Notebooks/data_spark/input_docs/1.html'
# becomes '1.html' 
import os

# Maps the document id with the corresponding text --> (did,text)
input2 = input.map(lambda x: (int(os.path.basename(x[0]).split(".")[0]), x[1])) # in input 2 the text in document is stored as key value pair

print(input2.take(1))

[(4, '<H2>26-FEB-1987 15:07:13.72</H2>\r\n<H2>TALKING POINT/BANKAMERICA BAC EQUITY OFFER</H2>\r\nBankAmerica Corp is not under\npressure to act quickly on its proposed equity offering and\nwould do well to delay it because of the stock\'s recent poor\nperformance, banking analysts said.\n    Some analysts said they have recommended BankAmerica delay\nits up to one-billion-dlr equity offering, which has yet to be\napproved by the Securities and Exchange Commission.\n    BankAmerica stock fell this week, along with other banking\nissues, on the news that Brazil has suspended interest payments\non a large portion of its foreign debt.\n    The stock traded around 12, down 1/8, this afternoon,\nafter falling to 11-1/2 earlier this week on the news.\n    Banking analysts said that with the immediate threat of the\nFirst Interstate Bancorp I takeover bid gone, BankAmerica is\nunder no pressure to sell the securities into a market that\nwill be nervous on bank stocks in the near term.\n    Ban

In [None]:
# Doc to wordlist function
# The output will be a list of tuples such as 
# ("search", (4,15,15/20)), 
# where 4 is docid, 
# 15 is frequency of "search" in this doc, 
# 20 is maxf in in the document.

from bs4 import BeautifulSoup
from collections import Counter
import re
import string
from string import digits
punctuations = '''!()-[]{};:'"\,<>./?@#$%^&*_~'''

from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
stop_words = set(stopwords.words('english'))

# Function for creating a list of tuples of the form (word, (docid, freq, freq/maxfreq)) when docid and text is given. 
def dw(docid, htmltext):
  htmltext_lower = htmltext.lower() #converting to lower case
  cleantext_notags = BeautifulSoup(htmltext_lower).get_text() #removing html tags
  remove_digits = str.maketrans('','',digits)  #To find didgits in the text and then removing in next line
  cleantext_nodigits = cleantext_notags.translate(remove_digits) #removing numbers 
  cleantext_nosplchar = re.sub('[^A-Za-z0-9]+',' ',cleantext_nodigits) #remove special characters
  cleantext_nopunct = ''
  for char in cleantext_nosplchar:
    if char not in punctuations:
      cleantext_nopunct = cleantext_nopunct + char #remove punctuations

  cleantext_tokenize = word_tokenize(cleantext_nopunct) #tokenized text for getting this us an individual word
  
  cleantext_nostopwords = [word for word in cleantext_tokenize if word not in stopwords.words('english')] #removing stopwords
  
  counter = Counter(cleantext_nostopwords) #counting the number of words and it is stored as a key value pair where key is the word and value is frequency

  maxfrequency_word = counter.most_common(1)[0][1] #finding maximum number of times a word is getting repeated in the document
  output = []
  for obj in counter: # creating required format for rdd
    key = (obj)
    value = (int(docid), counter[obj], counter[obj]/maxfrequency_word) #here docid ,freq and term freq(tf) is stored
    map = (key,value) 
    output.append(map) #adding to output
  return output

#dw(2,"<H2>26-FEB-1987 15:01:01.79</H2><H2>BAHIA COCOA REVIEW</H2>Showers continued throughout the week in the Bahia cocoa zone, alleviating the drought since early January and improving prospects for the coming temporao, although normal humidity levels have not been restored, Comissaria Smith said in its weekly review. The dry period means the temporao will be late this year. Arrivals for the week ended February 22 were 155,221 bags of 60 kilos making a cumulative total for the season of 5.93 mln against 5.81 at the same stage last year. Again it seems that cocoa delivered earlier on consignment was included in the arrivals figures. Comissaria Smith said there is still some doubt as to how much old crop cocoa is still available as harvesting has practically come to an end. With total Bahia crop estimates around 6.4 mln bags and sales standing at almost 6.2 mln there are a few hundred thousand bags still in the hands of farmers, middlemen, exporters and processors. There are doubts as to how much of this cocoa would be fit for export as shippers are now experiencing dificulties in obtaining +Bahia superior+ certificates. In view of the lower quality over recent weeks farmers have sold a good part of their cocoa held on consignment. Comissaria Smith said spot bean prices rose to 340 to 350 cruzados per arroba of 15 kilos. Bean shippers were reluctant to offer nearby shipment and only limited sales were booked for March shipment at 1,750 to 1,780 dlrs per tonne to ports to be named. New crop sales were also light and all to open ports with June/July going at 1,850 and 1,880 dlrs and at 35 and 45 dlrs under New York july, Aug/Sept at 1,870, 1,875 and 1,880 dlrs per tonne FOB. Routine sales of butter were made. March/April sold at 4,340, 4,345 and 4,350 dlrs. April/May butter went at 2.27 times New York May, June/July at 4,400 and 4,415 dlrs, Aug/Sept at 4,351 to 4,450 dlrs and at 2.27 and 2.28 times New York Sept and Oct/Dec at 4,480 dlrs and 2.27 times New York Dec, Comissaria Smith said. Destinations were the U.S., Covertible currency areas, Uruguay and open ports. Cake sales were registered at 785 to 995 dlrs for March/April, 785 dlrs for May, 753 dlrs for Aug and 0.39 times New York Dec for Oct/Dec. Buyers were the U.S., Argentina, Uruguay and convertible currency areas. Liquor sales were limited with March/April selling at 2,325 and 2,380 dlrs, June/July at 2,375 dlrs and at 1.25 times New York July, Aug/Sept at 2,400 dlrs and at 1.25 times New York Sept and Oct/Dec at 1.25 times New York Dec, Comissaria Smith said. Total Bahia sales are currently estimated at 6.13 mln bags against the 1986/87 crop and 1.06 mln bags against the 1987/88 crop. Final figures for the period to February 28 are expected to be published by the Brazilian Cocoa Trade Commission after carnival which ends midday on February 27.")
word_docid_freq_tf = input2.flatMap(lambda x: dw(x[0],x[1])) # we  are creating an rdd 
print(word_docid_freq_tf.take(2))

[('feb', (4, 1, 0.07142857142857142)), ('talking', (4, 1, 0.07142857142857142))]


In [None]:
# This cell creates an RDD of the form (word, [(did1,freq1,tf1), (did2,freq2,tf2), ...]) 
# ie. creates a posting list of the words

word_docid_freq_tf_rdd_2 = word_docid_freq_tf.map(lambda x : (x[0],[x[1]])).reduceByKey(lambda a,b: a+b)
word_posting_list_tf = sc.parallelize(word_docid_freq_tf_rdd_2.collect())
print(word_posting_list_tf.count())

396


In [None]:
print(word_posting_list_tf.map(lambda x : (x[0], list(x[1]))).take(1))

[('feb', [(4, 1, 0.07142857142857142), (5, 1, 0.16666666666666666), (1, 1, 0.07142857142857142), (2, 1, 0.2), (3, 1, 0.3333333333333333)])]


In [None]:
# This cell creates the tf-idf 
# (word, [(did,freq,tfidf), ...])
# idf = 1/len(postinglist_tf)

tf_idf = []
for row in word_posting_list_tf.collect():
  new_tuple_row = '' 
  word_variable = row[0]
  idf = 1/len(row[1])
  new_row_tuple_array = []
  for rowindex in row[1]:
    rowindex_tf = rowindex[2]
    rowindex_tfidf = idf * rowindex_tf
    new_tuple_rowindex = ''
    new_tuple_rowindex = (rowindex[0],rowindex[1],rowindex_tfidf)
    new_row_tuple_array.append(new_tuple_rowindex)
  new_tuple_row = (word_variable,new_row_tuple_array)
  tf_idf.append(new_tuple_row)
  word_posting_list_tf_idf = sc.parallelize(tf_idf)
  
print(word_posting_list_tf_idf.take(1))

[('feb', [(4, 1, 0.014285714285714285), (5, 1, 0.03333333333333333), (1, 1, 0.014285714285714285), (2, 1, 0.04000000000000001), (3, 1, 0.06666666666666667)])]


In [None]:
# For obtaining the magnitude of each document.

docid_freq_tfidf_rdd3_array = []
for row in word_posting_list_tf_idf.collect():
  for rowindex in row[1]:
    # Magnitude
    new_tuple = (rowindex[1],(rowindex[2]*rowindex[2])) 
    rowindex = (rowindex[0],new_tuple)
  docid_freq_tfidf_rdd3_array.append(rowindex)
docid_freq_tfidf_rdd3 = sc.parallelize(docid_freq_tfidf_rdd3_array)
new_list1 = docid_freq_tfidf_rdd3.reduceByKey(lambda x,y: (max(x[0],y[0]),(float(x[1])+float(y[1]))))

doc_maxf_mag = sc.parallelize(new_list1.collect())

print(doc_maxf_mag.take(2))

[(4, (14, 2.7653061224489752)), (2, (5, 3.8700000000000014))]


In [None]:
# Saving the inverted index created as a text file
!rm -rf inv_idx
word_posting_list_tf_idf.saveAsTextFile("inv_idx");

In [None]:
# Saving the document magnitude created as a text file
!rm -rf doc_mag
doc_maxf_mag.saveAsTextFile("doc_mag");

In [None]:
!ls -lrt inv_idx
!head inv_idx/part-00001
!wc -l inv_idx/part-00000
!wc -l inv_idx/part-00001
!cat inv_idx/part-00000 inv_idx/part-00001 > /content/drive/MyDrive/inv_idx.txt
!wc -l /content/drive/MyDrive/inv_idx.txt

total 24
-rw-r--r-- 1 root root 9056 Apr  8 17:00 part-00000
-rw-r--r-- 1 root root 8543 Apr  8 17:00 part-00001
-rw-r--r-- 1 root root    0 Apr  8 17:00 _SUCCESS
('aug', [(1, 4, 0.2857142857142857)])
('sept', [(1, 5, 0.35714285714285715)])
('butter', [(1, 2, 0.14285714285714285)])
('april', [(1, 4, 0.2857142857142857)])
('may', [(1, 3, 0.21428571428571427)])
('dec', [(1, 6, 0.42857142857142855)])
('destinations', [(1, 1, 0.07142857142857142)])
('covertible', [(1, 1, 0.07142857142857142)])
('areas', [(1, 2, 0.14285714285714285)])
('uruguay', [(1, 2, 0.14285714285714285)])
198 inv_idx/part-00000
198 inv_idx/part-00001
396 /content/drive/MyDrive/inv_idx.txt


In [None]:
!ls -lrt doc_mag
!head doc_mag/part-00000
!wc -l doc_mag/part-00000
!wc -l doc_mag/part-00001
!cat doc_mag/part-00000 doc_mag/part-00001 > /content/drive/MyDrive/doc_mag.txt
!wc -l /content/drive/MyDrive/doc_mag.txt

total 8
-rw-r--r-- 1 root root 59 Apr  8 17:01 part-00000
-rw-r--r-- 1 root root 86 Apr  8 17:01 part-00001
-rw-r--r-- 1 root root  0 Apr  8 17:01 _SUCCESS
(2, (5, 3.7600000000000016))
(4, (14, 2.8010204081632613))
2 doc_mag/part-00000
3 doc_mag/part-00001
5 /content/drive/MyDrive/doc_mag.txt
