In [1]:
from typing import List, Tuple, Dict

# A prime number chosen as a base for exponentiation, larger than 171,476 (estimated vocab size of English words)
RABIN_KARP_PRIME = 172001

# A prime number chosen as a base for hashing characters, larger than 256 (UTF-8 character space)
CHAR_PRIME = 401

hash_token_cache: Dict[str, int] = {}


class RabinKarpHash:
    """
    Rabin-Karp rolling hash for hashing sequences of integers.

    Initial hash:
        For a List[int] i_0...i_n with a window w where w <= n,
        the hash is calculated as:
        i_0 * RABIN_KARP_PRIME^(w-1) + i_1 * RABIN_KARP_PRIME^(w-2) + ... + i_{w-1}

    high_coefficient:
        RABIN_KARP_PRIME^(w-1)
    """

    def __init__(self, int_sequence: List[int], window_size: int, mod: int = 8554560727166512717):
        self.window_start = 0
        self.window_end = window_size
        self.mod = mod
        self.high_coefficient = pow(RABIN_KARP_PRIME, window_size - 1, mod)
        self.current_hash = 0
        self.int_sequence = int_sequence

        for i in range(window_size):
            self.current_hash = (self.current_hash * RABIN_KARP_PRIME + int_sequence[i]) % mod
            print(self.current_hash)
        if self.current_hash < 0:
            self.current_hash += mod

    def mult_mod(self, a: int, b: int, k: int) -> int:
        return (a * b) % k

    def update(self) -> None:
        """
        Update the rolling hash value when the window slides by one element.

        - To efficiently update the hash:
        1. Subtract the contribution of the element moving out of the window (start_piece).
        2. Multiply the result by RABIN_KARP_PRIME.
        3. Add the contribution of the new element entering the window.

        Then update the window indices
        """
        start_piece = self.mult_mod(self.int_sequence[self.window_start], self.high_coefficient, self.mod)
        self.current_hash = self.mult_mod(self.current_hash - start_piece, RABIN_KARP_PRIME, self.mod)
        self.current_hash = (self.current_hash + self.int_sequence[self.window_end]) % self.mod
        if self.current_hash < 0:
            self.current_hash += self.mod
        self.window_start += 1
        self.window_end += 1


def compute_hashes(int_sequence: List[int], window_size: int) -> List[Tuple[int, int, int]]:
    """
    Computes Rabin-Karp rolling hashes for a list of integers.

    Args:
        int_sequence (List[int]): List of integers.
        window_size (int): Size of the rolling window.

    Returns:
        List[Tuple[int, int, int]]: List of tuples containing hash value, window start, and window end.
    """
    if len(int_sequence) < window_size:
        return []
    rk_hash = RabinKarpHash(int_sequence, window_size)
    hashes = [(rk_hash.current_hash, rk_hash.window_start, rk_hash.window_end)]
    for i in range(len(int_sequence) - window_size):
        rk_hash.update()
        hash_info = (rk_hash.current_hash, rk_hash.window_start, rk_hash.window_end)
        hashes.append(hash_info)

    return hashes


# mod chosen as large prime of similar size to rabin karp, but distinct
def hash_token(token: str, mod: int = 8554560727166512181):
    """
    Hashes a string token into an integer.

    Args:
        token (str): Input token.
        mod (int): Modulus for hashing.

    Returns:
        int: Hash value.
    """

    # assume mod is constant for space/time efficiency
    if token in hash_token_cache:
        return hash_token_cache[token]

    hash_value = 0
    for ch in token:
        hash_value = (hash_value * CHAR_PRIME + ord(ch)) % mod

    hash_token_cache[token] = hash_value
    return hash_value


def get_ngram_hashes(tokens: List[str], n: int):
    """
    Computes Rabin-Karp rolling hashes for a list of string tokens.

    Args:
        tokens (List[str]): List of string tokens.
        n (int): Size of the rolling window.

    Returns:
        List[Tuple[int, int, int]]: List of tuples containing hash value, window start, and window end.
    """
    hashed_tokens = [hash_token(token) for token in tokens]
    return compute_hashes(hashed_tokens, n)


In [2]:
false_positive = list((
    'to', 'our', 'community', 'because', 'of', 'the', 'scale', '”', 'finding', 'returns', 'in', 'property', 'investments'
))

len_value = 1

hash_value = 3812146999411452660

hash_tuple = list((
    '87', '13', 'insofar', 'as', 'the', 'exporter', 'concerned', 'can', 'supply', 'proof', 'to', 'the', 'satisfaction'
))

In [3]:
get_ngram_hashes(false_positive, 13)

46627
8037786569
351858060723599987
5384235220600376257
2761537620149420601
3802388127059388614
1283531987620620650
1236716748178741252
7790112233239456582
6722575650850594459
3979273764239678652
4783122840134100499
3812146999411452660


[(3812146999411452660, 0, 13)]

In [4]:
get_ngram_hashes(hash_tuple, 13)

22511
3871934211
438381162509136565
2100083488355267939
133375988459995335
3831051992726420191
4424638033667372081
2780457907210419916
7378802008997725833
6694870097362757463
3486693334536117437
5815016265546463590
3812146999411452660


[(3812146999411452660, 0, 13)]

In [5]:
[hash_token_cache[token] for token in false_positive]

[46627,
 17895942,
 350475553395945418,
 408516676589972328,
 44613,
 18694721,
 2979949864120,
 8221,
 425189736904079535,
 475042321965897587,
 42215,
 7565619716682252300,
 2560213424686030768]

In [6]:
[hash_token_cache[token] for token in hash_tuple]

[22511,
 19700,
 437715185952910354,
 39012,
 18694721,
 6359531145655911850,
 354529278809664966,
 15958306,
 1195426288685085,
 2903348449750,
 46627,
 18694721,
 5885116323627867993]