In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.sql.functions import udf, col, lit, explode, collect_list
from pyspark.sql.types import *
import hashlib
import sys
import re
import os
import csv
import itertools
import string
import random

In [None]:
!unzip -q assignment_data.zip -d assignment_data

%cd assignment_data

!find . -type f \( -name "_SUCCESS" -o -name "*.crc" -o -not -name "*.csv" \) -delete

print("\nFinal files:")
!find . -type f -name "*.csv"

/home/dg7/assignment_data

Zawartość po oczyszczeniu:
./assignment_gold_sample.csv/part-00000-092b60af-3a90-468c-9e7b-07a5cfb40802-c000.csv
./assignment_questions.csv/part-00012-d081a743-261e-4834-b802-e980be0f64b1-c000.csv
./assignment_questions.csv/part-00009-d081a743-261e-4834-b802-e980be0f64b1-c000.csv
./assignment_questions.csv/part-00001-d081a743-261e-4834-b802-e980be0f64b1-c000.csv
./assignment_questions.csv/part-00011-d081a743-261e-4834-b802-e980be0f64b1-c000.csv
./assignment_questions.csv/part-00000-d081a743-261e-4834-b802-e980be0f64b1-c000.csv
./assignment_questions.csv/part-00008-d081a743-261e-4834-b802-e980be0f64b1-c000.csv
./assignment_questions.csv/part-00003-d081a743-261e-4834-b802-e980be0f64b1-c000.csv
./assignment_questions.csv/part-00010-d081a743-261e-4834-b802-e980be0f64b1-c000.csv
./assignment_questions.csv/part-00004-d081a743-261e-4834-b802-e980be0f64b1-c000.csv
./assignment_questions.csv/part-00005-d081a743-261e-4834-b802-e980be0f64b1-c000.csv
./assignment_questio

In [None]:
!hdfs dfs -mkdir -p /data/assignment_questions
!hdfs dfs -mkdir -p /data/gold_sample

!hdfs dfs -put ./assignment_questions.csv/*.csv /data/assignment_questions/
!hdfs dfs -put ./assignment_gold_sample.csv/*.csv /data/gold_sample/

In [None]:
print("Questions files at HDFS:")
!hdfs dfs -ls /data/assignment_questions

print("\nGold sample at HDFS:")
!hdfs dfs -ls /data/gold_sample

Questions files at HDFS:
Found 13 items
-rw-r--r--   3 dg7 supergroup    2995447 2025-04-21 22:46 /data/assignment_questions/part-00000-d081a743-261e-4834-b802-e980be0f64b1-c000.csv
-rw-r--r--   3 dg7 supergroup    3021293 2025-04-21 22:46 /data/assignment_questions/part-00001-d081a743-261e-4834-b802-e980be0f64b1-c000.csv
-rw-r--r--   3 dg7 supergroup    3033133 2025-04-21 22:46 /data/assignment_questions/part-00002-d081a743-261e-4834-b802-e980be0f64b1-c000.csv
-rw-r--r--   3 dg7 supergroup    2998842 2025-04-21 22:46 /data/assignment_questions/part-00003-d081a743-261e-4834-b802-e980be0f64b1-c000.csv
-rw-r--r--   3 dg7 supergroup    3013925 2025-04-21 22:46 /data/assignment_questions/part-00004-d081a743-261e-4834-b802-e980be0f64b1-c000.csv
-rw-r--r--   3 dg7 supergroup    3009866 2025-04-21 22:46 /data/assignment_questions/part-00005-d081a743-261e-4834-b802-e980be0f64b1-c000.csv
-rw-r--r--   3 dg7 supergroup    2994744 2025-04-21 22:46 /data/assignment_questions/part-00006-d081a743-261

25/05/13 16:41:22 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources


In [None]:
spark = SparkSession.builder \
                    .master("spark://master:7077") \
                    .config("spark.executor.memory", "2g") \
                    .config("spark.driver.memory", "1g") \
                    .appName("LSH") \
                    .getOrCreate()

print(f'The PySpark {spark.version} version is running...')

The PySpark 3.5.5 version is running...


The cluster consists of one master node and two worker nodes, as well as a driver. Each of these is an e2-medium virtual machine, equipped with 2 vCPUs and 4 GB of memory.

In [None]:
questions_schema = StructType([
    StructField("id", IntegerType(), nullable=False),
    StructField("question", StringType(), nullable=False)
])

gold_schema = StructType([
    StructField("qid1", IntegerType(), nullable=False),
    StructField("qid2", IntegerType(), nullable=False),
    StructField("is_duplicate", IntegerType(), nullable=False)
])


In [None]:
def load_partitioned_data(base_path):
    return spark.read \
        .schema(questions_schema if "questions" in base_path else gold_schema) \
        .csv(f"{base_path}/*.csv", header=True)

In [None]:
questions_path = "hdfs://master:9000/data/assignment_questions"
gold_sample_path = "hdfs://master:9000/data/gold_sample"

In [None]:
questions_df = load_partitioned_data(questions_path)
gold_df = load_partitioned_data(gold_sample_path)

print("\n=== Statistics ===")
print(f"Number of questions: {questions_df.count()}")
print(f"Number of gold pairs: {gold_df.count()}")

print("\n=== Sample questions ===")
questions_df.show(5, truncate=50)

print("\n=== Sample gold pairs ===")
gold_df.show(5)


=== Statistics ===




Number of questions: 537933




Number of gold pairs: 404288

=== Sample questions ===
+----+--------------------------------------------------+
|  id|                                          question|
+----+--------------------------------------------------+
| 221|Where can I watch gonulcelen with english subti...|
|1080|Is it necessary to unlock bootloader before roo...|
|1729|             How can constipation cause dizziness?|
|2454|How should I get motivated to hit the gym every...|
|3047|   How long will the Pokémon GO "fever" will last?|
+----+--------------------------------------------------+
only showing top 5 rows


=== Sample gold pairs ===
+----+----+------------+
|qid1|qid2|is_duplicate|
+----+----+------------+
|   1|   2|           0|
|   3|   4|           0|
|   5|   6|           0|
|   7|   8|           0|
|   9|  10|           0|
+----+----+------------+
only showing top 5 rows



In [None]:
def validate_data(df, columns):
    for column in columns:
        null_count = df.filter(col(column).isNull()).count()
        print(f"Missing values in {column}: {null_count}")

print("\n=== Questions validation ===")
validate_data(questions_df, ["id", "question"])

print("\n=== Gold sample validation ===")
validate_data(gold_df, ["qid1", "qid2", "is_duplicate"])


=== Questions validation ===




Missing values in id: 2




Missing values in question: 1

=== Gold sample validation ===
Missing values in qid1: 0
Missing values in qid2: 0
Missing values in is_duplicate: 0


We set `MAX_MOD = sys.maxsize` to ensure that all our hash values fit within a 64-bit signed integer (`LongType` in Spark). This keeps the shingle hashes and MinHash signatures in a consistent, efficient numeric range without risking overflow or loss of precision.

In [None]:
ALLOWED_CHARS = "abcdefghijklmnopqrstuvwxyz0123456789 .,!?\"'"
CHAR_TO_INT = {c: i+1 for i, c in enumerate(ALLOWED_CHARS)}
BASE = len(ALLOWED_CHARS) + 1  # 39 + 1 = 40
MAX_MOD = sys.maxsize  # 9,223,372,036,854,775,807 dla Int64

def preprocess(text: str) -> str:
    return re.sub(r"[^{}]".format(re.escape(ALLOWED_CHARS)), "", text.lower()).strip()


@udf(ArrayType(LongType()))
def shingle_hash(text: str, k: int = 3) -> list:
    text_clean = preprocess(text)
    n = len(text_clean)
    shingles = []

    if n < k:
        return []

    powers = [BASE ** i for i in range(k-1, -1, -1)]

    for i in range(n - k + 1):
        h = sum(CHAR_TO_INT.get(c,0)*powers[j] for j,c in enumerate(text_clean[i:i+k])) % MAX_MOD
        shingles.append(h)

    return list(set(shingles))

I tested both SHA-256-based hashing and simpler linear hash functions to compare their speed and accuracy in our MinHash/LSH pipeline.

With **k=3, sig_size=50, bands=5**:
- **SHA-256** took **50 min** (TP=9608, FP=7212)  
- **Linear hash** took **14 min** (TP=10115, 	FP=7418)  

Because of the speedup and similar TP rate we stick with the linear-hash method.

In [None]:
def make_hash_functions(num_hashes: int, mod: int = MAX_MOD):
    """
    h_i(x) = SHA256(i || x) % mod.
    """
    funcs = []
    for i in range(num_hashes):
        def _fn(x, i=i, _mod=mod):
            h = hashlib.sha256(f"minhash_{i}_{x}".encode()).hexdigest()
            return int(h, 16) % _mod
        funcs.append(_fn)
    return funcs

def make_simple_hash_functions(num_hashes: int, mod: int = MAX_MOD):
    """
    h_i(x) = shingle * random_number_1 + random_number_2.
    """
    funcs = []
    for i in range(num_hashes):
        random_number_1 = random.randint(1, mod)
        random_number_2 = random.randint(0, mod)

        def _fn(shingle, _random_number_1=random_number_1, _random_number_2=random_number_2, _mod=mod):
            if not shingle:
                return 0
            return (shingle * _random_number_1 + _random_number_2) % _mod

        funcs.append(_fn)

    return funcs



def make_minhash_signature_udf(hash_funcs):
    @udf(ArrayType(LongType()))
    def minhash_signature(shingles: list) -> list:
        if not shingles:
            return [0] * len(hash_funcs)
        return [min(fn(s) for s in shingles) for fn in hash_funcs]
    return minhash_signature

def make_lsh_bands_udf(band_funcs):
    """
    band_funcs: function list: [h_0, h_1, ..., h_{B-1}],
    h_i(x) = x*a_i + b_i % mod
    """
    @udf(ArrayType(StringType()))
    def lsh_bands(signature: list) -> list:
        if not signature:
            return []
        B = len(band_funcs)
        r = len(signature) // B
        out = []
        for i in range(B):
            chunk = signature[i*r:(i+1)*r]
            # we sum the chunk to one number
            x = sum(chunk)
            h = band_funcs[i](x)
            out.append(f"{i}:{h}")
        return out
    return lsh_bands

Our 2 workers × 2 vCPUs give 4 cores total. Using 8 partitions ensures each core handles two tasks, preventing slow stragglers.  

In [None]:
num_partitions = 8
questions = spark.read.csv("hdfs://master:9000/data/assignment_questions", schema=questions_schema)
questions = questions.cache()

gold = spark.read.csv("hdfs://master:9000/data/gold_sample", schema=gold_schema)

questions=questions.filter(col("id").isNotNull()).repartition(num_partitions)
gold=gold.filter(col("qid1").isNotNull() & col("qid2").isNotNull()).cache()

PARAM_COMBINATIONS = [
    (4, 60, 6),
    (3, 70, 7),
    (5, 50, 10),
    (4, 80, 8),
    (5, 100, 10),
    (9, 60, 6),
    (5, 30, 10),
    (3, 50, 5)
]

25/05/13 17:37:43 WARN CacheManager: Asked to cache already cached data.
25/05/13 17:37:43 WARN CacheManager: Asked to cache already cached data.


In [None]:
def run_experiment(params):
    k, signature_size, num_bands = params
    try:
        hash_funcs = make_simple_hash_functions(signature_size, MAX_MOD)
        minhash_udf = make_minhash_signature_udf(hash_funcs)

        band_funcs      = make_simple_hash_functions(num_bands, MAX_MOD)
        lsh_bands_udf   = make_lsh_bands_udf(band_funcs)

        df = (
            questions
            .withColumn("shingles", shingle_hash(col("question"), lit(k)))
            .withColumn("signature", minhash_udf(col("shingles")))
            .withColumn("band", explode(lsh_bands_udf(col("signature"))))
        )

        buckets = df.select("id", "band").groupBy("band").agg(collect_list("id").alias("ids"))
        pairs = buckets.rdd.flatMap(lambda row: itertools.combinations(sorted(row.ids), 2)).distinct()

        # Evaluate TP/FP
        gold_pairs = gold.rdd.map(lambda x: ((x.qid1, x.qid2), x.is_duplicate))
        results = pairs.map(lambda p: (p, 1)).leftOuterJoin(gold_pairs)
        tp = results.filter(lambda x: x[1][1] == 1).count()
        fp = results.filter(lambda x: x[1][1] == 0).count()
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0

        print(f"Results: k={k}, sig_size={signature_size}, bands={num_bands}")

        return {
            "k": k,
            "signature_size": signature_size,
            "num_bands": num_bands,
            "tp": tp,
            "fp": fp,
            "precision": precision
        }
    except Exception as e:
        print(f"Error for params {params}: {str(e)}")
        return None

In [None]:
CSV      = "lsh_results.csv"
HDFS_DIR = "hdfs://master:9000/results/lsh_experiments"

# Initialize local CSV file with header if it doesn't exist yet
fieldnames = ['k','signature_size','num_bands','tp','fp','precision']
if not os.path.exists(CSV):
    with open(CSV, 'w', newline='') as f:
        csv.writer(f).writerow(fieldnames)

# Load already processed parameter combinations from the CSV
seen = set()
with open(CSV, newline='') as f:
    rd = csv.DictReader(f)
    for r in rd:
        seen.add((int(r['k']), int(r['signature_size']), int(r['num_bands'])))

# Initialize empty Parquet file on HDFS for appending experiment results
schema = "k int, signature_size int, num_bands int, tp int, fp int, precision double"
spark.createDataFrame([], schema).write.mode("overwrite").parquet(HDFS_DIR)

# Run loop
for k, signature_size, num_bands in PARAM_COMBINATIONS:
    if (k, signature_size, num_bands) in seen:
        print(f"Skipping params {(k, signature_size, num_bands)} – already computed")
        continue

    print(f"Starting experiment for params: k={k}, signature_size={signature_size}, num_bands={num_bands}")
    res = run_experiment((k, signature_size, num_bands))
    if not isinstance(res, dict):
        print(f"Unexpected result for {(k, signature_size, num_bands)}, skipping")
        continue

    # Format the result as a row for CSV and Parquet
    row = [res['k'], res['signature_size'], res['num_bands'], res['tp'], res['fp'], res['precision']]

    # write to CSV
    with open(CSV, 'a', newline='') as f:
        csv.writer(f).writerow(row)

    # write to HDFS
    df = spark.createDataFrame([tuple(row)], fieldnames)
    df.write.mode("append").parquet(HDFS_DIR)

    print(f"Finished: {(k, signature_size, num_bands)} → tp={res['tp']}, fp={res['fp']}, precision={res['precision']}\n")

Skipping params (4, 60, 6) – already computed
Skipping params (3, 70, 7) – already computed
Skipping params (5, 50, 10) – already computed
Skipping params (4, 80, 8) – already computed
Skipping params (5, 100, 10) – already computed
Skipping params (9, 60, 6) – already computed
Skipping params (5, 30, 10) – already computed
Starting experiment for params: k=3, signature_size=50, num_bands=5




Results: k=3, sig_size=50, bands=5
Finished: (3, 50, 5) → tp=10115, fp=7418, precision=0.5769121085952205



In [None]:
import pandas as pd
from IPython.display import display

df = pd.read_csv("lsh_results.csv")
df = df.round(3)
df = df.sort_values(by=["precision"], ascending=False)
display(df.style.hide(axis="index"))

k,signature_size,num_bands,tp,fp,precision
3,50,5,10115,7418,0.577
3,70,7,11842,9103,0.565
4,60,6,7844,6718,0.539
4,80,8,8887,7867,0.53
5,50,10,21509,19879,0.52
5,30,10,43093,40344,0.516
5,100,10,7380,7351,0.501
9,60,6,2751,3172,0.464


In [None]:
spark.stop()