Skip to content
This repository has been archived by the owner on Dec 11, 2023. It is now read-only.

ctable: where_terms & groupby + tests #62

Closed
wants to merge 4 commits into from

Conversation

FrancescElies
Copy link
Member

Dear Bcolz-team,

first of all, thank you for your efforts creating and maintaining this project, my colleague @CarstVaartjes and me are very interested in this amazing data container.
We proceeded to implement an extra functionality we would like to have (see #57 & other related issues #61)

We tried our best but since we are nor C neither Python experts we know the solution we are proposing here can be optimized, we tried to make an ctable_ext.pyx file for this extra functionality but ran into issues (no pxd file for carrays made it really difficult). That is the main reason for having extended the carray_ext.pyx file.
Apologies in advance for any code or things not meeting the standards of the project, we are open to criticism and would love to improve the code.

Basically we added two things:

  • a where_terms clause that can handle in selects
  • a groupby clause (that can also use where & where_terms filters) that sums measures over columns

The groupby clause is largely based upon cytoolz, is okay with performance but is still far off from Pandas; so this functionally works, but it might be nice for future improvements to look at Pandas' hashing & grouping for improving the code.

Please let us know any comments you could have about this request.

Thanks & best regards.

Francesc & Carst

@FrancescAlted
Copy link
Member

I have given a quick look into your PR (btw, thanks for sending it). I need some time to digest that (I am swamped lately), but some quick comments:

  1. The 'in' functionality could be implemented in numexpr, and everything could probably be faster, but specially more elegant. For an example on how that can be tackled, see a recent PR that was accepted here: contains() operator for string columns pydata/numexpr#135

  2. The Cython code is salted with many low-level Python calls. I suppose this is because the code comes from CyToolz, but frankly, I would not like to replicate their code in bcolz (specially code including reference counting ;). The goal of bcolz is using CyToolz on top of its data containers (specially ctable). Why this is not enough? If it is performance, can you point out which is the advantage of this PR over bcolz + CyToolz?

  3. You said that this PR performance still lags behind pandas. Can you provide some numbers? Just curious.

At any rate, I will try to put more time in this PR later, but first I would like to be convinced that we are not reinventing the wheel ;)

@CarstVaartjes
Copy link
Contributor

Happy to contribute!

  1. Completely agree that numexpr would be the (much) better place, but it seemed much more daunting to us when looking at the code base there. Will peer into it and see if we can do it (if not, Francesc E. and me can ask another colleague to jump in maybe), would be definitely the better performing & more elegant solution (with all filtering in the same where clause). Pandas actually has this issue too and makes you use the isin(list) functionality (as numexpr cannot handle in/not in list expressions yet)

  2. It's a combination of syntax and performance. This syntax is much closer to what people with a traditional data background would be used to + it's easier to actually use for scenarios where you do not know the columns until execution of the code (we have a use case where we have a dynamic set of groupby columns and measure columns, which make the binop lambda really hard to use); all of that said, the pandas solution kicks it butt (see next point)

  3. The where_terms as we implemented it is relatively okay (numexpr would be even faster and outperform pandas most likely), the (cytoolz-based) groupby is around 5-10x slower (depending on the data set)

On a 800k data set: 

In [36]: %timeit df['state'].isin(['IL', 'CA'])
100 loops, best of 3: 4 ms per loop

In [37]: %timeit fact_bcolz.where_terms([('state', 'in', ['IL', 'CA'])])
100 loops, best of 3: 7.03 ms per loop

In [38]: %timeit df.groupby(['state'])['cost', 'cost2'].sum()
100 loops, best of 3: 7.42 ms per loop

In [39]: %timeit fact_bcolz.groupby(['state'], ['cost', 'cost2'])
10 loops, best of 3: 74.7 ms per loop

In [40]: %timeit df[df['state'].isin(['IL', 'CA'])].groupby(['state'])['cost', 'cost2'].sum()
100 loops, best of 3: 14.9 ms per loop

In [41]: %timeit fact_bcolz.groupby(['state'], ['cost', 'cost2'], where_terms=[('state', 'in', ['IL', 'CA'])])
10 loops, best of 3: 69.9 ms per loop

In [42]: %timeit df[-df['state'].isin(['IL', 'CA'])].groupby(['state'])['cost', 'cost2'].sum()
100 loops, best of 3: 8.09 ms per loop

In [43]: %timeit fact_bcolz.groupby(['state'], ['cost', 'cost2'], where_terms=[('state', 'not in', ['IL', 'CA'])])
10 loops, best of 3: 21.6 ms per loop

On a 40mln data set:

In [65]: %timeit df['state'].isin(['IL', 'CA'])
1 loops, best of 3: 2.02 s per loop

In [66]: %timeit fact_bcolz.where_terms([('state', 'in', ['IL', 'CA'])])
1 loops, best of 3: 2.8 s per loop

In [67]: %timeit df.groupby(['state'])['cost', 'cost2'].sum()
1 loops, best of 3: 2.48 s per loop

In [68]: %timeit fact_bcolz.groupby(['state'], ['cost', 'cost2'])
1 loops, best of 3: 36.1 s per loop

In [69]: %timeit df[df['state'].isin(['IL', 'CA'])].groupby(['state'])['cost', 'cost2'].sum()
1 loops, best of 3: 6.52 s per loop

In [70]: %timeit fact_bcolz.groupby(['state'], ['cost', 'cost2'], where_terms=[('state', 'in', ['IL', 'CA'])])
1 loops, best of 3: 33.3 s per loop

In [71]: %timeit df[-df['state'].isin(['IL', 'CA'])].groupby(['state'])['cost', 'cost2'].sum()
1 loops, best of 3: 2.64 s per loop

In [72]: %timeit fact_bcolz.groupby(['state'], ['cost', 'cost2'], where_terms=[('state', 'not in', ['IL', 'CA'])])
1 loops, best of 3: 8.76 s per loop

We did quite a few other approaches too, but the cytoolz was relatively quick but still much less than Pandas (which is really really quick). So given a bit more time, we might be able to learn from Pandas and improve this here.

Maybe we can take both of this step-by-step...

Edit: it was a disk based bcolz actually (which makes it rather impressive); an in-mem bcolz was a bit but not much quicker
Edit2: we use the same cythonized isin functionality as pandas, as it's a lot faster than numpy in1d, for an example: http://stackoverflow.com/questions/15939748/check-if-each-element-in-a-numpy-array-is-in-another-array

@esc
Copy link
Member

esc commented Oct 3, 2014

I had a look at this code a couple of times over the last few days and I am finding it fairly tricky to digest and review, mainly due to the size of the implementation and a little bit also the structure.
Anyway, let me articulate my current thoughts.

When I think of 'groupby implementation' I usually think of the following article:

http://wesmckinney.com/blog/?p=489

And it explains quite well how groupby is a combination of -- loosely speaking -- counting sort and factorize. Your groupby implementation sort of seems to be doing this, at least it seems to be composed of several steps. Perhaps one of the ways to make this code easier to comprehend, which is in the interest of long-term maintenance, you could refactor the individual steps into separate functions? This would decrease coupling and increase potential for reuse of code within bcolz itself.

Also, the groupby only implements 'sum' at the moment. From what I understand, pandas has a GroupBy object that you can then call the desired aggregation on. That would be one way to go here too to implement others.

Some food for thought, how might we implement counting sort efficiently on in-memory but also out-of-core chunked compressed storage. Obviously, the first step of counting sort, which just goes through linearly and counts how many items there are in each category using a hashtable, is pretty trivial, assuming there are significantly less categories then items. However the second step, which is also O(n) which actually sorts the elements will be somewhat more tricky than if you had a contiguous memory buffer, because you need random access for the output. Perhaps one needs to preallocate compressed chunks with zeros (will compress super nicely) and then use a clever system of caching decompressed chunks. So for example, several 'open' chunks might reside decompressed in memory in a type of cache. If another chunks must be decompressed to be written into, one of the decompressed chunks is 'closed', meaning that it will be compressed and put in 'storage' (in-mem or disk). Interesting will be of course the selection of a block for cache 'eviction', several approaches (LRU, current fill-status, etc...) come to mind.

For the where_terms implementation it seems to be implementing some sort of syntax parser? Not sure if that is such a good idea, and if it is, perhaps it should exist in it's own right, again for purposes of reuse.

Lastly, on a personal note, I think that It will be very interesting to see how an efficient groupby implementation for bcolz will work, so keep on pushing! 😄

@CarstVaartjes
Copy link
Contributor

edit: this is a good read too: http://www.slideshare.net/wesm/a-look-at-pandas-design-and-development (page 32 and on)

  • So I think we all agree on where_terms :) we made it for "in/not in list/set/tuple", but way better if it would be in numexpr, as where would then directly handle all typical filtering directly. We will try to implement it there, let's see if we fumble or not :)
  • On the groupby measure_col input: our idea was that in the end the syntax should be ideally something like
    • {'newcol1': ('sum', 'oldcol1'), 'newcol2': ('mean', 'oldcol1'), 'newcol3': ('sum', 'oldcol2'), etc.}.
    • {'newcol1': 'oldcol1'} # assuming a sum if the key/value pair gives a direct string -> translating it to {'newcol1': ('sum', 'oldcol1')}
    • ['oldcol1', 'oldcol2'] # assuming sums for each column without a "newcol1" name change -> translating it to {'oldcol1': ('sum', 'oldcol1'), 'oldcol2': ('sum', 'oldcol2')}
  • we tried some alternative routes to the cytoolz but they were slower (in hindsight it might be because we used carrays to write to in a non-sequential order, which probably triggers compression each time)
  • Some checks:
    • Wes McKinney doesn't directly hash the combination of the group by columns for performance reasons right? (so instead a vectorized column by column hashing first) (from that I understand from the post; will look into github later on)
    • Then he does a cartesian join of all combinations. also on: "assuming there are significantly less categories then items" we work with point of sales systems for instance, where you have to group by the transaction id for certain analyses. From 2 billion rows to sort on, you go back to 600 million baskets in that case.
    • This is why sparsity over unique combinations of independent columns really scares me.
    • but then also looking at "Otherwise, for arbitrary user-defined aggregations, we have two options for chopping up the data set: sorting it (O(N) using groupsort!) or create index arrays indicating which observations belong to each group. The latter would be preferable in many cases to sorting a large data set (reordering the data, even though linear time, can be very costly)." -> i understand, but is looping through huge amounts of index lookups better than sorting? :/
  • we might implement perhaps something like this, if the row by row cartesian join really is much faster normally:
    • First do a test (whether the K_1 - … - K_p groups observed >= length of the ctable input) and then have two separate optimization paths
    • If the total amount of groups observed is limited, do the cartesian join pattern
    • otherwise sort of a combination of a sorted groupby selection:
      • hash the combination of the groupby cols for each row and write the row index and hash to a new index array/ctable
      • then group sort that index array/ctable
      • iterate through the rows building a index list until the you get to a new hash value or the end of the array
      • for each selection do the sum/mean whatever for each measure_col, retrieve the groupby col combination from the first row and append that to a result ctable

Any ideas/comments/feedback is really welcome. I'm aware that this might be extremely delirious and that there might be better approaches

@CarstVaartjes
Copy link
Contributor

Not to wanting to spam everyone, but at least for me & @FrancescElies I tried to follow Pandas' logic to understand it better. (and i think it's good for the discussion to put in here). As you'll see, Francesc Elies & me are not the greatest cython/c experts, but I think we can improve on the current push.

Pandas is a bit complicated in terms of its construction. (One general thing, is that the fact that Pandas removes NaN groupby column values, is something I do not think normally should happen)

It imports klib:
https://github.com/pydata/pandas/tree/master/pandas/src/klib
It has a pxd file to make it available for cython:
https://github.com/pydata/pandas/blob/master/pandas/src/khash.pxd
The there we define the types of hashtables (ints, floats & strings)

ctypedef struct kh_int64_t:
    khint_t n_buckets, size, n_occupied, upper_bound
    uint32_t *flags
    int64_t *keys
    size_t *vals

inline kh_int64_t* kh_init_int64()
inline void kh_destroy_int64(kh_int64_t*)
inline void kh_clear_int64(kh_int64_t*)
inline khint_t kh_get_int64(kh_int64_t*, int64_t)
inline void kh_resize_int64(kh_int64_t*, khint_t)
inline khint_t kh_put_int64(kh_int64_t*, int64_t, int*)
inline void kh_del_int64(kh_int64_t*, khint_t)

bint kh_exist_int64(kh_int64_t*, khiter_t)

Then we have the hashtable pandas routines:
https://github.com/pydata/pandas/blob/master/pandas/hashtable.pyx
Basically it has routines for ints, floats & objects, with factorize looking like this:

def factorize(self, ndarray[object] values):
    cdef:
        Py_ssize_t i, n = len(values)
        ndarray[int64_t] labels = np.empty(n, dtype=np.int64)
        dict reverse = {}
        Py_ssize_t idx, count = 0
        int ret = 0
        object val
        char *buf
        khiter_t k

    for i in range(n):
        val = values[i]
        buf = util.get_c_string(val)
        k = kh_get_str(self.table, buf)
        if k != self.table. :
            idx = self.table.vals[k]
            labels[i] = idx
        else:
            k = kh_put_str(self.table, buf, &ret)
            # print 'putting %s, %s' % (val, count)

            self.table.vals[k] = count
            reverse[count] = val
            labels[i] = count
            count += 1

    # return None
    return reverse, labels

The self.table is the hashtable from khash (to which the current type corresponds):

  • it puts the value (val/buf) into the hashtable which gives back an integer hash (k)
  • count is the integer index in the hashtable (from what I gather at least, a slightly mystifying name)
  • an ndarray labels that states for each row to which count/integer index it relates
  • a dict reverse that states for each count/integer index what the original value was

So its a nice way to get for each row an idea of what the unique values are, how often they are there and which row relates to which value. For an out-of-core implementation the dict would have to be replaced by an array (with for each row containing the original value belong the the index of that row), which theoretically would start with the size of the length of the input (because each row could have an unique value) and then has to be resized at the end to the actual size

So all peachy / understandable up to here, but then we get into the groupby.py territory and stuff gets really complicated/long (3700 lines with lots of references, series vs df vs panel paths, etc.):
So the groupby goes to BaseGrouper per column and does a index creation for each result:
ax = Index(obj[key], name=key) # from https://github.com/pydata/pandas/blob/master/pandas/core/index.py
So then we basically have a factorized label_list for each column. it then combines the indices in a zip like fashion:

def _get_indices_dict(label_list, keys):
    shape = list(map(len, keys))
    ngroups = np.prod(shape)

    group_index = get_group_index(label_list, shape)
    sorter = _get_group_index_sorter(group_index, ngroups)

    sorted_labels = [lab.take(sorter) for lab in label_list]
    group_index = group_index.take(sorter)

    return lib.indices_fast(sorter, group_index, keys, sorted_labels)

and we get something like:

In [26]: df_tmp.groupby(['name', 'state']).grouper.indices
Out[26]: 
{('build roads', 'CA'): array([0, 4]),
 ('fight crime', 'IL'): array([1, 5]),
 ('help farmers', 'AR'): array([7]),
 ('help farmers', 'CA'): array([3]),
 ('help farmers', 'IL'): array([2, 6])}

Later on Pandas again refactors them in a smart way (it calculates the theoretical position on a cartesian joint product and filters out non-existing combinations)

def get_group_index(label_list, shape):
    """
    For the particular label_list, gets the offsets into the hypothetical list
    representing the totally ordered cartesian product of all possible label
    combinations.
    """
    if len(label_list) == 1:
        return label_list[0]

    n = len(label_list[0])
    group_index = np.zeros(n, dtype=np.int64)
    mask = np.zeros(n, dtype=bool)
    for i in range(len(shape)):
        stride = np.prod([x for x in shape[i + 1:]], dtype=np.int64)
        group_index += com._ensure_int64(label_list[i]) * stride
        mask |= label_list[i] < 0

    np.putmask(group_index, mask, -1)
    return group_index

nb: I don't completely understand why the actual sparse matrix needs to be made here? -> you could just zip through the columns together and calculate the group_index for each row...
Pandas then compresses them with _compress_group_index(group_index), basically by refactorizing making the integer count lower (esp for sparse matrixes), from the theoretical maximum to the actual count
It then counting sorts them with groupsort_indexer so that a bit later on you can do vectorized lookups for each combination

@cython.boundscheck(False)
@cython.wraparound(False)
def groupsort_indexer(ndarray[int64_t] index, Py_ssize_t ngroups):
    cdef:
        Py_ssize_t i, loc, label, n
        ndarray[int64_t] counts, where, result

# count group sizes, location 0 for NA
counts = np.zeros(ngroups + 1, dtype=np.int64)
n = len(index)
for i from 0 <= i < n:
    counts[index[i] + 1] += 1

# mark the start of each contiguous group of like-indexed data
where = np.zeros(ngroups + 1, dtype=np.int64)
for i from 1 <= i < ngroups + 1:
    where[i] = where[i - 1] + counts[i - 1]

# this is our indexer
result = np.zeros(n, dtype=np.int64)
for i from 0 <= i < n:
    label = index[i] + 1
    result[where[label]] = i
    where[label] += 1

return result, counts

Then a bit later on in the code we get to the aggregation where we do this: agg_func(result, counts, values, comp_ids), basically:

out_shape = (self.ngroups,) + values.shape[1:]
result = np.empty(out_shape, dtype=out_dtype)
counts = np.zeros(self.ngroups, dtype=np.int64)
result = self._aggregate(result, counts, values, how, is_numeric)

agg_func, dtype = self._get_aggregate_function(how, values)
comp_ids, _, ngroups = self.group_info
agg_func(result, counts, values, comp_ids)    

The agg_funcs are in algo.py, but I'm not exactly how it loops through the comp_ids (but by looping zipped throuh the comp_ids and counts you can sub_select the values each time using indexes I guess)

@esc
Copy link
Member

esc commented Oct 8, 2014

This is all quite alot to digest in a single sitting. I'm just scratching the surface, but I did make a factorize implementation here: #63

Regarding the removal of the NaN values, I think it depends on what semantics you want. For example it common for datasets to be messy and for certain values to be missing. If you then try to compute things with such a sequence you run into issues, for example a sum of a sequence with a single NaN in Numpy will yield NaN as the answer.

@FrancescElies
Copy link
Member Author

We had a closer look to Wes McKinney's blog (a link you posted before), pandas uses klib (https://github.com/attractivechaos/klib) for their factorize (hash table-based) and we proceeded to try to do like they do.

At the moment we are trying to make the latest version of klib available to python as an independent implementation, (pandas' implementation uses an older version of klib, but everything we are doing is based on their work).

Still not finished, but in case you would like to have a look at what we are doing at the moment you can go to https://github.com/CarstVaartjes/khash

@CarstVaartjes
Copy link
Contributor

@esc Hi Valentin, the khash implementation from https://github.com/CarstVaartjes/khash is now working (thx to @javiBi4 and @FrancescElies), you can try it like this:

from khash.hashtable import Int64HashTable
example_array = np.array([100,200,300], dtype=np.int64)
a = Int64HashTable(len(example_array))
a.get_labels_groupby(example_array)

it needs some code clean up + the api is a bit too straight pandas-specific still, but it works as a separate module now + we upgraded it to klib 0.28 (which should be faster than klib 0.26 due to a switch to quadratic probing. 0.26 is in Pandas currently (i'll drop them a mail too that this might be interesting to them too)
So this works and can function as a base for the groupby; it should be at least as fast as Pandas (as it basically is Pandas but with an upgraded library ;)

@CarstVaartjes CarstVaartjes mentioned this pull request Oct 10, 2014
@CarstVaartjes
Copy link
Contributor

@esc would it be okay with you if we do this step-by-step, so first we make a numpy based solution (using that khash factorize function + counted sorting with an aggregation function) and then we can see how to figure out how to make it out-of-core carray based?
As we have nowhere your level of understanding when it comes to the chunks & compression, it might be nice to have the basic working first still in numpy arrays and then see how that can be put efficiently into carrays without re-triggering compression all the time but instead at the end of writing an entire chunk?

@esc
Copy link
Member

esc commented Oct 10, 2014

@CarstVaartjes sure, choose whatever path is best for you. We can always clean up the git history before the final merge of the feature. In paralle,l I'll continue exploring ideas I have for the factorize and groupsort algorithms on compressed arrays. I think there are some things there that we can optimize on.

@FrancescElies FrancescElies mentioned this pull request Oct 16, 2014
@CarstVaartjes
Copy link
Contributor

I think this is now superceded by #63 btw

@esc
Copy link
Member

esc commented Oct 22, 2014

@CarstVaartjes okay, I'll close this issue.

@esc esc closed this Oct 22, 2014
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants