# ssdeep hash: scalable pairwise comparisons in pure Python


This notebook contains the following:
* A class called ```DocHash``` which pairs an input text string/document with an index, and an ssdeep hash value;
* A random document generator for testing the performance of ```DocHash``` comparison algorithms, below.

Three algorithms are implemented to test the scalability of pairwise ```DocHash``` comparisons:
* A naive pairwise ```DocHash``` instance comparator, comparing all n(n-1)/2 pairs in a list of n instances, to construct a baseline of both correct matches, and a time to compute all pairs.
* A slightly improved naive pairwise test, which tests the optimisation that ssdeep hash pairs should only be compared if the chunksizes are equal, double, or half (see https://www.intezer.com/blog/malware-analysis/intezer-community-tip-ssdeep-comparisons-with-elasticsearch/)
* An efficient ```DocHash``` ssdeep hash matching algorithm which constructs a set of all matching/similar pairs (documentation below).

In [311]:
import ssdeep

class DocHash:
    """An indexed document and ssdeep hash pair"""
    def __init__(self, index, doc):
        self.index = index
        self.doc = doc
        self.hash = ssdeep.hash(doc)
        self.chunksize = int(self.hash.split(':', 1)[0])
        self.chunk_ngrams = set(self._ngrams(self.hash.split(':', 2)[1], ngram_len=7))
        self.double_chunk_ngrams = set(self._ngrams(self.hash.split(':', 2)[2], ngram_len=7))
        
    def get_index(self):
        return self.index

    def get_doc(self):
        return self.doc

    def get_hash(self):
        return self.hash

    def get_chunksize(self):
        return self.chunksize
    
    def _ngrams(self, s, ngram_len=7):
        if len(s) < ngram_len:
            return [s]
        else:
            # generate all ngrams of length ngraph_len from s
            return [s[i:i+ngram_len] for i in range(0, len(s)) if i + ngram_len <= len(s)]
            
    def get_chunk_ngrams(self):
        return self.chunk_ngrams

    def get_double_chunk_ngrams(self):
        return self.double_chunk_ngrams
        
    def compare(self, that):
        return ssdeep.compare(self.hash, that.get_hash())
    
    def comparable(self, that):
        c0 = self.chunksize
        c1 = that.chunksize
        if(c0 > c1):
            c0 = that.chunksize
            c1 = self.chunksize
        # Rule: Only compare hashes that have chunksize equal, double or half of the chunksize of the other
        return c0 == c1 or c0 * 2 == c1
    
    def __repr__(self):
        return str((self.doc, self.hash))

In [337]:
# Computes a list of DocHash instances based on randomly generated text.

import string
import random

# seed the random generator for reproducibility
random.seed(42)

# test parameters
NUM_DOCS = 10000
SEED_SENTENCE = "This is a seed sentence which will be randomly permuted for input to the ssdhash comparison test."
MIN_SENTENCE_LENGTH = 1
MIN_SENTENCES_PER_DOC = 10
MAX_SENTENCES_PER_DOC = 100
MAX_PERMUTATIONS = 5

# a random permutaion method which randomly permutes 'permutations' characters of an the input string, s  
def random_permute(s, permutations):
    if permutations <= 0:
        # no more permutations to be computed, return s
        return s
    else:
        # index to permute
        index = random.randrange(len(s))
        # permute the string at the randomly chosen index
        s = s[:index] + ''.join(random.choices(string.ascii_lowercase, k=1)) + s[index + 1:]
        # permute some more
        return random_permute(s, permutations - 1)

# the random document generator    
docs = []
for i in range(0, NUM_DOCS):
    doc = ''.join([random_permute(SEED_SENTENCE[:random.randint(MIN_SENTENCE_LENGTH, len(SEED_STRING))], random.randint(1, MAX_PERMUTATIONS)) for i in range(MIN_SENTENCES_PER_DOC, random.randint(MIN_SENTENCES_PER_DOC + 1, MAX_SENTENCES_PER_DOC))])
    docs.append(DocHash(i, doc))
    
# print some stats for the randomly generated list of documents
csmap = {}
for d in docs:
    if d.chunksize not in csmap:
        csmap[d.chunksize] = 0
    csmap[d.chunksize] += 1

print('Chunksizes {(chunksize: count)}: %s' % (csmap))

Chunksizes {(chunksize: count)}: {48: 5167, 24: 2044, 3: 388, 12: 1211, 96: 723, 6: 467}


In [338]:
# Test 1: Naively compare all n(n-1)/2 pairs to compute a full set of comparisons, together with the set of 
#         all (i,j) document list index pairs for matching/similar documents. 

import time

all_doc_pair_count = 0
naive_similar_doc_pairs = set()
start_time = time.time()
for i in range(0, len(docs)):
    d0 = docs[i]
    for j in range(i+1, len(docs)):
        d1 = docs[j]
        all_doc_pair_count += 1
        compscore = d0.compare(d1)
        if(compscore > 0 and compscore <= 100):
            naive_similar_doc_pairs.add((i,j))
            #print("%s, %s: %s" % (docs[i], docs[j], compscore))

print("(All, Similar) pairs: (%.0f, %s) computed in %.3f seconds" % (all_doc_pair_count, len(naive_similar_doc_pairs), time.time() - start_time))

(All, Similar) pairs: (49995000, 92596) computed in 108.606 seconds


In [339]:
# Test 2: Naively compare all n(n-1)/2 pairs to compute a full set of comparisons, but skip any pairs for which 
#         the relative chunk sizes are not equal, double, or half each other.

all_doc_pair_count = 0
faster_similar_doc_pairs = set()
start_time = time.time()
for i in range(0, len(docs)):
    d0 = docs[i]
    for j in range(i+1, len(docs)):
        d1 = docs[j]
        all_doc_pair_count += 1
        if d0.comparable(d1):
            compscore = d0.compare(d1)
            if(compscore > 0 and compscore <= 100):
                faster_similar_doc_pairs.add((i,j))
                #print("%s, %s: %s" % (docs[i], docs[j], compscore))

print("(All, Similar) pairs: (%.0f, %s) computed in %.3f seconds" % (all_doc_pair_count, len(faster_similar_doc_pairs), time.time() - start_time))

if naive_similar_doc_pairs == faster_similar_doc_pairs:
    print("Naive/Faster pair sets match!")
else:
    print("Naive/Faster pair sets DON'T match!")

(All, Similar) pairs: (49995000, 92596) computed in 92.600 seconds
Naive/Faster pair sets match!


In [340]:
# get_matching_pairs() is an optimised method of computing matching/similar DocHash instances. Given a list of 
# DocHash instances as input, this method will compute all matching/similar pairs by building a lookup map based 
# on chunked ngrams (of length 7) which appear common to at least two DocHash instances in the input list.

def get_matching_pairs(document_list, minimum_ssdeep_match_score=1):
    
    # create a map of chunk ngram -> {doc index} as a forward lookup to 
    # see, given a certain chunk ngram, which document indexes contain this 
    all_chunk_index_lookup_table = {}

    # for every documet in the list
    for d in document_list:
        index = d.get_index()
        # for every chunk ngram
        for c in d.get_chunk_ngrams().union(d.get_double_chunk_ngrams()): 
            # add this document's index to the set against this chunk ngram
            if c not in all_chunk_index_lookup_table:
                all_chunk_index_lookup_table[c] = set()
            all_chunk_index_lookup_table[c].add(index)

    # iterate over the lookup map entries and retrieve all chunk ngram keys which are 
    # associated with more than one index, as these belong to DocHash instances which 
    # may be matching/similar. All other chunk ngram key entries associated with a 
    # single instance refer to singular DocHash instances with no comparable pair, 
    # which we can use as a filter when performing lookups for comparison canidates.
    repeated_ngram_chunks = set()
    for k in all_chunk_index_lookup_table:
        if len(all_chunk_index_lookup_table[k]) > 1:
            repeated_ngram_chunks.add(k)

    # recompute the forward lookup map based only on the intersection
    # of each document's chunk ngram set and the match_all_chunks set
    all_chunk_index_lookup_table = {}
    for d in document_list:
        index = d.get_index()
        for c in d.get_chunk_ngrams().union(d.get_double_chunk_ngrams()) & repeated_ngram_chunks: 
            if c not in all_chunk_index_lookup_table:
                all_chunk_index_lookup_table[c] = set()
            all_chunk_index_lookup_table[c].add(index)    

    # finally, with the minimised lookup map, perform all comparisons for all docs
    similar_doc_pairs = set()
    for d0 in document_list:
        # compute a set of indices of potential matches for d0 amongst the documents_list
        # based on all index entries in the lookup map for every chunk ngram key 
        potential_match_indices = set()
        for c in d0.get_chunk_ngrams().union(d0.get_double_chunk_ngrams()) & repeated_ngram_chunks:
            potential_match_indices = potential_match_indices.union(all_chunk_index_lookup_table[c])
        # for every potential match accumulated from the lookup map
        for i in potential_match_indices:
            # don't compare any index which is equal or lower than this document's one, as
            # we only want to compute pairs in one direction, e.g., (1,2) but not (2,1) 
            if i <= d0.index:
                continue
            # retrieve the document from the list
            d1 = document_list[i]
            # if the documents are comparable based on the chunk size test
            if d0.comparable(d1):
                # then compute the expensive ssdeep hash comparison score
                compscore = d0.compare(d1)
                # if the resulting score lies within the necessary range
                if(compscore >= minimum_ssdeep_match_score and compscore <= 100):
                    # capture this index pair as being a matching/similar pair of documents
                    similar_doc_pairs.add((d0.index, d1.index))
    
    # return any similar documents with a non-zero
    return similar_doc_pairs

In [341]:
# Test 3: Execute the get_matching_pairs() over the test list of documents to compare performance to the naive
#         comparison methods.

start_time = time.time()
optim_similar_doc_pairs = get_matching_pairs(docs, minimum_ssdeep_match_score=1)
print("(All, Similar) pairs: (%.0f, %s) computed in %.3f seconds" % ((len(docs)*(len(docs)-1))/2, len(optim_similar_doc_pairs), time.time() - start_time))

if naive_similar_doc_pairs == optim_similar_doc_pairs:
    print("Naive/Faster/Optim similar pair sets match!")
else:
    print("Naive/Faster/Optim similar pair sets DON'T match!")

(All, Similar) pairs: (49995000, 92596) computed in 3.667 seconds
Naive/Faster/Optim similar pair sets match!
