In [None]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
from time import time
from pathlib import Path
import pickle
from google.cloud import storage
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
import math
import builtins 
from contextlib import closing

# configuration and setup
YOUR_PROJECT_BUCKET_NAME_STRING = 'final_project_ir_stav_hen_bucket' 
google_storage_client = storage.Client()
path_to_local_helper_file = "/hadoop/cms/jupyter/user_home/inverted_index_gcp.py"
if os.path.exists(path_to_local_helper_file):
    sys.path.insert(0, "/hadoop/cms/jupyter/user_home/")
else:
    list_of_blobs_in_bucket = list(google_storage_client.list_blobs(YOUR_PROJECT_BUCKET_NAME_STRING, prefix='inverted_index_gcp.py'))
    if len(list_of_blobs_in_bucket) > 0:
        blob_object_reference = list_of_blobs_in_bucket[0]
        blob_object_reference.download_to_filename('inverted_index_gcp.py')
        sys.path.insert(0, os.getcwd())
sc.addFile("inverted_index_gcp.py")
from inverted_index_gcp import InvertedIndex, MultiFileWriter

# tokenization-
import nltk
from nltk.corpus import stopwords
nltk.download('stopwords')
english_language_stopwords = frozenset(stopwords.words('english'))
corpus_specific_stopwords_list = ["category", "references", "also", "external", "links",
                                  "may", "first", "see", "history", "people", "one", "two",
                                  "part", "thumb", "including", "second", "following",
                                  "many", "however", "would", "became"]
combined_stopwords_set = english_language_stopwords.union(corpus_specific_stopwords_list)
REGEX_PATTERN_FOR_TOKENS = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

def extract_tokens_from_text_string(input_text_string):
    """Parses the text and returns a list of valid tokens after filtering stopwords."""
    list_of_raw_tokens = [token_match.group() for token_match in REGEX_PATTERN_FOR_TOKENS.finditer(input_text_string.lower())]
    return [token_str for token_str in list_of_raw_tokens if token_str not in combined_stopwords_set]

def calculate_term_frequencies_for_document(text_content, document_unique_id):
    """Tokenizes text and returns pairs of (token, (doc_id, tf))."""
    list_of_document_tokens = extract_tokens_from_text_string(text_content)
    term_frequency_counter = Counter(list_of_document_tokens)
    return [(token_str, (document_unique_id, term_count)) for token_str, term_count in term_frequency_counter.items()]

def sort_posting_list_tuples(unsorted_posting_list_iterable):
    """Sorts a posting list by document ID."""
    return sorted(list(unsorted_posting_list_iterable), key=lambda x: x[0])

def calculate_document_frequency(posting_entry_tuple):
    """Calculates in how many documents each term appears (DF)."""
    term_string, posting_list_data = posting_entry_tuple
    return (term_string, len(posting_list_data))

# storage logic
NUMBER_OF_BUCKETS_FOR_SHARDING = 124
import hashlib

def map_token_string_to_bucket_id(token_string):
    """Maps a token to a bucket ID for sharding."""
    return int(hashlib.blake2b(bytes(token_string, encoding='utf8'), digest_size=5).hexdigest(), 16) % NUMBER_OF_BUCKETS_FOR_SHARDING

SIZE_OF_TUPLE_IN_BYTES = 6       
BIT_MASK_FOR_TF = 2**16 - 1 

def write_data_to_gcp_robustly_handling_blob_errors(bucket_data_tuple):
    """
    Writes data to local worker disk first, then uploads to GCS.
    This bypasses the 'Blob.open' error on older GCP libraries.
    """
    bucket_id_integer, items_iterator = bucket_data_tuple
    local_temporary_directory_path = Path(f"/tmp/postings_gcp_{bucket_id_integer}")
    if not local_temporary_directory_path.exists():
        local_temporary_directory_path.mkdir(parents=True, exist_ok=True)
    dictionary_of_posting_locations = defaultdict(list)
    with closing(MultiFileWriter(local_temporary_directory_path, bucket_id_integer, bucket_name=None)) as local_file_writer:
        for term_string, posting_list_data in items_iterator: 
            binary_data = b''.join([(doc_id << 16 | (tf_count & BIT_MASK_FOR_TF)).to_bytes(SIZE_OF_TUPLE_IN_BYTES, 'big')
                          for doc_id, tf_count in posting_list_data])
            locations_written = local_file_writer.write(binary_data)
            dictionary_of_posting_locations[term_string].extend(locations_written)
    worker_node_storage_client = storage.Client()
    target_bucket_object = worker_node_storage_client.bucket(YOUR_PROJECT_BUCKET_NAME_STRING)
    for local_binary_file in local_temporary_directory_path.glob("*.bin"):
        remote_blob_path = f"postings_gcp/{local_binary_file.name}"
        blob_object = target_bucket_object.blob(remote_blob_path)
        blob_object.upload_from_filename(str(local_binary_file))
        local_binary_file.unlink() # Delete local file after upload to save space
    pickle_filename_string = f'{bucket_id_integer}_posting_locs.pickle'
    local_pickle_file_path = local_temporary_directory_path / pickle_filename_string
    with open(local_pickle_file_path, 'wb') as open_pickle_file:
        pickle.dump(dictionary_of_posting_locations, open_pickle_file)
    remote_pickle_blob = target_bucket_object.blob(f"postings_gcp/{pickle_filename_string}")
    remote_pickle_blob.upload_from_filename(str(local_pickle_file_path))
    local_pickle_file_path.unlink()
    try:
        local_temporary_directory_path.rmdir()
    except:
        pass
        
    return bucket_id_integer

def partition_and_write_postings_to_cloud_storage(rdd_of_filtered_postings):
    """Groups postings by bucket ID and writes them to GCP storage using the robust method."""
    rdd_mapped_to_buckets = rdd_of_filtered_postings.map(lambda x: (map_token_string_to_bucket_id(x[0]), (x[0], x[1])))
    rdd_grouped_by_bucket_id = rdd_mapped_to_buckets.groupByKey()
    return rdd_grouped_by_bucket_id.map(write_data_to_gcp_robustly_handling_blob_errors).collect()

# main execution
full_gcs_path_to_parquet = f"gs://{YOUR_PROJECT_BUCKET_NAME_STRING}/*.parquet"
blobs_check_list = list(google_storage_client.list_blobs(YOUR_PROJECT_BUCKET_NAME_STRING, prefix='wikidata20210801_preprocessed'))
if len(blobs_check_list) > 0:
     full_gcs_path_to_parquet = f"gs://{YOUR_PROJECT_BUCKET_NAME_STRING}/wikidata20210801_preprocessed/*.parquet"

print(f"Reading Wiki data from: {full_gcs_path_to_parquet}")
spark_dataframe_wiki_data = spark.read.parquet(full_gcs_path_to_parquet)

# Title Index
print("Step 1/4: Building Title Index (Mapping Doc IDs to Titles)")
rdd_ids_and_titles = spark_dataframe_wiki_data.select("id", "title").rdd
dictionary_id_to_title_map = rdd_ids_and_titles.collectAsMap()

# Save locally
with open('index_title.pkl', 'wb') as title_index_file:
    pickle.dump(dictionary_id_to_title_map, title_index_file)
google_storage_client.bucket(YOUR_PROJECT_BUCKET_NAME_STRING).blob('index_title.pkl').upload_from_filename('index_title.pkl')
print("Title Index saved successfully")

# Body Index
print("Step 2/4: Processing Body Text (Calculating TF, DF, and Norms)")
rdd_text_and_doc_id = spark_dataframe_wiki_data.select("text", "id").rdd

# Calculate TF
rdd_flat_word_counts = rdd_text_and_doc_id.flatMap(lambda row: calculate_term_frequencies_for_document(row[0], row[1]))

# Group by Word and Sort
rdd_grouped_postings = rdd_flat_word_counts.groupByKey().mapValues(sort_posting_list_tuples)

# Filter Rare Words
rdd_filtered_postings = rdd_grouped_postings.filter(lambda x: len(x[1]) > 50)

# Calculate Document Frequency
rdd_document_frequency_pairs = rdd_filtered_postings.map(calculate_document_frequency)
dictionary_term_document_frequency = rdd_document_frequency_pairs.collectAsMap()

def calculate_norm_for_single_doc(doc_pair_data):
    text_content, doc_id_val = doc_pair_data
    doc_tokens = extract_tokens_from_text_string(text_content)
    token_counter = Counter(doc_tokens)
    norm_value = math.sqrt(builtins.sum([freq_val**2 for freq_val in token_counter.values()]))
    return (doc_id_val, norm_value)

print("Step 3/4: Calculating Document Norms")
dictionary_document_norms = rdd_text_and_doc_id.map(calculate_norm_for_single_doc).collectAsMap()

# Save Norms
with open('index_norms.pkl', 'wb') as norms_file_obj:
    pickle.dump(dictionary_document_norms, norms_file_obj)
google_storage_client.bucket(YOUR_PROJECT_BUCKET_NAME_STRING).blob('index_norms.pkl').upload_from_filename('index_norms.pkl')
print("Document Norms saved successfully.")

# Write Index to Storage
print("Step 4/4: Writing Posting Lists to GCP Storage")
_ = partition_and_write_postings_to_cloud_storage(rdd_filtered_postings)

# Finalize and Save Index Object
print("Finalizing: Consolidating and Saving the InvertedIndex object")

defaultdict_super_posting_locs = defaultdict(list)
iterator_all_blobs = google_storage_client.list_blobs(YOUR_PROJECT_BUCKET_NAME_STRING, prefix='postings_gcp')

# Temporary file for robust downloading
temporary_download_filename = "temp_robust_download.pkl"

for blob_item in iterator_all_blobs:
    if not blob_item.name.endswith("pickle"):
        continue
    try:
        blob_item.download_to_filename(temporary_download_filename)
        with open(temporary_download_filename, "rb") as open_pickle_file:
            partial_posting_locs_data = pickle.load(open_pickle_file)
            for term_key, locations_list in partial_posting_locs_data.items():
                defaultdict_super_posting_locs[term_key].extend(locations_list)
    except Exception as e:
        print(f"Skipping blob {blob_item.name} due to error: {e}")
    finally:
        if os.path.exists(temporary_download_filename):
            os.remove(temporary_download_filename)

# Create the final InvertedIndex object
final_inverted_index_object = InvertedIndex()
final_inverted_index_object.posting_locs = defaultdict_super_posting_locs
final_inverted_index_object.df = dictionary_term_document_frequency
final_inverted_index_object.write_index('.', 'index_body')

# Upload the final object
local_index_filename = "index_body.pkl"
remote_destination_path = f'postings_gcp/{local_index_filename}'
google_storage_client.bucket(YOUR_PROJECT_BUCKET_NAME_STRING).blob(remote_destination_path).upload_from_filename(local_index_filename)

print("All indexes created and saved successfully")

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


Reading Wiki data from: gs://final_project_ir_stav_hen_bucket/*.parquet


                                                                                

Step 1/4: Building Title Index (Mapping Doc IDs to Titles)


                                                                                

Title Index saved successfully
Step 2/4: Processing Body Text (Calculating TF, DF, and Norms)


                                                                                

Step 3/4: Calculating Document Norms


                                                                                

Document Norms saved successfully.
Step 4/4: Writing Posting Lists to GCP Storage


                                                                                

Finalizing: Consolidating and Saving the InvertedIndex object
All indexes created and saved successfully
