# **Body Inverted Index Creation**
*Based on Assignment 3*

# Setup


## General imports

The `inverted_index_anchor_text_colab` import requires the `inverted_index_anchor_colab.py` file.

You should upload the file and then run this cell.

In [None]:
import sys
from collections import Counter, OrderedDict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import math
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from timeit import timeit
from pathlib import Path
import pickle
import pandas as pd
import numpy as np
from google.cloud import storage

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

from inverted_index_body_colab import *

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


## Installing, importing, and initializing PySpark


In [None]:
# These will already be installed in the testing environment so disregard the 
# amount of time (~1 minute) it takes to install. 
!pip install -q pyspark
!pip install -U -q PyDrive
!apt-get update -qq
!apt install openjdk-8-jdk-headless -qq
!pip install -q graphframes

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
graphframes_jar = 'https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.2-s_2.12/graphframes-0.8.2-spark3.2-s_2.12.jar'
spark_jars = '/usr/local/lib/python3.7/dist-packages/pyspark/jars'
!wget -N -P $spark_jars $graphframes_jar

[K     |████████████████████████████████| 281.3 MB 38 kB/s 
[K     |████████████████████████████████| 198 kB 69.8 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
The following additional packages will be installed:
  openjdk-8-jre-headless
Suggested packages:
  openjdk-8-demo openjdk-8-source libnss-mdns fonts-dejavu-extra
  fonts-ipafont-gothic fonts-ipafont-mincho fonts-wqy-microhei
  fonts-wqy-zenhei fonts-indic
The following NEW packages will be installed:
  openjdk-8-jdk-headless openjdk-8-jre-headless
0 upgraded, 2 newly installed, 0 to remove and 61 not upgraded.
Need to get 36.5 MB of archives.
After this operation, 143 MB of additional disk space will be used.
Selecting previously unselected package openjdk-8-jre-headless:amd64.
(Reading database ... 155225 files and directories currently installed.)
Preparing to unpack .../openjdk-8-jre-headless_8u312-b07-0ubuntu1~18.04_amd64.deb ...
Unpacking openjdk-8-jre-headless:amd64 (8u312-b07-0ubuntu1~18.04) 

In [None]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from graphframes import *

In [None]:
# Initializing spark context
# create a spark context and session
conf = SparkConf().set("spark.ui.port", "4050")
sc = pyspark.SparkContext(conf=conf)
sc.addPyFile(str(Path(spark_jars) / Path(graphframes_jar).name))
spark = SparkSession.builder.getOrCreate()

## Copy some wiki data

In [None]:
# Authenticate your user
# The authentication should be done with the email connected to your GCP account
from google.colab import auth
auth.authenticate_user()

In [None]:
# Copy one wikidumps files 
import os
from pathlib import Path
from google.colab import auth

project_id = 'core-period-321814'
!gcloud config set project {project_id}

data_bucket_name = 'wikidata_preprocessed'
try:
    if os.environ["wikidata_preprocessed"] is not None:
        pass  
except:
      !mkdir wikidumps
      !gsutil cp gs://{data_bucket_name}/multistream1_preprocessed.parquet "wikidumps/" 

Updated property [core/project].


To take a quick anonymous survey, run:
  $ gcloud survey

Copying gs://wikidata_preprocessed/multistream1_preprocessed.parquet...
| [1 files][316.7 MiB/316.7 MiB]                                                
Operation completed over 1 objects/316.7 MiB.                                    


# Processing wikipedia

Now that we completed the setup and have some data in our local environment, we are ready to process it using PySpark. 

## A 2-minute intro to PySpark

Let's look at our data before transforming it to RDD.

In [None]:
from pathlib import Path 
import os

try:
    if os.environ["wikidata_preprocessed"] is not None:
      path = os.environ["wikidata_preprocessed"]+"/wikidumps/*"
except:
      path = "wikidumps/*"

parquetFile = spark.read.parquet(path)
parquetFile.show()

+---+--------------------+--------------------+--------------------+
| id|               title|                text|         anchor_text|
+---+--------------------+--------------------+--------------------+
| 12|           Anarchism|'''Anarchism''' i...|[{23040, politica...|
| 25|              Autism|'''Autism''' is a...|[{492271, Clinica...|
| 39|              Albedo|thumb|upright=1.3...|[{679294, diffuse...|
|290|                   A|'''A''', or '''a'...|[{290, See below}...|
|303|             Alabama|'''Alabama''' () ...|[{351590, Yellowh...|
|305|            Achilles|thumb|260px|Ancie...|[{1076007, potter...|
|307|     Abraham Lincoln|'''Abraham Lincol...|[{1827174, Alexan...|
|308|           Aristotle|'''Aristotle''' (...|[{1389981, bust},...|
|309|An American in Paris|'''''An American ...|[{13066, George G...|
|316|Academy Award for...|The '''Academy Aw...|[{39842, Academy ...|
|324|      Academy Awards|The '''Academy Aw...|[{649481, film in...|
|330|             Actrius|'''''Act

In [None]:
# take the 'anchor_text' and 'id' or the first 1000 rows and create an RDD from it
doc_text_pairs = parquetFile.limit(1000).select("text", "id").rdd
doc_title_pairs = parquetFile.limit(1000).select("id", "title").rdd

## Word counts

### Tokenization

In [None]:
import pickle
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ['category', 'references', 'also', 'links', 'extenal', 'see', 'thumb']
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)
all_stopwords = english_stopwords.union(corpus_stopwords)

### Term frequency

In [None]:
def word_count(text, id):
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  filtered_tokens = [tok for tok in tokens if tok not in all_stopwords]
  count = Counter(filtered_tokens)
  return [(k,(id,v)) for k,v in count.items()]

In [None]:
word_counts_body = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1]))

### Reduce word counts

In [None]:
def reduce_word_counts(unsorted_pl):
  return sorted(unsorted_pl)

In [None]:
postings_body = word_counts_body.groupByKey().mapValues(reduce_word_counts)

### Document frequency

In [None]:
def calculate_df(postings):
  return postings.mapValues(lambda x: len(x))

In [None]:
# create a df dictionary {w: df, ...}
postings_body_filtered = postings_body.filter(lambda x: len(x[1]) > 50)
df_dictionary = calculate_df(postings_body_filtered).collectAsMap()

### Document title

In [None]:
def doc_title_mapping_creator(id, title):
  return [(id,title)]

In [None]:
# create docID - title dictionary
doc_title_dicionary = doc_title_pairs.flatMap(lambda x: doc_title_mapping_creator(x.id, x.title)).collectAsMap()

### Document length

In [None]:
def doc_to_len_mapping_creator(text, id):
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  filtered_tokens = [tok for tok in tokens if tok not in all_stopwords]
  return [(id, len(filtered_tokens))]

In [None]:
doc_to_len_dictionary = doc_text_pairs.flatMap(lambda x: doc_to_len_mapping_creator(x[0], x[1])).collectAsMap()

## IDF

In [None]:
def calculate_idf(postings):
  N = doc_text_pairs.count()
  pairs = postings.map(lambda x: (x[0], len(x[1])))
  idf_calc = pairs.map(lambda x :(x[0], math.log((N / x[1]) , 10)))
  return idf_calc

In [None]:
idf_dictionary = calculate_idf(postings_filtered).collectAsMap()

## Weighted term frequency

In [None]:
def dominator_creator(text, id):
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  filtered_tokens = [tok for tok in tokens if tok not in all_stopwords]
  doc_len = len(filtered_tokens)
  wij = {} 
  sum = 0
  for token in filtered_tokens:
    if token in wij:
      wij[token] += 1
    else:
      wij[token] = 1
  
  for key in wij:
    sum +=math.pow(wij[key] / doc_len,2)
  return [(id,sum)]

In [None]:
# dictionary mapping doc id - doc's sum weight of words in it powered by 2
dominator_dictionary = doc_text_pairs.flatMap(lambda x: dominator_creator(x[0], x[1])).collectAsMap()

## Partitioning and writing the index

In [None]:
NUM_BUCKETS = 124
def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS

def partition_postings_and_write(postings):
  posting_with_bucket = postings.map(lambda x: (token2bucket_id(x[0]), (x[0],x[1])))
  b_w_pl = posting_with_bucket.groupByKey().map(lambda x : (x[0], list(x[1])))
  posting_locs_dict_rdd = b_w_pl.map(InvertedIndex.write_a_posting_list)
  return posting_locs_dict_rdd

In [None]:
# partitioning for the different buckets
posting_locs_list = partition_postings_and_write(postings_filtered).collect()

In [None]:
# merge the posting locations into a single dict
super_posting_locs = defaultdict(list)
for posting_loc in posting_locs_list:
  for k, v in posting_loc.items():
    super_posting_locs[k].extend(v)

Putting everything together (posting locations, df) and test that the resulting index is correct. 

In [None]:
# Create inverted index instance
inverted = InvertedIndex()

# Adding the posting locations dictionary to the inverted index
inverted.posting_locs = super_posting_locs

# Add the token - df dictionary to the inverted index
inverted.df = df_dictionary

# Add the idf dictionary to the inverted index
inverted.idf = idf_dictionary

# Add the doc_id - title dictionary to the inverted index
inverted.doc_title_mapping = doc_title_dictionary

# Add the doc_id - length dictionary to the inverted index
inverted.doc_len_mapping = doc_len_dictionary

# Add the dominator dictionary to the inverted index
inverted.dominator_mapping = dominator_dictionary

# write the global stats out
inverted.write_index('.', 'body_index')

In [None]:
TUPLE_SIZE = 6       
TF_MASK = 2 ** 16 - 1 # Masking the 16 low bits of an integer
from contextlib import closing

def read_posting_list(inverted, w):
  with closing(MultiFileReader()) as reader:
    locs = inverted.posting_locs[w]
    b = reader.read(locs, inverted.df[w] * TUPLE_SIZE)
    posting_list = []
    for i in range(inverted.df[w]):
      doc_id = int.from_bytes(b[i*TUPLE_SIZE:i*TUPLE_SIZE+4], 'big')
      tf = int.from_bytes(b[i*TUPLE_SIZE+4:(i+1)*TUPLE_SIZE], 'big')
      posting_list.append((doc_id, tf))
    return posting_list

In [None]:
# pl = read_posting_list(inverted, 'python')

In [None]:
# ! zip postings_anchor.zip *.bin