Skip to content

Commit

Permalink
Add DataFrame.countBy operation
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Hulette committed Jan 15, 2018
1 parent 2f4a349 commit 6719147
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 8 deletions.
26 changes: 22 additions & 4 deletions js/perf/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,23 @@ for (let { name, buffers} of config) {
suites.push(getByIndexSuite, iterateSuite, sliceSuite, parseSuite);
}

for (let {name, buffers, tests} of require('./table_config')) {
for (let {name, buffers, countBys, counts} of require('./table_config')) {
const table = Table.from(buffers);

const dfCountBySuite = new Benchmark.Suite(`DataFrame Count By "${name}"`, { async: true });
for (countBy of countBys) {
dfCountBySuite.add(createDataFrameCountByTest(table, countBy));
}

const dfFilterCountSuite = new Benchmark.Suite(`DataFrame Filter-Scan Count "${name}"`, { async: true });
const dfDirectCountSuite = new Benchmark.Suite(`DataFrame Direct Count "${name}"`, { async: true });
const table = Table.from(buffers);

for (test of tests) {
for (test of counts) {
dfFilterCountSuite.add(createDataFrameFilterCountTest(table, test.col, test.test, test.value))
dfDirectCountSuite.add(createDataFrameDirectCountTest(table, test.col, test.test, test.value))
}

suites.push(dfFilterCountSuite, dfDirectCountSuite)
suites.push(dfCountBySuite, dfFilterCountSuite, dfDirectCountSuite)
}

console.log('Running apache-arrow performance tests...\n');
Expand Down Expand Up @@ -167,6 +173,18 @@ function createDataFrameDirectCountTest(table, column, test, value) {
};
}

function createDataFrameCountByTest(table, column) {
let colidx = table.columns.findIndex((c)=>c.name === column);

return {
async: true,
name: `name: '${column}', length: ${table.length}, type: ${table.columns[colidx].type}`,
fn() {
table.countBy(col(column));
}
};
}

function createDataFrameFilterCountTest(table, column, test, value) {
let colidx = table.columns.findIndex((c)=>c.name === column);
let df;
Expand Down
10 changes: 7 additions & 3 deletions js/perf/table_config.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ const glob = require('glob');
const config = [];
const filenames = glob.sync(path.resolve(__dirname, `../test/data/tables/`, `*.arrow`));

tests = {
countBys = {
"tracks": ['origin', 'destination']
}
counts = {
"tracks": [
{col: 'lat', test: 'gteq', value: 0 },
{col: 'lng', test: 'gteq', value: 0 },
Expand All @@ -32,11 +35,12 @@ tests = {

for (const filename of filenames) {
const { name } = path.parse(filename);
if (name in tests) {
if (name in counts) {
config.push({
name,
buffers: [fs.readFileSync(filename)],
tests: tests[name]
countBys: countBys[name],
counts: counts[name],
});
}
}
Expand Down
73 changes: 72 additions & 1 deletion js/src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
// under the License.

import { Vector } from './vector/vector';
import { DictionaryVector } from './vector/dictionary';
import { Uint32Vector } from './vector/numeric';
import { read, readAsync } from './reader/arrow';
import { Predicate } from './predicate';
import { Col, Predicate } from './predicate';

export type NextFunc = (idx: number, cols: Vector[]) => void;

Expand All @@ -40,6 +42,7 @@ export interface DataFrame {
filter(predicate: Predicate): DataFrame;
scan(next: NextFunc): void;
count(): number;
countBy(col: (Col|string)): Table;
}

function columnsFromBatches(batches: Vector[][]) {
Expand Down Expand Up @@ -111,6 +114,40 @@ export class Table implements DataFrame {
count(): number {
return this.lengths.reduce((acc, val) => acc + val);
}
countBy(count_by: (Col|string)): Table {
if (count_by instanceof String) {
count_by = new Col(count_by);
}

// the last batch will have the most complete dictionary, use it's data
// vector as our count by keys
count_by.bind(this.batches[this.batches.length - 1]);
if (!(count_by.vector instanceof DictionaryVector)) {
throw new Error("countBy currently only supports dictionary-encoded columns");
}

let keys: Vector = (count_by.vector as DictionaryVector<any>).data;
// TODO: Adjust array byte width based on overall length
// (e.g. if this.length <= 255 use Uint8Array, etc...)
let counts: Uint32Array = new Uint32Array(keys.length);


for (let batch = -1; ++batch < this.lengths.length;) {
const length = this.lengths[batch];

// load batches
const columns = this.batches[batch];
count_by.bind(columns);

// yield all indices
for (let idx = -1; ++idx < length;) {
let key = (count_by.vector as DictionaryVector<any>).getKey(idx)
if (key !== null) { counts[key]++; }
}
}

return new Table({batches: [[keys, new Uint32Vector({data: counts})]]})
}
*[Symbol.iterator]() {
for (let batch = -1; ++batch < this.lengths.length;) {
const length = this.lengths[batch];
Expand Down Expand Up @@ -177,4 +214,38 @@ class FilteredDataFrame implements DataFrame {
this.predicate.and(predicate)
);
}

countBy(count_by: (Col|string)): Table {
if (count_by instanceof String) {
count_by = new Col(count_by);
}

// the last batch will have the most complete dictionary, use it's data
// vector as our count by keys
count_by.bind(this.parent.batches[this.parent.batches.length - 1]);
if (!(count_by.vector instanceof DictionaryVector)) {
throw new Error("countBy currently only supports dictionary-encoded columns");
}

let keys: Vector = (count_by.vector as DictionaryVector<any>).data;
let counts: Uint32Array = new Uint32Array(keys.length);


for (let batch = -1; ++batch < this.parent.lengths.length;) {
const length = this.parent.lengths[batch];

// load batches
const columns = this.parent.batches[batch];
const predicate = this.predicate.bind(columns);
count_by.bind(columns);

// yield all indices
for (let idx = -1; ++idx < length;) {
let key = (count_by.vector as DictionaryVector<any>).getKey(idx)
if (key !== null && predicate(idx, columns)) { counts[key]++; }
}
}

return new Table({batches: [[keys, new Uint32Vector({data: counts})]]})
}
}

0 comments on commit 6719147

Please sign in to comment.