Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregator in datatable #1077

Closed
lelandwilkinson opened this issue Jun 1, 2018 · 25 comments · Fixed by #1156
Closed

Aggregator in datatable #1077

lelandwilkinson opened this issue Jun 1, 2018 · 25 comments · Fixed by #1156
Assignees
Labels
improve Improvement of an existing functionality
Projects
Milestone

Comments

@lelandwilkinson
Copy link

  • Is there something datatable can't do just yet, but you think it'd be nice if it did?
    Aggregate

  • Is it related to some problem you're trying to solve?
    Solve slow reading of NFF format files.

  • What do you think the API for your feature should be?
    See API in the Java code. Methods required are in base class DataSource

See Java code in https://github.com/h2oai/vis-data-server/blob/master/library/src/main/java/com/h2o/data/Aggregator.java

Plus other classes in that package for support. All of this should be done in C++

@st-pasha st-pasha added the improve Improvement of an existing functionality label Jun 1, 2018
@nikhilshekhar
Copy link

@st-pasha @oleksiyskononenko any thoughts on how to go about this. We will have to decide on an interface of how the datatable aggregator interacts with tha java visual server.

@oleksiyskononenko
Copy link
Contributor

oleksiyskononenko commented Jun 6, 2018

@nikhilshekhar The idea is to implement the Aggregator in C++ in datatable and make it accessible from Python. Sure, we can discuss the interaction with the Java Visual Server.

@oleksiyskononenko
Copy link
Contributor

oleksiyskononenko commented Jun 12, 2018

Here is a preliminary list of functions that are going to be implemented in datatable:

Exposed function (Python):

  • aggregate(df, options)
    • Input:
      • df: frame to aggregate
      • options: optional parameter to control the aggregation through the number of bins, radius, minimum number of rows to aggregate, maximum number of dimensions, seed, etc.
    • Output:
      • df_aggregated: the original frame grouped into "observation clusters".
    • Implementation: do aggregation based on the dataframe and options supplied by invoking one of the following internal methods for different cases:
      • aggregate1DCategorical(df, options)
      • aggregate2DCategorical(df, options)
      • aggregate1DContinuous(df, options)
      • aggregate2DContinuous(df, options)
      • aggregate2DMixed(df, options)
      • aggregateND(df, options)

Internal functions (C++):

  • aggregate1DCategorical(df, options)
    • Implementation: uniform 1D binning for a categorical column.
  • aggregate2DCategorical(df, options)
    • Implementation: uniform 2D binning for two categorical columns.
  • aggregate1DContinuous(df, options)
    • Implementation: uniform 1D binning for a continuous column.
  • aggregate2DContinuous(df, options)
    • Implementation: uniform 2D binning for two continuous columns.
  • aggregate2DMixed(df, options)
    • Implementation: uniform 2D binning for two columns, one continuous and one categorical.
  • aggregateND(df, options)

@st-pasha
Copy link
Contributor

Looks good, except the output should be the original frame grouped into "observation clusters".

We will also add convenience functions to extract the [exemplar row + nobs] from the grouped frame, as well as the list of indices of rows within each group.

@nikhilshekhar
Copy link

@oleksiyskononenko I would like to point out a couple of things:

  • Ideally, the output of the Aggregator call should be a Tabular data comprising of the aggregated rows. If the output is as stated above, visual server being a JAVA implementation will have to re-parse the NFF file using a JAVA parser to get back the aggregated rows. As there will be multiple calls to Aggregator to compute each visualization, there will be a need to re-parse the NFF files every time the result is returned. This will lead to an overhead of maintaining the JAVA NFF parser and the multiple re-parsing of the NFF file in JAVA would lead to a bottleneck/performance hit. @mmalohlava your thoughts on this?

  • If multiple column names are passed in options, ideally the aggregation should be done over each of the columns in parallel and the result should be returned back to the visual server. For example, if I pass in the file on which aggregation needs to be done along with 100 column names in the file, the aggregator should be able to compute the 100 aggregations in parallel and return the respective aggregated frame. @oleksiyskononenko is this the right expectation from the aggregator.

Just to put things in perspective, to visualize one dataset, there can be more than a few hundred/thousand calls to the aggregator depending on the number of columns in the dataset. And the latency from the first aggregator call to the response being picked up by the visual server for all the subsequent aggregation calls is expected to be in seconds.

Rest all looks good, but we need to finalize on the interactions between the visual server (which is a JAVA library) and the aggregator. @oleksiyskononenko @lelandwilkinson @mmalohlava

The initial chain of call that me and @mmalohlava discussed is as below:
Visual server (Java library) invokes a System call and makes a call to a python script which in turn calls the Aggregator in datatable. The aggregator, then returns back a response which has the aggregated rows. @oleksiyskononenko @mmalohlava Any thoughts/ideas on how this aggregated rows will be returned back to the visual server ( Java process), would be very helpful.

@lo5 @mmalohlava this is the new proposed architecture of the visual server/aggregator interaction. Kindly, have a look before we start development on this.

@st-pasha
Copy link
Contributor

We already have a channel of communication between the java VizServer and the python DAI. Why can't we piggy-back on that? In terms of retrieving data on Java's end, the data has to get there from C/Python somehow, and I don't think there's faster way than through the NFF Jay file.

As for doing multiple aggregations at once -- it is certainly doable, but will require a separate method. Such method could get a list of frames to aggregate, and produce again a list of aggregated/reduced frames. Under the hood, all those aggregations would be carried out in parallel via OMP.

@nikhilshekhar
Copy link

@st-pasha The Visual Server as of today communicates to the python DAI only via JSON RPC request/response. There is no actual transfer of data that happens there. And if we want to rely on Visual Server to read data from the Jay file, it would need a Java Parser for reading the Jay file and that needs to be written/maintained.
@mmalohlava would like to know what you think would be the optimal way of doing this.

@oleksiyskononenko
Copy link
Contributor

oleksiyskononenko commented Jun 12, 2018

@nikhilshekhar Yes, we can add a convenience function to return you the aggregated rows only. As @st-pasha said (I've updated my original comment according to his suggestion), we can keep the data internally as the grouped frame, so that we can also return the members or any other relevant information, if required.

As for the way of communication between the visual server and datatable, it is probably an orthogonal issue. May be @arnocandel may have some insights.

@mmalohlava
Copy link
Member

My Comments:

  • VisServer integration:
    - VisServer makes a request to aggregate data, requests contains location of data to aggregate and aggregation columns (technically i suggested to call System.exec for first integration, later we can think about JNI integration)
    - the datatable.aggregate receive requests, aggregates data, save as NFF ???Jay??? (what is that???) file
    • VisServer receives response which contains location of aggregated files

The problem:

  • VisServer triggers multiple aggregation for different data columns.
    Solution:
    (A) We can solve parallelization of multiple requests on Java side, which at the end will mean that we trigger datatable.aggregator multiple times (we will pay penalty for process fork + multiple mmaps per datatable invocation), OR
    (B) We will solve parallelization directly in datatable.aggregate (the same dt as input, but different options) - for example, the method can compute multiple aggregation in single pass (following @st-pasha proposal)

I would prefer (B), but (A) is easier in short term.

WDYT @oleksiyskononenko @st-pasha ?

@st-pasha
Copy link
Contributor

@mmalohlava
If I understand you correctly, you are suggesting the following mechanism:

  • Whenever a VisServer needs to aggregate a data frame (which exists as the name of a file on disk), it launches a new system process invoking the python interpreter.
  • You pass that process via STDIN a python program that goes approximately like this:
import datatable as dt
from datatable.extras.autovis import aggregate
df = dt.open(name_of_dataset)
for params in ...:
    dfagg = aggregate(df, params)
    dfagg.save(params.name_of_output_dataset)
  • When the process finishes, you will have a certain number of files created on the file system, which will be then loaded into Java.
  • If VizServer wants to do things in parallel, it will launch multiple python processes instead, and distribute the workload among those processes manually.
  • VizServer will ensure that the python is launched within the correct virtual environment (with datatable installed), as well as handle all error messages that may be returned from python.

@lo5
Copy link
Member

lo5 commented Jun 13, 2018

The call sequence is Browser -> DAI -> Procsy -> VisServer.

DAI (python) already knows what it wants, hence can -

  1. get aggregated data from pydatatable (py-py function call).
  2. send it to visdataserver via Procsy to get the vis data.

I don't see a strong reason to have Java read anything from disk.

@lo5
Copy link
Member

lo5 commented Jun 13, 2018

From talking to @nikhilshekhar, it turns out that there is a need to aggregate all columns in a dataset for autoviz, so (2) above would be via a file possibly.

@nikhilshekhar will check with @lelandwilkinson if it's possible to eagerly aggregate all columns and hand vis server this data, as opposed to vis-server initiating calls to pydatatable (convoluted).

@lelandwilkinson
Copy link
Author

Thanks for all these comments and deep thinking. A couple of points:

vis-data-server is not allowed to see any data object except an aggregated Table. Please take a look at my latest code, where an aggregation happens when you see this line:

aggregatedTable = dataSource.getAggregatedTable(continuousVariableIndices);

For all purposes, vis-data-server has no idea (and doesn't care) whether the data were aggregated or not. Take a look at where this happens in DatatableDataSource. I don't have options like number of bins, radius, minimum number of rows to aggregate, maximum number of dimensions, seed because I don't need them and they add unnecessary complications to the algorithm. These are things we should consider for V2. Adding them at this point will only complicate the development process. The whole design of Aggregator, if you look at the Java code, is to require only one pass through the data, unlike the one Arno and I did for H2O.

I agree with Nikhil that we ought to consider parallelization inside Datatable if it will help with performance. It's not a difficult algorithm to parallelize.

A fundamental component of the contract is that vis-data-server ought to be able to access a data table as fast as other parts of DAI do. This implies that the aggregator in datatable has to be super fast and, equally important, vis-data-server cannot read or even know about NFF files. But aggregated Tables are tiny, so if you decide it is better to provide a storage mechanism for them, they shouldn't take a lot of resources. That would mean that datatable has some sort of buffering mechanism to handle repeated queries for aggregated Tables. Again, however, it is not the responsibility of vis-data-server to decide how it accesses an aggregated Table. There is only one way it can see data (see the call above).

Take a look at the Table class. All the expensive functions are inside there:

getMemberIndices(), getCategories(), getRow(), getDataLimits(), getNumValues()

We don't want any of these inside datatable. This is because these functions are not expensive when applied to small datasets that are typical for aggregations. Recall that the entire design of vis-data-server is based on computing weighted statistics, where the weights are in the last column ("counts").

Keep in mind also that vis-data-server is row oriented. I know column-oriented is all the rage today, but for statistical calculations (Mahalanobis distance, etc.), row-oriented is more efficient when applied to files containing a small number of rows. Keep in mind that rows contain elements which are Java Objects. These Objects can be dates, numbers, strings, etc. One needn't know their types in order to process them.

A small point: The statement "Whenever a VisServer needs to aggregate a data frame" should be changed to "Whenever vis-data-server needs to see data." Again, vis-data-server knows nothing about aggregation. It thinks all datasets have a "counts" column or otherwise, all counts are 1. The Aggregator (inside datatable) will decide when a file has too few rows to merit aggregation.

The aggregation algorithm is not a cluster analysis. So we should avoid terminology like "observation clusters" and instead use the terminology in the paper, namely, "exemplars" (aggregated rows) and "members" (lists of row indices for each exemplar).

The statement, "If multiple column names are passed in options, ideally the aggregation should be done over each of the columns in parallel and the result should be returned back to the visual server. For example, if I pass in the file on which aggregation needs to be done along with 100 column names in the file, the aggregator should be able to compute the 100 aggregations in parallel and return the respective aggregated frame." is not quite true. You can't parallelize across columns because the ND aggregation is row-wise and the Euclidean distance calculation has to be computed across columns. You CAN parallelize across rows and that would gain us some traction with deep files.

The statement, "We already have a channel of communication between the java VizServer and the python DAI. Why can't we piggy-back on that? In terms of retrieving data on Java's end, the data has to get there from C/Python somehow, and I don't think there's faster way than through the NFF Jay file" is not quite true. Take a look at NFFDataSource, which Nikhil designed for improving performance in V2. We thought it would be faster, but it wasn't. I don't think you want to store NFF files and parse them every time vis-data-server wants to see a Table. Whatever DAI does to get data should be the mechanism for vis-data-server getting the same data, except vis-data-server gets an aggregated Table rather than whole column(s). If DAI deals with memory data objects, then vis-data-server has to do the same. I liked the comment, "I don't see a strong reason to have Java read anything from disk."

We've been talking Python throughout, and that's fine. But keep in mind that the primary visualization client is JavaScript, not Python. If we're obliged to make everything accessible to Python (not sure why), then let's be sure we pay no performance penalty for doing that.

Again, thanks for the thoughts. It would appear that our main task is figuring out how to talk to Java. Even if an object is in memory, we have to get Java DataSource to see it as a Java Table. I suggested JNI, but this can be inefficient. Maybe one of you knows some magic that is even faster for letting C++ in datatable hand over a Table.

@nikhilshekhar
Copy link

@lo5 circled around with @lelandwilkinson and the code - the eager aggregations is not possible for all visualizations.
For example - in the Visual server, the first call is always get_autoviz() , so the aggregations needed by this method can be pre-computed. No issues here in pre-computing aggressively.
But the subsequent calls to visualization methods like parallel coordinate plots, barcharts etc needs to perform aggregations based on the return values from the get_autoviz(). There is no way of computing this aggregation beforehand. The aggregation over a subset of different columns results in different aggregated rows.

@nikhilshekhar
Copy link

Trying to comprehend, what would a good chain of calls after the discussion on the thread and in sync with @lelandwilkinson @lo5 @mmalohlava . Kindly have a look at the below and point out changes/fixes or suggest alternative for the same.

Browser makes request for visualization --> python DAI --> procsy --> Visual Server Calls the corresponding methods. Each of these method would need some aggregations to be computed --> Call goes back via procsy to --> python DAI --> calls datatable aggregator which writes down csv files ( 1 for each aggregation request) --> the response with the aggregated file path is returned via procsy --> visual server now reads the aggregated csv files and computes values needed for visualization --> returns back computed values via procsy --> python DAI --> UI renders it and plots it

The above chain of call will be called multiple times before all the visualization can be rendered and shown on the UI.

@lelandwilkinson
Copy link
Author

Been thinking this through a bit more, especially after talking with @nikhilshekhar. My CSV parser inside vis-data-server is almost as fast as datatable and produces a Table. So it might be cleaner for datatable to aggregate and output a CSV file that can be read (from disk or memory) by vis-data-server. Because the aggregated file is so small, this should be fairly quick. The credit_card.CSV file in my tests is 2.9 MB and is read in less than a second. This file is considerably larger than a typical aggregated file. This approach would be the cleanest. Whether it is the fastest requires further testing.

@lelandwilkinson
Copy link
Author

If datatable outputs an aggregated CSV file, it would also be more useful to those Python users who are not using it with DAI (because it's open source).

@oleksiyskononenko
Copy link
Contributor

oleksiyskononenko commented Jun 13, 2018

@lo5 The Aggregator will aggregate all the columns for the supplied dataframe.

@lelandwilkinson The options parameter will be optional (added this explicitely to the original comment). If not passed to the Aggregator it will use the default values from https://github.com/h2oai/vis-data-server/blob/master/library/src/main/java/com/h2o/Globals.java

The Aggregator will output an aggregated/grouped dataframe that can be easily saved as CSV according to @st-pasha, however, I leave it to you guys to decide on what will be the best way to communicate to vis-data-server.

@arnocandel
Copy link
Member

My comments:

  1. Writing a .CSV of a thousand rows is definitely fast enough, nothing we do here is disk I/O or memory bandwidth limited (those are both > GB/s), it's always the logic (sequential blocking calls to serial algorithms etc.) that's the reason for why AutoVis on a 100MB dataset would take more than 100 ms.
  2. @lelandwilkinson H2O-3 aggregator is only multi-pass to get the number of exemplars "just right", otherwise it's the same single-pass algo as in Lee's original Java implementation. The only differences are: a) parallelism and repeated aggregation of epsilon-bubbles leading to slightly larger epsilon-bubbles and b) different categorical handling (poorer in h2o-3)
  3. @lelandwilkinson When Nikhil talked about 100 aggregations over columns in parallel, I believe he meant 100 calls to the Aggregator for different X/Y scatterplots etc., not parallelization over columns for a single aggregation. So this is one of the reasons that AutoVis1 (H2O-3 aggregation) was faster than AutoVis2 since all those 100 calls were submitted at once and executed simultaneously by H2O-3 vs one at a time in AutoVis2.

@mmalohlava
Copy link
Member

@nikhilshekhar why you need a call from VisDataServer to "Call goes back via procsy to --> python DAI --> calls datatable "?
VisDataServer can call datatable (via a exec a Python wrapper) directly, no?

@lelandwilkinson CSV is an option, but to avoid re-parsing, still NFF/Jay seems more suitable

@lelandwilkinson I meant parallelization over multiple invocation of aggregator (as @arnocandel describes above) - the motivation is that Datatable is accessing "big data" so can do better job on parallelizing computation then we can do in VisDataServer.

@st-pasha
Copy link
Contributor

I think the problem still remains of figuring out which exactly aggregations to perform, and who is going to control them. If a user loads an n-column input Frame into DAI, the following tasks can possibly be carried out:

  • Aggregate the entire frame at once (NDAggregation);
  • Aggregate each column separately (1DAggregation), resulting in n aggregated frames;
  • Aggregate each pair of columns separately (2DAggregation), resulting in `n^2 aggregated frames.

Overall, that's n^2 + n + 1 output frames. However, considering that DAI supports up to 10000 columns, this approach is not tenable. Someone must have the responsibility to decide which exactly aggregations to carry out.

@nikhilshekhar
Copy link

@mmalohlava yes Visual Server can definitely call datatable via an exec Python wrapper. It will save a few method calls. But, to preserve the server-client architecture, I thought it would be best if every call is routed via the server. But, if calling datatable directly from the visual server is a better way, surely we can do that.

@arnocandel
Copy link
Member

It probably always does a full aggregation and then based on calculations on that, asks for up to ~100 more (hopefully at once).

oleksiyskononenko pushed a commit that referenced this issue Jun 21, 2018
Includes a general Python/C++ layout and implementations of
	- 1D continuous aggregation
	- 2D continuous aggregation
	- `count()` reduce function
	- wrappers to enable the usage of the `first()` reducer

1D categorical aggregation can now be done directly from Python through
`groupby/count`. It will be implemented in C++ along with the remaining
2D and ND aggregators.
@lelandwilkinson
Copy link
Author

A few things:

  1. Please let's use the tag "vis-data-server" in our paths. The word "autovis" is a descriptor for only one small part of the visualization server. Our task is not to serve autovis; it's to serve vis-data-server. Every class in there (BarCharts, BoxPlots, LoessRegressions, etc.) needs to look at the aggregated CSV file produced by datatable.
  2. @st-pasha , datatable needs to produce aggregated files on demand. It can't possibly produce all single column or pairwise column aggregations a priori, for obvious reasons you stated. Instead, datatable needs to produce them when vis-data-server requests a particular aggregation. If you want to precalculate some of these, the problem gets much more complex because only vis-data-server knows which ones will be needed. For Version 1, I'd suggest we stick with aggregation on demand. At the moment, DatatableDataSource points to a datatable NFF file and an aggregation is requested by a list of column names in that file over which the aggregation is to be performed. Getting that list to datatable itself may require some sort of serialization of that list of strings or use of Python. Not my area of expertise.
  3. @arnocandel , thanks for your observations on the H2O aggregator. You're correct regarding the superior performance of the H2O aggregator on large files because of the use of MapReduce. When I spoke of multiple passes, that was only in reference to the option of specifying in advance the size of the aggregated file. We no longer need to do this. It would be nice if we could use a distributed algorithm inside datatable the way you did for H2O and, especially, to leverage GPUs. For a baseline, though, I'd recommend starting with the simple single-threaded algorithm I gave @oleksiyskononenko and then going to distributed for V2. That's only a recommendation, however.
  4. I strongly believe that we should produce CSV files for the aggregated exemplars and corresponding aggregated members list. I've already checked in a super-fast importer for these files (which should have fewer than 500 rows). Following this architecture will give us more portability for the R and Python interfaces.
  5. The aggregated exemplar file looks like this:
    Separator: comma (,)
    fields: every field is a String
    embedded quotes in a field are represented by "" (double quotes)
    If a field has no commas or embedded quotes, it need not be surrounded by quotes
    Header records:
    record1: Column names (separated by commas)
    record2: Column types (Strings): "integer", "real", "string", "date" (can add more types later)
    record3 and subsequent records: data values represented as Strings
    Missing value for a data field: two adjacent commas, whether String or numeric field
    Do not use NULL, null, NaN, or other schemes for representing missing values.
    I could parse them (plus a bunch of other symbols like "*", "?", "." used by stat packages, but that
    would slow things down.
    The vis-data-server CSV parser will pad short data rows (rows with fewer fields than record1) with missing values and truncate long rows.
    Numeric data may be formatted as fixed decimal or scientific notation.
    Dates are formatted as MM/DD/YYYY. The vis-data-server will parse them into a Java Calendar/Date and output results according to locale.
  6. The aggregated members file looks like this:
    Comma separator
    for each row in the exemplar file, a list of long integers
    Rows in the exemplar file are one-to-one with rows in the members file.
    The members file has rows of different lengths (1 or more elements). No member row can be empty (by definition of the aggregation algorithm).

@lelandwilkinson
Copy link
Author

One more thing:
The datatable aggregator will add an extra column at the end, with a "counts" label. The entries in this column will be positive long integers represented in Strings. The "counts" label will be added to the list in record1 and a "real" type will be added to the end of record2.

@st-pasha st-pasha added this to Done in Aggregator Jul 19, 2018
@st-pasha st-pasha added this to the Release 0.7.0 milestone Jan 29, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
improve Improvement of an existing functionality
Projects
Aggregator
  
Done
Development

Successfully merging a pull request may close this issue.

7 participants