**Big Data and NLP: Inverted Index Database for 19,000 Reuters News Articles**

This project is about implementing an inverted index using Apache Spark（Pyspark）to build a relational database (SQLite) for 19,000 Reuters News Articles.

Jenny Yu

In [1]:
# Set up the environment

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

import findspark 
findspark.init("spark-2.4.4-bin-hadoop2.7")# SPARK_HOME

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
# Put input_docs.zip in Google Drive

!rm -rf input_docs

!cp /content/drive/My\ Drive/input_docs.zip .
!unzip input_docs.zip > /dev/null

# !cp /content/drive/My\ Drive/input_docs_sample.zip .
# !unzip input_docs_sample.zip > /dev/null

!ls input_docs/ | wc -l

# for the real collection change above input_docs_sample.zip to input_docs.zip
# for the sample collection of 5 docs, the process is fast
# for the real collection, the process takes about 6 min (start to finish, the whole notebook) 

19026


In [5]:
import nltk
nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

**1.Build the index using a document collection**

**Create an RDD from a text file**

Each line of the text file becomes an element of the RDD.

In [6]:
# wholeTextFiles generates an RDD of pair values, 
# where the key is the full path of each file, the value is the content of each file
# input = sc.wholeTextFiles("/content/drive/My\ Drive/input_docs");
input = sc.wholeTextFiles("input_docs");

# Now we strip the prefix of filenames and leave only the basename. 
# e.g. 'file:/content/drive/My Drive/Colab Notebooks/data_spark/input_docs/3.html'
# becomes '3.html' 
import os
from bs4 import BeautifulSoup

input2 = input.map(lambda x: (int(os.path.basename(x[0]).split(".")[0]), x[1]))

print(input2.take(1))


[(14372, "<H2> 7-APR-1987 12:42:50.60</H2>\r\n<H2>COCOA/COFFEE/SUGAR TRADE NEWS/MARKET COMMENT</H2>\r\nWest African\norigins believed near market, especially Ivory and Ghana. Ivory\nreported traded yesterday around 1,325 French Francs per 100\nkilos cif for new crop while Ghana was offering at 1,460 stg a\ntonne cif, also for new crop... Offtake small, restricted to\ndealer book-squaring ahead of buffer stock buying which\npossibly next week, some say... Ghana resale June/Aug 60 stg\nover Sep.\n    Cocoa comment/market talk - Terminal on defensive on firm\nsterling and overhanging West African new crop offers, but\nprospective buffer stock buying and lower Bahia Temperao crop\n(possibly down to 1.5 mln bags from 2.0/2.5 mln) constructive.\n    Sugar physical trade - Malta tendering for 3,000 tonnes\nwhites today for arrival in two equal parts in May and June. No\nnews by late afternoon... India in tomorrow for one/two cargoes\nwhites with Syria tendering same day for 36,000 tonnes whit

**Clean the text**

**Create RDD of （word，（docid，frqc，tf））**


In [7]:
# Doc to wordlist function
# The output will be a list of tuples such as 
# ("apple", (3,10,10/20)), 
# where 3 is docid, 
# 10 is frequency of "apple" in this doc, 
# 20 is maxf in in this doc.

# from bs4 import BeautifulSoup
from collections import Counter
import re

from nltk.corpus import stopwords
stop_words = set(stopwords.words('english'))

# for a given doc return a list of tuples of the form (w, (docid, freq, freq/maxfreq))
def dw(docid, htmltext):
  cleantext = BeautifulSoup(htmltext).get_text().lower()

  #returning result
  tokenizedText = re.findall(r"\b[a-z]+\b",cleantext)
  filteredToken = [w for w in tokenizedText if not w in stop_words]
  tokenCnt = Counter(filteredToken)
  maxf = tokenCnt.most_common(1)[0][1]

  wdft = dict()
  for t in filteredToken:
    frqc = tokenCnt[t]
    wdft[t] = (docid,frqc,frqc/maxf)
  return wdft

word_docid_freq_tf = input2.flatMap(lambda x: [(t,dw(x[0],x[1])[t]) for t in dw(x[0],x[1])])

**Create RDD of (word, (did,freq,tfidf) ）**

In [9]:
# Now create an RDD as follows 
# (word, [(did1,freq1,tf1), (did2,freq2,tf2), ...])
		    
word_postinglist_freq_tf = word_docid_freq_tf.groupByKey()

In [10]:
from pyspark.accumulators import AccumulatorParam
class ListParam(AccumulatorParam):
    def zero(self, v):
        return []
    def addInPlace(self, acc1, acc2):
        acc1.extend(acc2)
        return acc1

In [11]:
def file_read1(line):
    global list1 # Required otherwise the next line will fail
    list1 += [{"line":line}]
    return line

In [13]:
# (word, [(did,freq,tfidf), ...])
# We easily obtain idf as 1/len(postinglist_tf)
# idf = 1/len(postinglist_tf)
 
result = sc.accumulator([], ListParam())

def tfidf(term, x):
  df = len(list(x))
  l = []
  global result 
  for t in x:
    l.append((t[0],t[1],t[2]/df))
    result += [{"doc":t[0], "term":term, "score":t[2]/df}]
  return l

# creating RDD
word_postinglist_freq_tfidf = word_postinglist_freq_tf.map(lambda x: (x[0],tfidf(x[0],x[1])))


***Create RDD of (did,(freq,tfidf)) ***

In [14]:
# Now, we would like to obtain the magnitude of each doc.
# First, produce (did, (freq,tfidf)) for each word of doc did; 
# We do don't need the word itself, just its (freq,tfidf). 
# Then, do reduceByKey on these tuples and obtain maxfreq and 
# magnitude (squared) for each document. 

# RDD of (did,(freq,tfidf)) tuples

# creating RDD
did_freq_tfidfsq_rdd = word_postinglist_freq_tfidf.flatMap(lambda x: x[1]).map(lambda x: (x[0],(x[1],x[2]**2)))

# Produce (did,(maxf,magnitudesq))
# creating RDD
doc_maxf_mag = did_freq_tfidfsq_rdd.reduceByKey(lambda x,y:(max(x[0],y[0]),x[1]+y[1]))


In [15]:
!rm -rf inv_idx
word_postinglist_freq_tfidf.saveAsTextFile("inv_idx");

In [16]:
!rm -rf doc_mag
doc_maxf_mag.saveAsTextFile("doc_mag");

In [17]:
!ls -lrt inv_idx
!head inv_idx/part-00001
!wc -l inv_idx/part-00000
!wc -l inv_idx/part-00001
!cat inv_idx/part-00000 inv_idx/part-00001 > /content/drive/My\ Drive/inv_idx.txt
!wc -l /content/drive/My\ Drive/inv_idx.txt

total 37680
-rw-r--r-- 1 root root 17922381 Sep 16 07:17 part-00001
-rw-r--r-- 1 root root 20659357 Sep 16 07:17 part-00000
-rw-r--r-- 1 root root        0 Sep 16 07:17 _SUCCESS
('cocoa', [(14372, 2, 0.0032102728731942215), (1, 7, 0.0056179775280898875), (21525, 1, 0.0012484394506866417), (18383, 2, 0.007490636704119849), (9953, 10, 0.011235955056179775), (10742, 2, 0.011235955056179775), (14651, 2, 0.0056179775280898875), (17499, 7, 0.011235955056179775), (5168, 7, 0.011235955056179775), (16098, 2, 0.007490636704119849), (17568, 4, 0.011235955056179775), (12763, 2, 0.0056179775280898875), (6407, 3, 0.004815409309791332), (16148, 2, 0.0044943820224719105), (15653, 6, 0.011235955056179775), (15095, 10, 0.011235955056179775), (18014, 4, 0.011235955056179775), (10619, 2, 0.007490636704119849), (5192, 6, 0.011235955056179775), (8978, 3, 0.011235955056179775), (10613, 2, 0.007490636704119849), (6128, 5, 0.006242197253433209), (8961, 12, 0.007931262392597489), (20005, 20, 0.01123595505617977

In [None]:
!ls -lrt doc_mag
!head doc_mag/part-00000
!wc -l doc_mag/part-00000
!wc -l doc_mag/part-00001
!cat doc_mag/part-00000 doc_mag/part-00001 > /content/drive/My\ Drive/doc_mag.txt
!wc -l /content/drive/My\ Drive/doc_mag.txt

total 636
-rw-r--r-- 1 root root 324258 Apr  3 05:59 part-00001
-rw-r--r-- 1 root root 323297 Apr  3 05:59 part-00000
-rw-r--r-- 1 root root      0 Apr  3 05:59 _SUCCESS
(16050, (3, 0.019505547637100547))
(13228, (3, 0.05137016690348846))
(15060, (4, 0.10386443840740586))
(14950, (7, 0.22499457204331003))
(15210, (5, 0.10864092200016229))
(16480, (6, 0.028216833442731896))
(14264, (3, 0.009554588500264439))
(15320, (3, 0.000993183308447241))
(13986, (2, 0.0002859740790185993))
(15398, (4, 0.002872860074093733))
9500 doc_mag/part-00000
9526 doc_mag/part-00001
19026 /content/drive/My Drive/doc_mag.txt


**2.Create database tables for storing the inverted index**

**Create database **


In [18]:
import sqlite3
# connect database
conn = sqlite3.connect('Database.db')
print('Opened database successfully.');
c = conn.cursor()
c.execute('''DROP TABLE IF EXISTS postings ''')
c.execute(''' 
        CREATE TABLE IF NOT EXISTS postings
       (word VARCHAR(100) PRIMARY KEY     ,
       postinglist_freq_tfidf TEXT       ,
     
       
       REAL);''')
c.execute('''DROP TABLE IF EXISTS docmag ''')
c.execute(''' 
        CREATE TABLE IF NOT EXISTS docmag
       (docid INT PRIMARY KEY     NOT NULL,
       maxf INT     NOT NULL,
       mag FLOAT
      
       REAL);''')
print('Table created successfully.');
conn.commit()
conn.close()

Opened database successfully.
Table created successfully.


In [19]:
# store data to database

conn = sqlite3.connect('Database.db')
print('Opened database successfully.');
c = conn.cursor()

# delete content in database

sql_delete = """ DELETE FROM docmag"""
c.execute(sql_delete)

sql_delete = """ DELETE FROM postings"""
c.execute(sql_delete)


# look up content in database

sql_look = """ SELECT * FROM postings where word = 'jun'"""
c.execute(sql_look)
print(c.fetchall())


# insert data to database

with open("/content/drive/My Drive/doc_mag.txt") as f1:

    f11 = f1.readlines()

for x in f11:
  
   x.strip()
   x = re.findall(r'\d+\.?\d*', x)
   
   sql = """insert into docmag(docid,maxf,mag) values (?,?,?)"""
   c.execute(sql,[x[0],x[1],x[2]])

 
with open("/content/drive/My Drive/inv_idx.txt") as f2:   

    f22 = f2.readlines()
# first two columns 

for x in f22:
   x.strip()
   y = re.findall(r'\w+', x)
   y[0] = str(y[0])
   y[1] = str(y[1:])
   print(y[0]) 

   sql_1 = """insert into postings(word,postinglist_freq_tfidf) values (?,?)"""
   
   c.execute(sql_1,[y[0],y[1]])
 
conn.commit()
print('Records created successfully.');
conn.close()

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
redbook
ponderosa
wpm
superconductor
coradian
biosonics
rune
squander
contradiction
unreasonably
unjustifiably
invigorated
accomodation
suscription
leblanc
vermillion
perwira
phb
tlarf
reprogram
reprograms
doorley
roylaties
weirick
stufflebeam
vandalise
kml
upgradable
fco
versados
filtertek
machold
pfeizer
irv
goss
cpwr
gti
consul
flotations
pretend
manoeuvres
irrelevant
pitted
accompany
inititally
furnas
centrais
camilo
votinmg
eclipse
bangor
defaulted
crporation
races
extinct
uncontroversial
pal
sql
wasatch
terrier
frampton
isis
esd
seaborn
arrangment
sevices
cerprobe
crpb
tarzell
euphoric
retrenchment
zapf
reassessed
prise
dikb
airbags
occupant
spco
aggressitvely
fgi
productvitiy
cfs
comapny
financiera
nf
sparebanken
waccamaw
surfside
bgph
offerred
cryodynamics
roma
aver
twinjet
bewtween
seals
multimarket
brophy
fraudalent
suiker
nza
buttrose
decipher
wciin
confronting
junko
renard
particle
reagent
ovonic
spacecraft
en



**Implement the keyword search functionality.**
**Implement result ranking using the TF-IDF measure.**
**Implement a simple interface for giving keyword queries and showing results.**  

Search(Query, TopN): TopN_Documents

In [20]:
def tokenize(htmltext):
  cleantext = BeautifulSoup(htmltext).get_text().lower()

  #returning result
  tokenizedText = re.findall(r"\b[a-z]+\b",cleantext)
  filteredToken = [w for w in tokenizedText if not w in stop_words]
  tokenCnt = Counter(filteredToken)
  return tokenizedText


**Cosine Similarity Ranking based on TF-IDF **


In [22]:
import math
import re
def get_cosine(vec1, vec2):
    intersection = set(vec1.keys()) & set(vec2.keys())
    numerator = sum([vec1[x] * vec2[x] for x in intersection])

    sum1 = sum([vec1[x] ** 2 for x in list(vec1.keys())])
    sum2 = sum([vec2[x] ** 2 for x in list(vec2.keys())])
    denominator = math.sqrt(sum1) * math.sqrt(sum2)

    if not denominator:
        return 0.0
    else:
        return float(numerator) / denominator

In [25]:
tfidf_RDD = sc.parallelize(result.value).map(lambda x: (x['term'],(x['doc'],x['score']) )) # the corpus with tfidf scores

def search(query):
  tokens = sc.parallelize(tokenize(query.value)).map(lambda x: (x,1) ).collectAsMap()
  bcTokens = sc.broadcast(tokens)
  
  joined_tfidf = tfidf_RDD.map(lambda k: (k[0],bcTokens.value.get(k[0],'-'),k[1])).filter(lambda x: x[1] != '-' )
  new_rdd = joined_tfidf.countByKey()
  cosine_score = get_cosine(tokens,new_rdd)
  scount = joined_tfidf.map(lambda a: a[2]).aggregateByKey((0,0),
  (lambda acc, value: (acc[0] +value,acc[1]+1)),
  (lambda acc1,acc2: (acc1[0]+acc2[0],acc1[1]+acc2[1])) )
  
  scores = scount.map(lambda k: ( k[1][0]*k[1][1]/len(tokens), k[0]) ).top(3)
 
  if len(scores) == 1:
    print('The most relevant HTML ID is:', scores[0][1])
  elif len(scores) == 2:
    print('The most relevant HTML ID is:', scores[0][1])
    print('The 2nd most relevant HTML ID is:', scores[1][1])
  elif len(scores) >= 3:
    print('The most relevant HTML ID is:', scores[0][1])
    print('The 2nd most relevant HTML ID is:', scores[1][1])
    print('The 3rd most relevant HTML ID is:', scores[2][1])
  elif len(scores) == 0:
    print('String not found')

  return scores

In [28]:
# Text User Interface

import ipywidgets as widgets
from IPython.display import display
print("After putting in a word, hit enter at least AFTER 3 seconds. It may take time for the system to load.")
print("Allow at least 3 seconds between different word search. Your computer may be fast, but just in case.")
print("Have fun and thank you!")
print("Input is case sensitive.")
text = widgets.Text()
display(text)

text.on_submit(search)

After putting in a word, hit enter at least AFTER 3 seconds. It may take time for the system to load.
Allow at least 3 seconds between different word search. Your computer may be fast, but just in case.
Have fun and thank you!
Input is case sensitive.


Text(value='')

The most relevant HTML ID is: 11487
The 2nd most relevant HTML ID is: 18359
The 3rd most relevant HTML ID is: 1899
