# Setup spark

In [1]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] ="--conf spark.driver.memory=2g  pyspark-shell"

from pyspark.sql import SparkSession
from pyspark.sql.functions import flatten
from pyspark.sql.types import (StructType, StructField, StringType, 
                                FloatType, DateType, IntegerType, ArrayType)
spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("BDA assignment") \
        .getOrCreate()

# Imports

In [2]:
from typing import NamedTuple, Final, List
#from lxml import etree
import xml.etree.ElementTree as ET
from itertools import islice, chain, combinations
import argparse
import traceback
import bleach
import html
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.util import pr
import string
import random
import hashlib
import pandas as pd
import numpy as np
import hashlib
import json
import sys

# Constants

In [3]:
SHINGLE_SIZE: Final = 5
SAMPLES: Final = 1000

In [4]:
class comment_tuple(NamedTuple):
    id: int
    #owner_id: int
    post_type: int
    score: int
    text: str

class shingle_set(NamedTuple):
    id: int
    shingles: frozenset[tuple]

class similarity(NamedTuple):
    id_set1: int
    id_set2: int
    similarity: float

# Read and clean XML

In [5]:
def set_schema():
    """
    Define the schema for the DataFrame
    """
    schema_list = []
    schema_list.append(StructField("Id", IntegerType(), True))
    #schema_list.append(StructField("PostTypeId", IntegerType(), True))
    #schema_list.append(StructField("Score", IntegerType(), True))
    schema_list.append(StructField("Body", StringType(), True))
    
    return StructType(schema_list)

def parse_post(rdd):
    results = []
    root = ET.fromstring(rdd[0])

    for elem in root.findall('row'):
        rec = []
        #print("Found row")
        assert elem.text is None, "The row wasn't empty"
        rec.append(int(elem.attrib["Id"]))
        #int(elem.attrib["OwnerUserId"]),
        #rec.append(int(elem.attrib["PostTypeId"])),
        #rec.append(int(elem.attrib["Score"])),
        rec.append(bleach.clean(elem.attrib["Body"], strip=True))
        #rec.append(elem.attrib["Body"])

        #elem.clear()
        #while elem.getprevious() is not None:
        #    del elem.getparent()[0]
        results.append(rec)

    return results

In [6]:
filename = "cstheory_posts.bakxml"
chunksize = 1024

file_rdd = spark.read.text(filename, wholetext=True).rdd
dataset = file_rdd.flatMap(parse_post)

In [7]:
dataset.persist()
schema = set_schema()
df_ds = dataset.toDF(schema)
df_ds.show()

#df_posts = records_rdd.toDF(schema)
#coll = records_rdd.collect()

+---+--------------------+
| Id|                Body|
+---+--------------------+
|  2|I have a dataset ...|
|  3|A particular prog...|
|  4|What is the follo...|
|  5|Can the divide an...|
|  6|Is anyone aware o...|
|  7|In general, the q...|
|  8|If I understand t...|
|  9|<a href="http://w...|
| 10|There was recentl...|
| 11|Functional progra...|
| 12|I took a class on...|
| 13|It's possible tha...|
| 14|Other than going ...|
| 15|[This question ha...|
| 16|See the <a href="...|
| 17|it is often said ...|
| 18|In one word: No.
...|
| 19|In short, I would...|
| 20|It's easy to prov...|
| 21|Any &quot;cutting...|
+---+--------------------+
only showing top 20 rows



# Shingling

In [8]:
class shingler:
    """
    Class that contain a tokenizer and stopwords to make shingling easier.
    """
    def __init__(self):
        self.stopwords = set(stopwords.words('english'))

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        return True

    def tokenize(self, text: str) -> List[str]:
        text = text.translate(str.maketrans('', '', string.punctuation))
        # text_nop = text.split()
        text_nop = word_tokenize(text)
        filtered_words = []

        for word in text_nop:
            if word not in self.stopwords:
                filtered_words.append(word.lower())
        
        return filtered_words

    #def create_shingle(self, input_comment: comment_tuple, shingle_size: int) -> frozenset[tuple]:
    #    tokens = self.tokenize(input_comment.text)
    #    comment_length = len(tokens)
    #    shingles =  frozenset(tuple(tokens[i:(i + shingle_size)]) for i in range(comment_length - shingle_size + 1))
    
    def create_shingle(self, post: str, shingle_size: int) -> list[list]:
        tokens = self.tokenize(post)
        comment_length = len(tokens)
        shingle_set = frozenset(tuple(tokens[i:(i + shingle_size - 1)]) for i in range(comment_length - shingle_size))
        shingle_list = list(shingle_set)
        #for elem in shingle_list:
        #    elem.sort()
        shingle_list.sort()
        return list(shingle_set)
        

def shingle_map(row):
    ds_shingler = shingler()
    return (row[0], ds_shingler.create_shingle(row[1], SHINGLE_SIZE)
)

def set_shingle_schema():
    """
    Define the schema for the DataFrame
    """
    schema_list = []
    schema_list.append(StructField("Id", IntegerType(), True))
    #schema_list.append(StructField("PostTypeId", IntegerType(), True))
    #schema_list.append(StructField("Score", IntegerType(), True))
    schema_list.append(StructField("Shingles", ArrayType(ArrayType(StringType()), True)))
    return StructType(schema_list)

In [9]:
schema = set_shingle_schema()
shingle_rdd = dataset.map(shingle_map)

for elem in shingle_rdd.take(1):
    print(elem)
#df_shingle = shingle_rdd.toDF(schema)
#df_shingle.show()

PicklingError: args[0] from __newobj__ args has the wrong class

# MinHashing

In [None]:
# Transform posts to characteristic matrix
# Make feature set matrix
# Minhash
# Make Minhash Matrix
# LSH
import random
SIGNATURE_SIZE: Final = 50
HASH_PRIME: Final = (1 << 31) - 1
MAX_HASH: Final = (1 << 32)
    #sys.maxsize
HASH_RANGE: Final = (1<< 32)
    #sys.maxsize
SEED: Final = 193120

generator = np.random.RandomState(SEED)
salts = [generator.randint(1, MAX_HASH) for _ in range(SIGNATURE_SIZE)]
#permutations = [generator.randint(1, HASH_PRIME) for _ in range(SIGNATURE_SIZE)]

permutations = [[generator.randint(1, MAX_HASH) for _ in range(SIGNATURE_SIZE)],
                [generator.randint(0, MAX_HASH) for _ in range(SIGNATURE_SIZE)]]

print(salts)
print(permutations)

def hash_func(data, p1, p2):
    
    return int.from_bytes(hashlib.md5(int.to_bytes(salt, 8, byteorder="big") + json.dumps(data).encode()).digest()[:8], byteorder="big")
    
def min_hasher(row):
    sig = np.full((SIGNATURE_SIZE), MAX_HASH)
    for shingle in row[1]:
        for i in range(SIGNATURE_SIZE):
            a = permutations[0][i]
            b = permutations[1][i]
            hash_val = (a * hash(shingle) + b) % HASH_PRIME
            #hash_func(tuple(shingle), salts[i]) % HASH_PRIME
            sig[i] = min(hash_val, sig[i])
    return (row[0], sig)

In [None]:
a = permutations[0][1]
b = permutations[1][1]

hash_val = (a * hash(tuple(["text", "word", "shingle", "advanced"]))+ b) % HASH_PRIME
print(hash_val)

In [None]:
hash_rdd = shingle_rdd.map(min_hasher)

for elem in hash_rdd.take(3):
    print(elem)

# LSH

In [None]:
BANDS: Final = 10
ROWS: Final = 5
THRESHOLD: Final = (1/BANDS) ** (1/ROWS)
print(f"Bands: {BANDS}, rows {ROWS}, threshold {THRESHOLD}")

# returns (doc, band, hash)
hash_band_rdd = hash_rdd.flatMap(lambda x: [[(x[0], i % BANDS), hash] for i, hash in enumerate(x[1])])

for elem in hash_band_rdd.take(5):
    print(elem)

In [None]:
bands_rdd = hash_band_rdd.map(lambda x: [(x[0][1], x[1]), x[0][0]]) 

for elem in bands_rdd.take(5):
    print(elem)

In [None]:
vector_bucket_rdd = bands_rdd.map(lambda x: [x[1], x[0][1]]).distinct()

for elem in vector_bucket_rdd.take(1):
    print(elem)

# Exit Spark

In [None]:
#spark.stop()