In [1]:
import numpy as np
from numba import jit
import pandas as pd
from tqdm.notebook import trange

from groupby import GroupBy
from merge.merge_functions import _inner_merge_jit, _inner_merge

In [2]:
def make_data(n,m,i):
    arr = np.random.randint(i, size=n*m).reshape((n,m))
    df = pd.DataFrame(arr)
    return arr, df

In [176]:
ar1, df1 = make_data(1000, 10, 10)
ar2, df2 = make_data(1000, 10, 10)
# ar1, df1 = make_data(10, 5, 3)
# ar2, df2 = make_data(10, 5, 3)

# l_on, r_on = [2,3,4], [6,8,9]
l_on, r_on = [0,1,2], [0,1,2]
gb1 = GroupBy(ar1, l_on)
gb2 = GroupBy(ar2, r_on)

df1.merge(df2, how='left', left_on=l_on, right_on=r_on)

Unnamed: 0,0,1,2,3_x,4_x,5_x,6_x,7_x,8_x,9_x,3_y,4_y,5_y,6_y,7_y,8_y,9_y
0,0,0,0,7,8,6,0,1,8,7,8.0,9.0,0.0,9.0,7.0,8.0,6.0
1,0,0,0,3,1,2,8,0,9,1,8.0,9.0,0.0,9.0,7.0,8.0,6.0
2,0,0,0,0,5,6,6,7,1,6,8.0,9.0,0.0,9.0,7.0,8.0,6.0
3,0,0,1,5,1,1,7,1,5,3,8.0,0.0,3.0,8.0,4.0,8.0,1.0
4,0,0,1,5,1,1,7,1,5,3,8.0,6.0,2.0,0.0,2.0,6.0,5.0
5,0,0,3,9,9,9,4,1,0,5,,,,,,,
6,0,0,3,1,5,9,6,2,4,2,,,,,,,
7,0,0,4,3,5,8,2,9,9,1,7.0,1.0,0.0,4.0,0.0,9.0,2.0
8,0,0,4,3,5,8,2,9,9,1,1.0,2.0,0.0,9.0,5.0,6.0,1.0
9,0,0,4,5,6,0,8,7,0,4,7.0,1.0,0.0,4.0,0.0,9.0,2.0


In [177]:
class Merge:
    def __init__(self, l, r, l_on, r_on):
        """
            The Merge object implements different types of merges between two arrays. It utilizes
            the sorting an group-finding characterstics of the GroupBy object to facilitate the
            merges, as well as functions from NumPy's setops suite.
        """
        self.l_on = l_on
        self.r_on = r_on
        
        # Merge supports l and r parameters to be either arrays, or GroupBy objects. 
        # If they are arrays, we need to perform GroupBy initialization to get keys
        if isinstance(l, GroupBy) and (l.by == l_on):
            self.l_gb = l
        else:
            self.l_gb = GroupBy(l, l_on)
            
        if isinstance(r, GroupBy) and (r.by == r_on):
            self.r_gb = r
        else:
            self.r_gb = GroupBy(r, r_on)

        if isinstance(self.l_gb.idx.dtype, np.floating) and isinstance(self.r_gb.idx.dtype, np.floating):
            self.null_val = np.nan
        else:
            self.null_val = -99999
      
      
    def _merge_prefix(self, l_gb_keys, r_gb_keys):
        """ A shared function between several merge types """
        # We'll need contiguous arrays to get the proper view of our keys
        l_keyc, r_keyc = np.ascontiguousarray(l_gb_keys), np.ascontiguousarray(r_gb_keys)
        dtype = [(f'{i}', l_keyc.dtype) for i in range(l_keyc.shape[1])] # l_gb.on and r_gb.on should be of same length
        # Get a view of the keys (the one stored in the GroupBy objects may have differently named fields)
        l_keyv, r_keyv = l_keyc.view(dtype)[:, 0], r_keyc.view(dtype)[:, 0]
        
        # Find the intersection between the two key-sets and the indices in each set for those intersections
        intersect, l_idx, r_idx = np.intersect1d(l_keyv, r_keyv, assume_unique=True, 
                                                 return_indices=True)
        return intersect, l_idx, r_idx
    
    
    def inner(self, jitted=True):
        """ inner join on the specified columns of the Merge object """
        
        intersect, l_idx, r_idx = self._merge_prefix(self.l_gb.keys, self.r_gb.keys)

        # The creation of the return array is spedup with numba no-python mode
        if jitted:
            res = _inner_merge_jit(idx=self.l_gb.idx, l_vals=self.l_gb.vals, r_vals=self.r_gb.vals, 
                                    n_intersect=intersect.shape[0], l_gr_idx=self.l_gb.gr_idx,
                                    r_gr_idx=self.r_gb.gr_idx, l_idx=l_idx, r_idx=r_idx)
        else:
            res = _inner_merge(idx=self.l_gb.idx, l_vals=self.l_gb.vals, r_vals=self.r_gb.vals, 
                               n_intersect=intersect.shape[0], l_gr_idx=self.l_gb.gr_idx,
                               r_gr_idx=self.r_gb.gr_idx, l_idx=l_idx, r_idx=r_idx)
        return res
    
    
    def left(self, null_val=None):
        """ left outer join on specified columns """
        if null_val is None:
            null_val = self.null_val
        return self._lr_merge(l_gb=self.l_gb, r_gb=self.r_gb, how='left', null_val=null_val)
    
    
    def right(self, null_val=None):
        """ right outer join on specified columns """
        if null_val is None:
            null_val = self.null_val
        return self._lr_merge(l_gb=self.r_gb, r_gb=self.l_gb, how='right', null_val=null_val)
    
    
    def _lr_merge(self, l_gb, r_gb, how, null_val):
        """
            As left and right outer joins are symmetrical, we can reuse code, and just appropriately set the 
            left and right data objects to return the desired result. The default is left merge.
            
            The numpy null value is a floating point, so we need to designate what will signify null values 
            for other datatypes, such as integers. Default is -99,999.
        """
        if how == 'left':
            how = 0
        elif how == 'right':
            how = 1
            
        intersect, l_idx, r_idx = self._merge_prefix(l_gb.keys, r_gb.keys)
        
        return _lr_merge_constr_res(idx=l_gb.idx, l_vals=l_gb.vals, r_vals=r_gb.vals, 
                                    n_intersect=intersect.shape[0], l_gr_idx=l_gb.gr_idx,
                                    r_gr_idx=r_gb.gr_idx, l_idx=l_idx, r_idx=r_idx, 
                                    null_val=null_val, how=how)
        

In [199]:
@jit
def _lr_merge_constr_res(idx, l_vals, r_vals, n_intersect, 
                         l_gr_idx, r_gr_idx, l_idx, r_idx, 
                         null_val, how=0):
    """ Helper jitted function for inner merge """
    # Make the results object of the proper proportions
    # get counts per group
    l_counts = np.diff(l_gr_idx)
    r_counts_match = np.diff(r_gr_idx)[r_idx]
    
    r_counts = np.ones(l_counts.shape[0], np.int32)
    r_counts[l_idx] = r_counts_match

    # Multiplying these counts elementwise gives us the number of elements for each group
    group_n = l_counts * r_counts
    res_group_i = np.concatenate((np.array([0]), np.cumsum(group_n)))

    n = res_group_i[-1] # number of total rows is the dot product of the each merge-group's lengths
    m = idx.shape[1] + l_vals.shape[1] + r_vals.shape[1]

    # Make an empty array to be filled in piece by piece
    res = np.empty((n,m), dtype=idx.dtype)

    idx_end = idx.shape[1]
    if how == 0:
        l_begin, l_end = idx_end, idx_end + l_vals.shape[1]
        r_begin, r_end = l_end, l_end + r_vals.shape[1]
    elif how == 1:
        r_begin, r_end = idx_end, idx_end + r_vals.shape[1]
        l_begin, l_end = r_end, r_end + r_vals.shape[1]
    
    # We want to know when the group on the left has a match in the right
    idx_isin_r = np.full(l_counts.shape[0], False)
    idx_isin_r[l_idx] = True
    
    # We also need to know where the right group matches start and stop
    l2r_gr_idx = np.empty(l_gr_idx.shape[0], np.int32)
    l2r_gr_idx[l_idx] = r_idx
#     r_gr_idx_match = np.empty(l_gr_idx.shape[0], np.int32)
#     r_gr_idx_match[l_idx] = r_gr_idx[r_idx]
#     r_gr_idx_match[l_idx + 1] = r_gr_idx[r_idx + 1]

    # We iterate through each group in the left data object
    for i in range(l_counts.shape[0]):

        index = idx[l_gr_idx[i]] # a single row representation of the index columns (for left position)

        l_arr = l_vals[l_gr_idx[i]:l_gr_idx[i+1]]
        if idx_isin_r[i]:
#             r_arr = r_vals[r_gr_idx_match[i]:r_gr_idx_match[i+1]]
            r_arr = r_vals[r_gr_idx[l2r_gr_idx[i]]:r_gr_idx[l2r_gr_idx[i]+1]]
        else:
            r_arr = np.full(r_vals.shape[1], null_val)

        res_gr = res[res_group_i[i]:res_group_i[i+1]] # Get a view of the rows to be changed at this iteration

        res_gr[:, :idx_end] = index

        l_group_i = np.arange(l_counts[i]+1) * r_counts[i] # positions of each new row of l_arr in res_gr

        for j in range(l_counts[i]):
            res_gr[l_group_i[j]:l_group_i[j+1], l_begin:l_end] = l_arr[j]
            res_gr[l_group_i[j]:l_group_i[j+1], r_begin:r_end] = r_arr

    return res

In [202]:
Merge(gb1, gb2, l_on, r_on).left()

array([[     0,      0,      0, ...,      7,      8,      6],
       [     0,      0,      0, ...,      7,      8,      6],
       [     0,      0,      0, ...,      7,      8,      6],
       ...,
       [     9,      9,      5, ..., -99999, -99999, -99999],
       [     9,      9,      7, ...,      2,      9,      1],
       [     9,      9,      9, ..., -99999, -99999, -99999]])

In [197]:
df1.merge(df2,  how='right', left_on=l_on, right_on=r_on)

Unnamed: 0,0,1,2,3_x,4_x,5_x,6_x,7_x,8_x,9_x,3_y,4_y,5_y,6_y,7_y,8_y,9_y
0,0,0,0,7.0,8.0,6.0,0.0,1.0,8.0,7.0,8,9,0,9,7,8,6
1,0,0,0,3.0,1.0,2.0,8.0,0.0,9.0,1.0,8,9,0,9,7,8,6
2,0,0,0,0.0,5.0,6.0,6.0,7.0,1.0,6.0,8,9,0,9,7,8,6
3,0,0,1,5.0,1.0,1.0,7.0,1.0,5.0,3.0,8,0,3,8,4,8,1
4,0,0,1,5.0,1.0,1.0,7.0,1.0,5.0,3.0,8,6,2,0,2,6,5
5,0,0,4,3.0,5.0,8.0,2.0,9.0,9.0,1.0,7,1,0,4,0,9,2
6,0,0,4,5.0,6.0,0.0,8.0,7.0,0.0,4.0,7,1,0,4,0,9,2
7,0,0,4,7.0,3.0,1.0,2.0,2.0,4.0,4.0,7,1,0,4,0,9,2
8,0,0,4,3.0,5.0,8.0,2.0,9.0,9.0,1.0,1,2,0,9,5,6,1
9,0,0,4,5.0,6.0,0.0,8.0,7.0,0.0,4.0,1,2,0,9,5,6,1


In [530]:
2      4
3      6
4      7
0_x    0
1_x    4
2_x    4
3_x    6
4_x    7
5_x    2
6_x    4
7_x    5
8_x    5
9_x    9
0_y    9
1_y    2
2_y    6
3_y    6
4_y    3
5_y    9
6_y    4
7_y    3
8_y    6
9_y    7

SyntaxError: invalid syntax (<ipython-input-530-be2c9f7202e1>, line 1)

In [526]:
%prun Merge(gb1, gb2, l_on, r_on).inner()#.shape

 

In [525]:
# %timeit Merge(ar1, ar2, l_on, r_on).inner()
%timeit Merge(gb1, gb2, l_on, r_on).inner()#.shape

567 µs ± 5.41 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


In [524]:
%timeit df1.merge(df2,  how='inner', left_on=l_on, right_on=r_on)

3.87 ms ± 24.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
