In [None]:
import re
import numpy as np
import tiktoken
from bs4 import BeautifulSoup
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import ArrayType, IntegerType, StringType, StructType, StructField
from io import BytesIO
import boto3

In [None]:
# Initialize Spark
try:
    if spark is not None:
        print("Spark already defined")
except NameError:
    print("Starting Spark")
    spark = SparkSession.builder.appName("MyJob").getOrCreate()
    sc = spark.sparkContext

In [None]:
# Constants
N = 256
MAX_VAL = (2**32) - 1
PRIME = 4294967311
PRIME_NP = np.uint64(PRIME)
MIN_TEXT_LENGTH = 300

# Fixed random seeds for reproducibility
np.random.seed(42)
A_COEFFS = np.random.randint(0, MAX_VAL + 1, size=N, dtype=np.uint64)
B_COEFFS = np.random.randint(0, MAX_VAL + 1, size=N, dtype=np.uint64)

# Binary file writing function
HEADERS_INFO = {
    "gpt-2": {
        "magic": 20240520,
        "version": 1,
        "token_dtype": np.uint16,
    },
    "llama-3": {
        "magic": 20240801,
        "version": 7,
        "token_dtype": np.uint32,
    },
}

In [None]:
def minhash_numpy(s):
    """Calculate MinHash signature for a set."""
    signature = np.full(N, np.iinfo(np.uint64).max, dtype=np.uint64)

    for val in s:
        val_int = hash(val) % PRIME if not isinstance(val, (int, np.integer)) else val
        val_np = np.uint64(val_int)
        current_hashes = (A_COEFFS * val_np + B_COEFFS) % PRIME_NP
        signature = np.minimum(signature, current_hashes)

    return signature.tolist()


def bucket_range(hash_arr, start, end):
    """Extract bucket key from hash array."""
    if hash_arr is None or len(hash_arr) <= start:
        return None
    actual_end = min(end, len(hash_arr))
    return "-".join(str(x) for x in hash_arr[start:actual_end])


def html_to_text(html):
    """Extract text from HTML, removing scripts and styles."""
    if not html:
        return None

    if isinstance(html, (bytearray, bytes)):
        html = html.decode('utf-8', errors='ignore')

    soup = BeautifulSoup(html, 'lxml')
    for script in soup(["script", "style"]):
        script.extract()
    text = soup.get_text()
    lines = (line.strip() for line in text.splitlines())
    chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
    text = '\n'.join(chunk for chunk in chunks if chunk)
    return text if text else None


def html_to_text_simple(html_content):
    """Convert HTML to plain text for UDF."""
    if not html_content:
        return ""

    if isinstance(html_content, (bytearray, bytes)):
        html_content = html_content.decode('utf-8', errors='ignore')

    try:
        soup = BeautifulSoup(html_content, 'html.parser')
        return soup.get_text(strip=True)
    except Exception:
        return str(html_content) if html_content else ""


def tokenize(text):
    """Tokenize text using GPT-2 tokenizer."""
    enc = tiktoken.get_encoding("gpt2")
    eot = enc._special_tokens['<|endoftext|>']

    if not text:
        return [eot]

    try:
        tokens = [eot]
        tokens.extend(enc.encode_ordinary(text))
        tokens_np = np.array(tokens)
        assert (0 <= tokens_np).all() and (tokens_np < 2**16).all()
        return tokens_np.astype(int).tolist()
    except Exception as e:
        print(f"Error tokenizing: {e}")
        return [eot]

In [None]:
def write_partition_to_s3(partition_id, rows, s3_bucket, s3_prefix, model_desc="gpt-2"):
    """Write tokens from partition to binary buffer and upload to S3."""
    tokens = []
    for row in rows:
        if row.tokens:
            tokens.extend(row.tokens)
    
    if not tokens:
        return iter([])
    
    # Build binary in memory
    info = HEADERS_INFO[model_desc]
    
    # Construct header
    header = np.zeros(256, dtype=np.int32)
    header[0] = info["magic"]
    header[1] = info["version"]
    header[2] = len(tokens)
    
    # Construct token array
    toks_np = np.array(tokens, dtype=info["token_dtype"])
    
    # Write to in-memory buffer
    buffer = BytesIO()
    buffer.write(header.tobytes())
    buffer.write(toks_np.tobytes())
    buffer.seek(0)
    
    # Upload to S3
    s3_key = f"{s3_prefix}/tokens_{partition_id:06d}.bin"
    s3_client = boto3.client('s3')
    s3_client.upload_fileobj(buffer, s3_bucket, s3_key)
    
    num_bytes = (256 * 4) + (len(tokens) * toks_np.itemsize)
    print(f"Partition {partition_id}: uploaded {len(tokens):,} tokens ({num_bytes:,} bytes) to s3://{s3_bucket}/{s3_key}")
    
    return iter([])

In [None]:
# Register UDFs
html_to_text_udf = F.udf(html_to_text_simple, StringType())
tokenize_udf = F.udf(tokenize, ArrayType(IntegerType()))
bucket_range_udf = F.udf(bucket_range, StringType())

In [None]:
# TODO: select your dataset.
dataset = "1k" # "10k" "100k" "1m" "5m"

if dataset == "1k":
    input_path = "s3://10605-f25-hw5-subset-1000/*.parquet"

elif dataset == "10k":
    input_path = "s3://10605-f25-hw5-subset-10000/*.parquet"
    
else:
    input_path = None # TODO: Put your s3 bucket here!!


# TODO: Fill this in
output_path = None # TODO: Put your s3 bucket output path here!!


model_desc="gpt-2",
tokens_per_file=10**8

In [None]:
# Read data
print(f"Reading data from {input_path}")
df = spark.read.parquet(input_path)

print(f"Initial row count: {df.count()}")


In [None]:
# Shuffle
print("Shuffling data...")
df = df.orderBy(F.rand())

# Extract and tokenize text
print("Extracting text from HTML...")
schema = StructType([
    StructField("id", StringType(), True),
    StructField("data", ArrayType(IntegerType()), True)
])

text_rdd = (df.rdd
            .map(lambda x: (x[0], html_to_text(x[1])))
            .filter(lambda x: x[1] is not None))

token_rdd = text_rdd.map(lambda x: (x[0], tokenize(x[1])))
df_tokens = token_rdd.toDF(schema=schema)

In [None]:
# Filter and create sets
print("Filtering and creating token sets...")
df_split = (df_tokens
            .filter(F.size("data") >= MIN_TEXT_LENGTH))

test_rdd = df_split.rdd.map(lambda x: (x[0], set(x[1])))

In [None]:
# Calculate MinHash
print("Calculating MinHash signatures...")
minhash_rdd = test_rdd.map(lambda x: (x[0], minhash_numpy(x[1])))
minhash_df = minhash_rdd.toDF(["item_id", "hash_sequence"]).cache()

# Create buckets for LSH
print("Creating LSH buckets...")
bucket_ranges = [(i, i + 32) for i in range(0, 256, 32)]
ranges_df = spark.createDataFrame(bucket_ranges, ["bucket_start", "bucket_end"])

# Find duplicates using LSH
print("Finding duplicates...")
results_df = (minhash_df
                .crossJoin(F.broadcast(ranges_df))
                .withColumn("bucket_key", 
                            bucket_range_udf(F.col("hash_sequence"), 
                                            F.col("bucket_start"), 
                                            F.col("bucket_end")))
                .filter(F.col("bucket_key").isNotNull())
                .groupBy("bucket_key")
                .agg(F.collect_list("item_id").alias("items_in_bucket"))
                .filter(F.size("items_in_bucket") > 1))

In [None]:
# Deduplicate buckets
# TODO: Using the clustered items, deduplicate the original dataframe


result_df = df_deduped.select("id", "data")

In [None]:
if dataset == "1k" or dataset == "10k":
    # WARNING!!! DO NOT RUN COALESCE ON YOUR FULL DATASET!!!
    # YOU WILL CRASH YOUR CLUSTER AND NEED TO REPEAT THE ENTIRE PROCESS
    result_df.select("id", "data").coalesce(1).write.mode("overwrite").parquet(output_path)
    
else:
    result_df.select("data").write.mode("overwrite").parquet(output_path)