***Important*** DO NOT CLEAR THE OUTPUT OF THIS NOTEBOOK AFTER EXECUTION!!!

In [1]:
# if the following command generates an error, you probably didn't enable 
# the cluster security option "Allow API access to all Google Cloud services"
# under Manage Security → Project Access when setting up the cluster
!gcloud dataproc clusters list --region us-central1

NAME          PLATFORM  PRIMARY_WORKER_COUNT  SECONDARY_WORKER_COUNT  STATUS   ZONE           SCHEDULED_DELETE
cluster-d522  GCE       4                                             RUNNING  us-central1-a


In [2]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

[0m

In [3]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import numpy as np
from google.cloud import storage

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [4]:
# if nothing prints here you forgot to include the initialization script when starting the cluster
!ls -l /usr/lib/spark/jars/graph*

-rw-r--r-- 1 root root 247882 Mar 10 20:55 /usr/lib/spark/jars/graphframes-0.8.2-spark3.1-s_2.12.jar


In [5]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [6]:
spark

In [None]:
# Put your bucket name below and make sure you can access it without an error
bucket_name = 'bucket_209470236' 
full_path = f"gs://{bucket_name}/"
paths=[]

client = storage.Client()
blobs = client.list_blobs(bucket_name)
for b in blobs:
    if b.name != 'graphframes.sh':
        paths.append(full_path+b.name)

# Building an inverted index

In [8]:
parquetFile = spark.read.parquet(*paths)
doc_text_pairs_title = parquetFile.select("title", "id").rdd
doc_text_pairs_anchor = parquetFile.select("anchor_text", "id").rdd
doc_text_pairs_body = parquetFile.select("text", "id").rdd


                                                                                

In [9]:
# if nothing prints here you forgot to upload the file inverted_index_gcp.py to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp.py

inverted_index_gcp.py


In [10]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [11]:
from inverted_index_gcp import InvertedIndex

In [None]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

NUM_BUCKETS = 124
def token2bucket_id(token):
    return int(_hash(token),16) % NUM_BUCKETS

# PLACE YOUR CODE HERE
def tokenize_and_count(text):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower()) if token.group() not in all_stopwords]
    return len(tokens)

def word_count(text, id):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
    counter = Counter(tokens)
    tf_pairs = [(token, (id, count)) for token, count in counter.items() if token not in all_stopwords]
    return tf_pairs

def reduce_word_counts(unsorted_pl):
    sorted_pl = sorted(unsorted_pl, key=lambda x: x[0])
    return sorted_pl

def calculate_df(postings):
    df_rdd = postings.map(lambda x: (x[0], len(x[1])))
    return df_rdd

def partition_postings_and_write(postings):
    partitioned_postings = postings.groupBy(lambda x: token2bucket_id(x[0]))
    posting_locations = partitioned_postings.map(lambda x: InvertedIndex.write_a_posting_list(x, 'title_real_index' ,bucket_name))
    return posting_locations


# title inverted index

In [None]:
# time the index creation time
t_start = time()
# word counts map
word_counts = doc_text_pairs_title.flatMap(lambda x: word_count(x[0], x[1]))
postings = word_counts.groupByKey().mapValues(reduce_word_counts)

id_with_length = doc_text_pairs_title.map(lambda doc: (doc[1], tokenize_and_count(doc[0])))
doc_lengths_dict = id_with_length.collectAsMap()

total_sum = 0
for value in doc_lengths_dict.values():
    total_sum += value
# total = doc_lengths_dict.map(lambda x: x[1]).mean()
avg = total_sum/len(doc_lengths_dict)


# filtering postings and calculate df
postings_filtered = postings.filter(lambda x: len(x[1])>50)
w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()
# partition posting lists and write out
_ = partition_postings_and_write(postings_filtered).collect()
index_const_time = time() - t_start

In [None]:
# collect all posting lists locations into one super-set
super_posting_locs = defaultdict(list)
# for blob in client.list_blobs(bucket_name, prefix='postings_gcp'):
for blob in client.list_blobs(bucket_name):
# for blob in client.list_blobs(bucket_name, prefix='title_real_index'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            super_posting_locs[k].extend(v)
            
#TO CHECK NOT EMPTY
if not super_posting_locs:
    print("im empty")
else:
    count = 0
    for x,y in super_posting_locs.items():
        print(x , ":" , y)
        count += 1
        if count == 5:
            break

Putting it all together

In [None]:
# Create inverted index instance
inverted = InvertedIndex()
inverted.num_of_doc = parquetFile.count()
inverted.avg = avg
inverted.dict_docID_countWord = doc_lengths_dict

# Adding the posting locations dictionary to the inverted index
inverted.posting_locs = super_posting_locs
# Add the token - df dictionary to the inverted index
inverted.df = w2df_dict
# write the global stats out
inverted.write_index('.', 'title_index',bucket_name)
# upload to gs
index_src = "gs://bucket_318820123/title_index.pkl"
index_dst = f'gs://{bucket_name}/title_real_index/title_index.pkl'
!gsutil cp $index_src $index_dst

In [None]:
!gsutil ls -lh $index_dst

# text inverted index

In [None]:
word_counts1 = doc_text_pairs_body.flatMap(lambda x: word_count(x[0], x[1]))
# word_counts1.take(1)
postings1 = word_counts1.groupByKey().mapValues(reduce_word_counts)
# print("ll")

In [None]:
id_with_length = doc_text_pairs_body.map(lambda doc: (doc[1], tokenize_and_count(doc[0])))
doc_lengths_dict = id_with_length.collectAsMap()


In [None]:
total_sum = 0
for value in doc_lengths_dict.values():
    total_sum += value
# total = doc_lengths_dict.map(lambda x: x[1]).mean()
avg = total_sum/len(doc_lengths_dict)


In [None]:
# filtering postings and calculate df
postings_filtered = postings1.filter(lambda x: len(x[1])>50)
w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()
# partition posting lists and write out
_ = partition_postings_and_write(postings_filtered).collect()
# index_const_time = time() - t_start

In [None]:
# collect all posting lists locations into one super-set
super_posting_locs = defaultdict(list)
# for blob in client.list_blobs(bucket_name, prefix='postings_gcp'):
for blob in client.list_blobs(bucket_name):
# for blob in client.list_blobs(bucket_name, prefix='title_real_index'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            super_posting_locs[k].extend(v)
            
if not super_posting_locs:
    print("im empty")
else:
    count = 0
    for x,y in super_posting_locs.items():
        print(x , ":" , y)
        count += 1
        if count == 5:
            break

In [None]:
# Create inverted index instance
inverted = InvertedIndex()
inverted.num_of_doc = parquetFile1.count()
inverted.avg = avg
inverted.dict_docID_countWord = doc_lengths_dict

# Adding the posting locations dictionary to the inverted index
inverted.posting_locs = super_posting_locs
# Add the token - df dictionary to the inverted index
inverted.df = w2df_dict
# write the global stats out
inverted.write_index('.', 'text_index',bucket_name)
# upload to gs
index_src = "gs://bucket_209470236/text_index.pkl"
index_dst = f'gs://{bucket_name}/text_real_index/text_index.pkl'
!gsutil cp $index_src $index_dst

In [None]:
!gsutil ls -lh $index_dst

# stemming inverted index for title


In [18]:
import math
import statistics
bucket_name = 'bucket_209470236'
stemmer = PorterStemmer()
parquetFile = spark.read.parquet(*paths)
doc_text_pairs_title = parquetFile.select("title", "id").rdd
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

NUM_BUCKETS = 124
def token2bucket_id(token):
    return int(_hash(token),16) % NUM_BUCKETS

# PLACE YOUR CODE HERE
def tokenize_and_count(text):
    tokens = [stemmer.stem(token.group()) for token in RE_WORD.finditer(text.lower()) if token.group() not in all_stopwords]
    return len(tokens)

def word_count_stemming(text, id):
    
    tokens = [stemmer.stem(token.group()) for token in RE_WORD.finditer(text.lower()) if token.group() not in all_stopwords]
    map = {}
    for w in tokens:
        if w in all_stopwords:
            continue
        
        map[w] = map.get(w, 0) + 1
    return [(k,(id,v)) for k,v in map.items()]


def reduce_word_counts(unsorted_pl):
    sorted_pl = sorted(unsorted_pl, key=lambda x: x[0])
    return sorted_pl

def calculate_df(postings):
    df_rdd = postings.map(lambda x: (x[0], len(x[1])))
    return df_rdd

def partition_postings_and_write(postings):
    partitioned_postings = postings.groupBy(lambda x: token2bucket_id(x[0]))
    posting_locations = partitioned_postings.map(lambda x: InvertedIndex.write_a_posting_list(x, 'text_real_index_stemming' ,bucket_name))
    return posting_locations

In [13]:
word_counts_stemming = doc_text_pairs_title.flatMap(lambda x: word_count_stemming(x[0], x[1]))
# word_counts1.take(1)
postings_stemming = word_counts_stemming.groupByKey().mapValues(reduce_word_counts)
# print("ll")

In [16]:
word_counts_stemming.take(5)

                                                                                

[('foster', (4045403, 31)),
 ('air', (4045403, 51)),
 ('forc', (4045403, 27)),
 ('base', (4045403, 31)),
 ('1941', (4045403, 15))]

In [14]:
id_with_length = doc_text_pairs_title.map(lambda doc: (doc[1], tokenize_and_count(doc[0])))
doc_lengths_dict = id_with_length.collectAsMap()


                                                                                

In [15]:
total_sum = 0
for value in doc_lengths_dict.values():
    total_sum += value
# total = doc_lengths_dict.map(lambda x: x[1]).mean()
avg = total_sum/len(doc_lengths_dict)


In [16]:
# filtering postings and calculate df
postings_filtered = postings_stemming.filter(lambda x: len(x[1])>50)
w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()


24/03/09 16:08:24 WARN TaskSetManager: Lost task 4.0 in stage 7.0 (TID 562) (cluster-7ea3-w-2.us-central1-a.c.echzor3.internal executor 17): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_17692/78734506.py", line 45, in <lambda>
  File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1709979640550_0004/container_1709979640550_0004_01_000018/inverted_index_gcp.py", line 222, in write_a_posting_list
    with closing(MultiFileWriter(base_d

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 7.0 failed 4 times, most recent failure: Lost task 7.3 in stage 7.0 (TID 583) (cluster-7ea3-w-2.us-central1-a.c.echzor3.internal executor 17): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_17692/78734506.py", line 45, in <lambda>
  File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1709979640550_0004/container_1709979640550_0004_01_000018/inverted_index_gcp.py", line 222, in write_a_posting_list
    with closing(MultiFileWriter(base_dir, bucket_id, bucket_name)) as writer:
  File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1709979640550_0004/container_1709979640550_0004_01_000018/inverted_index_gcp.py", line 45, in __init__
    self._bucket = None if bucket_name is None else get_bucket(bucket_name)
  File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1709979640550_0004/container_1709979640550_0004_01_000018/inverted_index_gcp.py", line 22, in get_bucket
    return storage.Client(PROJECT_ID).bucket(bucket_name)
  File "/opt/conda/miniconda3/lib/python3.10/site-packages/google/cloud/storage/client.py", line 296, in bucket
    return Bucket(client=self, name=bucket_name, user_project=user_project)
  File "/opt/conda/miniconda3/lib/python3.10/site-packages/google/cloud/storage/bucket.py", line 667, in __init__
    name = _validate_name(name)
  File "/opt/conda/miniconda3/lib/python3.10/site-packages/google/cloud/storage/_helpers.py", line 91, in _validate_name
    if not all([name[0].isalnum(), name[-1].isalnum()]):
IndexError: string index out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:552)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:758)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:740)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:505)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2333)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1505)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2717)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2653)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2652)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2652)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1189)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1189)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1189)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2913)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2855)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2844)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:959)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2314)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2333)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2358)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_17692/78734506.py", line 45, in <lambda>
  File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1709979640550_0004/container_1709979640550_0004_01_000018/inverted_index_gcp.py", line 222, in write_a_posting_list
    with closing(MultiFileWriter(base_dir, bucket_id, bucket_name)) as writer:
  File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1709979640550_0004/container_1709979640550_0004_01_000018/inverted_index_gcp.py", line 45, in __init__
    self._bucket = None if bucket_name is None else get_bucket(bucket_name)
  File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1709979640550_0004/container_1709979640550_0004_01_000018/inverted_index_gcp.py", line 22, in get_bucket
    return storage.Client(PROJECT_ID).bucket(bucket_name)
  File "/opt/conda/miniconda3/lib/python3.10/site-packages/google/cloud/storage/client.py", line 296, in bucket
    return Bucket(client=self, name=bucket_name, user_project=user_project)
  File "/opt/conda/miniconda3/lib/python3.10/site-packages/google/cloud/storage/bucket.py", line 667, in __init__
    name = _validate_name(name)
  File "/opt/conda/miniconda3/lib/python3.10/site-packages/google/cloud/storage/_helpers.py", line 91, in _validate_name
    if not all([name[0].isalnum(), name[-1].isalnum()]):
IndexError: string index out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:552)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:758)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:740)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:505)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2333)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1505)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [19]:
# partition posting lists and write out
_ = partition_postings_and_write(postings_filtered).collect()
# index_const_time = time() - t_start

                                                                                

In [20]:
# collect all posting lists locations into one super-set
super_posting_locs = defaultdict(list)
for blob in client.list_blobs(bucket_name):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            super_posting_locs[k].extend(v)
            
if not super_posting_locs:
    print("im empty")
else:
    count = 0
    for x,y in super_posting_locs.items():
        print(x , ":" , y)
        count += 1
        if count == 5:
            break

eun-bi : [('text_real_index_stemming/0_000.bin', 0)]
kenli : [('text_real_index_stemming/0_000.bin', 984)]
argentino : [('text_real_index_stemming/0_000.bin', 1926)]
vandamm : [('text_real_index_stemming/0_000.bin', 25200)]
ocsp : [('text_real_index_stemming/0_000.bin', 26322)]


In [27]:
# Create inverted index instance
inverted = InvertedIndex()
inverted.num_of_doc = parquetFile.count()
inverted.avg = avg
inverted.dict_docID_countWord = doc_lengths_dict
bucket_name = 'bucket_209470236'
# Adding the posting locations dictionary to the inverted index
inverted.posting_locs = super_posting_locs
# Add the token - df dictionary to the inverted index
inverted.df = w2df_dict
# write the global stats out
inverted.write_index('.', 'title_index_stemming',bucket_name)
# upload to gs
index_src = f'gs://{bucket_name}/title_index_stemming.pkl'
index_dst = f'gs://{bucket_name}/title_real_index_stemming/title_index_stemming.pkl'
!gsutil cp $index_src $index_dst


# inverted_title.write_index('.', 'title_index')

# # upload to gs
# index_src = "title_index.pkl"
# index_dst = f'gs://{bucket_name}/title_index/{index_src}'
# !gsutil cp $index_src $index_dst

                                                                                

Copying gs://bucket_20943188/text_index_stemming.pkl [Content-Type=application/octet-stream]...
/ [1 files][ 60.2 MiB/ 60.2 MiB]                                                
Operation completed over 1 objects/60.2 MiB.                                     


In [28]:
!gsutil ls -lh $index_dst

 60.24 MiB  2024-03-09T16:46:12Z  gs://bucket_20943188/text_real_index_stemming/text_index_stemming.pkl
TOTAL: 1 objects, 63168870 bytes (60.24 MiB)


# stemming inverted index for text


In [18]:
import math
import statistics
bucket_name = 'bucket_20943188'
stemmer = PorterStemmer()
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links", 
                    "may", "first", "see", "history", "people", "one", "two", 
                    "part", "thumb", "including", "second", "following", 
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

NUM_BUCKETS = 124
def token2bucket_id(token):
    return int(_hash(token),16) % NUM_BUCKETS

# PLACE YOUR CODE HERE
def tokenize_and_count(text):
    tokens = [stemmer.stem(token.group()) for token in RE_WORD.finditer(text.lower()) if token.group() not in all_stopwords]
    return len(tokens)

def word_count_stemming(text, id):
    
    tokens = [stemmer.stem(token.group()) for token in RE_WORD.finditer(text.lower()) if token.group() not in all_stopwords]
    map = {}
    for w in tokens:
        if w in all_stopwords:
            continue
        
        map[w] = map.get(w, 0) + 1
    return [(k,(id,v)) for k,v in map.items()]


def reduce_word_counts(unsorted_pl):
    sorted_pl = sorted(unsorted_pl, key=lambda x: x[0])
    return sorted_pl

def calculate_df(postings):
    df_rdd = postings.map(lambda x: (x[0], len(x[1])))
    return df_rdd

def partition_postings_and_write(postings):
    partitioned_postings = postings.groupBy(lambda x: token2bucket_id(x[0]))
    posting_locations = partitioned_postings.map(lambda x: InvertedIndex.write_a_posting_list(x, 'text_real_index_stemming' ,bucket_name))
    return posting_locations

In [13]:
word_counts_stemming = doc_text_pairs_body.flatMap(lambda x: word_count_stemming(x[0], x[1]))
# word_counts1.take(1)
postings_stemming = word_counts_stemming.groupByKey().mapValues(reduce_word_counts)
# print("ll")

In [16]:
word_counts_stemming.take(5)

                                                                                

[('foster', (4045403, 31)),
 ('air', (4045403, 51)),
 ('forc', (4045403, 27)),
 ('base', (4045403, 31)),
 ('1941', (4045403, 15))]

In [14]:
id_with_length = doc_text_pairs_body.map(lambda doc: (doc[1], tokenize_and_count(doc[0])))
doc_lengths_dict = id_with_length.collectAsMap()


                                                                                

In [15]:
total_sum = 0
for value in doc_lengths_dict.values():
    total_sum += value
# total = doc_lengths_dict.map(lambda x: x[1]).mean()
avg = total_sum/len(doc_lengths_dict)


In [16]:
# filtering postings and calculate df
postings_filtered = postings_stemming.filter(lambda x: len(x[1])>50)
w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()


24/03/09 16:08:24 WARN TaskSetManager: Lost task 4.0 in stage 7.0 (TID 562) (cluster-7ea3-w-2.us-central1-a.c.echzor3.internal executor 17): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_17692/78734506.py", line 45, in <lambda>
  File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1709979640550_0004/container_1709979640550_0004_01_000018/inverted_index_gcp.py", line 222, in write_a_posting_list
    with closing(MultiFileWriter(base_d

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 7.0 failed 4 times, most recent failure: Lost task 7.3 in stage 7.0 (TID 583) (cluster-7ea3-w-2.us-central1-a.c.echzor3.internal executor 17): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_17692/78734506.py", line 45, in <lambda>
  File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1709979640550_0004/container_1709979640550_0004_01_000018/inverted_index_gcp.py", line 222, in write_a_posting_list
    with closing(MultiFileWriter(base_dir, bucket_id, bucket_name)) as writer:
  File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1709979640550_0004/container_1709979640550_0004_01_000018/inverted_index_gcp.py", line 45, in __init__
    self._bucket = None if bucket_name is None else get_bucket(bucket_name)
  File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1709979640550_0004/container_1709979640550_0004_01_000018/inverted_index_gcp.py", line 22, in get_bucket
    return storage.Client(PROJECT_ID).bucket(bucket_name)
  File "/opt/conda/miniconda3/lib/python3.10/site-packages/google/cloud/storage/client.py", line 296, in bucket
    return Bucket(client=self, name=bucket_name, user_project=user_project)
  File "/opt/conda/miniconda3/lib/python3.10/site-packages/google/cloud/storage/bucket.py", line 667, in __init__
    name = _validate_name(name)
  File "/opt/conda/miniconda3/lib/python3.10/site-packages/google/cloud/storage/_helpers.py", line 91, in _validate_name
    if not all([name[0].isalnum(), name[-1].isalnum()]):
IndexError: string index out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:552)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:758)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:740)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:505)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2333)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1505)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2717)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2653)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2652)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2652)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1189)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1189)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1189)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2913)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2855)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2844)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:959)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2314)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2333)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2358)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_17692/78734506.py", line 45, in <lambda>
  File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1709979640550_0004/container_1709979640550_0004_01_000018/inverted_index_gcp.py", line 222, in write_a_posting_list
    with closing(MultiFileWriter(base_dir, bucket_id, bucket_name)) as writer:
  File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1709979640550_0004/container_1709979640550_0004_01_000018/inverted_index_gcp.py", line 45, in __init__
    self._bucket = None if bucket_name is None else get_bucket(bucket_name)
  File "/hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1709979640550_0004/container_1709979640550_0004_01_000018/inverted_index_gcp.py", line 22, in get_bucket
    return storage.Client(PROJECT_ID).bucket(bucket_name)
  File "/opt/conda/miniconda3/lib/python3.10/site-packages/google/cloud/storage/client.py", line 296, in bucket
    return Bucket(client=self, name=bucket_name, user_project=user_project)
  File "/opt/conda/miniconda3/lib/python3.10/site-packages/google/cloud/storage/bucket.py", line 667, in __init__
    name = _validate_name(name)
  File "/opt/conda/miniconda3/lib/python3.10/site-packages/google/cloud/storage/_helpers.py", line 91, in _validate_name
    if not all([name[0].isalnum(), name[-1].isalnum()]):
IndexError: string index out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:552)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:758)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:740)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:505)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2333)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1505)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


In [19]:
# partition posting lists and write out
_ = partition_postings_and_write(postings_filtered).collect()
# index_const_time = time() - t_start

                                                                                

In [20]:
# collect all posting lists locations into one super-set
super_posting_locs = defaultdict(list)
# for blob in client.list_blobs(bucket_name, prefix='postings_gcp'):
for blob in client.list_blobs(bucket_name):
# for blob in client.list_blobs(bucket_name, prefix='title_real_index'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            super_posting_locs[k].extend(v)
            
if not super_posting_locs:
    print("im empty")
else:
    count = 0
    for x,y in super_posting_locs.items():
        print(x , ":" , y)
        count += 1
        if count == 5:
            break

eun-bi : [('text_real_index_stemming/0_000.bin', 0)]
kenli : [('text_real_index_stemming/0_000.bin', 984)]
argentino : [('text_real_index_stemming/0_000.bin', 1926)]
vandamm : [('text_real_index_stemming/0_000.bin', 25200)]
ocsp : [('text_real_index_stemming/0_000.bin', 26322)]


In [27]:
# Create inverted index instance
inverted = InvertedIndex()
inverted.num_of_doc = parquetFile.count()
inverted.avg = avg
inverted.dict_docID_countWord = doc_lengths_dict
bucket_name = 'bucket_20943188'
# Adding the posting locations dictionary to the inverted index
inverted.posting_locs = super_posting_locs
# Add the token - df dictionary to the inverted index
inverted.df = w2df_dict
# write the global stats out
inverted.write_index('.', 'text_index_stemming',bucket_name)
# upload to gs
index_src = f'gs://{bucket_name}/text_index_stemming.pkl'
index_dst = f'gs://{bucket_name}/text_real_index_stemming/text_index_stemming.pkl'
!gsutil cp $index_src $index_dst


# inverted_title.write_index('.', 'title_index')

# # upload to gs
# index_src = "title_index.pkl"
# index_dst = f'gs://{bucket_name}/title_index/{index_src}'
# !gsutil cp $index_src $index_dst

                                                                                

Copying gs://bucket_20943188/text_index_stemming.pkl [Content-Type=application/octet-stream]...
/ [1 files][ 60.2 MiB/ 60.2 MiB]                                                
Operation completed over 1 objects/60.2 MiB.                                     


In [28]:
!gsutil ls -lh $index_dst

 60.24 MiB  2024-03-09T16:46:12Z  gs://bucket_20943188/text_real_index_stemming/text_index_stemming.pkl
TOTAL: 1 objects, 63168870 bytes (60.24 MiB)


# anchor inverted index


In [None]:
word_counts1 = doc_text_pairs_anchor.flatMap(lambda x: word_count(x[0], x[1]))
# word_counts1.take(1)
postings1 = word_counts1.groupByKey().mapValues(reduce_word_counts)
# print("ll")

In [None]:
id_with_length = doc_text_pairs_anchor.map(lambda doc: (doc[1], tokenize_and_count(doc[0])))
doc_lengths_dict = id_with_length.collectAsMap()


In [None]:
total_sum = 0
for value in doc_lengths_dict.values():
    total_sum += value
# total = doc_lengths_dict.map(lambda x: x[1]).mean()
avg = total_sum/len(doc_lengths_dict)


In [None]:
# filtering postings and calculate df
postings_filtered = postings1.filter(lambda x: len(x[1])>50)
w2df = calculate_df(postings_filtered)
w2df_dict = w2df.collectAsMap()
# partition posting lists and write out
_ = partition_postings_and_write(postings_filtered).collect()
# index_const_time = time() - t_start

In [None]:
# collect all posting lists locations into one super-set
super_posting_locs = defaultdict(list)
# for blob in client.list_blobs(bucket_name, prefix='postings_gcp'):
for blob in client.list_blobs(bucket_name):
# for blob in client.list_blobs(bucket_name, prefix='title_real_index'):
    if not blob.name.endswith("pickle"):
        continue
    with blob.open("rb") as f:
        posting_locs = pickle.load(f)
        for k, v in posting_locs.items():
            super_posting_locs[k].extend(v)
            
if not super_posting_locs:
    print("im empty")
else:
    count = 0
    for x,y in super_posting_locs.items():
        print(x , ":" , y)
        count += 1
        if count == 5:
            break

In [None]:
# Create inverted index instance
inverted = InvertedIndex()
inverted.num_of_doc = parquetFile1.count()
inverted.avg = avg
inverted.dict_docID_countWord = doc_lengths_dict

# Create inverted index instance
inverted_anchor = InvertedIndex()
# Adding the posting locations dictionary to the inverted index
inverted_anchor.posting_locs = super_posting_locs_anchor
inverted_anchor.df = w2df_anchor
# write the global stats out
inverted_anchor.write_index('.', 'anchor_index')

# upload to gs
index_src = "anchor_index.pkl"
index_dst = f'gs://{bucket_name}/anchor_index/{index_src}'
!gsutil cp $index_src $index_dst

# count words in doc Dictionary

In [None]:
parquetFile1 = spark.read.parquet(*paths)
doc_text_pairs1 = parquetFile1.select("text", "id").rdd
# doc_text_pairs1.take(5)
# if nothing prints here you forgot to upload the file inverted_index_gcp.py to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp.py
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())
from inverted_index_gcp import InvertedIndex
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links",
                    "may", "first", "see", "history", "people", "one", "two",
                    "part", "thumb", "including", "second", "following",
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

NUM_BUCKETS = 124
def token2bucket_id(token):
    return int(_hash(token),16) % NUM_BUCKETS

# PLACE YOUR CODE HERE
def tokenize_and_count(text):
    tokens = [token.group() for token in RE_WORD.finditer(text.lower()) if token.group() not in all_stopwords]
    return len(tokens)

id_with_length = doc_text_pairs1.map(lambda doc: (doc[1], tokenize_and_count(doc[0])))
doc_lengths_dict = id_with_length.collectAsMap()

In [None]:
# Specify the bucket name and file name in the bucket
bucket_name = 'bucket_20943188'
file_name = 'dict_docID_countWords.pkl'

# Serialize the defaultdict object to a pickle file
with open(file_name, 'wb') as f:
    pickle.dump(doc_lengths_dict, f)

# Upload the pickle file to Google Cloud Storage
client = storage.Client()
bucket = client.bucket(bucket_name)
blob = bucket.blob(file_name)
blob.upload_from_filename(file_name)

# title to doc Dictionary

In [None]:
id_title_dict = parquetFile.select("id","title").rdd.collectAsMap()
with open("id_title_dict.pkl", 'wb') as f:
    pickle.dump(id_title_dict, f)

In [None]:
index_src = "id_title_dict.pkl"
index_dst = f'gs://{bucket_name}/{index_src}'

In [None]:
!gsutil cp $index_src $index_dst

# Page Rank Dictionary

In [None]:
t_start = time()
pages_links = spark.read.parquet("gs://wikidata_preprocessed/*").select("id", "anchor_text").rdd
# construct the graph 
edges, vertices = generate_graph(pages_links)
# compute PageRank
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)
pr = pr_results.vertices.select("id", "pagerank")
pr = pr.sort(col('pagerank').desc())
pr.repartition(1).write.csv(f'gs://{bucket_name}/pr', compression="gzip")
pr_time = time() - t_start
pr.show()

In [None]:
#after unzip the file that in the bucket
bucket_name = 'bucket_318820123'
pr_URL = 'pr_part-00000-2c654e78-7660-4927-9c12-36d456f9ac1f-c000.csv'
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob_pr = bucket.blob(pr_URL)
contents_pr = blob_pr.download_as_bytes()
csv_buffer = io.BytesIO(contents_pr)
df = pd.read_csv(csv_buffer, header=None)
first_column = df.iloc[:, 0]
second_column = df.iloc[:, 1]
pr_dict = dict(zip(first_column, second_column))
max_pr_value = max(pr_dict.values())
norm_pr = {doc_id: view/max_pr_value for doc_id, view in pr_dict.items()}
# pr_pkl_url = 'page_rank/'
path =  'PageRank/page_rank_dict.pkl'
bucket = None if bucket_name is None else get_bucket(bucket_name)
with _open(path, 'wb', bucket) as f:
      pickle.dump(norm_pr,f)

# Page views dict

In [None]:
# Paths
# Using user page views (as opposed to spiders and automated traffic) for the
# month of August 2021
pv_path = 'https://dumps.wikimedia.org/other/pageview_complete/monthly/2021/2021-08/pageviews-202108-user.bz2'
p = Path(pv_path)
pv_name = p.name
pv_temp = f'{p.stem}-4dedup.txt'
pv_clean = f'{p.stem}.pkl'
# Download the file (2.3GB)
!wget -N $pv_path
# Filter for English pages, and keep just two fields: article ID (3) and monthly
# total number of page views (5). Then, remove lines with article id or page
# view values that are not a sequence of digits.
!bzcat $pv_name | grep "^en\.wikipedia" | cut -d' ' -f3,5 | grep -P "^\d+\s\d+$" > $pv_temp
# Create a Counter (dictionary) that sums up the pages views for the same
# article, resulting in a mapping from article id to total page views.
wid2pv = Counter()
with open(pv_temp, 'rt') as f:
    for line in f:
        parts = line.split(' ')
        wid2pv.update({int(parts[0]): int(parts[1])})
# write out the counter as binary file (pickle it)
with open(pv_clean, 'wb') as f:
    pickle.dump(wid2pv, f)
# read in the counter
with open(pv_clean, 'rb') as f:
    wid2pv = pickle.loads(f.read())