In [1]:
import dask.dataframe as ddf
import fletcher
import jellyfish
import numba
import numpy
import pandas
import pyarrow
from pathlib import Path

this_data_root_path = '/Users/joshfrazier/data/kaggle'

In [None]:
# Uncomment out these lines if the file needs to be re-written for some reason.
#tmp_dataframe = pandas.read_csv(f'{this_data_root_path}/test.csv')
#tmp_dataframe.to_parquet(f'{this_data_root_path}/test.parquet')

In [2]:
# Load the data back into memory (in this case, read: pandas). 
# This data is fairly large, and takes up about half a gig of RAM with the current default object type.
dataframe = pandas.read_parquet(f'{this_data_root_path}/test.parquet')
dataframe[['question1', 'question2']].info(memory_usage='deep')

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2345796 entries, 0 to 2345795
Data columns (total 2 columns):
 #   Column     Dtype 
---  ------     ----- 
 0   question1  object
 1   question2  object
dtypes: object(2)
memory usage: 527.9 MB


In [3]:
# NB: This method signature is tightly coupled to the training dataset in this tutorial.
# If you wish to use it elsewhere, it should be generalized to take arbitrary datasets.
def calculate_lev_distance_per_row(x, y):
    if x is None or y is None:
        return -1
    else:
        return jellyfish.levenshtein_distance(x, y)

In [None]:
# This step is for demonstrative purposes, and is not necessary to run every time this notebook is run.
%timeit dataframe.apply(lambda x: calculate_lev_distance_per_row(x['question1'], x['question2']), axis=1)

In [None]:
# A straightforward way to utilize more CPUs on a local machine is to use dask and distributed
# to spread the work among multiple cores.

# Split the previous dataframe into even chunks.
# THIS ONLY NEEDS TO BE DONE ONCE. UNCOMMENT TO WRITE THESE FILES.
# chunked_dataframes = numpy.array_split(dataframe, 32)
# for i, chunk in enumerate(chunked_dataframes):
#     chunk.to_parquet(f'{this_data_root_path}/chunked/test-{i}.parquet')

In [4]:
# Start a local cluster and load the data into RAM.
# This call to persist will only load the data once a computation reaches this point.
# Here, we do a small, "dummy" computation, which ensures that our dataframe is in memory,
# so that future benchmarks can be isolated to algorithm computation time.

from dask.distributed import Client, LocalCluster, wait

cluster = LocalCluster(dashboard_address=1000)
client = Client(cluster)
# Build a distributed dataframe.
dask_dataframe = ddf.read_parquet(f'{this_data_root_path}/chunked/test-*.parquet', engine='pyarrow').persist()
dask_dataframe['test_id'].sum().compute(), len(dask_dataframe)

(2751378263910, 2345796)

In [5]:
%%timeit
# This time, run the same levenshtein calculation, but with distributed's future implementation.
# As stated above, we are benchmarking the computation time ONLY, by this point, the dataframe is in memory.

tasks = dask_dataframe.apply(lambda x: calculate_lev_distance_per_row(x['question1'], x['question2']), 
                             axis=1, 
                             meta=(None, 'int64'))
result = client.compute(tasks)
wait(result)

# Cleanup
result.release()

The slowest run took 481.41 times longer than the fastest. This could mean that an intermediate result is being cached.
2.17 s ± 5.23 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [None]:
# Clean up after our previous cell.
cluster.close()
client.shutdown()

In [6]:
# So far, we've proven that in purely computational terms, that using jellyfish has resulted in a significant 
# performance boost, given its (jellyfish's) efficient C implementation. 

# The main areas for improvement where we will focus moving forward will be:
# The overhead of using the PyObject structure.
# The overhead of the string representations of the data.

# In the case of strings, the PyObject overhead is much smaller than in the numeric case
# since the payload data is much larger when compared to the size of the PyObject header.

# Rather than loading the data using pandas directly, we'll use fletcher's wrapper method "read_parquet"
# which will make the string columns available as pyarrow.Array-backed columns.

fletcher_dataframe = fletcher.read_parquet(f'{this_data_root_path}/test.parquet', continuous=True)
fletcher_dataframe[['question1', 'question2']].info(memory_usage='deep')

# In our previous experiments, using Python's string encoding, the data took up 527.9 MB in RAM.
# Arrow's string encoding utilizes a little over half the memory: 287.4 MB.

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2345796 entries, 0 to 2345795
Data columns (total 2 columns):
 #   Column     Dtype                      
---  ------     -----                      
 0   question1  fletcher_continuous[string]
 1   question2  fletcher_continuous[string]
dtypes: fletcher_continuous[string](2)
memory usage: 287.4 MB


In [7]:
# Our definition of Levenshtein distance is NOT based on the textbook definition, but is
# instead based on a C implementation of the algorithm. See evernote and link to original
# tutorial here: https://uwekorn.com/2020/12/08/levenshtein-distance-with-fletcher.html for the C-implementation.

"""
Compute levenshtein distance for a single row.
"""
# Is it necessary to pass the string length to this method? Why can't we just use len(stringX)?
@numba.jit(nogil=True, nopython=True)
def levenshtein_distance_by_row(stringOne, stringOneLength, stringTwo, stringTwoLength):
    if stringOneLength > stringTwoLength:
        stringOne, stringTwo = stringTwo, stringOne
        stringOneLength, stringTwoLength = stringTwoLength, stringOneLength
    
    this_column = numpy.arange(stringOneLength + 1)
    
    for x in range(1, stringTwoLength + 1):
        this_column[0] = x
        last_diag = x - 1
        
        for y in range(1, stringOneLength + 1):
            old_diag = this_column[y]
            this_column[y] = min(this_column[y] + 1,
                                min(this_column[y - 1] + 1, last_diag + (0 if stringOne[y - 1] == stringTwo[x - 1] else 1)),
                                )
            last_diag = old_diag
    return this_column[stringOneLength]

In [31]:
from numba import prange
from fletcher.algorithms.string import _extract_string_buffers
from fletcher._algorithms import _merge_valid_bitmaps
"""
Compute levenshtein distance for a whole column without nulls.
"""
@numba.jit(nogil=True, nopython=True, parallel=True)
def levenshtein_numba_no_nulls(length, 
                              offsets_buffer_a, 
                              data_buffer_a, 
                              offsets_buffer_b, 
                              data_buffer_b, 
                              out):
    for i in prange(length):
        out[i] = levenshtein_distance_by_row(data_buffer_a[offsets_buffer_a[i] :],
                                             offsets_buffer_a[i + 1] - offsets_buffer_a[i],
                                             data_buffer_b[offsets_buffer_b[i] :],
                                             offsets_buffer_b[i + 1] - offsets_buffer_b[i],)

"""
Compute levenshtein distance for a whole column where nulls occur.
"""
@numba.jit(nogil=True, nopython=True, parallel=True)
def levenshtein_numba_nulls(length, 
                              valid, 
                              offsets_buffer_a, 
                              data_buffer_a, 
                              offsets_buffer_b, 
                              data_buffer_b, 
                              out):
    for i in prange(length):
        byte_offset = i // 8
        bit_offset = i % 8
        mask = numpy.uint8(1 << bit_offset)
        is_valid = valid[byte_offset] & mask
        
        if is_valid:
            out[i] = levenshtein_distance_by_row(data_buffer_a[offsets_buffer_a[i] :],
                                                 offsets_buffer_a[i + 1] - offsets_buffer_a[i],
                                                 data_buffer_b[offsets_buffer_b[i] :],
                                                 offsets_buffer_b[i + 1] - offsets_buffer_b[i],)

"""
Compute the levenshtein distance for two fletcher.FletcherContinuousArray columns.

This method delegates to the respective methods, based on whether or not they are robust to 
null values or the ommission of the validity bitmap.
"""
def calculate_levenshtein_distance_numba(a, b):
    if len(a) != len(b):
        raise ValueError('Arrays must be of equal lengths...')
    
    out = numpy.empty(len(a), dtype=int)
    
    offsets_buffer_a, data_buffer_a = _extract_string_buffers(a)
    offsets_buffer_b, data_buffer_b = _extract_string_buffers(b)
    
    if a.null_count == 0 and b.null_count == 0:
        levenstein_numba_no_nulls(len(a),
                                offsets_buffer_a,
                                data_buffer_a,
                                offsets_buffer_b,
                                data_buffer_b,
                                 out,)
        return pyarrow.array(out)
    else:
        valid = _merge_valid_bitmaps(a, b)
        levenshtein_numba_nulls(len(a),
                               valid,
                               offsets_buffer_a,
                               data_buffer_a,
                               offsets_buffer_b,
                               data_buffer_b,
                               out,)
        buffers = [pyarrow.py_buffer(x) for x in [valid, out]]
        return pyarrow.Array.from_buffers(pyarrow.int64(), len(out), buffers)

In [32]:
%timeit levenshtein = calculate_levenshtein_distance_numba(fletcher_dataframe['question1'].values.data, fletcher_dataframe['question2'].values.data)

2.78 s ± 104 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [36]:
%%timeit
"""
fletcher >= 0.7 introduces this convenience method, which removes the necessity to implement the methods
- levenshtein_numba_no_nulls
- levenshtein_numba_nulls
- calculate_levenshtein_distance_numba

and wraps them, and their non-parallelized variants, in this apply_binary_str method,
which takes the dataframes to operate on, and the function to apply to each input.
"""
from fletcher.algorithms.string import apply_binary_str
apply_binary_str(fletcher_dataframe['question1'].values.data,
                      fletcher_dataframe['question2'].values.data,
                      func=levenshtein_distance_by_row,
                      output_dtype=numpy.int64,
                      parallel=True,)

2.83 s ± 125 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
