# Imports + Setup
# Hardware: Mac M1, 16GB

In [1]:
from itertools import combinations
import bleach
import html
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.util import ngrams
import random
import hashlib
import numpy as np
import hashlib
from lxml import etree
import os

from pyspark.sql import SparkSession
from pyspark import SparkContext

os.environ['PYSPARK_SUBMIT_ARGS'] ="pyspark-shell"
os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES'

try: 
    spark
    print("Spark application already started. Terminating existing application and starting new one")
    spark.stop()
except: 
    pass

spark = SparkSession.builder \
    .appName("App") \
    .master("local[*]") \
    .getOrCreate()
sc = SparkContext.getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# Suppress INFO, WARN, and ERROR logs for Spark
log4j = spark._jvm.org.apache.log4j
log4j.LogManager.getRootLogger().setLevel(log4j.Level.ERROR)


24/01/04 22:21:11 WARN Utils: Your hostname, Melih-M1.local resolves to a loopback address: 127.0.0.1; using 192.168.0.102 instead (on interface en0)
24/01/04 22:21:11 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/04 22:21:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Constants

In [2]:
N_SHINGLES= 5
STOPWORDS = stopwords.words('english')
XML_FILE = 'CPosts.xml'
DEBUG_MODE = True       # On True: will print additional information, SIDENOTE: TAKES A BIT OF TIME BECAUSE OF TAKE() Actions
ENABLE_LOWERING = True  # On True: will lower characters in shingles


NUM_HASH_FUNCTIONS = 60
BANDS = 12

# Read data in Memory

In [3]:
'''
    +- 20 secs
'''
posts = {}
def readData():
    
    def refineBody(body):
        unespaced = html.unescape(body)
        bleached = bleach.clean(unespaced, tags=[], attributes={}, strip=True)
        refined_body = bleached.replace('\n', '')
        return refined_body
    
    # Source: https://github.com/Networks-Learning/stackexchange-dump-to-postgres/blob/master/row_processor.py
    def parse():
        context = etree.iterparse(XML_FILE, events=("end",))
        for _, elem in context:
            if elem.tag == "row":
                assert elem.text is None, "The row wasn't empty"
                yield [int(elem.attrib['Id']), refineBody(elem.attrib['Body'])]
            elem.clear()
            while elem.getprevious() is not None:
                del elem.getparent()[0]

    postsLists = parse()
    for rows in postsLists:
        posts[rows[0]] = rows[1]

readData()

# Shingling + Hash Shingles

In [4]:
def createShingles(post):
    words = word_tokenize(post)
    if ENABLE_LOWERING:
        filteredWords = [word.lower() for word in words if word.lower() not in STOPWORDS]
    else:
        filteredWords = [word for word in words if word.lower() not in STOPWORDS]
    shingles = [ ' '.join(gram) for gram in list(ngrams(filteredWords, N_SHINGLES))]
    return shingles

def hashShingle(shingle):
    return int.from_bytes(hashlib.sha1((shingle).encode("utf-8")).digest()[:4], "little")

if DEBUG_MODE:
    print(createShingles('Why do we use a permutation table in the first step of DES algorithm and one at the end of the algorithm?'))

'''
    - Posts got read without spark in memory. We turn the dict into a RDD.
    - shinglesRDD: each post containing its shingles
    - hashedShinglesRDD: each post containing its hashed shingles
    - shingles: each post containing its shingles in memory, used when comparing candidates later on
'''
postsRDD = sc.parallelize(posts.items())
shinglesRDD = postsRDD.map(lambda post: (post[0], createShingles(post[1])))
hashedShinglesRDD = shinglesRDD.map(lambda post: (post[0], [hashShingle(shingle) for shingle in post[1]]))

# Used at latest step to fetch shingles in a fast way: 
shingles = shinglesRDD.collectAsMap()

['use permutation table first step', 'permutation table first step des', 'table first step des algorithm', 'first step des algorithm one', 'step des algorithm one end', 'des algorithm one end algorithm', 'algorithm one end algorithm ?']


                                                                                

# Generate Hash Functions + Apply MinHash

In [6]:
'''
    - generating hash functions
'''
THRESHOLD = (1 / BANDS) ** (1 / (NUM_HASH_FUNCTIONS // BANDS)) 
print(f"Size Signatures MinHash: {NUM_HASH_FUNCTIONS}\nBands: {BANDS}\nRows: {NUM_HASH_FUNCTIONS // BANDS}\nApproximation of similarity Threshold: {THRESHOLD}")

def generateHashFunctions(NUM_HASH_FUNCTIONS):
    hashFunctions = []
    prime_p = (2**61) - 1
    MAX_A_B_BIT = 2**32
    for _ in range(NUM_HASH_FUNCTIONS):
        a = random.randint(1, MAX_A_B_BIT)
        b = random.randint(0, MAX_A_B_BIT)
        hashFunctions.append((a, b, prime_p))

    return hashFunctions


hashFunctionsBroadcast = sc.broadcast(generateHashFunctions(NUM_HASH_FUNCTIONS))

def minHashSignature(shingles, hashFunctions):
    minhashValues = []
    if shingles:
        for function in hashFunctions:
            a, b, primeP = function
            minhashValue = min((a * x + b) % primeP for x in shingles)
            minhashValues.append(minhashValue)
    else:
        return np.full(NUM_HASH_FUNCTIONS, 2**62)

    return np.array(minhashValues)

hashFunctions = hashFunctionsBroadcast.value

def applyMinHash(post):
    postId, shingles = post
    minhashValues = minHashSignature(shingles, hashFunctions)
    return (postId, minhashValues)

minHashedRDD = hashedShinglesRDD.map(applyMinHash)

# Debug only: += 60 secs
if DEBUG_MODE:
    print('\n')
    testIds = [72388, 72389, 3]
    filteredRDD = minHashedRDD.filter(lambda x: x[0] in testIds)
    for i in filteredRDD.take(100):
        print(f'Postid: {i[0]}, Signatures: {[j for j in i[1]]}')
#

Size Signatures MinHash: 60
Bands: 12
Rows: 5
Approximation of similarity Threshold: 0.6083643418932058






Postid: 3, Signatures: [106883110219743081, 51222968121298566, 280316273865347745, 258626472186669527, 13822783890160771, 140220130785703180, 507968295856228581, 249030203742136621, 746471543607799722, 374967510038058935, 565756216751829396, 517107267049045672, 160588040573943852, 803627286854360650, 137182212357037921, 458109813118765759, 387823424463943829, 346839917024963632, 498185061631520344, 162228361200178892, 919933292984025601, 85513365838907616, 259768857941703830, 53680002492637895, 232213359566524942, 66050687022063765, 157995559935111223, 126386241142836839, 508356274484240760, 441954833922753414, 126646982460958606, 692460938682469117, 40315546146542106, 199754018726958829, 88459572773785059, 67622871285028387, 98715301830606789, 422165991894186025, 151781558309907233, 255256865330803851, 220944518230277393, 5813462728687191, 396218613484394889, 339056453565776196, 334237347823044311, 266717509452143255, 414972318003070446, 133503807159707677, 284738041067135626, 3571120

                                                                                

# LSH

In [7]:
'''
    - Divide list of signatures in chunks of rows size and place them into bands.
    - output = (postid, band) , corresponding signatures
'''

dividedSignaturesIntoBandsRDD = minHashedRDD.flatMap(lambda x: [[(x[0], bandIndex), choppedSignatures] for bandIndex, choppedSignatures in enumerate(np.array_split(x[1], BANDS))])

# Debug only: += 80 secs
if DEBUG_MODE:
    testIds = [72388, 72389, 3]
    filteredRDD = dividedSignaturesIntoBandsRDD.filter(lambda x: x[0][0] in testIds)
    for i in filteredRDD.take(100):
        print(f'Postid: {i[0][0]}, Band: {i[0][1]}, Signatures: {[j for j in i[1]]}')
#



Postid: 3, Band: 0, Signatures: [106883110219743081, 51222968121298566, 280316273865347745, 258626472186669527, 13822783890160771]
Postid: 3, Band: 1, Signatures: [140220130785703180, 507968295856228581, 249030203742136621, 746471543607799722, 374967510038058935]
Postid: 3, Band: 2, Signatures: [565756216751829396, 517107267049045672, 160588040573943852, 803627286854360650, 137182212357037921]
Postid: 3, Band: 3, Signatures: [458109813118765759, 387823424463943829, 346839917024963632, 498185061631520344, 162228361200178892]
Postid: 3, Band: 4, Signatures: [919933292984025601, 85513365838907616, 259768857941703830, 53680002492637895, 232213359566524942]
Postid: 3, Band: 5, Signatures: [66050687022063765, 157995559935111223, 126386241142836839, 508356274484240760, 441954833922753414]
Postid: 3, Band: 6, Signatures: [126646982460958606, 692460938682469117, 40315546146542106, 199754018726958829, 88459572773785059]
Postid: 3, Band: 7, Signatures: [67622871285028387, 98715301830606789, 42216

                                                                                

In [8]:
'''
    - Each pair had a list of hashed signatures, now we want 1 hash value for each pair (=assign to a bucket in each band).
    - Various ways to do this: sum, concatenate or tuple form and then take its hash
    - We choose sum: easiest + most used
'''

def sumHashedSignaturesPerPair(pair):
    listHashedSignatures = pair[1]
    band = pair[0][1]
    postId = pair[0][0]
    sum = 0
    for hashedSign in listHashedSignatures:
        sum += hashedSign
    return [(band, int.from_bytes(hashlib.sha1(str(sum).encode("utf-8")).digest()[:4], "little")), postId]

In [9]:
'''
    - Now we got: (band, hashedValue=bucket) , postid
    - We might have postids looking to the same pair (band, bucket) so we group them.
'''
bandBucketRDD = dividedSignaturesIntoBandsRDD.map(sumHashedSignaturesPerPair).groupByKey()

# Debug only: += 50 secs
if DEBUG_MODE:
    for i in bandBucketRDD.take(50):
        print(f'Band: {i[0][0]}, Bucket: {i[0][1]}, Postids: {[id for id in i[1]]}')
#



Band: 1, Bucket: 1749677309, Postids: [3]
Band: 3, Bucket: 1219255775, Postids: [3]
Band: 6, Bucket: 4227979876, Postids: [3]
Band: 0, Bucket: 2244350318, Postids: [6, 1282]
Band: 1, Bucket: 1392084061, Postids: [6, 1282]
Band: 5, Bucket: 57520297, Postids: [6]
Band: 6, Bucket: 774436084, Postids: [6, 1282]
Band: 7, Bucket: 419230003, Postids: [6, 1282]
Band: 1, Bucket: 4011581861, Postids: [7]
Band: 8, Bucket: 593453662, Postids: [7]
Band: 2, Bucket: 1015959528, Postids: [8]
Band: 8, Bucket: 2425340446, Postids: [8]
Band: 4, Bucket: 2475359658, Postids: [9]
Band: 5, Bucket: 983096073, Postids: [12]
Band: 10, Bucket: 957659952, Postids: [12]
Band: 0, Bucket: 3249986238, Postids: [13]
Band: 2, Bucket: 3645408672, Postids: [14]
Band: 8, Bucket: 3934473990, Postids: [15]
Band: 1, Bucket: 1110680517, Postids: [16]
Band: 1, Bucket: 18396037, Postids: [17]
Band: 1, Bucket: 2369753957, Postids: [18]
Band: 8, Bucket: 2339053110, Postids: [18]
Band: 10, Bucket: 301161808, Postids: [18]
Band: 5,

                                                                                

In [10]:
'''
    - We only want buckets containing more than 1 postid, those are possible candidates
'''
bandBucketRDD = bandBucketRDD.filter(lambda x: len(x[1]) > 1)

# Debug only: += 2 secs
if DEBUG_MODE:
    for i in bandBucketRDD.take(50):
        print(f'Band: {i[0][0]}, Bucket: {i[0][1]}, Postids: {[id for id in i[1]]}')
#

Band: 0, Bucket: 2244350318, Postids: [6, 1282]
Band: 1, Bucket: 1392084061, Postids: [6, 1282]
Band: 6, Bucket: 774436084, Postids: [6, 1282]
Band: 7, Bucket: 419230003, Postids: [6, 1282]
Band: 7, Bucket: 4010978891, Postids: [164, 196, 200, 206, 416, 532, 534, 536, 1023, 1046, 1084, 1096, 1133, 1218, 1220, 1239, 1248, 1250, 1305, 1308, 1310, 1354, 1356, 1581, 1583, 1585, 1604, 1658, 1867, 2168, 2169, 2426, 2912, 3040, 3657, 5057, 5260, 5378, 5854, 6372, 8329, 8435, 9445, 9651, 9777, 10153, 11545, 11546, 11549, 12662, 12692, 18872, 20434, 21100, 21109, 24410, 25635, 25838, 25840, 25842, 25863, 25865, 25867, 25869, 25871, 29284, 29761, 30645, 31151, 31218, 33290, 34274, 41770, 41890, 52887, 52888, 52895, 53808, 54401, 55816, 58276, 60507, 63314, 63820, 64072, 64081, 66691, 68167, 70938, 73124, 74512, 74671, 77054, 77281, 81969, 81971, 81973, 81975, 86128, 86130, 86132, 86170, 86172, 86174, 86182, 86991, 87848, 88341, 88999, 91703, 91815, 92236, 95848, 99471, 100120, 100547, 100914, 10

In [11]:
'''
    - Remove duplicate candidates
'''
bucketsWithCandidatesRDD = bandBucketRDD.map(lambda x: tuple(sorted(x[1]))).distinct()
# Debug only: += 2 secs
if DEBUG_MODE:
    for bucketWithCandidates in bucketsWithCandidatesRDD.take(50):
        print(bucketWithCandidates)
#

(674, 2646, 8241)
(2585, 54189)
(10784, 30270)
(16319, 99035)
(20326, 20653, 20962, 38131, 63125)
(26273, 91853)
(30057, 83805)
(31643, 31647)
(33628, 35378)
(33692, 40634, 50633, 77630, 77730)
(50805, 59977)
(55589, 58769)
(57577, 62085)
(62946, 64832)
(75400, 99099, 108088)
(80162, 102928)
(543, 3471, 10324, 17918, 18106, 32427, 39167, 39179, 51602, 51799, 58462, 58538, 64213, 66242, 71006, 84136, 100270, 103570, 106745, 108054)
(5803, 75899, 99743)
(8930, 43720)
(20005, 87633)
(26516, 108378)
(31695, 37381, 42158, 43299, 47153, 51757, 53158, 60306, 71554, 77755, 89727, 93391, 95822, 98125, 103336)
(48309, 87633)
(50824, 95654)
(52492, 66690)
(76033, 99045)
(85594, 85600)
(87360, 102158)
(98899, 99303)
(99003, 99004, 99006)
(2211, 61055)
(18240, 18251, 86256)
(18829, 22436, 31404, 32392, 37381, 43299, 47153, 53158, 60306, 71554, 74570, 77755, 83785, 84381, 93391, 93396, 95822, 97914, 98125, 107347)
(20994, 24640)
(44400, 89958)
(55307, 55767)
(83800, 84238)
(102565, 102569)
(9301, 10

In [12]:
'''
    - Collect all the candidates
    SIDENOTE: DEBUG_MODE
        -True : +- 1 sec
        -False: +- 45 secs
'''
bucketsWithCandidates = bucketsWithCandidatesRDD.collect()

In [13]:
'''
    - Note: set are possible candidates so false positives are in the list.
    - Make combinations of candidates in same bucket and run jaccard similarity on it.
'''
def jaccard_similarity(set1, set2):
    intersection = len(set(set1).intersection(set(set2)))
    union = len(set(set1).union(set(set2)))
    return intersection / union if union != 0 else 0.0


countTotal = 0
countTruePositive = 0

print(f'\nOnly printing candidates which have similarity above threshold, candidates below are filtered out.\n')
for candidatesPerBucket in bucketsWithCandidates:
    for combination in combinations(candidatesPerBucket, 2):
        countTotal+= 1
        postId1, postId2 = combination
        
        shingleSimilarity = jaccard_similarity(shingles.get(postId1), shingles.get(postId2))
        bodySimilarity = jaccard_similarity(posts.get(postId1), posts.get(postId2))
        if shingleSimilarity > THRESHOLD: 
            countTruePositive+= 1
            print("---------------------------")
            print(f"Candidates ids: {postId1}, {postId2}")
            print(f'{postId1} : {posts.get(postId1)}')
            print(f'{postId2} : {posts.get(postId2)}')
            print(f'Shingle Similarity: {shingleSimilarity}')
            print(f'Body    Similarity: {bodySimilarity}')


Only printing candidates which have similarity above threshold, candidates below are filtered out.

---------------------------
Candidates ids: 10784, 30270
10784 : Dual-ec-drgb is short for Dual Elliptic Curve Deterministic Random Bit Generator; a pseudorandom number generator based on the elliptic curve discrete logarithm problem. It was standardized in NIST SP 800-90A, but Dual_EC has a pretty negative image. In 2006, Dual_EC was shown to be a slow and bad random number generator. By 2007, Shumow and Ferguson raised worries about the possibility of a backdoor in the specification. In 2013, The New York Times reported internal NSA memos suggest an RNG generated by the NSA which was used in the Dual_EC_DRBG standard does indeed contain a backdoor for the NSA.
30270 : Dual-ec-drbg is short for Dual Elliptic Curve Deterministic Random Bit Generator; a pseudorandom number generator based on the elliptic curve discrete logarithm problem.It was standardized in NIST SP 800-90A, but Dual_EC

In [15]:
print(f'Amount of true positives: {countTruePositive}')
print(f'Amount of false positives: {countTotal-countTruePositive}')
print(f'On scale: {round(countTruePositive / countTotal,5)*100}% true positives.')

Amount of true positives: 104
Amount of false positives: 14227
On scale: 0.726% true positives.


In [None]:
'''
    Stop sparkSession
'''
spark.stop()