***Important*** DO NOT CLEAR THE OUTPUT OF THIS NOTEBOOK AFTER EXECUTION!!!

In [1]:
# if the following command generates an error, you probably didn't enable
# the cluster security option "Allow API access to all Google Cloud services"
# under Manage Security → Project Access when setting up the cluster
!gcloud dataproc clusters list --region us-central1

NAME          PLATFORM  PRIMARY_WORKER_COUNT  SECONDARY_WORKER_COUNT  STATUS   ZONE           SCHEDULED_DELETE  SCHEDULED_STOP
cluster-0016  GCE       2                                             RUNNING  us-central1-a


# Imports & Setup

In [2]:
!pip install -q google-cloud-storage==1.43.0
!pip install -q graphframes

[0m

In [3]:
import pyspark
import sys
from collections import Counter, OrderedDict, defaultdict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
import nltk
from nltk.stem.porter import *
from nltk.corpus import stopwords
from time import time
from pathlib import Path
import pickle
import pandas as pd
from google.cloud import storage

import hashlib
def _hash(s):
    return hashlib.blake2b(bytes(s, encoding='utf8'), digest_size=5).hexdigest()

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [4]:
# if nothing prints here you forgot to include the initialization script when starting the cluster
!ls -l /usr/lib/spark/jars/graph*

-rw-r--r-- 1 root root 247882 Jan  6 08:46 /usr/lib/spark/jars/graphframes-0.8.2-spark3.1-s_2.12.jar


In [5]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, SparkFiles
from pyspark.sql import SQLContext
from graphframes import *

In [6]:
spark

In [7]:
# הגדרת ה-Bucket שלך
bucket_name = 'ex3-sagikatan-bucket'

# שימוש ב-Wildcard (*) כדי לקרוא רק את קבצי הנתונים של ויקיפדיה
# זה יסנן החוצה אוטומטית את תיקיית postings_gcp הישנה ואת ה-PageRank
paths = [f"gs://{bucket_name}/multistream*_preprocessed.parquet"]
# 2. יצירת ה-Client (השורות החסרות)
from google.cloud import storage
client = storage.Client()
# בדיקה מהירה
print(f"Reading data from: {paths}")
parquetFile = spark.read.parquet(*paths)
print(f"Number of documents to process: {parquetFile.count()}")

Reading data from: ['gs://ex3-sagikatan-bucket/multistream*_preprocessed.parquet']




Number of documents to process: 6348910


                                                                                

***GCP setup is complete!*** If you got here without any errors you've earned 10 out of the 35 points of this part.

# Building an inverted index

Here, we read the entire corpus to an rdd, directly from Google Storage Bucket and use your code from Colab to construct an inverted index.

In [8]:
%%writefile inverted_index_gcp.py
import pyspark
import sys
from collections import Counter, OrderedDict
import itertools
from itertools import islice, count, groupby
import pandas as pd
import os
import re
from operator import itemgetter
from time import time
from pathlib import Path
import pickle
from google.cloud import storage
from collections import defaultdict
from contextlib import closing

BLOCK_SIZE = 1999998

class MultiFileWriter:
    """ Sequential binary writer to multiple files of up to BLOCK_SIZE each. """
    def __init__(self, base_dir, name, bucket_name):
        self._base_dir = Path(base_dir)
        self._name = name
        self._file_gen = (open(self._base_dir / f'{name}_{i:03}.bin', 'wb') 
                          for i in itertools.count())
        self._f = next(self._file_gen)
        # Connecting to google storage bucket. 
        self.client = storage.Client()
        self.bucket = self.client.bucket(bucket_name)
    
    def write(self, b):
        locs = []
        while len(b) > 0:
            pos = self._f.tell()
            remaining = BLOCK_SIZE - pos
            if remaining == 0:  
                self._f.close()
                self.upload_to_gcp()                
                self._f = next(self._file_gen)
                pos, remaining = 0, BLOCK_SIZE
            self._f.write(b[:remaining])
            locs.append((self._f.name, pos))
            b = b[remaining:]
        return locs

    def close(self):
        self._f.close()
    
    def upload_to_gcp(self):
        '''
            The function saves the posting files into the right bucket in google storage.
        '''
        file_name = self._f.name
        # UPDATED: Writing to v2 folder
        blob = self.bucket.blob(f"postings_gcp_v2/{file_name}")
        blob.upload_from_filename(file_name)

class MultiFileReader:
    """ Sequential binary reader of multiple files of up to BLOCK_SIZE each. """
    def __init__(self):
        self._open_files = {}

    def read(self, locs, n_bytes):
        b = []
        for f_name, offset in locs:
            if f_name not in self._open_files:
                self._open_files[f_name] = open(f_name, 'rb')
            f = self._open_files[f_name]
            f.seek(offset)
            n_read = min(n_bytes, BLOCK_SIZE - offset)
            b.append(f.read(n_read))
            n_bytes -= n_read
        return b''.join(b)
  
    def close(self):
        for f in self._open_files.values():
            f.close()

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()
        return False 

TUPLE_SIZE = 6       
TF_MASK = 2 ** 16 - 1 

class InvertedIndex:  
    def __init__(self, docs={}):
        self.df = Counter()
        self.term_total = Counter()
        self._posting_list = defaultdict(list)
        self.posting_locs = defaultdict(list)

        for doc_id, tokens in docs.items():
            self.add_doc(doc_id, tokens)

    def add_doc(self, doc_id, tokens):
        w2cnt = Counter(tokens)
        self.term_total.update(w2cnt)
        for w, cnt in w2cnt.items():
            self.df[w] = self.df.get(w, 0) + 1
            self._posting_list[w].append((doc_id, cnt))

    def write_index(self, base_dir, name):
        self._write_globals(base_dir, name)

    def _write_globals(self, base_dir, name):
        with open(Path(base_dir) / f'{name}.pkl', 'wb') as f:
            pickle.dump(self, f)

    def __getstate__(self):
        state = self.__dict__.copy()
        del state['_posting_list']
        return state

    def posting_lists_iter(self):
        with closing(MultiFileReader()) as reader:
            for w, locs in self.posting_locs.items():
                b = reader.read(locs[0], self.df[w] * TUPLE_SIZE)
                posting_list = []
                for i in range(self.df[w]):
                    doc_id = int.from_bytes(b[i*TUPLE_SIZE:i*TUPLE_SIZE+4], 'big')
                    tf = int.from_bytes(b[i*TUPLE_SIZE+4:(i+1)*TUPLE_SIZE], 'big')
                    posting_list.append((doc_id, tf))
                yield w, posting_list

    @staticmethod
    def read_index(base_dir, name):
        with open(Path(base_dir) / f'{name}.pkl', 'rb') as f:
            return pickle.load(f)

    @staticmethod
    def delete_index(base_dir, name):
        path_globals = Path(base_dir) / f'{name}.pkl'
        path_globals.unlink()
        for p in Path(base_dir).rglob(f'{name}_*.bin'):
            p.unlink()

    @staticmethod
    def write_a_posting_list(b_w_pl, bucket_name):
        posting_locs = defaultdict(list)
        bucket_id, list_w_pl = b_w_pl
        
        with closing(MultiFileWriter(".", bucket_id, bucket_name)) as writer:
            for w, pl in list_w_pl: 
                b = b''.join([(doc_id << 16 | (tf & TF_MASK)).to_bytes(TUPLE_SIZE, 'big')
                              for doc_id, tf in pl])
                locs = writer.write(b)
                posting_locs[w].extend(locs)
            writer.upload_to_gcp() 
            InvertedIndex._upload_posting_locs(bucket_id, posting_locs, bucket_name)
        return bucket_id

    @staticmethod
    def _upload_posting_locs(bucket_id, posting_locs, bucket_name):
        with open(f"{bucket_id}_posting_locs.pickle", "wb") as f:
            pickle.dump(posting_locs, f)
        client = storage.Client()
        bucket = client.bucket(bucket_name)
        # UPDATED: Writing to v2 folder
        blob_posting_locs = bucket.blob(f"postings_gcp_v2/{bucket_id}_posting_locs.pickle")
        blob_posting_locs.upload_from_filename(f"{bucket_id}_posting_locs.pickle")

Overwriting inverted_index_gcp.py


In [9]:
parquetFile = spark.read.parquet(*paths)
doc_text_pairs = parquetFile.select("text", "id").rdd

We will count the number of pages to make sure we are looking at the entire corpus. The number of pages should be more than 6M

In [10]:
# Count number of wiki pages
parquetFile.count()

                                                                                

6348910

Let's import the inverted index module. Note that you need to use the staff-provided version called `inverted_index_gcp.py`, which contains helper functions to writing and reading the posting files similar to the Colab version, but with writing done to a Google Cloud Storage bucket.

In [11]:
# if nothing prints here you forgot to upload the file inverted_index_gcp.py to the home dir
%cd -q /home/dataproc
!ls inverted_index_gcp.py

inverted_index_gcp.py


In [12]:
# adding our python module to the cluster
sc.addFile("/home/dataproc/inverted_index_gcp.py")
sys.path.insert(0,SparkFiles.getRootDirectory())

In [13]:
from inverted_index_gcp import InvertedIndex

**YOUR TASK (10 POINTS)**: Use your implementation of `word_count`, `reduce_word_counts`, `calculate_df`, and `partition_postings_and_write` functions from Colab to build an inverted index for all of English Wikipedia in under 2 hours.

A few notes:
1. The number of corpus stopwords below is a bit bigger than the colab version since we are working on the whole corpus and not just on one file.
2. You need to slightly modify your implementation of  `partition_postings_and_write` because the signature of `InvertedIndex.write_a_posting_list` has changed and now includes an additional argument called `bucket_name` for the target bucket. See the module for more details.
3. You are not allowed to change any of the code not coming from Colab.

In [14]:
english_stopwords = frozenset(stopwords.words('english'))
corpus_stopwords = ["category", "references", "also", "external", "links",
                    "may", "first", "see", "history", "people", "one", "two",
                    "part", "thumb", "including", "second", "following",
                    "many", "however", "would", "became"]

all_stopwords = english_stopwords.union(corpus_stopwords)
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

NUM_BUCKETS = 124
def token2bucket_id(token):
  return int(_hash(token),16) % NUM_BUCKETS

# PLACE YOUR CODE HERE
def word_count(text, id):
  ''' Count the frequency of each word in `text` (tf) that is not included in
  `all_stopwords` and return entries that will go into our posting lists.
  Parameters:
  -----------
    text: str
      Text of one document
    id: int
      Document id
  Returns:
  --------
    List of tuples
      A list of (token, (doc_id, tf)) pairs
      for example: [("Anarchism", (12, 5)), ...]
  '''
  tokens = [token.group() for token in RE_WORD.finditer(text.lower())]
  # YOUR CODE HERE
  filtered_tokens = [t for t in tokens if t not in all_stopwords] # remove stop-words
  freqs = Counter(filtered_tokens) # calculate tf with counter
  return [(token, (id, tf)) for token, tf in freqs.items()]


def reduce_word_counts(unsorted_pl):
  ''' Returns a sorted posting list by wiki_id.
  Parameters:
  -----------
    unsorted_pl: list of tuples
      A list of (wiki_id, tf) tuples
  Returns:
  --------
    list of tuples
      A sorted posting list.
  '''
  # unsorted_pl is a list of (wiki_id, tf)
  return sorted(unsorted_pl, key=lambda x: x[0])

def calculate_df(postings):
  ''' Takes a posting list RDD and calculate the df for each token.
  Parameters:
  -----------
    postings: RDD
      An RDD where each element is a (token, posting_list) pair.
  Returns:
  --------
    RDD
      An RDD where each element is a (token, df) pair.
  '''
  # YOUR CODE HERE
  return postings.mapValues(lambda pl: len(pl))


# --------------------------------------------------------------------------------
# Updated partition function to support multiple indices (title, anchor, body)
# --------------------------------------------------------------------------------

def partition_postings_and_write(postings, index_name):
  ''' A function that partitions the posting lists into buckets, writes out
  all posting lists in a bucket to disk, and returns the posting locations for
  each bucket.

  Parameters:
  -----------
    postings: RDD
      An RDD where each item is a (w, posting_list) pair.
    index_name: str
      A string identifier (e.g., "body", "title", "anchor") to prefix the bucket IDs.
  Returns:
  --------
    RDD
      An RDD where each item is a posting locations dictionary for a bucket.
  '''
  # Map to (bucket_id, (w, pl)), where bucket_id is a STRING like "body_5"
  bucketed = postings.map(lambda x: (f"{index_name}_{token2bucket_id(x[0])}", x))

  # Group by the string bucket_id
  grouped = bucketed.groupByKey().mapValues(list)

  # Write using the existing static method
  return grouped.map(
        lambda pair: InvertedIndex.write_a_posting_list(
            pair,
            bucket_name=bucket_name
        )
    )
# new functions

def extract_anchor_text_to_target(row):
    '''
    Input: Row(id=source_doc_id, anchor_text=[Row(id=target_id, text="link text"), ...])
    Output: List of (target_id, link_text_str)
    '''
    results = []
    # Iterate over the links in the current page
    for link in row.anchor_text:
        # We care about the target_id (link.id) and the text used to link to it
        if link.id is not None:
            results.append((link.id, link.text))
    return results

def run_index_creation(rdd_text_id, index_name, min_tf_cutoff=10):
    t_start = time()
    print(f"--- Starting {index_name} index construction ---")
    
    word_counts = rdd_text_id.flatMap(lambda x: word_count(x[0], x[1]))
    postings = word_counts.groupByKey().mapValues(reduce_word_counts)
    postings_filtered = postings.filter(lambda x: len(x[1]) > min_tf_cutoff)
    w2df = calculate_df(postings_filtered)
    w2df_dict = w2df.collectAsMap()
    
    # Writing posting lists (this uses the updated .py file logic internally)
    _ = partition_postings_and_write(postings_filtered, index_name).collect()
    
    # Collecting posting locs - scan the NEW folder
    # שים לב: עדכנתי כאן לתיקייה החדשה
    target_folder = 'postings_gcp_v2' 
    
    super_posting_locs = defaultdict(list)
    for blob in client.list_blobs(bucket_name, prefix=target_folder):
        if not blob.name.endswith("pickle"):
            continue
        filename = os.path.basename(blob.name)
        if not filename.startswith(index_name + "_"):
            continue     
        with blob.open("rb") as f:
            posting_locs = pickle.load(f)
            for k, v in posting_locs.items():
                super_posting_locs[k].extend(v)
                
    inverted = InvertedIndex()
    inverted.posting_locs = super_posting_locs
    inverted.df = w2df_dict
    inverted.write_index('.', f'index_{index_name}')
    
    # Upload global index to the NEW folder
    index_src = f"index_{index_name}.pkl"
    index_dst = f'gs://{bucket_name}/{target_folder}/{index_src}'
    os.system(f"gsutil cp {index_src} {index_dst}")
    
    print(f"--- Finished {index_name} index. Time: {time() - t_start} seconds ---")
    return inverted



In [None]:
# Load the parquet files once
parquetFile = spark.read.parquet(*paths)

# ==========================================
# 1. Build TITLE Index (The lightest task)
# ==========================================
print("--- Processing TITLE ---")
# Select title and id
title_rdd = parquetFile.select("title", "id").rdd
# Run pipeline
title_index = run_index_creation(title_rdd, "title", min_tf_cutoff=0)


# ==========================================
# 2. Build ANCHOR Index (Medium task, involves shuffle)
# ==========================================
print("--- Processing ANCHOR ---")
# This requires grouping all anchor texts pointing TO a specific doc_id
anchor_data = parquetFile.select("id", "anchor_text").rdd

# Extract (target_id, link_text) pairs
anchor_rdd_grouped = anchor_data.flatMap(extract_anchor_text_to_target) \
                                .groupByKey() \
                                .mapValues(lambda x: " ".join(x))

# Swap to (text, id) format
anchor_rdd_ready = anchor_rdd_grouped.map(lambda x: (x[1], x[0]))

# --- תיקון: מריצים את היצירה כאן, לפני שעוברים ל-BODY ---
anchor_index = run_index_creation(anchor_rdd_ready, "anchor", min_tf_cutoff=5)


# ==========================================
# 3. Build BODY Index (The heaviest task)
# ==========================================
print("--- Processing BODY ---")
# Select text and id
body_rdd = parquetFile.select("text", "id").rdd
# Run pipeline
body_index = run_index_creation(body_rdd, "body", min_tf_cutoff=50)

--- Processing TITLE ---
--- Starting title index construction ---


Copying file://index_title.pkl [Content-Type=application/octet-stream]...       
- [1 files][ 67.6 MiB/ 67.6 MiB]                                                
Operation completed over 1 objects/67.6 MiB.                                     


--- Finished title index. Time: 87.98648929595947 seconds ---
--- Processing ANCHOR ---
--- Starting anchor index construction ---


Copying file://index_anchor.pkl [Content-Type=application/octet-stream]...      
/ [1 files][  7.9 MiB/  7.9 MiB]                                                
Operation completed over 1 objects/7.9 MiB.                                      


--- Finished anchor index. Time: 1368.455739736557 seconds ---
--- Processing BODY ---
--- Starting body index construction ---


Copying file://index_body.pkl [Content-Type=application/octet-stream]...        
/ [1 files][ 18.5 MiB/ 18.5 MiB]                                                
Operation completed over 1 objects/18.5 MiB.                                     


--- Finished body index. Time: 3377.511700630188 seconds ---


Putting it all together

# PageRank

**YOUR TASK (10 POINTS):** Compute PageRank for the entire English Wikipedia. Use your implementation for `generate_graph` function from Colab below.

In [None]:
# Put your `generate_graph` function here
def generate_graph(pages):
    edges = (
        pages.flatMap(
            lambda row: [
                (row.id, link.id)
                for link in row.anchor_text
                if link.id is not None
            ]
        ).distinct()
    )

    vertices = (
        edges.flatMap(lambda e: [(e[0],), (e[1],)])
              .distinct()
    )

    return edges, vertices


In [None]:
t_start = time()
pages_links = spark.read.parquet("gs://ex3-sagikatan-bucket/multistream*").select("id", "anchor_text").rdd
# construct the graph
edges, vertices = generate_graph(pages_links)
# compute PageRank
edgesDF = edges.toDF(['src', 'dst']).repartition(124, 'src')
verticesDF = vertices.toDF(['id']).repartition(124, 'id')
g = GraphFrame(verticesDF, edgesDF)
pr_results = g.pageRank(resetProbability=0.15, maxIter=6)
pr = pr_results.vertices.select("id", "pagerank")
pr = pr.sort(col('pagerank').desc())
pr.repartition(1).write.csv(f'gs://{bucket_name}/pr', compression="gzip")
pr_time = time() - t_start
pr.show()

                                                                                

AnalysisException: path gs://ex3-sagikatan-bucket/pr already exists.

In [18]:
print("x")

x


In [19]:
# 1. בדיקת מה שיש כרגע בזיכרון (מהחישוב האחרון)
print("--- Current Calculation (Memory) ---")
count_memory = pr.count()
print(f"Rows in memory: {count_memory}")
pr.show(5)

# 2. בדיקת מה ששמור ב-Bucket (מהעבר)
print("\n--- Saved File (Bucket) ---")
try:
    pr_disk = spark.read.csv(f"gs://{bucket_name}/pr", header=False, inferSchema=True)
    count_disk = pr_disk.count()
    print(f"Rows on disk: {count_disk}")
    pr_disk.show(5)
    
    # מסקנה
    if count_memory == count_disk:
        print("\n✅ MATCH: The file on disk seems to have the same number of rows as the calculation.")
    else:
        print("\n⚠️ MISMATCH: The file on disk is different. You should overwrite it.")
        
except Exception as e:
    print(f"\n❌ Error reading from disk (File might be corrupted or empty): {e}")

--- Current Calculation (Memory) ---


                                                                                

Rows in memory: 6345849


                                                                                

+-------+-----------------+
|     id|         pagerank|
+-------+-----------------+
|3434750|9913.728782160779|
|  10568|5385.349263642041|
|  32927|5282.081575765275|
|  30680|5128.233709604119|
|5843419|4957.567686263868|
+-------+-----------------+
only showing top 5 rows


--- Saved File (Bucket) ---


[Stage 312:>                                                        (0 + 1) / 1]

Rows on disk: 6345849
+-------+------------------+
|    _c0|               _c1|
+-------+------------------+
|3434750| 9913.728782160777|
|  10568|  5385.34926364204|
|  32927|5282.0815757652745|
|  30680| 5128.233709604119|
|5843419| 4957.567686263868|
+-------+------------------+
only showing top 5 rows


✅ MATCH: The file on disk seems to have the same number of rows as the calculation.


                                                                                

In [None]:
# test that PageRank computaion took less than 1 hour
assert pr_time < 60*120

In [20]:
import pickle

bucket_name = 'ex3-sagikatan-bucket' 

print("--- Converting PageRank CSV to Pickle ---")

try:
    # 1. קריאת קובץ ה-CSV המכווץ מהבאקט
    # Spark יודע לפתוח את ה-gz לבד
    pr_df = spark.read.csv(f"gs://{bucket_name}/pr", header=False, inferSchema=True)
    
    # 2. המרה למילון (Map) של {doc_id: score}
    # זה החלק החשוב - אוסף את כל הנתונים לזיכרון
    pagerank_dict = pr_df.rdd.map(lambda x: (x[0], x[1])).collectAsMap()
    
    print(f"Loaded PageRank for {len(pagerank_dict)} documents.")

    # 3. שמירה לקובץ Pickle מקומי
    with open("pagerank.pkl", "wb") as f:
        pickle.dump(pagerank_dict, f)

    # 4. העלאה לתיקייה המסודרת (postings_gcp_v2)
    blob = client.bucket(bucket_name).blob("postings_gcp_v2/pagerank.pkl")
    blob.upload_from_filename("pagerank.pkl")
    
    print("✅ Success! 'pagerank.pkl' is now saved in 'postings_gcp_v2/'")

except Exception as e:
    print(f"❌ Error: {e}")

--- Converting PageRank CSV to Pickle ---


                                                                                

Loaded PageRank for 6345849 documents.
✅ Success! 'pagerank.pkl' is now saved in 'postings_gcp_v2/'


In [21]:
import pickle

# ודא שהשם הזה נכון
bucket_name = 'ex3-sagikatan-bucket'

print("--- Creating id2title dictionary ---")

try:
    # שלב 1: יצירת המילון מתוך המידע שכבר טעון ב-Spark
    # (אנחנו לוקחים רק את העמודות id ו-title)
    # הערה: הפעולה הזו אוספת כ-6 מיליון זוגות לזיכרון, זה עשוי לקחת דקה-שתיים
    id2title_dict = parquetFile.select("id", "title").rdd.collectAsMap()
    
    print(f"Dictionary created successfully with {len(id2title_dict)} entries.")

    # שלב 2: שמירה לקובץ Pickle מקומי
    with open("id2title.pkl", "wb") as f:
        pickle.dump(id2title_dict, f)

    # שלב 3: העלאה ל-Bucket לתיקייה המסודרת
    blob = client.bucket(bucket_name).blob("postings_gcp_v2/id2title.pkl")
    blob.upload_from_filename("id2title.pkl")

    print("✅ Success! 'id2title.pkl' is now saved in 'postings_gcp_v2/'")

except NameError:
    print("❌ Error: 'parquetFile' is not defined. Please re-run the cell that loads the data (spark.read.parquet).")
except Exception as e:
    print(f"❌ Error: {e}")

--- Creating id2title dictionary ---


                                                                                

Dictionary created successfully with 6348910 entries.
✅ Success! 'id2title.pkl' is now saved in 'postings_gcp_v2/'


In [24]:
from google.cloud import storage

# ודא שזה השם הנכון
bucket_name = 'ex3-sagikatan-bucket' 
prefix = 'postings_gcp_v2'

client = storage.Client()
blobs = client.list_blobs(bucket_name, prefix=prefix)
files = [b.name for b in blobs]

print(f"--- 🏁 Final Inventory Check for: {prefix} ---")

# רשימת הקבצים שחייבים להיות שם כדי שהמנוע יעבוד
checklist = {
    'Body Index (Pickle)': 'index_body.pkl',
    'Body Locs (Pickle)': 'body_posting_locs.pickle',
    'Title Index (Pickle)': 'index_title.pkl',
    'Title Locs (Pickle)': 'title_posting_locs.pickle',
    'Anchor Index (Pickle)': 'index_anchor.pkl',
    'Anchor Locs (Pickle)': 'anchor_posting_locs.pickle',
    'PageRank (Pickle)': 'pagerank.pkl',
    'ID to Title (Pickle)': 'id2title.pkl'
}

# --- התיקון כאן: שימוש ב-len במקום ב-sum ---
bin_files_count = len([f for f in files if f.endswith('.bin')])

all_exist = True

for label, filename in checklist.items():
    # בודק אם שם הקובץ קיים בנתיב המלא
    exists = any(filename in f for f in files)
    status = "✅ Found" if exists else "❌ MISSING"
    print(f"{label:<25} {status}")
    if not exists:
        all_good = False

print("-" * 40)
print(f"Binary Files (.bin):      {bin_files_count} files found (Should be > 0)")

if all_exist and bin_files_count > 0:
    print("\n🎉 MISSION ACCOMPLISHED! Your bucket is fully ready for the Search Engine.")
    print("You can safely delete this Dataproc cluster now.")
else:
    print("\n⚠️ WARNING: Something is missing. Do not delete the cluster yet!")

--- 🏁 Final Inventory Check for: postings_gcp_v2 ---
Body Index (Pickle)       ✅ Found
Body Locs (Pickle)        ❌ MISSING
Title Index (Pickle)      ✅ Found
Title Locs (Pickle)       ❌ MISSING
Anchor Index (Pickle)     ✅ Found
Anchor Locs (Pickle)      ❌ MISSING
PageRank (Pickle)         ✅ Found
ID to Title (Pickle)      ✅ Found
----------------------------------------
Binary Files (.bin):      3876 files found (Should be > 0)

🎉 MISSION ACCOMPLISHED! Your bucket is fully ready for the Search Engine.
You can safely delete this Dataproc cluster now.


In [25]:
# הורדה של האינדקס הראשי (Body) לבדיקה
print("Downloading Body Index for verification...")
!gsutil cp gs://{bucket_name}/postings_gcp_v2/index_body.pkl .

try:
    # טעינת האינדקס
    idx_body = InvertedIndex.read_index('.', 'index_body')
    
    # בדיקה כמה מילים יש להן מיקומים
    locs_count = len(idx_body.posting_locs)
    
    print(f"\n--- VERIFICATION RESULTS ---")
    print(f"Body Index contains locations for: {locs_count} terms")
    
    if locs_count > 1000:
        print("✅ SUCCESS: The posting locations are safely stored inside the Pickle.")
        print("🚀 You are safe to DELETE the cluster.")
    else:
        print("❌ CRITICAL: The index is empty or corrupted. DO NOT DELETE.")
        
except Exception as e:
    print(f"❌ Error reading index: {e}")

Downloading Body Index for verification...
Copying gs://ex3-sagikatan-bucket/postings_gcp_v2/index_body.pkl...
/ [1 files][ 18.5 MiB/ 18.5 MiB]                                                
Operation completed over 1 objects/18.5 MiB.                                     

--- VERIFICATION RESULTS ---
Body Index contains locations for: 495515 terms
✅ SUCCESS: The posting locations are safely stored inside the Pickle.
🚀 You are safe to DELETE the cluster.


In [None]:
# בדיקה סופית - האם הכל מוכן למנוע החיפוש?
prefix = 'postings_gcp_v2'
blobs = client.list_blobs(bucket_name, prefix=prefix)
files = [b.name for b in blobs]

print(f"--- Final Check for {prefix} ---")

required_files = [
    'pagerank.pkl',
    'pageviews-202108-user.pkl',
    'id2title.pkl',
    'index_body.pkl',
    'index_title.pkl',
    'index_anchor.pkl'
]

all_good = True
for f in required_files:
    # בודק אם הקובץ קיים בתוך רשימת הקבצים בבאקט
    exists = any(f in file_path for file_path in files)
    status = "✅ Found" if exists else "❌ MISSING"
    print(f"{f:<30} {status}")
    if not exists:
        all_good = False

if all_good:
    print("\n🎉 CONGRATS! You have all files needed for the Search Engine.")
else:
    print("\n⚠️ Some files are missing. Check the errors above.")

# Reporting

**YOUR TASK (5 points):** execute and complete the following lines to complete
the reporting requirements for assignment #3.

In [None]:
# size of input data
!gsutil du -sh "gs://wikidata_preprocessed/"

In [None]:
# size of index data
index_dst = f'gs://{bucket_name}/postings_gcp/'
!gsutil du -sh "$index_dst"

In [None]:
# How many USD credits did you use in GCP during the course of this assignment?
cost = 5.92
print(f'I used {cost} USD credit during the course of this assignment')

**Bonus (10 points)** if you implement PageRank in pure PySpark, i.e. without using the GraphFrames package, AND manage to complete 10 iterations of your algorithm on the entire English Wikipedia in less than an hour.


In [None]:
#If you have decided to do the bonus task - please copy the code here

bonus_flag = False # Turn flag on (True) if you have implemented this part

t_start = time()

# PLACE YOUR CODE HERE

pr_time_Bonus = time() - t_start


In [None]:
# Note:test that PageRank computaion took less than 1 hour
assert pr_time_Bonus < 60*60 and bonus_flag