In [2]:
# ===== FULL METADATA BUILD (NO index rebuild, NO PageRank recompute) =====

import os
import pickle
import re
import builtins
from google.cloud import storage

# Bucket where you already have: postings_gcp/ and pr/
BUCKET_NAME = "207400714-task3"

# New folder to store metadata (as requested: no version subfolder)
META_PREFIX = "metadata"   # will create: gs://BUCKET/metadata/*.pkl

# Local temp output (Dataproc/GCP notebook VM)
LOCAL_OUT_DIR = "/content/full_metadata_out"
os.makedirs(LOCAL_OUT_DIR, exist_ok=True)

client = storage.Client()
print("Config OK")


Config OK


In [3]:
# Read full corpus parquet (same idea as your PageRank code uses multistream*)
# NOTE: This assumes your corpus lives under gs://BUCKET/multistream*
corpus_df = spark.read.parquet(f"gs://{BUCKET_NAME}/multistream*")

# Make sure these columns exist in your corpus parquet:
# id, title, text
corpus_df.select("id", "title", "text").printSchema()

docs_rdd = corpus_df.select("id", "title", "text").rdd
print("Loaded corpus rows (approx):", corpus_df.count())


                                                                                

root
 |-- id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)





Loaded corpus rows (approx): 6348910


                                                                                

In [4]:
# Tokenization MUST match the indexing pipeline (Assignment 3 / your GCP notebook)

import nltk
from nltk.corpus import stopwords

# If stopwords are not available in the environment:
nltk.download("stopwords")

english_stopwords = frozenset(stopwords.words("english"))
corpus_stopwords = [
    "category", "references", "also", "external", "links",
    "may", "first", "see", "history", "people", "one", "two",
    "part", "thumb", "including", "second", "following",
    "many", "however", "would", "became"
]

all_stopwords = english_stopwords.union(corpus_stopwords)

# EXACT regex used in your GCP notebook:
RE_WORD = re.compile(r"""[\#\@\w](['\-]?\w){2,24}""", re.UNICODE)

def clean_tokens_for_dl(text: str):
    """Return tokens exactly like indexing: lowercase + RE_WORD + stopword removal."""
    if not text:
        return []
    tokens = []
    for m in RE_WORD.finditer(text.lower()):
        tok = m.group()
        if tok in all_stopwords:
            continue
        tokens.append(tok)
    return tokens

print("Tokenizer ready")


Tokenizer ready


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [5]:
def build_meta(rows_iter):
    """
    For each document:
      - doc_id = int(id)
      - title = string (fallback to "")
      - dl = number of tokens in text after SAME tokenization as indexing
    """
    out = []
    for r in rows_iter:
        doc_id = int(r["id"])
        title  = r["title"] if r["title"] is not None else ""
        text   = r["text"] if r["text"] is not None else ""
        dl = len(clean_tokens_for_dl(text))
        out.append((doc_id, title, dl))
    return iter(out)

meta = docs_rdd.mapPartitions(build_meta).collect()

titles = {doc_id: title for doc_id, title, dl in meta}
doc_len = {doc_id: dl    for doc_id, title, dl in meta}

with open(os.path.join(LOCAL_OUT_DIR, "titles.pkl"), "wb") as f:
    pickle.dump(titles, f, protocol=pickle.HIGHEST_PROTOCOL)

with open(os.path.join(LOCAL_OUT_DIR, "doc_len.pkl"), "wb") as f:
    pickle.dump(doc_len, f, protocol=pickle.HIGHEST_PROTOCOL)

print("Saved titles.pkl and doc_len.pkl")
print("N =", len(doc_len))


                                                                                

Saved titles.pkl and doc_len.pkl
N = 6348910


In [6]:
N = len(doc_len)
total_tokens = builtins.sum(doc_len.values())
avgdl = (total_tokens / N) if N else 0.0

corpus_stats = {
    "N": int(N),
    "total_tokens": int(total_tokens),
    "avgdl": float(avgdl),
    "note": "computed from doc_len.pkl using the SAME tokenization as indexing"
}

with open(os.path.join(LOCAL_OUT_DIR, "corpus_stats.pkl"), "wb") as f:
    pickle.dump(corpus_stats, f, protocol=pickle.HIGHEST_PROTOCOL)

print("Saved corpus_stats.pkl:", corpus_stats)


Saved corpus_stats.pkl: {'N': 6348910, 'total_tokens': 2028354650, 'avgdl': 319.4807691398996, 'note': 'computed from doc_len.pkl using the SAME tokenization as indexing'}


In [7]:
from pyspark.sql import functions as F

# Your bucket already has pr/ (from the full PageRank you computed)
pr_raw = spark.read.csv(f"gs://{BUCKET_NAME}/pr", header=True, inferSchema=True)

# Robust handling: if header is wrong/missing, fallback to header=False
cols = set(pr_raw.columns)
if not {"id", "pagerank"}.issubset(cols):
    print("PageRank CSV columns not as expected. Falling back to header=False...")
    pr_raw = spark.read.csv(f"gs://{BUCKET_NAME}/pr", header=False, inferSchema=True)
    # assume: _c0=id, _c1=pagerank
    pr_df = pr_raw.select(
        F.col("_c0").cast("int").alias("id"),
        F.col("_c1").cast("double").alias("pagerank")
    )
else:
    pr_df = pr_raw.select(
        F.col("id").cast("int").alias("id"),
        F.col("pagerank").cast("double").alias("pagerank")
    )

pr_pairs = pr_df.rdd.map(lambda r: (int(r["id"]), float(r["pagerank"]))).collect()
pagerank = dict(pr_pairs)

with open(os.path.join(LOCAL_OUT_DIR, "pagerank.pkl"), "wb") as f:
    pickle.dump(pagerank, f, protocol=pickle.HIGHEST_PROTOCOL)

print("Saved pagerank.pkl entries:", len(pagerank))


                                                                                

PageRank CSV columns not as expected. Falling back to header=False...


                                                                                

Saved pagerank.pkl entries: 6345849


In [8]:
# Upload to the bucket into: gs://BUCKET/metadata/
!gsutil -m cp "{LOCAL_OUT_DIR}/titles.pkl"       "gs://{BUCKET_NAME}/{META_PREFIX}/titles.pkl"
!gsutil -m cp "{LOCAL_OUT_DIR}/doc_len.pkl"      "gs://{BUCKET_NAME}/{META_PREFIX}/doc_len.pkl"
!gsutil -m cp "{LOCAL_OUT_DIR}/corpus_stats.pkl" "gs://{BUCKET_NAME}/{META_PREFIX}/corpus_stats.pkl"
!gsutil -m cp "{LOCAL_OUT_DIR}/pagerank.pkl"     "gs://{BUCKET_NAME}/{META_PREFIX}/pagerank.pkl"

print("✅ Uploaded metadata to:", f"gs://{BUCKET_NAME}/{META_PREFIX}/")


Copying file:///content/full_metadata_out/titles.pkl [Content-Type=application/octet-stream]...
==> NOTE: You are uploading one or more large file(s), which would run          
significantly faster if you enable parallel composite uploads. This
feature can be enabled by editing the
"parallel_composite_upload_threshold" value in your .boto
configuration file. However, note that if you do this large files will
be uploaded as `composite objects
<https://cloud.google.com/storage/docs/composite-objects>`_,which
means that any user who downloads such objects will need to have a
compiled crcmod installed (see "gsutil help crcmod"). This is because
without a compiled crcmod, computing checksums on composite objects is
so slow that gsutil disables downloads of composite objects.

| [1/1 files][168.9 MiB/168.9 MiB] 100% Done                                    
Operation completed over 1 objects/168.9 MiB.                                    
Copying file:///content/full_metadata_out/doc_len.pkl [

In [10]:
!gsutil ls -lh gs://207400714-task3/metadata/


     137 B  2026-01-06T22:12:49Z  gs://207400714-task3/metadata/corpus_stats.pkl
  44.3 MiB  2026-01-06T22:12:47Z  gs://207400714-task3/metadata/doc_len.pkl
 84.69 MiB  2026-01-06T22:12:52Z  gs://207400714-task3/metadata/pagerank.pkl
168.88 MiB  2026-01-06T22:12:44Z  gs://207400714-task3/metadata/titles.pkl
TOTAL: 4 objects, 312329495 bytes (297.86 MiB)
