# Text Deduplicating using Apache Spark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
spark

23/12/18 01:05:41 WARN Utils: Your hostname, andromeda resolves to a loopback address: 127.0.1.1; using 192.168.240.99 instead (on interface enp8s0f1)
23/12/18 01:05:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/12/18 01:05:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Loading data

In [2]:
from datasets import load_dataset

In [3]:
rows = 1000
cc100 = load_dataset(
    'cc100',
    lang='uk',
    split='train',
    streaming=True,
).take(rows)

In [4]:
cc100_rdd = sc.parallelize(cc100, numSlices=5)

In [5]:
cc100_rdd.take(20)

                                                                                

[{'id': '0', 'text': "5-Б клас гімназії №3 «СУЗІР'Я»\n"},
 {'id': '1', 'text': 'ПОСИЛАННЯ (наукові сайти)\n'},
 {'id': '2', 'text': 'ПОСИЛАННЯ (виховна робота)\n'},
 {'id': '3',
  'text': 'ПОСИЛАННЯ (Інтернет-ресурси України і Запорізького регіону)\n'},
 {'id': '4',
  'text': 'ПОСИЛАННЯ (контактна інф. органів виконавчої влади України)\n'},
 {'id': '5',
  'text': 'ПОСИЛАННЯ (телефонні "гарячі лінії" міністерств України)\n'},
 {'id': '6',
  'text': 'ПОСИЛАННЯ (громадянам України /довідкова інформація/)\n'},
 {'id': '7', 'text': 'Про організацію навчально-виховного процесу\n'},
 {'id': '8', 'text': 'Про децентралізацію у загальній середній освіті\n'},
 {'id': '9',
  'text': 'Учням 1-4 класів спрощено систему навчання (наказ МОН)\n'},
 {'id': '10', 'text': 'Про розвиток української мови в Запорізькій області\n'},
 {'id': '11', 'text': 'Навчальні програми для 1-4 класів\n'},
 {'id': '12',
  'text': 'Практичний психолог і соціальний педагог навчального закладу\n'},
 {'id': '13', 'text': 'Де

## Minhash

In [44]:
from typing import Callable
import hashlib
import struct
import re

import numpy as np

In [20]:
MAX_HASH = np.uint64((1 << 32) - 1)
MERSENNE_PRIME = np.uint64((1 << 61) - 1)

In [8]:
def tokenize(text: str) -> list[str]:
    return [token for token in re.split(r'\W+', text) if token]

In [9]:
def ngrams(tokens: list[str], n: int) -> list[tuple[str]]:
    if n > len(tokens):
        return []
    
    _ngrams = []
    for i in range(len(tokens) - n + 1):
        _ngrams.append(tuple(tokens[i:i+n]))
    return _ngrams

In [10]:
ngrams(tokenize("!hello world, we are going to test ngrams splitting!"), n=2)

[('hello', 'world'),
 ('world', 'we'),
 ('we', 'are'),
 ('are', 'going'),
 ('going', 'to'),
 ('to', 'test'),
 ('test', 'ngrams'),
 ('ngrams', 'splitting')]

In [55]:
def init_permutations(num_perm: int, seed: int = 0) -> tuple[np.ndarray, np.ndarray]:
    gen = np.random.RandomState(seed)
    return (
        gen.randint(1, MERSENNE_PRIME, size=num_perm, dtype=np.uint64),
        gen.randint(0, MERSENNE_PRIME, size=num_perm, dtype=np.uint64)
    )

In [60]:
def hash_func(payload: bytes):
    return struct.unpack('<I', hashlib.sha1(payload).digest()[:4])[0]

In [83]:
def embed(
    entry: dict,
    ngram_size: int,
    hash_func: Callable,
    permutations: np.ndarray
) -> np.ndarray:
    
    text = entry['text']
    
    # shape each: (num_perm,)
    a, b = permutations
    num_perm = a.shape[0]
    tokens = tokenize(text)
    
    # shape: (ngrams,)
    hashvalues = np.array([
        hash_func(' '.join(ngram_tokens).encode('utf-8'))
        for ngram_tokens in ngrams(tokens, ngram_size)
    ], dtype=np.uint64)
    
    # output shape: (ngrams, num_perm)
    signature_per_ngram = np.bitwise_and(
        (np.outer(hashvalues, a) + b) % MERSENNE_PRIME,
        MAX_HASH
    )
    
    masks = np.full(shape=num_perm, dtype=np.uint64, fill_value=MAX_HASH)
    
    signature = np.vstack((signature_per_ngram, masks)).min(axis=0)
    return {"id": entry["id"], "signature": signature}

In [84]:
permutations = init_permutations(num_perm=32)

In [87]:
a1 = embed(
    {
        "id": "a1",
        "text": "hello world, we are going to test ngrams splitting!"
    },
    2,
    hash_func,
    permutations
)
a1

{'id': 'a1',
 'signature': array([ 233880495, 1790894235,  202142312,  655645340,  948344410,
         808589115,  186933008,  311201738,  115794231,  662775714,
          86100601,  540493554,   95405598,  267980497, 1069398562,
          82106972, 1655492649,   52210820, 1570493133,  703051650,
         144122945, 1479076262,   53265824,  270343758,   24204102,
          33557877,  645627348,  287501491,  232856318,  107996767,
        1588273853,  153499437], dtype=uint64)}

In [88]:
a2 = embed(
    {
        "id": "a2",
        "text": "hello world, what about we test ngrams splitting!"
    },
    2,
    hash_func,
    permutations
)
a2

{'id': 'a2',
 'signature': array([ 233880495,  530117518,  812736762,  655645340, 1415361717,
         336754984,  186933008,  161821783,  750602565,  747642867,
          64588450,  388555747,   95405598,   12570438,  479281602,
         446652527, 1854945348,   52210820,  518797166,  703051650,
        1063784949, 1123812759,   53265824,  270343758,   24204102,
          33557877,  987344689,  304637770,  232856318, 1423976841,
        1022681342,  314905519], dtype=uint64)}

In [90]:
np.count_nonzero(a1['signature'] == a2['signature']) / 32

0.34375

---

In [91]:
from functools import partial

In [92]:
cc100_embedded = cc100_rdd.map(
    partial(
        embed,
        ngram_size=2,
        hash_func=hash_func,
        permutations=permutations
    )
)

In [94]:
cc100_embedded.take(5)

[{'id': '0',
  'signature': array([ 443099278, 1902230491,  400808915,  632125985,  712493998,
          211675878,  178638339,  248196949,  725912459,  761080947,
          458425793,  523973521,  830357500,  284530911, 1119222011,
           85359596, 1568075707,  148583629,   50368138,  348143240,
         1563055107,  575197625,  359488729,  319597852, 1091967692,
          488626365, 1683174590, 1129833888,   57517324,  509923556,
         1487974346,  460568138], dtype=uint64)},
 {'id': '1',
  'signature': array([  76171711,   25084861,  367387321,  913353708, 1486430108,
          177687564,  583314983,  169761035,  927951015,  398857155,
          495485259,  740783294, 2448683780, 1861502244, 1352887525,
          398463736, 3827733729, 1965012471, 2667796076, 2521359723,
         2891046593,  897208591,  253404536, 1222524544, 2771146346,
         1575795916,  688568419,  282769784, 1110845084, 2796036938,
         2121931771, 1040502586], dtype=uint64)},
 {'id': '2',
  'sign

## Local Sensitivity Hash

In [96]:
MAX_HASH.byteswap()

18446744069414584320

In [109]:
def lsh(entry: dict, b: int = 16):
    rows = entry['signature'].shape[0]
    r = rows // b
    bands = [
        entry['signature'][start:start+r]
        for start in range(0, rows - r + 1, r)
    ]
    return {
        'id': entry['id'],
        'bands': bands,
    }

In [110]:
cc100_lsh = cc100_embedded.map(lsh)

In [111]:
cc100_lsh.take(5)

[{'id': '0',
  'bands': [array([ 443099278, 1902230491], dtype=uint64),
   array([400808915, 632125985], dtype=uint64),
   array([712493998, 211675878], dtype=uint64),
   array([178638339, 248196949], dtype=uint64),
   array([725912459, 761080947], dtype=uint64),
   array([458425793, 523973521], dtype=uint64),
   array([830357500, 284530911], dtype=uint64),
   array([1119222011,   85359596], dtype=uint64),
   array([1568075707,  148583629], dtype=uint64),
   array([ 50368138, 348143240], dtype=uint64),
   array([1563055107,  575197625], dtype=uint64),
   array([359488729, 319597852], dtype=uint64),
   array([1091967692,  488626365], dtype=uint64),
   array([1683174590, 1129833888], dtype=uint64),
   array([ 57517324, 509923556], dtype=uint64),
   array([1487974346,  460568138], dtype=uint64)]},
 {'id': '1',
  'bands': [array([76171711, 25084861], dtype=uint64),
   array([367387321, 913353708], dtype=uint64),
   array([1486430108,  177687564], dtype=uint64),
   array([583314983, 1697610

## Bucketing

## Duplicates

In [20]:
???

Object `?` not found.
