# Imports

In [None]:
!gcloud dataproc clusters list --region us-central1

In [None]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

In [None]:
%cd -q /home/dataproc
!ls inverted_index_gcp.py
from inverted_index_gcp import InvertedIndex

In [None]:
# These will already be installed in the testing environment so disregard the 
# amount of time (~1 minute) it takes to install. 
!pip install -q pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
!pip install -q graphframes



import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import numpy as np
import pandas as pd
import math
from functools import reduce
from google.cloud import storage
from inverted_index_gcp import *


import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

# ''' code addition'''
# from flask import Flask, request, jsonify
# ''' code addition'''

nltk.download('stopwords')



# *PySpark*

In [None]:
!ls -l /usr/lib/spark/jars/graph*

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [None]:
spark

In [None]:
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())
spark = SparkSession.builder.getOrCreate()

# Suppot cluster

In [None]:
# Stopwords
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)
all_stopwords = english_stopwords.union(corpus_stopwords)

# Index Functions

In [None]:
# Calculating tf
def stemming(tokens):
    stemmer = PorterStemmer()
    return [stemmer.stem(x) for x in tokens]

def word_count_b(text, id):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    
    dict_counter= Counter(stem)
    new_list=[(k, (id,dict_counter[k])) for k in dict_counter if k not in all_stopwords]
    return new_list
def word_count_t_stem(text, id):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    stem = stemming(tokens)
    dict_counter = Counter(tokens)
    return [(k, (id,dict_counter[k])) for k in dict_counter if k not in all_stopwords]
# get list of (id, terms)
def doc_count_b(text, id):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    return [(id,tokens)]
    
# Calculating tf for title
def word_count_t(text, id):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    dict_counter = Counter(tokens)
    return [(k, (id,1)) for k in dict_counter if k not in all_stopwords]

# Sort posting list by wiki_id
def reduce_word_counts(unsorted_pl): return sorted(unsorted_pl, key=lambda k: k[0])

# Calculate df for each token in a posting list
def calculate_df(postings): return postings.map(lambda x: (x[0], len(x[1])))

#Write to the disk all posting lists locations
NUM_BUCKETS = 124
def token2bucket_id(token):
    return int(_hash(token),16) % NUM_BUCKETS
def partition_postings_and_write(postings,bucket_name):
    rd = postings.map(lambda x : (token2bucket_id(x[0]),(x[0],x[1])))
    rd = rd.groupByKey()
    return rd.map(lambda x: InvertedIndex.write_a_posting_list(x,bucket_name))




In [None]:
def tokenize_and_remove_sw(text):
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  return [x for x in tokens if x not in all_stopwords]


def calculate_DL(text, id):
  tokens = tokenize_and_remove_sw(text)
  return((id,len(tokens)))

# Creating Index

In [None]:
def createIndex(bucket_name, index_name):
    paths=[]
    client = storage.Client()
    bucket_name_title = bucket_name
    full_path = f"gs://{bucket_name}/"

    blobs = client.list_blobs(bucket_name)
    for b in blobs:
        if b.name.endswith("parquet"):
            paths.append(full_path+b.name)

    # Wikipidia
    parquetFile = spark.read.parquet(*paths)
    dict_title_id= parquetFile.select("id", "title").rdd
    if index_name == "anchor":
        doc_pairs = parquetFile.select("id",f"{index_name}_text").rdd 
    elif index_name == "body":
        doc_pairs = parquetFile.select("text", "id").rdd
    else:
        doc_pairs = parquetFile.select(f"{index_name}", "id").rdd

    if index_name == "anchor":
        united_anchor_text_corpus = doc_pairs.flatMap(lambda x :x[1]).groupByKey().mapValues(list).map(lambda x : (x[0]," ".join([y for y in x[1]])))
        word_counts = united_anchor_text_corpus.flatMap(lambda x: word_count_b(str(x[1]), x[0]))
   
    elif index_name == "body":
        word_counts = doc_pairs.flatMap(lambda x: word_count_b(x[0], x[1]))
    else:
         word_counts = doc_pairs.flatMap(lambda x: word_count_t(x[0], x[1]))
        
    postings = word_counts.groupByKey().mapValues(reduce_word_counts)
    
    dl_body_rdd = None
    doc_tf = None
    if index_name == "body":
        doc_term_body = doc_pairs.flatMap(lambda x: doc_count_b(x[0], x[1]))
        doc_term_body = doc_term_body.flatMap(lambda x: [(x[0], (y, x[1].count(y))) for y in set(x[1])])
        doc_term_body = doc_term_body.groupByKey()
        doc_term_body = doc_term_body.map(lambda x: (x[0], 1/(reduce(lambda a, b: a + b[1]**2, x[1], 0))))
        doc_term_body = doc_term_body.map(lambda x: (x[0], math.sqrt(x[1])))
        doc_tf = dict(doc_term_body.collect())
        
        postings = postings.filter(lambda x: len(x[1])>50)
        
        dl_body_rdd = doc_pairs.map(lambda x: calculate_DL(x[0], x[1]))
        

    w2df = calculate_df(postings)

    w2df_dict = w2df.collectAsMap()

    posting_locs_list = partition_postings_and_write(postings, bucket_name).collect()

    super_posting_locs = defaultdict(list)
    for blob in client.list_blobs(bucket_name, prefix='postings_gcp'):
        if not blob.name.endswith("pickle"):
            continue
        with blob.open("rb") as f:
            posting_locs = pickle.load(f)
            for k, v in posting_locs.items():
                super_posting_locs[k].extend(v)

    inverted = InvertedIndex()
    inverted.posting_locs=super_posting_locs
    inverted.df=w2df_dict
    inverted.title_dict = dict_title_id.collectAsMap()
    inverted.nf = doc_tf  
    inverted.dl = dl_body_rdd.collectAsMap()
    inverted.write_index('.',  f'index_{index_name}')
    index_src =  f"index_{index_name}.pkl"
    index_dst = f'gs://{bucket_name}/postings_gcp/{index_src}'
    !gsutil cp $index_src $index_dst


In [None]:
def createStemmedIndex(bucket_name, index_name):
    paths=[]
    client = storage.Client()
    bucket_name_title = bucket_name
    full_path = f"gs://{bucket_name}/"

    blobs = client.list_blobs(bucket_name)
    for b in blobs:
        if b.name.endswith("parquet"):
            paths.append(full_path+b.name)

    # Wikipidia
    parquetFile = spark.read.parquet(*paths)
    dict_title_id= parquetFile.select("id", "title").rdd
    doc_pairs = parquetFile.select("title", "id").rdd
    word_counts = doc_pairs.flatMap(lambda x: word_count_t_stem(x[0], x[1]))
    postings = word_counts.groupByKey().mapValues(reduce_word_counts)
    w2df = calculate_df(postings)
    w2df_dict = w2df.collectAsMap()
    posting_locs_list = partition_postings_and_write(postings, bucket_name).collect()
    
    super_posting_locs = defaultdict(list)
    for blob in client.list_blobs(bucket_name, prefix='postings_gcp'):
        if not blob.name.endswith("pickle"):
            continue
        with blob.open("rb") as f:
            posting_locs = pickle.load(f)
            for k, v in posting_locs.items():
                super_posting_locs[k].extend(v)
    
    inverted = InvertedIndex()
    inverted.posting_locs=super_posting_locs
    inverted.df=w2df_dict
    inverted.title_dict = dict_title_id.collectAsMap()
    inverted.write_index('.',  f'index_{index_name}')
    index_src =  f"index_{index_name}.pkl"
    index_dst = f'gs://{bucket_name}/postings_gcp/{index_src}'
    !gsutil cp $index_src $index_dst
    