# Project Pre-Processing : Creating Inverted Indexes for anchor, text and title of wikipedia files

## Overview

The goal of this notebook is to process the necessary inverse index in such a way that we can eventually store it in our GCP bucket without having to read the index again, thus saving space and runtime.
The processing will be on any Wikipedia file that we have already saved in bucket, on which we will perform all kinds of operations in order to create a general structure of (token, (doc_id, tf)). This structure is the basis by which we can read the posting list into the bucket, save them in binary files and then read them back directly from the bucket.
In the end, each of them will have their own dictionary.
After that we will create a new folder that will basically contain the latest dictionary from all indexes, its bin files and the new dictionary that will be in the pkl file.

We can later read the pkl file from the bucket and use it to perform retrieval operations, answer queries and calculate metrics.



# Setup

## Cells relevant to cluster creation and working with GCP directly



In [None]:
# if the following command generates an error, you probably didn't enable
# the cluster security option "Allow API access to all Google Cloud services"
# under Manage Security → Project Access when setting up the cluster
!gcloud dataproc clusters list --region us-central1

## General imports

The `inverted_index_gcp` import requires the `inverted_index_gcp.py` file. You should upload the file and then run this cell.

In [None]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

In [None]:
import sys
from collections import Counter, OrderedDict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from timeit import timeit
from pathlib import Path
import pickle
import pandas as pd
import numpy as np
from google.cloud import storage
import builtins
import math

stemmer = PorterStemmer()

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')



## Installing, importing, and initializing PySpark


In [None]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from graphframes import *


In [None]:
spark


## Checking inverted_index_gcp is in our dataproc. If it's not there, upload it.



In [None]:
# if nothing prints here you forgot to upload the file inverted_index_gcp.py to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp.py
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())
from inverted_index_gcp import InvertedIndex

## Load the wiki dump files directly from the bucket into one big corpus

In [None]:
bucket_name = '315537936'
full_path = f"gs://{bucket_name}/"
paths = []

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name.endswith('.parquet') and b.name != 'graphframes.sh':
        paths.append(full_path + b.name)

corpus = spark.read.parquet(*paths)

                                                                                

## After GCP setup completed, we can start creating wanted Inverted Indexes

First of all, we will check the number of files in the corpus, just like we have seen in HW3, the count needs to be more than 6M

In [None]:
# Count number of wiki pages
corpus_size = corpus.count()

                                                                                

In [None]:
#CHECKING
print(corpus_size)

6348910


First functions will be related to count words and sorting them.
Unlike HW3, we modified this functions in order to be able to do phrasing  (in some cases, not in all cases phrasing is needed)- the main goal is to decrease the number of words to search later during query.
We will create three methods of stemming and we will apply them mainly on the body text index.
In title and anchor text, stemming is less common because there is a significance to the original text.For example, An article "Studies" and "Stud" is not the same article.   

The methods are -
* stemming by porter's

* N-gram (N=2,N=3)




Furthermore, we will use later CosSim function. We will calculate for each document the denominator for all relavant words

In [None]:
CORPUS_SIZE = 6348910
import math
from nltk.util import ngrams

english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ['category', 'references', 'also', 'links', 'extenal', 'see', 'thumb']
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

all_stopwords = english_stopwords.union(corpus_stopwords)

def word_count(id, text, stem=False, n2gram=False, n3gram=False):
    """
    Count the frequency of each word in `text` (tf) that is not included in
    `all_stopwords` and return entries that will go into our posting lists.
    There is an option of stemming, 2-gram, 3-gram, or none stemming on the text body.

    Parameters:
    -----------
    text: str
        Text of one document
    id: int
        Document id
    stem: boolean
        Option for stemming
    n2gram: boolean
        Option for 2-gram
    n3gram: boolean
        Option for 3-gram

    Returns:
    --------
    List of tuples
        A list of (token, (doc_id, tf)) pairs,
        for example: [("Anarchism", (12, 5)), ...]. Token can be stemmed or not depending on the boolean parameters
    """
    # Tokenization and stemming
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    if stem:
        stemmer = PorterStemmer()
        tokens = [stemmer.stem(term) for term in tokens if term not in all_stopwords]

    # N-grams
    if n2gram or n3gram:
        if len(tokens) < 2:
            return []

        if n2gram:
            bigram = list(ngrams(tokens, 2))
            tokens += [' '.join(b) for b in bigram]

        if n3gram:
            trigram = list(ngrams(tokens, 3))
            tokens += [' '.join(t) for t in trigram]

    # Counting frequencies
    counter = Counter(tokens)
    result = []
    # Filtering out stopwords and preparing result
    result = [(token, (id, tf)) for token, tf in counter.items() if token not in all_stopwords]

    return result

def cal_len_tokens(id, text, stem=False, n2gram=False, n3gram=False):
    """
    Count the frequency of each word in `text` (tf) that is not included in
    `all_stopwords` and return entries that will go into our posting lists.
    There is an option of stemming, 2-gram, 3-gram, or none stemming on the text body.
    Afterwards, calculate the number of tokens

    Parameters:
    -----------
    text: str
        Text of one document
    id: int
        Document id
    stem: boolean
        Option for stemming
    n2gram: boolean
        Option for 2-gram
    n3gram: boolean
        Option for 3-gram

    Returns:
    --------
    List of tuples (doc_id, length) for each document
    """
    # Tokenization and stemming
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    if stem:
        stemmer = PorterStemmer()
        tokens = [stemmer.stem(term) for term in tokens if term not in all_stopwords]

    # N-grams
    if n2gram or n3gram:
        if len(tokens) < 2:
            return []

        if n2gram:
            bigram = list(ngrams(tokens, 2))
            tokens += [' '.join(b) for b in bigram]

        if n3gram:
            trigram = list(ngrams(tokens, 3))
            tokens += [' '.join(t) for t in trigram]

    # Counting frequencies
    counter = Counter(tokens)
    result = [(token, (id, tf)) for token, tf in counter.items() if token not in all_stopwords]
    result_tokens = [token for token, _ in result]
    results = [(id, len(result_tokens))]

    return results


def reduce_word_counts(unsorted_pl):
    ''' Returns a sorted posting list by wiki_id.
    Parameters:
    -----------
        unsorted_pl: list of tuples
            A list of (wiki_id, tf) tuples
    Returns:
    --------
        list of tuples
            A sorted posting list.
    '''
    sorted_pl = sorted(unsorted_pl, key=lambda x: x[0])
    return sorted_pl

def word_count_anchor(data_tuple):
    ''' Count the frequency of each word in `text` (tf) that is not included in
    `all_stopwords` and return entries that will go into our posting lists.
     In anchor - we don't want to so stemming.
    Parameters:
    -----------
      data_tuple: tuple
        A tuple containing (id, (doc_id, text))
    Returns:
    --------
      List of tuples
        A list of (doc_id, (token, tf)) pairs
        for example: [(12, ('Anarchism', 5)), ...]
    '''
    id, (doc_id, text) = data_tuple

    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    counter = Counter(tokens)
    result = [(id, (token, tf)) for token, tf in counter.items() if token not in all_stopwords]
    return result


def calc_tfidf_normalized(doc_id, text, df_corpus, corpus_size, stem=False, n2gram=False, n3gram=False):
    # Calculate the word counts using the word_count function
    word_counts = word_count(doc_id, text, stem, n2gram, n3gram)
    # Extracting (token, tf) pairs from the result of word_count
    tokens = [token for token, (_, tf) in word_counts]
    words_counter = Counter(tokens)
    tokens_length = len(tokens)

    # Calculate the normalized TD-IDF size - it will be normalized by the tokens_length
    size = builtins.sum([(count /tokens_length * math.log2(corpus_size/df_corpus[word]))**2 for word, count in words_counter.items() if word in df_corpus])


    normalized_tfidf_size = math.sqrt(size)


    return (doc_id, normalized_tfidf_size)

Now it will be functions that help us to recieve wanted RDD startctue, using MapReduce

In [None]:
def calculate_term_total(posting):
    """Calculate the term total for each posting from the corpus """
    return posting.map(lambda x: (x[0],builtins.sum([i[1] for i in x[1]])))

def calculate_df(posting):

    """
    calculate doc frequency of each posting term from the corpus
    """

    return posting.map(lambda x: (x[0], len(x[1])))

def calculate_doc_length(corpus, column, stem, n2gram, n3gram):
    """Calculate the document length for each one in the corpus"""
    doc_pairs = corpus.select("id", column).rdd
    all_tokens_rdd = doc_pairs.flatMap(lambda x: cal_len_tokens(x[0], x[1], stem, n2gram, n3gram))
    token_length = all_tokens_rdd.collect()
    token_length_dict = dict(token_length)

    return token_length_dict


def tf_idf_nz(corpus, column, w2df_dict,corpus_size, stem, n2gram, n3gram):
    """Calculate the tf-idf normalized for each one in the corpus"""
    doc_pairs = corpus.select("id", column).rdd
    tf_score_df = doc_pairs.map(lambda x: calc_tfidf_normalized(x[0], x[1], w2df_dict, corpus_size, stem, n2gram, n3gram))
    token_score = tf_score_df.collect()
    token_length_dict = dict(token_score)


    return token_length_dict



def create_posting_list_for_text_or_title(corpus, column_name, filter_number=50, stem=False, n2gram=False, n3gram=False):
    # [(12, 'Anarchism'), (25, 'Autism'), (39, 'Albedo')....]
    doc_text_pairs = corpus.limit(600000).select("id", column_name).rdd.map(lambda r: (r['id'], r[column_name]))
    #[('anarchism', (12, 1)), ('autism', (25, 1)), ('albedo', (39, 1))....]
    all_text_tuples_rdd = doc_text_pairs.flatMap(lambda x: word_count(x[0], x[1], stem, n2gram, n3gram))
    #[('autism', [(25, 1)]), ('abraham', [(307, 1), (1436, 1), (2851, 1)])...]
    posting_text = all_text_tuples_rdd.groupByKey().mapValues(reduce_word_counts)
    if column_name == "text":
        posting_text = posting_text.filter(lambda x: len(x[1]) > filter_number)
    return posting_text


def create_posting_list_for_anchor(corpus):
    #[(12, [Row(id=23040, text='political philosophy'), Row(id=99232, text='movement')...]
    doc_anchor_pairs = corpus.select("anchor_text", "id").rdd.map(lambda r: (r['id'], r['anchor_text']))
    #[(12, (23040, 'political philosophy')), (12, (99232, 'movement'))...]
    transformed_data = doc_anchor_pairs.flatMap(lambda x: [(x[0], (row.id, row.text)) for row in x[1]])
    #[(12, ('political', 1)), (12, ('philosophy', 1)), (12, ('movement', 1)...]
    another_transformation = transformed_data.flatMap(word_count_anchor)
    #[(12, 'political'), (12, 'philosophy'), (12, 'movement'), (12, 'authority')..]
    tokens_rdd = another_transformation.map(lambda x: (x[0], x[1][0]))
    #[((12, 'social'), 3), ((12, 'economics'), 1), ((12, 'collectivism'), 2)...]
    result_rdd = tokens_rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
    #[(12, ('social', 3)), (12, ('economics', 1)), (12, ('collectivism', 2))...]
    posting_anchor_before = result_rdd.map(lambda x: (x[0][0], (x[0][1], x[1])))
    #[('social', (12, 3)), ('economics', (12, 1)), ('collectivism', (12, 2))...]
    transformed_anchor = posting_anchor_before.map(lambda x: (x[1][0], (x[0], x[1][1])))
    #[('teaching', [(1148, 1), (1930, 1), (1938, 1), (3464, 1), (3747, 1), (4157, 1), (4501, 1), (4868, 3)....]
    posting_anchor = transformed_anchor.groupByKey().mapValues(reduce_word_counts)
    return posting_anchor


Functions related to how to write and read an InvertedIndex and how to read a tuple of (doc_id,tf) from a bin file which is in the bucket.
This functions may be useful if we would like to write the posting lists locally.
Afterwards we will read it directly from InvertedIndex instance.


In [None]:
from google.cloud import storage
from google.cloud.exceptions import NotFound, GoogleCloudError
import pickle
from io import BytesIO
from pathlib import Path
from itertools import groupby

NUM_BUCKETS=124

def token2bucket_id(token):
    return int(_hash(token), 16) % NUM_BUCKETS

def extract_and_write_postings(posting_rdd,base_dir, bucket_name):
    # Create directories if they don't exist
    Path(base_dir).mkdir(parents=True, exist_ok=True)

    # Partition Posting Lists into Buckets
    posting_into_buckets = posting_rdd.map(lambda x: (token2bucket_id(x[0]), x))
    grouped_buckets = posting_into_buckets.groupByKey()
    # All tokens will be in the same bucket and not in seperate - bucket,list(token,location)
    flattened_buckets = grouped_buckets.map(lambda x: (x[0], list([(word, locations) for word, locations in x[1]])))
    # Write Posting Lists to Disk so it will be (token,location)
    posting_locations_for_bucket = flattened_buckets.map(lambda x: InvertedIndex.write_a_posting_list((x[0], list(x[1])), base_dir, bucket_name))

    # Collect bucket IDs
    bucket_ids = posting_locations_for_bucket.collect()

    # Merge them locally
    merged_posting_locs = {}
    for bucket_id in bucket_ids:
        data = InvertedIndex.read_index(base_dir, f'{bucket_id}_posting_locs', bucket_name)
        if data is not None:
            for key, value in data.items():
                if key in merged_posting_locs:
                    merged_posting_locs[key].extend(value)
                else:
                    merged_posting_locs[key] = value


    return merged_posting_locs



In the end, when we create an InvertedIndex on GCP, we would like to store in it many dictionaries that are nececcery for useful retrieval.
Some of them -

1) Posting Locs

2) (token,df)

3) (token.term_total)

4) (token, normalized score for CosSim)

**NOTICE - posting locations directly we will be able to read with instance of InvertedIndex**



In [None]:
def create_index_for_text_or_title(corpus, column_name,kind_of_index, stem=False, n2gram=False, n3gram=False):
    t_start = time()

    #two_new_lists that will help us to retreival it faster during query
    final_posting_locs_rdd = create_posting_list_for_text_or_title(corpus,column_name,stem,n2gram,n3gram)
    final_posting_locs = extract_and_write_postings(final_posting_locs_rdd,f"{kind_of_index}",bucket_name)
    term_total = calculate_term_total(final_posting_locs_rdd).collectAsMap()
    term_frequency = calculate_df(final_posting_locs_rdd).collectAsMap()
    term_frequency_counter = Counter(term_frequency)
    documents_length = calculate_doc_length(corpus,column_name,stem,n2gram,n3gram)
    documents_normalized_length = tf_idf_nz(corpus, column_name, term_frequency_counter,CORPUS_SIZE, stem, n2gram, n3gram)
    index = InvertedIndex()
    index.posting_locs = final_posting_locs
    index.term_total = term_total
    index.df = term_frequency
    index.document_length = documents_length
    index.normalized_length = documents_normalized_length
    index.write_index(f"{kind_of_index}", f"{kind_of_index}_index",bucket_name)

    # we need to upload it to our bucket
    index_const_time = time() - t_start
    print(f"index_time = {index_const_time}")


def create_index_for_anchor(corpus,kind_of_index):
    t_start = time()

    #two_new_lists that will help us to retreival it faster during query
    final_posting_locs_rdd = create_posting_list_for_anchor(corpus)
    final_posting_locs = extract_and_write_postings(final_posting_locs_rdd,f"{kind_of_index}",bucket_name)
    term_total = calculate_term_total(final_posting_locs).collectAsMap()
    term_frequency = calculate_df(final_posting_locs).collectAsMap()

    index = InvertedIndex()
    index.posting_locs = final_posting_locs
    index.term_total = term_total
    index.df = term_frequency
    index.write_index(f"{kind_of_index}", f"{kind_of_index}_index",bucket_name)

    # we need to upload it to our bucket
    index_const_time = time() - t_start
    print(f"index_time = {index_const_time}")






**Creating the indexes**

1) Text Body without stemming, with stemming, with n-gram and with n-gram
   and stemming

2) Title Body without stemming, with stemming, with n-gram and with n-gram
   and stemming

3) Anchor


**Title**

In [None]:
title_index = create_index_for_text_or_title(corpus, "title","title", stem=False, n2gram=False, n3gram=False)

In [None]:
title2gram_index = create_index_for_text_or_title(corpus, "title","title2gram", stem=False, n2gram=True, n3gram=False)

In [None]:
title_stemmed = create_index_for_text_or_title(corpus, "title","title_stem", stem=True, n2gram=True, n3gram=False)

In [None]:
title_stemmed_only = create_index_for_text_or_title(corpus, "title","title_stem_only", stem=True, n2gram=False, n3gram=False)

In [None]:
anchor_index = create_index_for_anchor(corpus, "anchor")

In [None]:
body_index = create_index_for_text_or_title(corpus, "text","text", stem=False, n2gram=False, n3gram=False)

In [None]:
body_stem = create_index_for_text_or_title(corpus, "text","text_stem", stem=True, n2gram=False, n3gram=False)

In [None]:
body_2gram = create_index_for_text_or_title(corpus, "text","text_2gram", stem=False, n2gram=True, n3gram=False)

In [None]:
body_stem2gram = create_index_for_text_or_title(corpus, "text","text_stem2gram", stem=True, n2gram=True, n3gram=False)

Now let's read the pkl file and see the dictionaries acceptable by O(1) retreival




**Other function that help us to store another pkl files that we may use them later - {doc:id,title} dict, page rank dict, page views dict**

## Creating {doc_id:title} dict



In [None]:
doc_title_rdd = corpus.select("id", "title").rdd
doc_title_dict = doc_title_rdd.map(lambda row: (row.id, row.title)).collectAsMap()

# Writing to GCS using google-cloud-storage
client = storage.Client()
bucket = client.get_bucket("315537936")
blob = bucket.blob("doc_title_dict.pkl")
with blob.open("wb") as pkl_file:
    pickle.dump(doc_title_dict, pkl_file)

                                                                                

## PageRank

In [None]:
def generate_graph(pages):
  ''' Compute the directed graph generated by wiki links.
  Parameters:
  -----------
    pages: RDD
      An RDD where each row consists of one wikipedia articles with 'id' and
      'anchor_text'.
  Returns:
  --------
    edges: RDD
      An RDD where each row represents an edge in the directed graph created by
      the wikipedia links. The first entry should the source page id and the
      second entry is the destination page id. No duplicates should be present.
    vertices: RDD
      An RDD where each row represents a vetrix (node) in the directed graph
      created by the wikipedia links. No duplicates should be present.
  '''
  # #{id=1,anchor_text={1,'tokens...'},{2,'tokens',....}}-> {(1,1),(1,2),(1,3)}
  pair_list = pages.flatMap(lambda x: [(x[0], entry[0]) for entry in x[1]])

  # edges need to be unique by pairs
  #if (1,3) exists (3,1) don't
  edges = pair_list.distinct()

  # vertices need to be unique by individual
  # final map in order that it will fit for dataframe
  vertices = pair_list.flatMap(lambda x: [x[0], x[1]]).distinct().map(lambda x: (x,))

  return edges, vertices


In [None]:
pages_links = corpus.select("id", "anchor_text").rdd
edges, vertices = generate_graph(pages_links)

v_cnt, e_cnt = vertices.count(), edges.count()
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=5)
pr = pr_results.vertices.select("id", "pagerank")
pr = pr.sort(col('pagerank').desc())



In [None]:
page_rank_rdd = pr.rdd.map(lambda row: (row['id'], row['pagerank']))
pr_dict = page_rank_rdd.collectAsMap()


# Writing to GCS using google-cloud-storage
client = storage.Client()
bucket = client.get_bucket("315537936")
blob = bucket.blob("pagerank.pkl")
with blob.open("wb") as pkl_file:
    pickle.dump(pr_dict, pkl_file)

##Page Views

In [None]:
pv_path = 'https://dumps.wikimedia.org/other/pageview_complete/monthly/2021/2021-08/pageviews-202108-user.bz2'
p = Path(pv_path)
pv_name = p.name
pv_temp = f'{p.stem}-4dedup.txt'
pv_clean = f'{p.stem}.pkl'

!wget -N $pv_path
!bzcat $pv_name | grep "^en\.wikipedia" | cut -d' ' -f3,5 | grep -P "^\d+\s\d+$" > $pv_temp

wid2pv = Counter()
with open(pv_temp, 'rt') as f:
    for line in f:
        parts = line.split(' ')
        wid2pv.update({int(parts[0]): int(parts[1])})

with open(pv_clean, 'wb') as f:
    pickle.dump(wid2pv, f)

In [None]:

# Writing to GCS using google-cloud-storage
client = storage.Client()
bucket = client.get_bucket("315537936")
blob = bucket.blob("pageviews.pkl")
with blob.open("wb") as pkl_file:
    pickle.dump(wid2pv, pkl_file)