# IR Project GCP Helper for Creating Indexes and more #

In [13]:
!gcloud dataproc clusters list --region us-central1

NAME          PLATFORM  PRIMARY_WORKER_COUNT  SECONDARY_WORKER_COUNT  STATUS   ZONE           SCHEDULED_DELETE
cluster-6ed3  GCE       3                                             RUNNING  us-central1-a


# Imports & Setup

In [14]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

[0m

In [15]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage


from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()
def PRINT(text) -> None: print(f'{"-"*80}\n{text}\n{"-"*80}')

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [16]:
!ls -l /usr/lib/spark/jars/graph*

-rw-r--r-- 1 root root 247882 Mar  9 17:13 /usr/lib/spark/jars/graphframes-0.8.2-spark3.1-s_2.12.jar


In [42]:
spark = SparkSession.builder.appName("YourAppName").config("spark.driver.maxResultSize", "4g").getOrCreate()

In [18]:
bucket_name = 'bucket_for_index_generation' 
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name != 'graphframes.sh':
        paths.append(full_path+b.name)

## Calculate Documents L2 Normalization ##

In the next step we calculate L2 Norm for each document text.

The ending result will be dictionary which maps -> (key, value) to (doc_id, doc_l2_norm_value)

We will need that in order to preform CosinSimilarity in the next steps of the project

In [11]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links",
                            "may", "first", "see", "history", "people", "one", "two",
                            "part", "thumb", "including", "second", "following",
                            "many", "however", "would", "became"]
special_words = ['3d', '4k', 'ip', 'js', 'ai', 'vr', 'ar', 'dl', 'ml', '09', '11', '9']
all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){1,24}""", re.UNICODE)
stemmer = PorterStemmer()

In [63]:
text = "is a very ad fun way Q-system (genetics) 3d the 1997 22"

print(tokenize(text))

['veri', 'ad', 'fun', 'way', 'q-system', 'genet', '3d', '1997', '22']


In [62]:
english_stopwords = frozenset(stopwords.words('english'))
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){1,24}""", re.UNICODE)
stemmer = PorterStemmer()

def tokenize(text):
    clean_text = []
    text = text.lower()
    for token in RE_WORD.finditer(text):
        stemmed_token = stemmer.stem(token.group())
        if stemmed_token not in all_stopwords:
            clean_text.append(stemmed_token)
    return clean_text

In [11]:
def tokenize(text, to_stem):
    clean_text = []
    text = text.lower()
    tokens = [token.group() for token in RE_WORD.finditer(text)]
    for token in tokens:
        if token not in all_stopwords:
            if to_stem:
                token = stemmer.stem(token)
                clean_text.append(token)
    return clean_text

In [39]:
from collections import Counter

def l2_norm(text):
    # Count the occurrences of each unique word
    word_counts = Counter(text)
    # Get the counts of each unique word as a list
    counts_list = [word_counts[word]**2 for word in set(text)]
    l2_sum = 0
    for word_num in counts_list:
        l2_sum += word_num
    l2 = math.sqrt(l2_sum)
    #l2_norm = math.sqrt(sum(x**2 for x in counts_list))
    return l2

text = "This is some hell of a way just to add add add this function"
text_tok = tokenizer.tokenize(text)
print(text_tok)
print(l2_norm(text_tok))


['hell', 'add', 'add', 'add', 'function']
3.3166247903554


In [38]:
parquetFile = spark.read.parquet(*paths)
rdd_ = parquetFile.select("text", "id").rdd

                                                                                

In [40]:
doc_norm = {}

In [41]:
def calculate_doc_l2_norm(row):
    doc_id = row['id']
    text = row['text']
    tok_text = tokenizer.tokenize(text)
    return (doc_id, l2_norm(tok_text))

doc_norm_rdd = rdd_.map(calculate_doc_l2_norm)

In [None]:
doc_norm = dict(doc_norm_rdd.collect())

                                                                                

### Save to our Bucket ###

In [26]:
base_dir='project_final_indexes'  
doc_l2_norm='doc_l2_norm_'
bucket_name='inverted_indexes_bucket'

In [27]:
path = str(Path(base_dir) / f'{doc_l2_norm}.pkl')
bucket = None if bucket_name is None else get_bucket(bucket_name)

Path(base_dir).mkdir(parents=True, exist_ok=True)

blob = bucket.blob(path)
pickle.dump(doc_norm, open(path, 'wb'))
blob.upload_from_filename(path)

In [29]:
for key, value in list(doc_norm.items())[:10]:
    PRINT(f"Key: {key}, Value: {value}")

--------------------------------------------------------------------------------
Key: 4045403, Value: 125.50298801223818
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Key: 4045413, Value: 28.319604517012593
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Key: 4045419, Value: 38.57460304397182
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Key: 4045426, Value: 22.891046284519195
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Key: 4045432, Value: 46.119410230400824
------------------------------------------------------------------------

### Explore the RDD Dataset ###

In [9]:
parquetFile = spark.read.parquet(*paths)
rdd = parquetFile.rdd

# Convert RDD to DataFrame to view column names
rdd = rdd.toDF()

# Print column names
print(rdd.columns)



['id', 'title', 'text', 'anchor_text']


                                                                                

In [10]:
doc_text_pairs_df.show()



+-------+--------------------+--------------------+--------------------+
|     id|               title|                text|         anchor_text|
+-------+--------------------+--------------------+--------------------+
|4045403|Foster Air Force ...|'''Foster Air For...|[{1176764, Tactic...|
|4045413|     Torino Palavela|'''Palavela''', f...|[{77743, 2006 Win...|
|4045419|   Mad About the Boy|"'''Mad About the...|[{34028256, Joyce...|
|4045426|       Shayne Breuer|'''Shayne Breuer'...|[{1838386, Woodvi...|
|4045432|         Parantaka I|'''Parantaka Chol...|[{1511716, Aditya...|
|4045456|Arundel (UK Parli...|'''Arundel''' was...|[{4665376, Arunde...|
|4045466|     Andrew Martinez|'''Luis Andrew Ma...|[{4860, Berkeley,...|
|4045471|    Vancouver VooDoo|The '''Vancouver ...|[{32706, Vancouve...|
|4045479|     Invisible plane|The '''Invisible ...|[{2260539, Ross A...|
|4045516|    Shopping channel|'''Shopping chann...|[{592899, special...|
|4045519|      Turgay (river)|The '''Turgay''' ...|

                                                                                

## Import Inverted Index .py File ##

In [19]:
# if nothing prints here you forgot to upload the file inverted_index_gcp.py to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp.py

inverted_index_gcp.py


In [20]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

24/03/09 17:17:56 WARN SparkContext: The path /home/dataproc/inverted_index_gcp.py has been added already. Overwriting of added paths is not supported in the current version.


In [21]:
from inverted_index_gcp import InvertedIndex

## Building an inverted indexes ##

In [22]:
from nltk.corpus import stopwords
import nltk
nltk.download('stopwords')
from nltk.stem.porter import *
import re
from nltk.corpus import stopwords
from nltk.stem.porter import PorterStemmer


class Tokenizer:

    def __init__(self):
        english_stopwords = frozenset(stopwords.words('english'))
        corpus_stopwords = ["category", "references", "also", "external", "links",
                            "may", "first", "see", "people", "one", "two",
                            "part", "thumb", "including", "second", "following",
                            "many", "however", "would", "became", "yet", "oh", "even",
                            "within", "beyond", "hey", "since", "without", "ugh", "wow",
                            "ah", "already", "oops", "really", "still", "hmm", "among"]


        self.all_stopwords = english_stopwords.union(corpus_stopwords)
        self.stemmer = PorterStemmer()
        self.special_words = ['3d', '4k', 'ip', 'js', 'ai', 'vr', 'ar', 'dl', 'ml', '09', '11', '9']
        
    def get_word_pattern(self):
        word_pattern = r"(?:(?<=^)|(?<=\s))(\w+[-']?\w+([-']\w+)*)[,.']?(?<![,.!])"
        return word_pattern


    def tokenize(self, text):
      RE_TOKENIZE = re.compile(rf"""
      (
          # Words
          (?P<WORD>{self.get_word_pattern()})
          # space
          |(?P<SPACE>[\s\t\n]+)
          # everything else
          |(?P<OTHER>\w+))""",  re.MULTILINE | re.IGNORECASE | re.VERBOSE | re.UNICODE)

      return [self.stemmer.stem(v) for match in RE_TOKENIZE.finditer(text) for k, v in match.groupdict().items() if v is not None and k != 'SPACE' and bool(re.match(r'^[a-zA-Z0-9]+$', v)) and (len(v) > 2 or v.lower() in self.special_words) and v.lower() not in self.all_stopwords and len(v) <= 24] 


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [16]:
stopwords_list_ = ["a", "about", "above", "after", "again", "against", "all", "am", "an", "and", "any", "are", "aren't", "as", "at", "be", "because", "been", "before", "being", "below", "between", "both", "but", "by", "can", "can't", "cannot", "could", "couldn't", "did", "didn't", "do", "does", "doesn't", "doing", "don't", "down", "during", "each", "few", "for", "from", "further", "had", "hadn't", "has", "hasn't", "have", "haven't", "having", "he", "he'd", "he'll", "he's", "her", "here", "here's", "hers", "herself", "him", "himself", "his", "how", "how's", "i", "i'd", "i'll", "i'm", "i've", "if", "in", "into", "is", "isn't", "it", "it's", "its", "itself", "let's", "me", "more", "most", "mustn't", "my", "myself", "no", "nor", "not", "of", "off", "on", "once", "only", "or", "other", "ought", "our", "ours", "ourselves", "out", "over", "own", "same", "shan't", "she", "she'd", "she'll", "she's", "should", "shouldn't", "so", "some", "such", "than", "that", "that's", "the", "their", "theirs", "them", "themselves", "then", "there", "there's", "these", "they", "they'd", "they'll", "they're", "they've", "this", "those", "through", "to", "too", "under", "until", "up", "very", "was", "wasn't", "we", "we'd", "we'll", "we're", "we've", "were", "weren't", "what", "what's", "when", "when's", "where", "where's", "which", "while", "who", "who's", "whom", "why", "why's", "with", "won't", "would", "wouldn't", "you", "you'd", "you'll", "you're", "you've", "your", "yours", "yourself", "yourselves", "again", "against", "ain't", "all", "almost", "alone", "along", "already", "also", "although", "always", "among", "an", "and", "another", "any", "anybody", "anyone", "anything", "anywhere", "are", "aren't", "around", "as", "at", "back", "be", "became", "because", "become", "becomes", "been", "before", "behind", "being", "below", "beside", "besides", "between", "beyond", "both", "but", "by", "can", "can't", "cannot", "canst", "certain", "chiefly", "clean", "clear", "clearly", "come", "could", "couldn't", "dare", "daren't", "de", "definitely", "did", "didn't", "different", "do", "does", "doesn't", "doing", "done", "don't", "down", "downwards", "during", "each", "either", "else", "elsewhere", "enough", "even", "ever", "every", "everybody", "everyone", "everything", "everywhere", "exactly", "except", "fairly", "far", "farther", "few", "fewer", "fifth", "first", "five", "followed", "following", "follows", "for", "forth", "four", "from", "further", "furthermore", "get", "gets", "getting", "given", "gives", "go", "goes", "going", "gone", "got", "had", "hadn't", "has", "hasn't", "have", "haven't", "having", "he", "he'd", "he'll", "he's", "hence", "her", "here", "here's", "hers", "herself", "him", "himself", "his", "how", "how's", "i", "i'd", "i'll", "i'm", "i've", "if", "in", "into", "is", "isn't", "it", "it's", "its", "itself", "just", "least", "let", "let's", "like", "likely", "little", "look", "looking", "looks", "low", "lower", "made", "make", "makes", "making", "many", "may", "mayn't", "me", "mean", "meantime", "meanwhile", "might", "mightn't", "mine", "minus", "more", "most", "mostly", "much", "must", "mustn't", "my", "myself", "namely", "need", "needn't", "neither", "never", "nevertheless", "new", "next", "nine", "no", "nobody", "none", "noone", "nor", "not", "nothing", "notwithstanding", "now", "nowhere", "of", "off", "often", "on", "once", "one", "ones", "only", "onto", "or", "other", "others", "otherwise", "ought", "oughtn't", "our", "ours", "ourselves", "out", "outside", "over", "overall", "own", "particular", "particularly", "per", "perhaps", "placed", "please", "plus", "possible", "present", "presumably", "probably", "provided", "provides", "que", "quite", "qv", "rather", "rd", "re", "really", "reasonably", "recent", "recently", "regarding", "regardless", "regards", "relatively", "respectively", "right", "round", "said", "same", "saw", "say", "saying", "says", "second", "secondly", "see", "seeing", "seem", "seemed", "seeming", "seems", "seen", "self", "selves", "sensible", "sent", "serious", "seriously", "seven", "several", "shall", "shan't", "she", "she'd", "she'll", "she's", "should", "shouldn't", "since", "six", "so", "some", "somebody", "someday", "somehow", "someone", "something", "sometime", "sometimes", "somewhat", "somewhere", "soon", "sorry", "specified", "specify", "specifying", "still", "sub", "such", "sup", "sure", "take", "taken", "taking", "tell", "tends", "th", "than", "thank", "thanks", "thanx", "that", "that's", "thats", "the", "their", "theirs", "them", "themselves", "then", "thence", "there", "there's", "thereafter", "thereby", "therefore", "therein", "theres", "thereupon", "these", "they", "they'd", "they'll", "they're", "they've", "think", "third", "this", "thorough", "thoroughly", "those", "though", "three", "through", "throughout", "thru", "thus", "to", "together", "too", "took", "toward", "towards", "tried", "tries", "truly", "try", "trying", "twice", "two", "under", "unfortunately", "unless", "unlikely", "until", "unto", "up", "upon", "us", "use", "used", "useful", "uses", "using", "usually", "value", "various", "very", "via", "viz", "vs", "want", "wants", "was", "wasn't", "way", "we", "we'd", "we'll", "we're", "we've", "welcome", "well", "went", "were", "weren't", "what", "what's", "whatever", "when", "when's", "whence", "whenever", "where", "where's", "whereafter", "whereas", "whereby", "wherein", "whereupon", "wherever", "whether", "which", "while", "whither", "who", "who's", "whoever", "whole", "whom", "whose", "why", "why's", "will", "willing", "wish", "with", "within", "without", "won't", "wonder", "would", "would", "wouldn't", "x", "y", "yes", "yet", "you", "you'd", "you'll", "you're", "you've", "your", "yours", "yourself", "yourselves", "z", "zero", "aboard", "about", "above", "absent", "across", "after", "against", "along", "alongside", "amid", "among", "amongst", "an", "and", "around", "as", "aside", "astride", "at", "atop", "before", "behind", "below", "beneath", "beside", "between", "betwixt", "beyond", "by", "despite", "down", "during", "except", "for", "from", "in", "inside", "into", "like", "near", "next", "of", "off", "on", "onto", "opposite", "out", "outside", "over", "past", "per", "plus", "round", "save", "since", "through", "throughout", "to", "toward", "towards", "under", "underneath", "until", "unto", "up", "upon", "with", "within", "without", "worth", "abst", "accordance", "according", "accordingly", "across", "act", "actually", "added", "adj", "affected", "affecting", "affects", "afterwards", "ah", "almost", "alone", "along", "already", "also", "although", "always", "among", "amongst", "announce", "another", "anybody", "anyhow", "anymore", "anyone", "anything", "anyway", "anyways", "anywhere", "apparently", "approximately", "arent", "arise", "around", "aside", "ask", "asking", "auth", "available", "away", "awfully", "back", "became", "become", "becomes", "becoming", "beforehand", "begin", "beginning", "beginnings", "begins", "behind", "believe", "beside", "besides", "beyond", "biol", "brief", "briefly", "c'mon", "c's", "ca", "came", "cannot", "can't", "cause", "causes", "certain", "certainly", "co", "com", "come", "comes", "contain", "containing", "contains", "couldnt", "date", "different", "done", "downwards", "due", "e", "ed", "edu", "effect", "eg", "eight", "eighty", "either", "else", "elsewhere", "end", "ending", "enough", "especially", "et", "etc", "even", "ever", "every", "everybody", "everyone", "everything", "everywhere", "ex", "except", "far", "ff", "fifth", "first", "five", "fix", "followed", "following", "follows", "former", "formerly", "forth", "found", "four", "furthermore", "gave", "get", "gets", "getting", "give", "given", "gives", "giving", "go", "goes", "gone", "got", "gotten", "happens", "hardly", "hed", "hence", "hereafter", "hereby", "herein", "heres", "hereupon", "hes", "hi", "hid", "hither", "home", "howbeit", "however", "hundred", "id", "ie", "im", "immediate", "immediately", "importance", "important", "inc", "indeed", "index", "information", "instead", "invention", "inward", "itd", "it'll", "keep", "keeps", "kept", "kg", "know", "known", "knows", "largely", "last", "lately", "later", "latter", "latterly", "least", "less", "lest", "let", "lets", "like", "liked", "likely", "line", "little", "look", "looking", "looks", "ltd", "made", "mainly", "make", "makes", "many", "may", "maybe", "mean", "means", "meantime", "meanwhile", "merely", "mg", "might", "million", "miss", "ml", "moreover", "mostly", "mr", "mrs", "much", "mug", "must", "name", "namely", "nay", "nd", "near", "nearly", "necessarily", "necessary", "need", "needs", "neither", "never", "nevertheless", "new", "next", "nine", "ninety", "nobody", "non", "none", "nonetheless", "noone", "normally", "nos", "noted", "nothing", "nowhere", "obtain", "obtained", "obviously", "often", "oh", "ok", "okay", "old", "omitted", "one", "ones", "onto", "ord", "others", "otherwise", "outside", "overall", "owing", "page", "pages", "part", "particular", "particularly", "past", "per", "perhaps", "placed", "please", "plus", "poorly", "possible", "possibly", "potentially", "pp", "predominantly", "present", "previously", "primarily", "probably", "promptly", "proud", "provides", "put", "que", "quickly", "quite", "qv", "ran", "rather", "rd", "readily", "really", "recent", "recently", "ref", "refs", "regarding", "regardless", "regards", "related", "relatively", "research", "respectively", "resulted", "resulting", "results", "right", "run", "said", "saw", "say", "saying", "says", "sec", "section", "see", "seeing", "seem", "seemed", "seeming", "seems", "seen", "self", "selves", "sent", "seven", "several", "shall", "shed", "shes", "show", "showed", "shown", "showns", "shows", "significant", "significantly", "similar", "similarly", "since", "six", "slightly", "somebody", "somehow", "someone", "somethan", "something", "sometime", "sometimes", "somewhat", "somewhere", "soon", "sorry", "specifically", "specified", "specify", "specifying", "still", "stop", "strongly", "sub", "substantially", "successfully", "sufficiently", "suggest", "sup", "sure", "take", "taken", "taking", "tell", "tends", "th", "thank", "thanks", "thanx", "thats", "that've", "thence", "thereafter", "thereby", "thered", "therefore", "therein", "there'll", "thereof", "therere", "theres", "thereto", "thereupon", "there've", "theyd", "theyre", "think", "thou", "though", "thoughh", "thousand", "throug", "throughout", "thru", "thus", "til", "tip", "together", "took", "toward", "towards", "tried", "tries", "truly", "try", "trying", "ts", "twice", "two", "u", "un", "unfortunately", "unless", "unlike", "unlikely", "unto", "upon", "ups", "us", "use", "used", "useful", "usefully", "usefulness", "uses", "using", "usually", "v", "value", "various", "'ve", "via", "viz", "vol", "vols", "vs", "w", "want", "wants", "wasnt", "way", "wed", "welcome", "went", "werent", "whatever", "what'll", "whats", "whence", "whenever", "whereafter", "whereas", "whereby", "wherein", "wheres", "whereupon", "wherever", "whether", "whim", "whither", "whod", "whoever", "whole", "who'll", "whomever", "whos", "whose", "widely", "willing", "wish", "within", "without", "wont", "words", "world", "wouldnt", "www", "x", "yes", "yet", "youd", "youre", "z", "zero", "a", "about", "above", "across", "after", "afterwards", "again", "against", "all", "almost", "alone", "along", "already", "also", "although", "always", "am", "among", "amongst", "amoungst", "amount", "an", "and", "another", "any", "anyhow", "anyone", "anything", "anyway", "anywhere", "are", "around", "as", "at", "back", "be", "became", "because", "become", "becomes", "becoming", "been", "before", "beforehand", "behind", "being", "below", "beside", "besides", "between", "beyond", "bill", "both", "bottom", "but", "by", "call", "can", "cannot", "cant", "co", "con", "could", "couldnt", "cry", "de", "describe", "detail", "do", "done", "down", "due", "during", "each", "eg", "eight", "either", "eleven", "else", "elsewhere", "empty", "enough", "etc", "even", "ever", "every", "everyone", "everything", "everywhere", "except", "few", "fifteen", "fify", "fill", "find", "fire", "first", "five", "for", "former", "formerly", "forty", "found", "four", "from", "front", "full", "further", "get", "give", "go", "had", "has", "hasnt", "have", "he", "hence", "her", "here", "hereafter", "hereby", "herein", "hereupon", "hers", "herself", "him", "himself", "his", "how", "however", "hundred", "ie", "if", "in", "inc", "indeed", "interest", "into", "is", "it", "its", "itself", "keep", "last", "latter", "latterly", "least", "less", "ltd", "made", "many", "may", "me", "meanwhile", "might", "mill", "mine", "more", "moreover", "most", "mostly", "move", "much", "must", "my", "myself", "name", "namely", "neither", "never", "nevertheless", "next", "nine", "no", "nobody", "none", "noone", "nor", "not", "nothing", "now", "nowhere", "of", "off", "often", "on", "once", "one", "only", "onto", "or", "other", "others", "otherwise", "our", "ours", "ourselves", "out", "over", "own", "part", "per", "perhaps", "please", "put", "rather", "re", "same", "see", "seem", "seemed", "seeming", "seems", "serious", "several", "she", "should", "show", "side", "since", "sincere", "six", "sixty", "so", "some", "somehow", "someone", "something", "sometime", "sometimes", "somewhere", "still", "such", "system", "take", "ten", "than", "that", "the", "their", "them", "themselves", "then", "thence", "there", "thereafter", "thereby", "therefore", "therein", "thereupon", "these", "they", "thick", "thin", "third", "this", "those", "though", "three", "through", "throughout", "thru", "thus", "to", "together", "too", "top", "toward", "towards", "twelve", "twenty", "two", "un", "under", "until", "up", "upon", "us", "very", "via", "was", "we", "well", "were", "what", "whatever", "when", "whence", "whenever", "where", "whereafter", "whereas", "whereby", "wherein", "whereupon", "wherever", "whether", "which", "while", "whither", "who", "whoever", "whole", "whom", "whose", "why", "will", "with", "within", "without", "would", "yet", "you", "your", "yours", "yourself", "yourselves"]
corpus_stopwords_ = set(stopwords_list_)

### Pipeline Code to Generate the Index ###

In [11]:
tokenizer = Tokenizer()

In [13]:
test = 'is a dsfsfsdfsdfdsfdsfdsewlkjkljkljlkjlkjkljkjkr ll pp the Spiderman September 3D 11 11 11 5 0 4 09 attacks attack gpu PDF 1997 "Who is considered the father of USA?"  bioinformatics september PDF 3D 4K television mawtānā d sharʿūṭā genetics video games'

In [14]:

test = tokenizer.tokenize(test)

test

['spiderman',
 'septemb',
 '3d',
 '11',
 '11',
 '11',
 '09',
 'attack',
 'attack',
 'gpu',
 'pdf',
 '1997',
 'consid',
 'father',
 'usa',
 'bioinformat',
 'septemb',
 'pdf',
 '3d',
 '4k',
 'televis',
 'genet',
 'video',
 'game']

In [22]:
res = word_count(test, 55410)

In [23]:
res

[('spiderman', (55410, 1)),
 ('septemb', (55410, 2)),
 ('3d', (55410, 2)),
 ('11', (55410, 3)),
 ('09', (55410, 1)),
 ('attack', (55410, 2)),
 ('gpu', (55410, 1)),
 ('pdf', (55410, 2)),
 ('1997', (55410, 1)),
 ('consid', (55410, 1)),
 ('father', (55410, 1)),
 ('usa', (55410, 1)),
 ('bioinformat', (55410, 1)),
 ('4k', (55410, 1)),
 ('televis', (55410, 1)),
 ('genet', (55410, 1)),
 ('video', (55410, 1)),
 ('game', (55410, 1))]

In [66]:
PRINT(f'Number of stopwords for body index -> {len(tokenizer.all_stopwords)}')

--------------------------------------------------------------------------------
Number of stopwords for body index -> 797
--------------------------------------------------------------------------------


In [31]:
PRINT(f'Number of stopwords for title index -> {len(tokenizer.all_stopwords)}')

--------------------------------------------------------------------------------
Number of stopwords for title index -> 216
--------------------------------------------------------------------------------


In [21]:
tokenizer = Tokenizer()

NUM_BUCKETS = 124

def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS


def word_count(text, id):
  ''' 
  Count the frequency of each word in `text` (tf) that is not included in 
  `all_stopwords` and return entries that will go into our posting lists. 
  Parameters:
    text: str ,Text of one document
    id: int ,Document id
  Returns: List of tuples, A list of (token, (doc_id, tf)) pairs 
  [token.group() for token in RE_WORD.finditer(text.lower())]
  '''

  tokens = tokenizer.tokenize(text)
  c = Counter([tok for tok in tokens])
  return [(item[0],(id,item[1])) for item in c.items()]


def reduce_word_counts(unsorted_pl):
  ''' 
  Returns a sorted posting list by wiki_id.
  Parameters: unsorted_pl: list of (wiki_id, tf) tuples 
  Returns: A sorted posting list.
  '''
  return sorted(unsorted_pl)

def calculate_df(postings):
  ''' 
  Takes a posting list RDD and calculate the df for each token.
  Parameters: postings: RDD ,An RDD where each element is a (token, posting_list) pair.
  Returns:RDD ,An RDD where each element is a (token, df) pair.
  '''
  return postings.map(lambda x: (x[0],len(x[1])))


def partition_postings_and_write(postings, base_dir_, bucket_name_):
  ''' 
  A function that partitions the posting lists into buckets, writes out 
  all posting lists in a bucket to disk, and returns the posting locations for 
  each bucket. Partitioning done by using `token2bucket`.
  Parameters:
  
  base_dir : string - Name for index directory
  bucket_name : string - Name of the bucket we want to store our inverted index directory (base_dir)
  -----------
    postings: RDD , An RDD where each item is a (w, posting_list) pair.
  Returns: RDD
      An RDD where each item is a posting locations dictionary for a bucket. The
      posting locations maintain a list for each word of file locations and 
      offsets its posting list was written to.
  '''
  return postings.map(lambda x: (token2bucket_id(x[0]),x)).groupByKey().map(lambda x: InvertedIndex.write_a_posting_list(x,
                                                                                                                         base_dir= base_dir_,
                                                                                                                         bucket_name=bucket_name_))

### Generate Inverted Index For Title & Id Pairs ###

In [45]:
title_id_pairs = parquetFile.select("title", "id").rdd

In [46]:
count_pairs = title_id_pairs.count()
print(f'Number of title & id pairs -> {count_pairs}')



Number of title & id pairs -> 6348910


                                                                                

#### Generate Posting Lists for Title Index & Save them ####

In [48]:
# time the index creation time
t_start = time()

# word counts map
word_counts = title_id_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)

# filtering postings and calculate df
postings_filtered = postings.filter(lambda x: len(x[1])>50)
w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()

# partition posting lists and write out
_ = partition_postings_and_write(postings_filtered,
                                 base_dir_='title_index_directory_final',
                                 bucket_name_='inverted_indexes_bucket').collect()

index_const_time = time() - t_start

Exception in thread "serve RDD 35" java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.socketAccept(Native Method)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.security.SocketAuthServer$$anon$1.run(SocketAuthServer.scala:64)
                                                                                

In [49]:
PRINT(f'The total amount of time required to create the index is {(index_const_time/60):.2f} minutes')

--------------------------------------------------------------------------------
The total amount of time required to create the index is 4.68 minutes
--------------------------------------------------------------------------------


In [50]:
blobs_test = client.list_blobs(bucket_or_name='inverted_indexes_bucket',
                              prefix='title_index_directory_final')
count = 0
# Iterate over the blobs
for blob in blobs_test:
    if count == 10: break
    count+=1
    print(blob.name)
    
PRINT('Done')

title_index_directory_final/0_000.bin
title_index_directory_final/0_posting_locs.pickle
title_index_directory_final/100_000.bin
title_index_directory_final/100_posting_locs.pickle
title_index_directory_final/101_000.bin
title_index_directory_final/101_posting_locs.pickle
title_index_directory_final/102_000.bin
title_index_directory_final/102_posting_locs.pickle
title_index_directory_final/103_000.bin
title_index_directory_final/103_posting_locs.pickle
--------------------------------------------------------------------------------
Done
--------------------------------------------------------------------------------


In [51]:
# collect all posting lists locations into one super-set
super_posting_locs = defaultdict(list)

for blob in client.list_blobs(bucket_or_name='inverted_indexes_bucket',
                             prefix='title_index_directory_final'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs[k].extend(v)
    
PRINT('Done.')

--------------------------------------------------------------------------------
Done.
--------------------------------------------------------------------------------


Putting it all together

In [52]:
# Create inverted index instance
inverted = InvertedIndex()

# Adding the posting locations dictionary to the inverted index
inverted.posting_locs = super_posting_locs

# Add the token - df dictionary to the inverted index
inverted.df = w2df_dict

# write the global stats out
inverted.write_index(base_dir='project_final_indexes', 
                     name='title_index_final',
                     bucket_name='inverted_indexes_bucket')

PRINT('Done')

--------------------------------------------------------------------------------
Done
--------------------------------------------------------------------------------


#### Visualize the Index ####

In [28]:
def load_index(base_dir, name, bucket_name):
    # Initialize the client
    client = storage.Client()
    
    # Get the bucket
    bucket = client.get_bucket(bucket_name)
    
    # Define the path to the index file in the bucket
    index_path = f'{base_dir}/{name}.pkl'
    
    # Get the blob (file) from the bucket
    blob = bucket.blob(index_path)
    
    # Download the blob into memory
    index_data = blob.download_as_string()
    
    # Load the index from the downloaded data
    inverted_index = pickle.loads(index_data)
    
    return inverted_index

In [38]:
inverted_index = load_index(base_dir='project_final_indexes', 
                            name='title_stem_index',
                            bucket_name='inverted_indexes_bucket')

In [39]:
posting_locs = inverted_index.posting_locs
df_data = inverted_index.df

# Convert the posting_locs dictionary to a DataFrame
posting_locs_df = pd.DataFrame.from_dict(posting_locs, orient='index', columns=['Posting List'])

# Convert the df_data dictionary to a DataFrame
df_data_df = pd.DataFrame.from_dict(df_data, orient='index', columns=['Document Frequency'])

In [40]:
df_data_df_sorted = df_data_df.sort_values(by='Document Frequency', ascending=True)

In [41]:
PRINT(f'Data Frame shape -> {df_data_df_sorted.shape}')

--------------------------------------------------------------------------------
Data Frame shape -> (25230, 1)
--------------------------------------------------------------------------------


In [73]:
PRINT(f'Data Frame shape -> {df_data_df_sorted.shape}')

--------------------------------------------------------------------------------
Data Frame shape -> (26981, 1)
--------------------------------------------------------------------------------


In [42]:
tesla_frequency = df_data_df_sorted.loc['bioinformat', 'Document Frequency']

In [43]:
tesla_frequency

66

In [54]:
df_data_df_sorted.head(10)

Unnamed: 0,Document Frequency
membership,101
jena,101
deux,101
frye,101
adkin,101
tesla,101
kingsburi,101
staunton,101
liceo,101
gaspard,101


In [55]:
PRINT('Done with Title Index !')

--------------------------------------------------------------------------------
Done with Title Index !
--------------------------------------------------------------------------------


### Generate Inverted Idex For Body & Id Pairs ###

In [28]:
parquetFile = spark.read.parquet(*paths)
body_id_pairs = parquetFile.select("text", "id").rdd

                                                                                

In [29]:
# time the index creation time
t_start = time()

# word counts map
word_counts = body_id_pairs.flatMap(lambda x: word_count(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)

# filtering postings and calculate df
postings_filtered = postings.filter(lambda x: len(x[1])>50)
w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()

# partition posting lists and write out
_ = partition_postings_and_write(postings_filtered,
                                 base_dir_='body_index_directory_final',
                                 bucket_name_='inverted_indexes_bucket').collect()

index_const_time = time() - t_start

                                                                                

In [30]:
PRINT(f'The total amount of time required to create the index is {index_const_time/60/60:.3f} hours')

--------------------------------------------------------------------------------
The total amount of time required to create the index is 3.725 hours
--------------------------------------------------------------------------------


In [31]:
blobs_test = client.list_blobs(bucket_or_name='inverted_indexes_bucket',
                              prefix='body_index_directory_final')
count = 0
# Iterate over the blobs
for blob in blobs_test:
    if count == 10:break
    count+=1
    print(blob.name)
    
PRINT('Done')

body_index_directory_final/0_000.bin
body_index_directory_final/0_001.bin
body_index_directory_final/0_002.bin
body_index_directory_final/0_003.bin
body_index_directory_final/0_004.bin
body_index_directory_final/0_005.bin
body_index_directory_final/0_006.bin
body_index_directory_final/0_007.bin
body_index_directory_final/0_008.bin
body_index_directory_final/0_009.bin
--------------------------------------------------------------------------------
Done
--------------------------------------------------------------------------------


In [32]:
# collect all posting lists locations into one super-set
super_posting_locs = defaultdict(list)

for blob in client.list_blobs(bucket_or_name='inverted_indexes_bucket',
                             prefix='body_index_directory_final'):
  if not blob.name.endswith("pickle"):
    continue
  with blob.open("rb") as f:
    posting_locs = pickle.load(f)
    for k, v in posting_locs.items():
      super_posting_locs[k].extend(v)
    
PRINT('Done.')

--------------------------------------------------------------------------------
Done.
--------------------------------------------------------------------------------


In [33]:
# Create inverted index instance
inverted = InvertedIndex()

# Adding the posting locations dictionary to the inverted index
inverted.posting_locs = super_posting_locs

# Add the token - df dictionary to the inverted index
inverted.df = w2df_dict

# write the global stats out
inverted.write_index(base_dir='project_final_indexes', 
                     name='body_index_final',
                     bucket_name='inverted_indexes_bucket')

PRINT('Done')

--------------------------------------------------------------------------------
Done
--------------------------------------------------------------------------------


## Generate Document Lenghs for Body & Title ##

### Generate Body Document Length Dictionary ###

In [11]:
parquetFile = spark.read.parquet(*paths)
body_id_rdd = parquetFile.select("text", "id").rdd

                                                                                

In [12]:
body_dictionary_lenght = {} # maps (key,value)=(doc_id, body_doc_lenght)

In [14]:
def calculate_doc_length(row):
    doc_id = row['id']
    text = row['text']
    text_tok = tokenize(text)
    return (doc_id, len(text_tok))

# Apply the function to each row of the RDD and map it to (doc_id, doc_length) pairs
doc_lengths_rdd = body_id_rdd.map(calculate_doc_length)

# Collect the results as a dictionary
body_dictionary_lenght = dict(doc_lengths_rdd.collect())

                                                                                

#### Save it to our Bucket ####

In [15]:
base_dir='project_final_indexes' 
dl_name='body_dl_'
bucket_name='inverted_indexes_bucket'

In [16]:
def get_bucket(bucket_name):
    return storage.Client('irprojectilayvictor').bucket(bucket_name)

In [17]:
path = str(Path(base_dir) / f'{dl_name}.pkl')

In [18]:
path = str(Path(base_dir) / f'{dl_name}.pkl')
bucket = None if bucket_name is None else get_bucket(bucket_name)

Path(base_dir).mkdir(parents=True, exist_ok=True)

blob = bucket.blob(path)
pickle.dump(body_dictionary_lenght, open(path, 'wb'))
blob.upload_from_filename(path)

#### Verify that the Dictionary Saved Successfully ####

In [19]:
client = storage.Client()
bucket = client.get_bucket(bucket_name)

# Download the pickled file from the bucket
blob = bucket.blob(path)
blob.download_to_filename(path)

with open(path, 'rb') as f:
    loaded_dict = pickle.load(f)

In [20]:
# final
for key, value in list(loaded_dict.items())[:3]:
    PRINT(f"Key: {key}, Value: {value}")

--------------------------------------------------------------------------------
Key: 4045403, Value: 1665
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Key: 4045413, Value: 241
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Key: 4045419, Value: 584
--------------------------------------------------------------------------------


### Generate Title Document Length Dictionary ###

In [15]:
english_stopwords = frozenset(stopwords.words('english'))
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){1,24}""", re.UNICODE)
stemmer = PorterStemmer()

def tokenize(text):
    clean_text = []
    text = text.lower()
    for token in RE_WORD.finditer(text):
        stemmed_token = stemmer.stem(token.group())
        if stemmed_token not in english_stopwords:
            clean_text.append(stemmed_token)
    return clean_text

In [14]:
parquetFile = spark.read.parquet(*paths)
title_id_rdd = parquetFile.select("title", "id").rdd

                                                                                

In [15]:
title_dictionary_length = {} # maps (key, value)=(doc_id, title_length)

In [19]:
def calculate_title_length(row):
    doc_id = row['id']
    title = row['title']
    title_token = tokenize(title)
    return (doc_id, len(title_token))

# Apply the function to each row of the RDD and map it to (doc_id, title) pairs
title_rdd = title_id_rdd.map(calculate_title_length)

title_dictionary_length = dict(title_rdd.collect())

                                                                                

In [20]:
base_dir='project_final_indexes' 
dl_name='title_DL_'
bucket_name='inverted_indexes_bucket'

In [23]:
path = str(Path(base_dir) / f'{dl_name}.pkl')
bucket = None if bucket_name is None else get_bucket(bucket_name)

Path(base_dir).mkdir(parents=True, exist_ok=True)

blob = bucket.blob(path)
pickle.dump(title_dictionary_length, open(path, 'wb'))
blob.upload_from_filename(path)

PRINT('Done.')

--------------------------------------------------------------------------------
Done.
--------------------------------------------------------------------------------


In [24]:
client = storage.Client()
bucket = client.get_bucket(bucket_name)

# Download the pickled file from the bucket
blob = bucket.blob(path)
blob.download_to_filename(path)

with open(path, 'rb') as f:
    loaded_dict = pickle.load(f)

In [25]:
for key, value in list(loaded_dict.items())[:3]:
    PRINT(f"Key: {key}, Value: {value}")

--------------------------------------------------------------------------------
Key: 4045403, Value: 4
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Key: 4045413, Value: 2
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Key: 4045419, Value: 2
--------------------------------------------------------------------------------


### Generate Title Document Dictionary ###

In [83]:
title_dictionary = {} # maps (key, value)=(doc_id, title_name)

In [26]:
parquetFile = spark.read.parquet(*paths)
title_id_rdd = parquetFile.select("title", "id").rdd

                                                                                

In [12]:
def calculate_title(row):
    doc_id = row['id']
    title = row['title']
    return (doc_id, title)

# Apply the function to each row of the RDD and map it to (doc_id, title) pairs
title_rdd = title_id_rdd.map(calculate_title)

title_dictionary = dict(title_rdd.collect())

                                                                                

In [12]:
for key, value in list(title_dictionary.items())[:3]:
    PRINT(f"Key: {key}, Value: {value}")

--------------------------------------------------------------------------------
Key: 4045403, Value: Foster Air Force Base
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Key: 4045413, Value: Torino Palavela
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Key: 4045419, Value: Mad About the Boy
--------------------------------------------------------------------------------


#### Save it to our Bucket ####

In [19]:
base_dir='project_final_indexes' 
dl_name='doc_norm'
bucket_name='inverted_indexes_bucket'

In [None]:
path = str(Path(base_dir) / f'{dl_name}.pkl')
bucket = None if bucket_name is None else get_bucket(bucket_name)

Path(base_dir).mkdir(parents=True, exist_ok=True)

blob = bucket.blob(path)
pickle.dump(doc_norm, open(path, 'wb'))
blob.upload_from_filename(path)

## Execute PageRank & Save ##

In [23]:
def generate_graph(pages):
  ''' 
  Compute the directed graph generated by wiki links.
  Parameters: An RDD where each row consists of one wikipedia articles with 'id' and 
      'anchor_text'.
  Returns: 
   - edges: RDD
      An RDD where each row represents an edge in the directed graph created by
      the wikipedia links. The first entry should the source page id and the 
      second entry is the destination page id. No duplicates should be present. 
   - vertices: RDD
      An RDD where each row represents a vetrix (node) in the directed graph 
      created by the wikipedia links. No duplicates should be present. 
  '''
  edges_w_dup = pages.flatMap(lambda page: [(page[0],anchor[0]) for anchor in page[1]])
  edges = edges_w_dup.distinct()
  vertices = edges.flatMap(lambda x: x)
  vertices = vertices.distinct()
  vertices = vertices.map(lambda x: Row(x)) #converting entries to a format that fits toDF() func up next
  return edges, vertices

In [None]:
t_start = time()

pages_links = parquetFile.select("id", "anchor_text").rdd

# construct the graph 
edges, vertices = generate_graph(pages_links)

# compute PageRank
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')

g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)

pr = pr_results.vertices.select("id", "pagerank")
pr = pr.sort(col('pagerank').desc())

pr_time = time() - t_start

In [27]:
pr.show()



+-------+------------------+
|     id|          pagerank|
+-------+------------------+
|3434750| 9913.728782160779|
|  10568| 5385.349263642038|
|  32927| 5282.081575765276|
|  30680|  5128.23370960412|
|5843419| 4957.567686263868|
|  68253| 4769.278265355157|
|  31717| 4486.350180548311|
|  11867|4146.4146509127695|
|  14533|3996.4664408855006|
| 645042| 3531.627089803743|
|  17867|3246.0983906041415|
|5042916| 2991.945739166178|
|4689264| 2982.324883041747|
|  14532| 2934.746829203171|
|  25391|   2903.5462235134|
|   5405| 2891.416329154635|
|4764461| 2834.366987332661|
|  15573|2783.8651181588384|
|   9316|2782.0396464137702|
|8569916|2775.2861918400154|
+-------+------------------+
only showing top 20 rows



                                                                                

In [None]:
pr_dict = pr.collect()
pr_dict = {row['id']: row['pagerank'] for row in pr_dict}

In [9]:
base_dir='project_pageRank' 
page_rank ='pageRank'
bucket_name='bucket_for_index'

In [35]:
path = str(Path(base_dir) / f'{page_rank}.pkl')
bucket = None if bucket_name is None else get_bucket(bucket_name)

Path(base_dir).mkdir(parents=True, exist_ok=True)

blob = bucket.blob(path)
pickle.dump(pr_dict, open(path, 'wb'))
blob.upload_from_filename(path)

In [37]:
for key, value in list(pr_dict.items())[:10]:
    PRINT(f"Key: {key}, doc page rank: {value}")

--------------------------------------------------------------------------------
Key: 3434750, doc page rank: 9913.728782160779
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Key: 10568, doc page rank: 5385.349263642038
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Key: 32927, doc page rank: 5282.081575765276
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Key: 30680, doc page rank: 5128.23370960412
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------
Key: 5843419, doc page rank: 4957.567686263868
-------------------------------------------

# Generate Word Embedding #

## Ilay dont forget to save the model in our bucket as '.pkl' so we can load it and use it at runtime

In [43]:
parquetFile = spark.read.parquet(*paths)
rdd = parquetFile.rdd

# Convert RDD to DataFrame to view column names


                                                                                

['id', 'title', 'text', 'anchor_text']


In [30]:
tokenizer = Tokenizer() # Run the cell of Tokenizer() class before (the class somewhen in the middle)

## Train Word2Ven Model ##

In [31]:
from pyspark.ml.feature import Word2Vec


tokenize_udf = udf(lambda text: tokenizer.tokenize(text), ArrayType(StringType()))
df = rdd.withColumn("tokens", tokenize_udf("text"))

# Train Word2Vec model
word2Vec = Word2Vec(vectorSize=100, minCount=10, windowSize=10, inputCol="tokens", outputCol="vectors")
model = word2Vec.fit(df)

AttributeError: 'RDD' object has no attribute 'withColumn'

## Save it first thing after training to avoid kernel issues ##

In [None]:
base_dir='Work2Vec_dir' 
page_rank ='word2vec'
bucket_name='bucket_for_index'

In [None]:
def get_bucket(bucket_name):
    return storage.Client('irprojectilayvictor').bucket(bucket_name)

In [None]:
path = str(Path(base_dir) / f'{page_rank}.pkl')
bucket = None if bucket_name is None else get_bucket(bucket_name)

Path(base_dir).mkdir(parents=True, exist_ok=True)

blob = bucket.blob(path)
pickle.dump(pr_dict, open(path, 'wb'))
blob.upload_from_filename(path)

print('Saved !')

In [34]:
anchor_text_first_row = parquetFile.select("anchor_text").first()[0]
print(anchor_text_first_row)



[Row(id=1176764, text='Tactical Air Command'), Row(id=91416, text='Victoria County'), Row(id=136747, text='Victoria, Texas'), Row(id=23814944, text='USGS'), Row(id=185235, text='Air Force Base'), Row(id=32090, text='United States Air Force'), Row(id=29810, text='Texas'), Row(id=91416, text='Victoria County'), Row(id=136747, text='Victoria'), Row(id=32927, text='World War II'), Row(id=1176764, text='Tactical Air Command'), Row(id=325329, text='Cold War'), Row(id=136815, text='Georgetown'), Row(id=23869026, text='U.S. Army Air Corps'), Row(id=1875937, text='Brooks Field'), Row(id=53848, text='San Antonio'), Row(id=763002, text='AT-6'), Row(id=1484086, text='Guadalupe River'), Row(id=18717338, text='$'), Row(id=373962, text='War Department'), Row(id=1029642, text='flight officers'), Row(id=6120180, text='warrant officers'), Row(id=201930, text='second lieutenants'), Row(id=60098, text='Pearl Harbor'), Row(id=1468292, text='WACs'), Row(id=763002, text='AT-6'), Row(id=7211, text='P-40'), Ro

                                                                                

In [45]:
from pyspark.sql.functions import explode, col, struct

exploded_df = parquetFile.select(col("title"), explode("anchor_text").alias("anchor_text"))

# Create a dictionary where each title is a key and the corresponding second elements of the tuple are the values
title_anchor_dict = exploded_df.rdd.map(lambda row: (row['title'], row['anchor_text'][1])).groupByKey().mapValues(list).collectAsMap()


24/03/09 20:02:24 WARN YarnAllocator: Container from a bad node: container_1710004378772_0001_01_000108 on host: cluster-6ed3-w-1.us-central1-a.c.irprojectilayvictor.internal. Exit status: 143. Diagnostics: [2024-03-09 20:02:24.715]Container killed on request. Exit code is 143
[2024-03-09 20:02:24.715]Container exited with a non-zero exit code 143. 
[2024-03-09 20:02:24.715]Killed by external signal
.
24/03/09 20:02:24 ERROR YarnScheduler: Lost executor 106 on cluster-6ed3-w-1.us-central1-a.c.irprojectilayvictor.internal: Container from a bad node: container_1710004378772_0001_01_000108 on host: cluster-6ed3-w-1.us-central1-a.c.irprojectilayvictor.internal. Exit status: 143. Diagnostics: [2024-03-09 20:02:24.715]Container killed on request. Exit code is 143
[2024-03-09 20:02:24.715]Container exited with a non-zero exit code 143. 
[2024-03-09 20:02:24.715]Killed by external signal
.
24/03/09 20:02:24 WARN TaskSetManager: Lost task 9.0 in stage 22.0 (TID 649) (cluster-6ed3-w-1.us-central

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 22.0 failed 4 times, most recent failure: Lost task 5.3 in stage 22.0 (TID 681) (cluster-6ed3-w-0.us-central1-a.c.irprojectilayvictor.internal executor 119): ExecutorLostFailure (executor 119 exited caused by one of the running tasks) Reason: Container from a bad node: container_1710004378772_0001_01_000121 on host: cluster-6ed3-w-0.us-central1-a.c.irprojectilayvictor.internal. Exit status: 143. Diagnostics: [2024-03-09 20:09:10.565]Container killed on request. Exit code is 143
[2024-03-09 20:09:10.565]Container exited with a non-zero exit code 143. 
[2024-03-09 20:09:10.565]Killed by external signal
.
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2717)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2653)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2652)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2652)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1189)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1189)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1189)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2913)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2855)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2844)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:959)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2314)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2333)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2358)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [None]:
base_dir='title_anchor_dict_dir' 
page_rank ='title_anchor_dict'
bucket_name='bucket_for_index'
def get_bucket(bucket_name):
    return storage.Client('irprojectilayvictor').bucket(bucket_name)
path = str(Path(base_dir) / f'{page_rank}.pkl')
bucket = None if bucket_name is None else get_bucket(bucket_name)

#Path(base_dir).mkdir(parents=True, exist_ok=True)
p = Path(base_dir)
pv_clean = f'{p.stem}.pkl'

blob = bucket.blob(path)
with open(pv_clean, 'wb') as f:
  pickle.dump(title_anchor_dict, f)
#pickle.dump(pr_dict, open(path, 'wb'))



24/03/09 20:09:11 WARN TaskSetManager: Lost task 32.0 in stage 22.0 (TID 684) (cluster-6ed3-w-0.us-central1-a.c.irprojectilayvictor.internal executor 122): TaskKilled (Stage cancelled)
24/03/09 20:09:12 WARN TaskSetManager: Lost task 30.1 in stage 22.0 (TID 686) (cluster-6ed3-w-1.us-central1-a.c.irprojectilayvictor.internal executor 123): TaskKilled (Stage cancelled)
24/03/09 20:09:12 WARN TaskSetManager: Lost task 28.1 in stage 22.0 (TID 683) (cluster-6ed3-w-1.us-central1-a.c.irprojectilayvictor.internal executor 118): TaskKilled (Stage cancelled)
24/03/09 20:09:12 WARN TaskSetManager: Lost task 33.0 in stage 22.0 (TID 685) (cluster-6ed3-w-2.us-central1-a.c.irprojectilayvictor.internal executor 120): TaskKilled (Stage cancelled)
24/03/09 20:09:12 WARN TaskSetManager: Lost task 31.0 in stage 22.0 (TID 682) (cluster-6ed3-w-2.us-central1-a.c.irprojectilayvictor.internal executor 121): TaskKilled (Stage cancelled)
