From a6a8dea5eea1829b05b0b93ff98f323f6f2e29f8 Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Tue, 15 Jan 2019 10:02:43 +0100 Subject: [PATCH 01/26] ci: update Travis CI test suite to gonum/gonum standards Fixes gonum/exp#18. --- .travis.yml | 17 ++++++++++++++--- .travis/check-copyright.sh | 4 ++++ .travis/check-imports.sh | 4 ++++ .travis/test-coverage.sh | 6 +++--- 4 files changed, 25 insertions(+), 6 deletions(-) create mode 100755 .travis/check-copyright.sh create mode 100755 .travis/check-imports.sh diff --git a/.travis.yml b/.travis.yml index 2e481be..89e8c9a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,13 +9,24 @@ go: - 1.11.x - master +env: + - TAGS="" + - TAGS="-tags bounds" + - TAGS="-tags noasm" + - TAGS="-tags appengine" + matrix: + fast_finish: true allow_failures: - go: master before_install: # Required for format check. - go get golang.org/x/tools/cmd/goimports + # Required for imports check. + - go get gonum.org/v1/tools/cmd/check-imports + # Required for copyright header check. + - go get gonum.org/v1/tools/cmd/check-copyright # Required for coverage. - go get golang.org/x/tools/cmd/cover - go get github.com/mattn/goveralls @@ -25,14 +36,14 @@ go_import_path: gonum.org/v1/exp # Get deps, build, test, and ensure the code is gofmt'ed. # If we are building as gonum, then we have access to the coveralls api key, so we can run coverage as well. script: + - ${TRAVIS_BUILD_DIR}/.travis/check-copyright.sh - ${TRAVIS_BUILD_DIR}/.travis/check-formatting.sh - go get -d -t -v ./... - go build -v ./... - go test -v ./... - - go test -a -tags bounds -x -v ./... - - go test -a -tags noasm -x -v ./... - - go test -a -tags appengine -x -v ./... + - go test -a $TAGS -v ./... - if [[ $TRAVIS_SECURE_ENV_VARS = "true" ]]; then bash ./.travis/test-coverage.sh; fi + - ${TRAVIS_BUILD_DIR}/.travis/check-imports.sh # This is run last since it alters the tree. - ${TRAVIS_BUILD_DIR}/.travis/check-generate.sh diff --git a/.travis/check-copyright.sh b/.travis/check-copyright.sh new file mode 100755 index 0000000..209dca3 --- /dev/null +++ b/.travis/check-copyright.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +set -e +check-copyright -notice "Copyright ©20[0-9]{2} The Gonum Authors\. All rights reserved\." diff --git a/.travis/check-imports.sh b/.travis/check-imports.sh new file mode 100755 index 0000000..f88fc5a --- /dev/null +++ b/.travis/check-imports.sh @@ -0,0 +1,4 @@ +#!/bin/bash + +set -e +check-imports -b "math/rand,github.com/gonum/.*" diff --git a/.travis/test-coverage.sh b/.travis/test-coverage.sh index 1415518..ab31ab9 100644 --- a/.travis/test-coverage.sh +++ b/.travis/test-coverage.sh @@ -13,7 +13,7 @@ testCover() { # switch to the directory to check pushd $d > /dev/null # create the coverage profile - coverageresult=`go test -v -coverprofile=$PROFILE_OUT` + coverageresult=`go test -v $TAGS -coverprofile=$PROFILE_OUT` # output the result so we can check the shell output echo ${coverageresult} # append the results to acc.out if coverage didn't fail, else set the retval to 1 (failed) @@ -27,8 +27,8 @@ testCover() { # Init acc.out echo "mode: set" > $ACC_OUT -# Run test coverage on all directories containing go files except testlapack and testblas. -find . -type d -not -path '*testlapack*' -and -not -path '*testblas*' | while read d; do testCover $d || exit; done +# Run test coverage on all directories containing go files except testlapack testblas and testgraph. +find . -type d -not -path '*testlapack*' -and -not -path '*testblas*' -and -not -path '*testgraph*' | while read d; do testCover $d || exit; done # Upload the coverage profile to coveralls.io [ -n "$COVERALLS_TOKEN" ] && goveralls -coverprofile=$ACC_OUT -service=travis-ci -repotoken $COVERALLS_TOKEN From 157435f52acbfe1babc7bf5882ad9c6604b251b9 Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Tue, 15 Jan 2019 10:05:21 +0100 Subject: [PATCH 02/26] ci: reduce size of CI matrix --- .travis.yml | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/.travis.yml b/.travis.yml index 89e8c9a..0eaaf01 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,12 +9,6 @@ go: - 1.11.x - master -env: - - TAGS="" - - TAGS="-tags bounds" - - TAGS="-tags noasm" - - TAGS="-tags appengine" - matrix: fast_finish: true allow_failures: @@ -41,7 +35,9 @@ script: - go get -d -t -v ./... - go build -v ./... - go test -v ./... - - go test -a $TAGS -v ./... + - go test -a -tags bounds -v ./... + - go test -a -tags noasm -v ./... + - go test -a -tags appengine -v ./... - if [[ $TRAVIS_SECURE_ENV_VARS = "true" ]]; then bash ./.travis/test-coverage.sh; fi - ${TRAVIS_BUILD_DIR}/.travis/check-imports.sh # This is run last since it alters the tree. From 0bc07f5b339ce29167e69a0ae1f3e391b875e3a4 Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Tue, 15 Jan 2019 10:22:14 +0100 Subject: [PATCH 03/26] linsolve: fix copyrights --- linsolve/cg_example_test.go | 2 +- linsolve/gmres.go | 2 +- linsolve/internal/mmarket/reader.go | 2 +- linsolve/internal/triplet/triplet.go | 2 +- linsolve/iterative_test.go | 2 +- linsolve/linsolve_test.go | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/linsolve/cg_example_test.go b/linsolve/cg_example_test.go index ab67902..869afc0 100644 --- a/linsolve/cg_example_test.go +++ b/linsolve/cg_example_test.go @@ -1,4 +1,4 @@ -// Copyright ©2016 The gonum Authors. All rights reserved. +// Copyright ©2016 The Gonum Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. diff --git a/linsolve/gmres.go b/linsolve/gmres.go index 2fec7e3..ad781ba 100644 --- a/linsolve/gmres.go +++ b/linsolve/gmres.go @@ -1,4 +1,4 @@ -// Copyright ©2017 The gonum Authors. All rights reserved. +// Copyright ©2017 The Gonum Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. diff --git a/linsolve/internal/mmarket/reader.go b/linsolve/internal/mmarket/reader.go index e67823e..eaf92d4 100644 --- a/linsolve/internal/mmarket/reader.go +++ b/linsolve/internal/mmarket/reader.go @@ -1,4 +1,4 @@ -// Copyright ©2017 The gonum Authors. All rights reserved. +// Copyright ©2017 The Gonum Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. diff --git a/linsolve/internal/triplet/triplet.go b/linsolve/internal/triplet/triplet.go index 9405a5e..bed1fd3 100644 --- a/linsolve/internal/triplet/triplet.go +++ b/linsolve/internal/triplet/triplet.go @@ -1,4 +1,4 @@ -// Copyright ©2017 The gonum Authors. All rights reserved. +// Copyright ©2017 The Gonum Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. diff --git a/linsolve/iterative_test.go b/linsolve/iterative_test.go index f736763..6cb96ef 100644 --- a/linsolve/iterative_test.go +++ b/linsolve/iterative_test.go @@ -1,4 +1,4 @@ -// Copyright ©2017 The Gonum authors. All rights reserved. +// Copyright ©2017 The Gonum Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. diff --git a/linsolve/linsolve_test.go b/linsolve/linsolve_test.go index 5680e93..178554d 100644 --- a/linsolve/linsolve_test.go +++ b/linsolve/linsolve_test.go @@ -1,4 +1,4 @@ -// Copyright ©2017 The Gonum authors. All rights reserved. +// Copyright ©2017 The Gonum Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. From c2bbe049c82c9696f787f102df0a2494b545d77f Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Tue, 8 Jan 2019 18:54:55 +0100 Subject: [PATCH 04/26] exp: add godoc, coverage badges --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d5d7952..5967eeb 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Gonum exp [![Build Status](https://travis-ci.org/gonum/exp.svg?branch=master)](https://travis-ci.org/gonum/exp) +# Gonum exp [![Build Status](https://travis-ci.org/gonum/exp.svg?branch=master)](https://travis-ci.org/gonum/exp) [![Coverage Status](https://coveralls.io/repos/gonum/exp/badge.svg?branch=master&service=github)](https://coveralls.io/github/gonum/exp?branch=master) [![GoDoc](https://godoc.org/gonum.org/v1/exp?status.svg)](https://godoc.org/gonum.org/v1/exp) [![Go Report Card](https://goreportcard.com/badge/github.com/gonum/exp)](https://goreportcard.com/report/github.com/gonum/exp) ## Issues From 539d5cbca658daa9a518320a1c3de2546f92aacb Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Tue, 8 Jan 2019 19:11:26 +0100 Subject: [PATCH 05/26] dframe: first stab to dframe proposal --- dframe/README.md | 66 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 dframe/README.md diff --git a/dframe/README.md b/dframe/README.md new file mode 100644 index 0000000..88fd506 --- /dev/null +++ b/dframe/README.md @@ -0,0 +1,66 @@ +# dframe + +`dframe` is a work-in-progress [Data Frame](https://en.wikipedia.org/wiki/Pandas_%28software%29) a-la [pandas](https://pandas.pydata.org/pandas-docs/stable/index.html). + +`dframe` is leveraging [Apache Arrow](https://arrow.apache.org/) and its [Go backend](https://godoc.org/github.com/apache/arrow/go/arrow). + +## Proposal + +We propose to introduce a new `Frame` type inside the `dframe` package: a 2-dim data structure to handle: + +- tabular data with heterogeneous columns (like a `SQL` table) +- arbitrary matrix data with row and column labels +- any other form of observational/statistical dataset. + +For a good cross-pollination and integration with the Gonum and Go scientific ecosystem, it is expected for other "companion" packages tailored for a few focused operations to appear: + +- integration with `gonum/plot`, +- integration with `gonum/stat`, +- integration with `gonum/mat` (_e.g.:_ creation of `dframe.Frame`s from `gonum/mat.Vector` or `gonum/mat.Matrix`, and vice versa) +- `hdf5` loading/saving of `dframe.Frame`s, +- integration with `encoding/csv` or `npyio`, +- integration with `database/sql`, +- etc... + +### Previous work + +The data frame concept comes from `R`'s `data.frame` and Python's `pandas.DataFrame`: + +- https://www.rdocumentation.org/packages/base/versions/3.4.3/topics/data.frame +- https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html + +A few data frame-like implementations in Go have also been investigated: + +- [kniren/gota](https://github.com/kniren/gota) +- [tobgu/qframe](https://github.com/tobgu/qframe) + +Some inspiration from this previous body of work will be drawn, both in terms of API and performance hindsight. + +### API + +The main type should be: + +```go +package dframe + +type Frame struct { + // contains filtered or unexported fields +} +``` + +It is expected to build `dframe.Frame` on top of `arrow/array.Interface` and/or `arrow/tensor.Interface` to re-use the SIMD optimized operations and zero-copy optimization that are implemented within these packages. +Using Arrow should also allow seamless interoperability with other data wrangling systems, possibly written in other languages than Go. + +```go +// Open opens an already existing Frame using the provided driver technology, +// located at the provided source. +// +// Possible drivers: hdf5, npyio, csv, json, hdfs, spark, sql, ... +func Open(drv, src string) (*Frame, error) { ... } + +// Create creates a new Frame, using the provided driver technology +func Create(drv, dst string, schema *arrow.Schema) (*Frame, error) { ... } + +// New creates a new in-memory data frame with the provided memory schema. +func New(schema *arrow.Schema) (*Frame, error) { ... } +``` From 4b2f72bcb2a87ee2f49ae6b5f66c94631c66f490 Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Thu, 10 Jan 2019 09:50:41 +0100 Subject: [PATCH 06/26] dframe: flesh out proposal --- dframe/README.md | 100 +++++++++++++++++++++++++++++++++++++++++++++-- dframe/dframe.go | 63 +++++++++++++++++++++++++++++ 2 files changed, 160 insertions(+), 3 deletions(-) diff --git a/dframe/README.md b/dframe/README.md index 88fd506..7be870b 100644 --- a/dframe/README.md +++ b/dframe/README.md @@ -36,7 +36,7 @@ A few data frame-like implementations in Go have also been investigated: Some inspiration from this previous body of work will be drawn, both in terms of API and performance hindsight. -### API +### dframe The main type should be: @@ -46,11 +46,83 @@ package dframe type Frame struct { // contains filtered or unexported fields } + +// Err returns the first error encountered during operations on a Frame. +func (df Frame) Err() error { ... } + +// NumRows returns the number of rows of this Frame. +func (df Frame) NumRows() int + +// NumCols returns the number of columns of this Frame. +func (df Frame) NumCols() int + +// Column returns the i-th column of this Frame. +func (df Frame) Column(i int) *array.Column + +// ColumnNames returns the list of column names of this Frame. +func (df Frame) ColumnNames() []string ``` It is expected to build `dframe.Frame` on top of `arrow/array.Interface` and/or `arrow/tensor.Interface` to re-use the SIMD optimized operations and zero-copy optimization that are implemented within these packages. Using Arrow should also allow seamless interoperability with other data wrangling systems, possibly written in other languages than Go. +`tobgu/qframe` presents a `QFrame` type that is essentially immutable. +Operations on a `QFrame`, such as copying columns, dropping columns, sorting them or applying some kind of operation on columns, return a new `QFrame`, leaving the original untouched. + +Arrow uses a ref-counting mechanism for all the types that involve memory allocation (mainly to address workloads involving memory allocated on a GPGPU, by a SQL database or a mmap-file.) +This ref-counting mechanism is presented to the user as a pair of methods `Retain/Release` that resp. increment and decrement that reference count. +At first, it would seem this mechanism would prevent to expose an API with "chained methods", as the intermediate `Frame` would be "leaked": + +```go +o := df.Slice(0, 10).Select("col1", "col2").Apply("col1 + col2") +``` + +If we want an immutable `Frame`, the code above should be rewritten as: + +```go +sli := df.Slice(0, 10) +defer sli.Release() + +sel := sli.Select("col1", "col2") +defer sel.Release() + +o := sel.Apply("col1 + col2") +defer o.Release() +``` +It is not clear (to me!) yet whether an immutable `Frame` makes much sense in Go and with this ref-counting mechanism coming from Arrow. + +But, immutable or not, one could recoup the nice "chained methods" API by introducing a `dframe.Tx` transaction: + +```go +func (df *Frame) Exec(f func(tx *Tx) error) error + +func example(df *dframe.Frame) { + err := df.Exec(func(tx *dframe.Tx) error { + tx.Slice(0, 10).Select("col1", "col2").Apply("col1 + col2") + return nil + }) + if err != nil { + log.Fatal(err) + } +} +``` + +Or, without a "chained methods" API: + +```go +func example(df *dframe.Frame) { + err := df.Exec(func(tx *dframe.Tx) error { + tx.Slice(0, 10) + tx.Select("col1", "col2") + tx.Apply("col1 + col2") + return nil + }) + if err != nil { + log.Fatal(err) + } +} +``` + ```go // Open opens an already existing Frame using the provided driver technology, // located at the provided source. @@ -59,8 +131,30 @@ Using Arrow should also allow seamless interoperability with other data wranglin func Open(drv, src string) (*Frame, error) { ... } // Create creates a new Frame, using the provided driver technology -func Create(drv, dst string, schema *arrow.Schema) (*Frame, error) { ... } +func Create(drv, dst string, schema *arrow.Schema, opts ...Option) (*Frame, error) { ... } // New creates a new in-memory data frame with the provided memory schema. -func New(schema *arrow.Schema) (*Frame, error) { ... } +func New(schema *arrow.Schema, opts ...Option) (*Frame, error) { ... } + +// FromTable creates a new data frame from the provided arrow table. +func FromTable(tbl arrow.Table, opts ...Option) (*Frame, error) { ... } ``` + +### Operations + +One should be able to carry the following operations on a `dframe.Frame`: + +```go +/// Slice returns a new Frame consisting of rows from beg to end-1. +// +// The returned Frame must be Release()'d after use. +func (df *Frame) Slice(beg, end int) *Frame + +// Drop +``` + +- retrieve the list of columns that a `Frame` is made of, +- create new columns that are the result of an operation on a set of already existing columns of that `Frame`, +- drop columns from a `Frame` +- append new data to a `Frame`, +- select a subset of columns from a `Frame` diff --git a/dframe/dframe.go b/dframe/dframe.go index 73ed59f..c591557 100644 --- a/dframe/dframe.go +++ b/dframe/dframe.go @@ -11,3 +11,66 @@ // Apache Arrow: // - https://github.com/apache/arrow package dframe + +import ( + "sync" + + "github.com/apache/arrow/go/arrow/array" +) + +type Frame struct { + mu sync.RWMutex + tbl array.Table + err error +} + +// Err returns the first error encountered during operations on a Frame. +func (df *Frame) Err() error { + return df.err +} + +// NumRows returns the number of rows of this Frame. +func (df *Frame) NumRows() int64 { + return df.tbl.NumRows() +} + +// NumCols returns the number of columns of this Frame. +func (df *Frame) NumCols() int64 { + return df.tbl.NumCols() +} + +// Column returns the i-th column of this Frame. +func (df *Frame) Column(i int) *array.Column { + return df.tbl.Column(i) +} + +// ColumnNames returns the list of column names of this Frame. +func (df *Frame) ColumnNames() []string { + names := make([]string, df.NumCols()) + for i := range names { + names[i] = df.Column(i).Name() + } + return names +} + +func (df *Frame) Exec(f func(tx *Tx) error) error { + df.mu.Lock() + defer df.mu.Unlock() + + if df.err != nil { + return df.err + } + + tx := &Tx{df: df} + return f(tx) +} + +// Tx represents a read-only or read/write transaction on a data frame. +type Tx struct { + df *Frame + err error +} + +func (tx *Tx) Err() error { + return tx.err +} From affe29c2dda28d1532df74b3eaad89bec3ca19f3 Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Thu, 10 Jan 2019 12:33:13 +0100 Subject: [PATCH 07/26] cleanup --- dframe/README.md | 2 +- dframe/dframe.go | 286 +++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 278 insertions(+), 10 deletions(-) diff --git a/dframe/README.md b/dframe/README.md index 7be870b..9f66971 100644 --- a/dframe/README.md +++ b/dframe/README.md @@ -137,7 +137,7 @@ func Create(drv, dst string, schema *arrow.Schema, opts ...Option) (*Frame, erro func New(schema *arrow.Schema, opts ...Option) (*Frame, error) { ... } // FromTable creates a new data frame from the provided arrow table. -func FromTable(tbl arrow.Table, opts ...Option) (*Frame, error) { ... } +func FromTable(tbl array.Table, opts ...Option) (*Frame, error) { ... } ``` ### Operations diff --git a/dframe/dframe.go b/dframe/dframe.go index c591557..bf7192e 100644 --- a/dframe/dframe.go +++ b/dframe/dframe.go @@ -9,19 +9,128 @@ // - https://github.com/tobgu/qframe // Ultimately, dframe should also allow for a good inter-operability with // Apache Arrow: -// - https://github.com/apache/arrow +// - https://godoc.org/github.com/apache/arrow/go/arrow package dframe import ( + "fmt" "sync" + "sync/atomic" + "github.com/apache/arrow/go/arrow" "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" + "github.com/pkg/errors" ) +// Frame is a Go-based data frame built on top of Apache Arrow. type Frame struct { - mu sync.RWMutex - tbl array.Table - err error + mu sync.RWMutex // serialize creation of transactions + + refs int64 // reference count + err error // first error encountered + mem memory.Allocator + schema *arrow.Schema + + cols []array.Column + rows int64 +} + +// FromCols creates a new data frame from the provided columns. +func FromCols(cols []array.Column, opts ...Option) (*Frame, error) { + df := &Frame{ + refs: 1, + mem: memory.NewGoAllocator(), + cols: cols, + rows: -1, + } + + for _, opt := range opts { + err := opt(df) + if err != nil { + return nil, err + } + } + + if df.rows < 0 { + switch len(df.cols) { + case 0: + df.rows = 0 + default: + df.rows = int64(df.cols[0].Len()) + } + } + + if df.schema == nil { + fields := make([]arrow.Field, len(cols)) + for i, col := range cols { + fields[i].Name = col.Name() + fields[i].Type = col.DataType() + } + df.schema = arrow.NewSchema(fields, nil) + } + + // validate the data frame and its constituents. + // note we retain the columns after having validated the data frame + // in case the validation fails and panics (and would otherwise leak + // a ref-count on the columns.) + df.validate() + + for i := range df.cols { + df.cols[i].Retain() + } + + return df, nil +} + +// FromTable creates a new data frame from the provided arrow table. +func FromTable(tbl array.Table, opts ...Option) (*Frame, error) { + df := &Frame{ + refs: 1, + mem: memory.NewGoAllocator(), + schema: tbl.Schema(), + rows: tbl.NumRows(), + } + + for _, opt := range opts { + err := opt(df) + if err != nil { + return nil, err + } + } + + df.cols = make([]array.Column, tbl.NumCols()) + for i := range df.cols { + df.cols[i] = *tbl.Column(i).NewSlice(0, -1) + } + + return df, nil +} + +func (df *Frame) validate() { + if len(df.cols) != len(df.schema.Fields()) { + panic(errors.New("dframe: table schema mismatch")) + } + for i, col := range df.cols { + if !col.Field().Equal(df.schema.Field(i)) { + panic(fmt.Errorf("dframe: column field %q is inconsistent with schema", col.Name())) + } + + if int64(col.Len()) < df.rows { + panic(fmt.Errorf("dframe: column %q expected length >= %d but got length %d", col.Name(), df.rows, col.Len())) + } + } +} + +// Option configures an aspect of a data frame. +type Option func(*Frame) error + +// WithMemAllocator configures a data frame to use the provided memory allocator. +func WithMemAllocator(mem memory.Allocator) Option { + return func(df *Frame) error { + df.mem = mem + return nil + } } // Err returns the first error encountered during operations on a Frame. @@ -29,19 +138,46 @@ func (df *Frame) Err() error { return df.err } +// Retain increases the reference count by 1. +// Retain may be called simultaneously from multiple goroutines. +func (df *Frame) Retain() { + atomic.AddInt64(&df.refs, 1) +} + +// Release decreases the reference count by 1. +// When the reference count goes to zero, the memory is freed. +// Release may be called simultaneously from multiple goroutines. +func (df *Frame) Release() { + if atomic.LoadInt64(&df.refs) <= 0 { + panic("dframe: too many releases") + } + + if atomic.AddInt64(&df.refs, -1) == 0 { + for i := range df.cols { + df.cols[i].Release() + } + df.cols = nil + } +} + +// Schema returns the schema of this Frame. +func (df *Frame) Schema() *arrow.Schema { + return df.schema +} + // NumRows returns the number of rows of this Frame. func (df *Frame) NumRows() int64 { - return df.tbl.NumRows() + return df.rows } // NumCols returns the number of columns of this Frame. func (df *Frame) NumCols() int64 { - return df.tbl.NumCols() + return int64(len(df.cols)) } // Column returns the i-th column of this Frame. func (df *Frame) Column(i int) *array.Column { - return df.tbl.Column(i) + return &df.cols[i] } // ColumnNames returns the list of column names of this Frame. @@ -61,8 +197,42 @@ func (df *Frame) Exec(f func(tx *Tx) error) error { return df.err } - tx := &Tx{df: df} - return f(tx) + tx := newTx(df) + defer tx.Close() + + err := f(tx) + if err != nil { + return err + } + if tx.Err() != nil { + return tx.Err() + } + + df.swap(tx.df) + return nil +} + +func (lhs *Frame) swap(rhs *Frame) { + rhs.refs = atomic.SwapInt64(&lhs.refs, rhs.refs) + lhs.mem, rhs.mem = rhs.mem, lhs.mem + lhs.schema, rhs.schema = rhs.schema, lhs.schema + lhs.rows, rhs.rows = rhs.rows, lhs.rows + lhs.cols, rhs.cols = rhs.cols, lhs.cols +} + +func (df *Frame) clone() *Frame { + o := &Frame{ + refs: 1, + mem: df.mem, + schema: df.schema, + cols: make([]array.Column, len(df.cols)), + rows: df.rows, + } + copy(o.cols, df.cols) + for i := range o.cols { + o.cols[i].Retain() + } + return o } // Tx represents a read-only or read/write transaction on a data frame. @@ -71,6 +241,104 @@ type Tx struct { err error } +func newTx(df *Frame) *Tx { + tx := &Tx{df: df.clone()} + return tx +} + +func (tx *Tx) Close() error { + if tx.err != nil { + return tx.err + } + + tx.df.Release() + return nil +} + func (tx *Tx) Err() error { return tx.err } + +// Copy copies the content of the column named src to the column named dst. +// +// Copy fails if src does not exist. +// Copy fails if dst already exist. +func (tx *Tx) Copy(dst, src string) *Tx { + if tx.err != nil { + return tx + } + + if tx.df.Schema().HasField(dst) { + tx.err = errors.Errorf("dframe: column %q already exists", dst) + return tx + } + if !tx.df.Schema().HasField(src) { + tx.err = errors.Errorf("dframe: no column named %q", src) + return tx + } + + isrc := tx.df.Schema().FieldIndex(src) + idst := len(tx.df.Schema().Fields()) + + fields := make([]arrow.Field, len(tx.df.Schema().Fields())+1) + copy(fields, tx.df.Schema().Fields()) + + fields[idst] = fields[isrc] + fields[idst].Name = dst + + md := tx.df.Schema().Metadata() + tx.df.schema = arrow.NewSchema(fields, &md) + + col := array.NewColumn(fields[idst], tx.df.cols[isrc].Data()) + tx.df.cols = append(tx.df.cols, *col) + return tx +} + +func (tx *Tx) Slice(beg, end int) *Tx { + if tx.err != nil { + return tx + } + + cols := make([]array.Column, tx.df.NumCols()) + for i := range cols { + cols[i] = *tx.df.Column(i).NewSlice(int64(beg), int64(end)) + } + + for _, col := range tx.df.cols { + col.Release() + } + + tx.df.cols = cols + return tx +} + +func (tx *Tx) Drop(cols ...string) *Tx { + if tx.err != nil || len(cols) == 0 { + return tx + } + + set := make(map[string]struct{}, len(cols)) + for _, col := range cols { + set[col] = struct{}{} + } + + cs := make([]array.Column, 0, len(tx.df.cols)-len(cols)) + fs := make([]arrow.Field, 0, len(tx.df.Schema().Fields())-len(cols)) + + for i := range tx.df.cols { + col := &tx.df.cols[i] + if _, ok := set[col.Name()]; ok { + col.Release() + continue + } + cs = append(cs, *col) + fs = append(fs, tx.df.Schema().Field(i)) + } + + md := tx.df.Schema().Metadata() // FIXME(sbinet): also remove metadata of removed cols. + sc := arrow.NewSchema(fs, &md) + + tx.df.cols = cs + tx.df.schema = sc + return tx +} From b3690153716f1f48fd63cea9d87500f1c8c828ce Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Thu, 10 Jan 2019 12:45:20 +0100 Subject: [PATCH 08/26] dframe: add example --- dframe/README.md | 21 +++----- dframe/dframe.go | 4 +- dframe/dframe_example_test.go | 90 +++++++++++++++++++++++++++++++++++ 3 files changed, 99 insertions(+), 16 deletions(-) create mode 100644 dframe/dframe_example_test.go diff --git a/dframe/README.md b/dframe/README.md index 9f66971..86e7e46 100644 --- a/dframe/README.md +++ b/dframe/README.md @@ -48,19 +48,19 @@ type Frame struct { } // Err returns the first error encountered during operations on a Frame. -func (df Frame) Err() error { ... } +func (df *Frame) Err() error { ... } // NumRows returns the number of rows of this Frame. -func (df Frame) NumRows() int +func (df *Frame) NumRows() int // NumCols returns the number of columns of this Frame. -func (df Frame) NumCols() int +func (df *Frame) NumCols() int // Column returns the i-th column of this Frame. -func (df Frame) Column(i int) *array.Column +func (df *Frame) Column(i int) *array.Column // ColumnNames returns the list of column names of this Frame. -func (df Frame) ColumnNames() []string +func (df *Frame) ColumnNames() []string ``` It is expected to build `dframe.Frame` on top of `arrow/array.Interface` and/or `arrow/tensor.Interface` to re-use the SIMD optimized operations and zero-copy optimization that are implemented within these packages. @@ -144,17 +144,8 @@ func FromTable(tbl array.Table, opts ...Option) (*Frame, error) { ... } One should be able to carry the following operations on a `dframe.Frame`: -```go -/// Slice returns a new Frame consisting of rows from beg to end-1. -// -// The returned Frame must be Release()'d after use. -func (df *Frame) Slice(beg, end int) *Frame - -// Drop -``` - - retrieve the list of columns that a `Frame` is made of, - create new columns that are the result of an operation on a set of already existing columns of that `Frame`, - drop columns from a `Frame` -- append new data to a `Frame`, +- append new data to a `Frame`, (either a new column or a new row) - select a subset of columns from a `Frame` diff --git a/dframe/dframe.go b/dframe/dframe.go index bf7192e..37dde6f 100644 --- a/dframe/dframe.go +++ b/dframe/dframe.go @@ -101,7 +101,9 @@ func FromTable(tbl array.Table, opts ...Option) (*Frame, error) { df.cols = make([]array.Column, tbl.NumCols()) for i := range df.cols { - df.cols[i] = *tbl.Column(i).NewSlice(0, -1) + col := tbl.Column(i) + end := int64(col.Len()) + df.cols[i] = *col.NewSlice(0, end) } return df, nil diff --git a/dframe/dframe_example_test.go b/dframe/dframe_example_test.go new file mode 100644 index 0000000..f932cb5 --- /dev/null +++ b/dframe/dframe_example_test.go @@ -0,0 +1,90 @@ +// Copyright ©2019 The Gonum Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package dframe_test + +import ( + "fmt" + "log" + + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" + "gonum.org/v1/exp/dframe" +) + +func ExampleFrame() { + + pool := memory.NewGoAllocator() + + schema := arrow.NewSchema( + []arrow.Field{ + arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32}, + arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64}, + }, + nil, + ) + + b := array.NewRecordBuilder(pool, schema) + defer b.Release() + + b.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3, 4, 5, 6}, nil) + b.Field(0).(*array.Int32Builder).AppendValues([]int32{7, 8, 9, 10}, []bool{true, true, false, true}) + b.Field(1).(*array.Float64Builder).AppendValues([]float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + rec1 := b.NewRecord() + defer rec1.Release() + + b.Field(0).(*array.Int32Builder).AppendValues([]int32{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, nil) + b.Field(1).(*array.Float64Builder).AppendValues([]float64{11, 12, 13, 14, 15, 16, 17, 18, 19, 20}, nil) + + rec2 := b.NewRecord() + defer rec2.Release() + + tbl := array.NewTableFromRecords(schema, []array.Record{rec1, rec2}) + defer tbl.Release() + + df, err := dframe.FromTable(tbl) + if err != nil { + log.Fatal(err) + } + defer df.Release() + + fmt.Printf("cols: %v\n", df.ColumnNames()) + + err = df.Exec(func(tx *dframe.Tx) error { + tx.Drop("f1-i32") + tx.Copy("fx-f64", "f2-f64") + return nil + }) + if err != nil { + log.Fatal(err) + } + + fmt.Printf("cols: %v\n", df.ColumnNames()) + + tr := array.NewTableReader(df, 5) + defer tr.Release() + + n := 0 + for tr.Next() { + rec := tr.Record() + for i, col := range rec.Columns() { + fmt.Printf("rec[%d][%q]: %v\n", n, rec.ColumnName(i), col) + } + n++ + } + + // Output: + // cols: [f1-i32 f2-f64] + // cols: [f2-f64 fx-f64] + // rec[0]["f2-f64"]: [1 2 3 4 5] + // rec[0]["fx-f64"]: [1 2 3 4 5] + // rec[1]["f2-f64"]: [6 7 8 9 10] + // rec[1]["fx-f64"]: [6 7 8 9 10] + // rec[2]["f2-f64"]: [11 12 13 14 15] + // rec[2]["fx-f64"]: [11 12 13 14 15] + // rec[3]["f2-f64"]: [16 17 18 19 20] + // rec[3]["fx-f64"]: [16 17 18 19 20] +} From 6de2ece0b2c990eccb51d2714d2d61d2956761f0 Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Thu, 10 Jan 2019 13:55:27 +0100 Subject: [PATCH 09/26] dframe: cosmetics --- dframe/README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/dframe/README.md b/dframe/README.md index 86e7e46..e539aa3 100644 --- a/dframe/README.md +++ b/dframe/README.md @@ -51,16 +51,16 @@ type Frame struct { func (df *Frame) Err() error { ... } // NumRows returns the number of rows of this Frame. -func (df *Frame) NumRows() int +func (df *Frame) NumRows() int { ... } // NumCols returns the number of columns of this Frame. -func (df *Frame) NumCols() int +func (df *Frame) NumCols() int { ... } // Column returns the i-th column of this Frame. -func (df *Frame) Column(i int) *array.Column +func (df *Frame) Column(i int) *array.Column { ... } // ColumnNames returns the list of column names of this Frame. -func (df *Frame) ColumnNames() []string +func (df *Frame) ColumnNames() []string { ... } ``` It is expected to build `dframe.Frame` on top of `arrow/array.Interface` and/or `arrow/tensor.Interface` to re-use the SIMD optimized operations and zero-copy optimization that are implemented within these packages. From aedf124c0e19c691c40e71a0e06fbf75e7f20b25 Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Thu, 10 Jan 2019 18:09:30 +0100 Subject: [PATCH 10/26] dframe: add FromArrays, FromCols --- dframe/README.md | 6 ++ dframe/dframe.go | 84 +++++++++++++++++++--- dframe/dframe_example_test.go | 132 +++++++++++++++++++++++++++++++++- 3 files changed, 213 insertions(+), 9 deletions(-) diff --git a/dframe/README.md b/dframe/README.md index e539aa3..be29eaa 100644 --- a/dframe/README.md +++ b/dframe/README.md @@ -136,6 +136,12 @@ func Create(drv, dst string, schema *arrow.Schema, opts ...Option) (*Frame, erro // New creates a new in-memory data frame with the provided memory schema. func New(schema *arrow.Schema, opts ...Option) (*Frame, error) { ... } +// FromArrays creates a new data frame from the provided schema and arrays. +func FromArrays(schema *arrow.Schema, arrs []array.Interface, opts ...Option) (*Frame, error) { ... } + +// FromCols creates a new data frame from the provided schema and columns. +func FromCols(cols []array.Column, opts ...Option) (*Frame, error) { ... } + // FromTable creates a new data frame from the provided arrow table. func FromTable(tbl array.Table, opts ...Option) (*Frame, error) { ... } ``` diff --git a/dframe/dframe.go b/dframe/dframe.go index 37dde6f..8850768 100644 --- a/dframe/dframe.go +++ b/dframe/dframe.go @@ -13,7 +13,6 @@ package dframe import ( - "fmt" "sync" "sync/atomic" @@ -36,7 +35,65 @@ type Frame struct { rows int64 } -// FromCols creates a new data frame from the provided columns. +// FromArrays creates a new data frame from the provided schema and arrays. +func FromArrays(schema *arrow.Schema, arrs []array.Interface, opts ...Option) (*Frame, error) { + df := &Frame{ + refs: 1, + mem: memory.NewGoAllocator(), + schema: schema, + rows: -1, + } + + for _, opt := range opts { + err := opt(df) + if err != nil { + return nil, err + } + } + + if df.rows < 0 { + switch len(arrs) { + case 0: + df.rows = 0 + default: + df.rows = int64(arrs[0].Len()) + } + } + + if df.schema == nil { + return nil, errors.Errorf("dframe: nil schema") + } + + if len(df.schema.Fields()) != len(arrs) { + return nil, errors.Errorf("dframe: inconsistent schema/arrays") + } + + for i, arr := range arrs { + ft := df.schema.Field(i) + if arr.DataType() != ft.Type { + return nil, errors.Errorf("dframe: column %q is inconsitent with schema", ft.Name) + } + + if int64(arr.Len()) < df.rows { + return nil, errors.Errorf("dframe: column %q expected length >= %d but got length %d", ft.Name, df.rows, arr.Len()) + } + } + + df.cols = make([]array.Column, len(arrs)) + for i := range arrs { + func(i int) { + chunk := array.NewChunked(arrs[i].DataType(), []array.Interface{arrs[i]}) + defer chunk.Release() + + col := array.NewColumn(df.schema.Field(i), chunk) + df.cols[i] = *col + }(i) + } + + return df, nil +} + +// FromCols creates a new data frame from the provided schema and columns. func FromCols(cols []array.Column, opts ...Option) (*Frame, error) { df := &Frame{ refs: 1, @@ -61,7 +118,7 @@ func FromCols(cols []array.Column, opts ...Option) (*Frame, error) { } } - if df.schema == nil { + { fields := make([]arrow.Field, len(cols)) for i, col := range cols { fields[i].Name = col.Name() @@ -74,7 +131,10 @@ func FromCols(cols []array.Column, opts ...Option) (*Frame, error) { // note we retain the columns after having validated the data frame // in case the validation fails and panics (and would otherwise leak // a ref-count on the columns.) - df.validate() + err := df.validate() + if err != nil { + return nil, err + } for i := range df.cols { df.cols[i].Retain() @@ -109,19 +169,20 @@ func FromTable(tbl array.Table, opts ...Option) (*Frame, error) { return df, nil } -func (df *Frame) validate() { +func (df *Frame) validate() error { if len(df.cols) != len(df.schema.Fields()) { - panic(errors.New("dframe: table schema mismatch")) + return errors.New("dframe: table schema mismatch") } for i, col := range df.cols { if !col.Field().Equal(df.schema.Field(i)) { - panic(fmt.Errorf("dframe: column field %q is inconsistent with schema", col.Name())) + return errors.Errorf("dframe: column field %q is inconsistent with schema", col.Name()) } if int64(col.Len()) < df.rows { - panic(fmt.Errorf("dframe: column %q expected length >= %d but got length %d", col.Name(), df.rows, col.Len())) + return errors.Errorf("dframe: column %q expected length >= %d but got length %d", col.Name(), df.rows, col.Len()) } } + return nil } // Option configures an aspect of a data frame. @@ -296,11 +357,17 @@ func (tx *Tx) Copy(dst, src string) *Tx { return tx } +// Slice creates a new frame consisting of rows[beg:end]. func (tx *Tx) Slice(beg, end int) *Tx { if tx.err != nil { return tx } + if int64(end) > tx.df.rows || beg > end { + tx.err = errors.Errorf("dframe: index out of range") + return tx + } + cols := make([]array.Column, tx.df.NumCols()) for i := range cols { cols[i] = *tx.df.Column(i).NewSlice(int64(beg), int64(end)) @@ -311,6 +378,7 @@ func (tx *Tx) Slice(beg, end int) *Tx { } tx.df.cols = cols + tx.df.rows = int64(end - beg) return tx } diff --git a/dframe/dframe_example_test.go b/dframe/dframe_example_test.go index f932cb5..e13dd33 100644 --- a/dframe/dframe_example_test.go +++ b/dframe/dframe_example_test.go @@ -14,7 +14,7 @@ import ( "gonum.org/v1/exp/dframe" ) -func ExampleFrame() { +func ExampleFrame_fromTable() { pool := memory.NewGoAllocator() @@ -88,3 +88,133 @@ func ExampleFrame() { // rec[3]["f2-f64"]: [16 17 18 19 20] // rec[3]["fx-f64"]: [16 17 18 19 20] } + +func ExampleFrame_fromArrays() { + + pool := memory.NewGoAllocator() + + schema := arrow.NewSchema( + []arrow.Field{ + arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32}, + arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64}, + }, + nil, + ) + + b := array.NewRecordBuilder(pool, schema) + defer b.Release() + + b.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + b.Field(1).(*array.Float64Builder).AppendValues([]float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + rec := b.NewRecord() + defer rec.Release() + + df, err := dframe.FromArrays(schema, rec.Columns()) + if err != nil { + log.Fatal(err) + } + defer df.Release() + + fmt.Printf("cols: %v\n", df.ColumnNames()) + + err = df.Exec(func(tx *dframe.Tx) error { + tx.Drop("f1-i32") + tx.Copy("fx-f64", "f2-f64") + tx.Slice(3, 8) + return nil + }) + if err != nil { + log.Fatal(err) + } + fmt.Printf("cols: %v\n", df.ColumnNames()) + + tr := array.NewTableReader(df, -1) + defer tr.Release() + + n := 0 + for tr.Next() { + rec := tr.Record() + for i, col := range rec.Columns() { + fmt.Printf("rec[%d][%q]: %v\n", n, rec.ColumnName(i), col) + } + n++ + } + + // Output: + // cols: [f1-i32 f2-f64] + // cols: [f2-f64 fx-f64] + // rec[0]["f2-f64"]: [4 5 6 7 8] + // rec[0]["fx-f64"]: [4 5 6 7 8] +} + +func ExampleFrame_fromCols() { + + pool := memory.NewGoAllocator() + + schema := arrow.NewSchema( + []arrow.Field{ + arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32}, + arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64}, + }, + nil, + ) + + b := array.NewRecordBuilder(pool, schema) + defer b.Release() + + b.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + b.Field(1).(*array.Float64Builder).AppendValues([]float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + rec := b.NewRecord() + defer rec.Release() + + cols := func() []array.Column { + var cols []array.Column + for i, field := range schema.Fields() { + chunk := array.NewChunked(field.Type, []array.Interface{rec.Column(i)}) + defer chunk.Release() + col := array.NewColumn(field, chunk) + cols = append(cols, *col) + } + return cols + }() + + df, err := dframe.FromCols(cols) + if err != nil { + log.Fatal(err) + } + defer df.Release() + + fmt.Printf("cols: %v\n", df.ColumnNames()) + + err = df.Exec(func(tx *dframe.Tx) error { + tx.Drop("f1-i32") + tx.Copy("fx-f64", "f2-f64") + tx.Slice(3, 8) + return nil + }) + if err != nil { + log.Fatal(err) + } + + fmt.Printf("cols: %v\n", df.ColumnNames()) + + tr := array.NewTableReader(df, 5) + defer tr.Release() + + n := 0 + for tr.Next() { + rec := tr.Record() + for i, col := range rec.Columns() { + fmt.Printf("rec[%d][%q]: %v\n", n, rec.ColumnName(i), col) + } + n++ + } + + // Output: + // cols: [f1-i32 f2-f64] + // cols: [f2-f64 fx-f64] + // rec[0]["f2-f64"]: [4 5 6 7 8] + // rec[0]["fx-f64"]: [4 5 6 7 8] +} From 99a7d50f6b4f939fc4e8fc3fdd2fc74ab599e016 Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Thu, 10 Jan 2019 18:37:56 +0100 Subject: [PATCH 11/26] dframe: add FromFrame --- dframe/README.md | 6 +++ dframe/dframe.go | 13 +++++++ dframe/dframe_example_test.go | 70 +++++++++++++++++++++++++++++++++++ 3 files changed, 89 insertions(+) diff --git a/dframe/README.md b/dframe/README.md index be29eaa..1ee1aa5 100644 --- a/dframe/README.md +++ b/dframe/README.md @@ -144,6 +144,10 @@ func FromCols(cols []array.Column, opts ...Option) (*Frame, error) { ... } // FromTable creates a new data frame from the provided arrow table. func FromTable(tbl array.Table, opts ...Option) (*Frame, error) { ... } + +// FromFrame returns a new data frame created by applying the provided +// transaction on the provided frame. +func FromFrame(df *Frame, f func(tx *Tx) error) (*Frame, error) { ... } ``` ### Operations @@ -155,3 +159,5 @@ One should be able to carry the following operations on a `dframe.Frame`: - drop columns from a `Frame` - append new data to a `Frame`, (either a new column or a new row) - select a subset of columns from a `Frame` +- create different versions of a `Frame: _e.g._ create `sub` from `Frame` `df` where `sub` is a subset of `df`. + diff --git a/dframe/dframe.go b/dframe/dframe.go index 8850768..a3be66d 100644 --- a/dframe/dframe.go +++ b/dframe/dframe.go @@ -169,6 +169,19 @@ func FromTable(tbl array.Table, opts ...Option) (*Frame, error) { return df, nil } +// FromFrame returns a new data frame created by applying the provided +// transaction on the provided frame. +func FromFrame(df *Frame, f func(tx *Tx) error) (*Frame, error) { + out := df.clone() + err := out.Exec(f) + if err != nil { + out.Release() + return nil, err + } + + return out, nil +} + func (df *Frame) validate() error { if len(df.cols) != len(df.schema.Fields()) { return errors.New("dframe: table schema mismatch") diff --git a/dframe/dframe_example_test.go b/dframe/dframe_example_test.go index e13dd33..b444bd6 100644 --- a/dframe/dframe_example_test.go +++ b/dframe/dframe_example_test.go @@ -218,3 +218,73 @@ func ExampleFrame_fromCols() { // rec[0]["f2-f64"]: [4 5 6 7 8] // rec[0]["fx-f64"]: [4 5 6 7 8] } + +func ExampleFrame_fromFrame() { + + pool := memory.NewGoAllocator() + + schema := arrow.NewSchema( + []arrow.Field{ + arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32}, + arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64}, + }, + nil, + ) + + b := array.NewRecordBuilder(pool, schema) + defer b.Release() + + b.Field(0).(*array.Int32Builder).AppendValues([]int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + b.Field(1).(*array.Float64Builder).AppendValues([]float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, nil) + + rec := b.NewRecord() + defer rec.Release() + + df, err := dframe.FromArrays(schema, rec.Columns()) + if err != nil { + log.Fatal(err) + } + defer df.Release() + + fmt.Printf("cols: %v\n", df.ColumnNames()) + + sub, err := dframe.FromFrame(df, func(tx *dframe.Tx) error { + tx.Drop("f1-i32") + tx.Copy("fx-f64", "f2-f64") + tx.Slice(3, 8) + return nil + }) + if err != nil { + log.Fatal(err) + } + defer sub.Release() + + fmt.Printf("sub: %v\n", sub.ColumnNames()) + fmt.Printf("cols: %v\n", df.ColumnNames()) + + for i, df := range []*dframe.Frame{df, sub} { + fmt.Printf("--- frame %d ---\n", i) + tr := array.NewTableReader(df, -1) + defer tr.Release() + + n := 0 + for tr.Next() { + rec := tr.Record() + for i, col := range rec.Columns() { + fmt.Printf("rec[%d][%q]: %v\n", n, rec.ColumnName(i), col) + } + n++ + } + } + + // Output: + // cols: [f1-i32 f2-f64] + // sub: [f2-f64 fx-f64] + // cols: [f1-i32 f2-f64] + // --- frame 0 --- + // rec[0]["f1-i32"]: [1 2 3 4 5 6 7 8 9 10] + // rec[0]["f2-f64"]: [1 2 3 4 5 6 7 8 9 10] + // --- frame 1 --- + // rec[0]["f2-f64"]: [4 5 6 7 8] + // rec[0]["fx-f64"]: [4 5 6 7 8] +} From 964a9ce86330056c99e73425b92fa4fba337bdd3 Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Thu, 10 Jan 2019 18:50:52 +0100 Subject: [PATCH 12/26] dframe: make sure *Frame implements array.Table --- dframe/dframe.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dframe/dframe.go b/dframe/dframe.go index a3be66d..450c756 100644 --- a/dframe/dframe.go +++ b/dframe/dframe.go @@ -425,3 +425,7 @@ func (tx *Tx) Drop(cols ...string) *Tx { tx.df.schema = sc return tx } + +var ( + _ array.Table = (*Frame)(nil) +) From 3ec4386bcf1b13650711219e87d12a50c1b5bb84 Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Thu, 10 Jan 2019 19:26:45 +0100 Subject: [PATCH 13/26] dframe: add FromMem --- dframe/README.md | 3 + dframe/dframe.go | 128 ++++++++++++++++++++++++++++++++++ dframe/dframe_example_test.go | 53 ++++++++++++++ 3 files changed, 184 insertions(+) diff --git a/dframe/README.md b/dframe/README.md index 1ee1aa5..2b9f5bb 100644 --- a/dframe/README.md +++ b/dframe/README.md @@ -136,6 +136,9 @@ func Create(drv, dst string, schema *arrow.Schema, opts ...Option) (*Frame, erro // New creates a new in-memory data frame with the provided memory schema. func New(schema *arrow.Schema, opts ...Option) (*Frame, error) { ... } +// FromMem creates a new data frame from the provided in-memory data. +func FromMem(cols Map, opts ...Option) (*Frame, error) { ... } + // FromArrays creates a new data frame from the provided schema and arrays. func FromArrays(schema *arrow.Schema, arrs []array.Interface, opts ...Option) (*Frame, error) { ... } diff --git a/dframe/dframe.go b/dframe/dframe.go index 450c756..d71f60b 100644 --- a/dframe/dframe.go +++ b/dframe/dframe.go @@ -35,6 +35,134 @@ type Frame struct { rows int64 } +type Map map[string]interface{} + +// FromMem creates a new data frame from the provided in-memory data. +func FromMem(cols Map, opts ...Option) (*Frame, error) { + var ( + err error + mem = memory.NewGoAllocator() + arrs = make([]array.Interface, 0, len(cols)) + fields = make([]arrow.Field, 0, len(cols)) + ) + + for k, v := range cols { + func(k string, v interface{}) { + var ( + arr array.Interface + ) + switch v := v.(type) { + case []bool: + bld := array.NewBooleanBuilder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []int8: + bld := array.NewInt8Builder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []int16: + bld := array.NewInt16Builder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []int32: + bld := array.NewInt32Builder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []int64: + bld := array.NewInt64Builder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []uint8: + bld := array.NewUint8Builder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []uint16: + bld := array.NewUint16Builder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []uint32: + bld := array.NewUint32Builder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []uint64: + bld := array.NewUint64Builder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []float32: + bld := array.NewFloat32Builder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []float64: + bld := array.NewFloat64Builder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + case []string: + bld := array.NewStringBuilder(mem) + defer bld.Release() + + bld.AppendValues(v, nil) + arr = bld.NewArray() + + default: + if err == nil { + err = errors.Errorf("dframe: invalid data type for %q (%T)", k, v) + return + } + } + + arrs = append(arrs, arr) + fields = append(fields, arrow.Field{Name: k, Type: arr.DataType()}) + + }(k, v) + } + + defer func() { + for i := range arrs { + arrs[i].Release() + } + }() + + if err != nil { + return nil, err + } + + schema := arrow.NewSchema(fields, nil) + return FromArrays(schema, arrs, opts...) +} + // FromArrays creates a new data frame from the provided schema and arrays. func FromArrays(schema *arrow.Schema, arrs []array.Interface, opts ...Option) (*Frame, error) { df := &Frame{ diff --git a/dframe/dframe_example_test.go b/dframe/dframe_example_test.go index b444bd6..4bfac1b 100644 --- a/dframe/dframe_example_test.go +++ b/dframe/dframe_example_test.go @@ -288,3 +288,56 @@ func ExampleFrame_fromFrame() { // rec[0]["f2-f64"]: [4 5 6 7 8] // rec[0]["fx-f64"]: [4 5 6 7 8] } + +func ExampleFrame_fromMem() { + df, err := dframe.FromMem(dframe.Map{ + "f1-i32": []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + "f2-f64": []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + }) + if err != nil { + log.Fatal(err) + } + defer df.Release() + + fmt.Printf("cols: %v\n", df.ColumnNames()) + + sub, err := dframe.FromFrame(df, func(tx *dframe.Tx) error { + tx.Drop("f1-i32") + tx.Copy("fx-f64", "f2-f64") + tx.Slice(3, 8) + return nil + }) + if err != nil { + log.Fatal(err) + } + defer sub.Release() + + fmt.Printf("sub: %v\n", sub.ColumnNames()) + fmt.Printf("cols: %v\n", df.ColumnNames()) + + for i, df := range []*dframe.Frame{df, sub} { + fmt.Printf("--- frame %d ---\n", i) + tr := array.NewTableReader(df, -1) + defer tr.Release() + + n := 0 + for tr.Next() { + rec := tr.Record() + for i, col := range rec.Columns() { + fmt.Printf("rec[%d][%q]: %v\n", n, rec.ColumnName(i), col) + } + n++ + } + } + + // Output: + // cols: [f1-i32 f2-f64] + // sub: [f2-f64 fx-f64] + // cols: [f1-i32 f2-f64] + // --- frame 0 --- + // rec[0]["f1-i32"]: [1 2 3 4 5 6 7 8 9 10] + // rec[0]["f2-f64"]: [1 2 3 4 5 6 7 8 9 10] + // --- frame 1 --- + // rec[0]["f2-f64"]: [4 5 6 7 8] + // rec[0]["fx-f64"]: [4 5 6 7 8] +} From 3ffc95af206b0e12ccadfcd50a9d7a49b7ccfc65 Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Fri, 11 Jan 2019 10:37:25 +0100 Subject: [PATCH 14/26] dframe: rename Map to Dict --- dframe/README.md | 2 +- dframe/dframe.go | 11 ++++++----- dframe/dframe_example_test.go | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/dframe/README.md b/dframe/README.md index 2b9f5bb..938607b 100644 --- a/dframe/README.md +++ b/dframe/README.md @@ -137,7 +137,7 @@ func Create(drv, dst string, schema *arrow.Schema, opts ...Option) (*Frame, erro func New(schema *arrow.Schema, opts ...Option) (*Frame, error) { ... } // FromMem creates a new data frame from the provided in-memory data. -func FromMem(cols Map, opts ...Option) (*Frame, error) { ... } +func FromMem(dict Dict, opts ...Option) (*Frame, error) { ... } // FromArrays creates a new data frame from the provided schema and arrays. func FromArrays(schema *arrow.Schema, arrs []array.Interface, opts ...Option) (*Frame, error) { ... } diff --git a/dframe/dframe.go b/dframe/dframe.go index d71f60b..3bd9eac 100644 --- a/dframe/dframe.go +++ b/dframe/dframe.go @@ -35,18 +35,19 @@ type Frame struct { rows int64 } -type Map map[string]interface{} +// Dict is a map of string to array of data. +type Dict map[string]interface{} // FromMem creates a new data frame from the provided in-memory data. -func FromMem(cols Map, opts ...Option) (*Frame, error) { +func FromMem(dict Dict, opts ...Option) (*Frame, error) { var ( err error mem = memory.NewGoAllocator() - arrs = make([]array.Interface, 0, len(cols)) - fields = make([]arrow.Field, 0, len(cols)) + arrs = make([]array.Interface, 0, len(dict)) + fields = make([]arrow.Field, 0, len(dict)) ) - for k, v := range cols { + for k, v := range dict { func(k string, v interface{}) { var ( arr array.Interface diff --git a/dframe/dframe_example_test.go b/dframe/dframe_example_test.go index 4bfac1b..ae7d5dc 100644 --- a/dframe/dframe_example_test.go +++ b/dframe/dframe_example_test.go @@ -290,7 +290,7 @@ func ExampleFrame_fromFrame() { } func ExampleFrame_fromMem() { - df, err := dframe.FromMem(dframe.Map{ + df, err := dframe.FromMem(dframe.Dict{ "f1-i32": []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, "f2-f64": []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, }) From 974c4db6181dc2aa0b62dcc48c9afd2e6e097ac6 Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Fri, 11 Jan 2019 17:01:08 +0100 Subject: [PATCH 15/26] dframe: add support for []int and []uint in FromMem --- dframe/dframe.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/dframe/dframe.go b/dframe/dframe.go index 3bd9eac..5ea7162 100644 --- a/dframe/dframe.go +++ b/dframe/dframe.go @@ -137,6 +137,30 @@ func FromMem(dict Dict, opts ...Option) (*Frame, error) { bld.AppendValues(v, nil) arr = bld.NewArray() + case []uint: + bld := array.NewUint64Builder(mem) + defer bld.Release() + + vs := make([]uint64, len(v)) + for i, e := range v { + vs[i] = uint64(e) + } + + bld.AppendValues(vs, nil) + arr = bld.NewArray() + + case []int: + bld := array.NewInt64Builder(mem) + defer bld.Release() + + vs := make([]int64, len(v)) + for i, e := range v { + vs[i] = int64(e) + } + + bld.AppendValues(vs, nil) + arr = bld.NewArray() + default: if err == nil { err = errors.Errorf("dframe: invalid data type for %q (%T)", k, v) From 67124b5cc14bbfef59fd4ce4bb2fc64b49ffdb5c Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Fri, 11 Jan 2019 17:13:15 +0100 Subject: [PATCH 16/26] dframe: introduce read/only and read/write transactions --- dframe/README.md | 14 ++++++++++++-- dframe/dframe.go | 46 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/dframe/README.md b/dframe/README.md index 938607b..4e94534 100644 --- a/dframe/README.md +++ b/dframe/README.md @@ -94,7 +94,9 @@ It is not clear (to me!) yet whether an immutable `Frame` makes much sense in Go But, immutable or not, one could recoup the nice "chained methods" API by introducing a `dframe.Tx` transaction: ```go -func (df *Frame) Exec(f func(tx *Tx) error) error +// Exec runs the provided function inside an atomic read/write transaction, +// applied on this Frame. +func (df *Frame) Exec(f func(tx *Tx) error) error { ... } func example(df *dframe.Frame) { err := df.Exec(func(tx *dframe.Tx) error { @@ -151,6 +153,14 @@ func FromTable(tbl array.Table, opts ...Option) (*Frame, error) { ... } // FromFrame returns a new data frame created by applying the provided // transaction on the provided frame. func FromFrame(df *Frame, f func(tx *Tx) error) (*Frame, error) { ... } + +// Exec runs the provided function inside an atomic read/write transaction, +// applied on this Frame. +func (df *Frame) Exec(f func(tx *Tx) error) error { ... } + +// RExec runs the provided function inside an atomic read-only transaction, +// applied on this Frame. +func (df *Frame) RExec(f func(tx *Tx) error) error { ... } ``` ### Operations @@ -162,5 +172,5 @@ One should be able to carry the following operations on a `dframe.Frame`: - drop columns from a `Frame` - append new data to a `Frame`, (either a new column or a new row) - select a subset of columns from a `Frame` -- create different versions of a `Frame: _e.g._ create `sub` from `Frame` `df` where `sub` is a subset of `df`. +- create different versions of a `Frame`: _e.g._ create `sub` from `Frame` `df` where `sub` is a subset of `df`. diff --git a/dframe/dframe.go b/dframe/dframe.go index 5ea7162..f8e9320 100644 --- a/dframe/dframe.go +++ b/dframe/dframe.go @@ -418,6 +418,8 @@ func (df *Frame) ColumnNames() []string { return names } +// Exec runs the provided function inside an atomic read-write transaction, +// applied on this Frame. func (df *Frame) Exec(f func(tx *Tx) error) error { df.mu.Lock() defer df.mu.Unlock() @@ -441,6 +443,27 @@ func (df *Frame) Exec(f func(tx *Tx) error) error { return nil } +// RExec runs the provided function inside an atomic read-only transaction, +// applied on this Frame. +func (df *Frame) RExec(f func(tx *Tx) error) error { + df.mu.RLock() + defer df.mu.RUnlock() + + if df.err != nil { + return df.err + } + + tx := newRTx(df) + defer tx.Close() + + err := f(tx) + if err != nil { + return err + } + + return tx.Err() +} + func (lhs *Frame) swap(rhs *Frame) { rhs.refs = atomic.SwapInt64(&lhs.refs, rhs.refs) lhs.mem, rhs.mem = rhs.mem, lhs.mem @@ -467,11 +490,17 @@ func (df *Frame) clone() *Frame { // Tx represents a read-only or read/write transaction on a data frame. type Tx struct { df *Frame + rw bool // read-write access err error } func newTx(df *Frame) *Tx { - tx := &Tx{df: df.clone()} + tx := &Tx{df: df.clone(), rw: true} + return tx +} + +func newRTx(df *Frame) *Tx { + tx := &Tx{df: df.clone(), rw: false} return tx } @@ -497,6 +526,11 @@ func (tx *Tx) Copy(dst, src string) *Tx { return tx } + if !tx.rw { + tx.err = errors.Errorf("dframe: r/w operation on read-only transaction") + return tx + } + if tx.df.Schema().HasField(dst) { tx.err = errors.Errorf("dframe: column %q already exists", dst) return tx @@ -529,6 +563,11 @@ func (tx *Tx) Slice(beg, end int) *Tx { return tx } + if !tx.rw { + tx.err = errors.Errorf("dframe: r/w operation on read-only transaction") + return tx + } + if int64(end) > tx.df.rows || beg > end { tx.err = errors.Errorf("dframe: index out of range") return tx @@ -553,6 +592,11 @@ func (tx *Tx) Drop(cols ...string) *Tx { return tx } + if !tx.rw { + tx.err = errors.Errorf("dframe: r/w operation on read-only transaction") + return tx + } + set := make(map[string]struct{}, len(cols)) for _, col := range cols { set[col] = struct{}{} From 8b840c336b7cca70d2ba165c6bab3fd877013792 Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Fri, 11 Jan 2019 17:52:16 +0100 Subject: [PATCH 17/26] dframe: add import clause --- dframe/dframe.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dframe/dframe.go b/dframe/dframe.go index f8e9320..425c9db 100644 --- a/dframe/dframe.go +++ b/dframe/dframe.go @@ -10,7 +10,7 @@ // Ultimately, dframe should also allow for a good inter-operability with // Apache Arrow: // - https://godoc.org/github.com/apache/arrow/go/arrow -package dframe +package dframe // import "gonum.org/v1/exp/dframe" import ( "sync" From a2278f672fc671e6baefd46f6c8d6c369c8c93ea Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Fri, 11 Jan 2019 17:52:37 +0100 Subject: [PATCH 18/26] dframe/dfmat: first import --- dframe/dfmat/dfmat.go | 142 +++++++++++++++++++++++++++++ dframe/dfmat/dfmat_example_test.go | 98 ++++++++++++++++++++ 2 files changed, 240 insertions(+) create mode 100644 dframe/dfmat/dfmat.go create mode 100644 dframe/dfmat/dfmat_example_test.go diff --git a/dframe/dfmat/dfmat.go b/dframe/dfmat/dfmat.go new file mode 100644 index 0000000..0ddc6f0 --- /dev/null +++ b/dframe/dfmat/dfmat.go @@ -0,0 +1,142 @@ +// Copyright ©2019 The Gonum Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package dfmat provides a set of tools to easily leverage gonum/mat types +// from exp/dframe (and vice versa.) +// +// This is still a WIP package, building on the experience from: +// - https://github.com/kniren/gota +// - https://github.com/tobgu/qframe +// Ultimately, dframe should also allow for a good inter-operability with +// Apache Arrow: +// - https://godoc.org/github.com/apache/arrow/go/arrow +package dfmat // import "gonum.org/v1/exp/dframe/dfmat" + +import ( + "fmt" + + "github.com/apache/arrow/go/arrow" + "github.com/apache/arrow/go/arrow/array" + "github.com/apache/arrow/go/arrow/memory" + "gonum.org/v1/exp/dframe" + "gonum.org/v1/gonum/mat" +) + +type Option func(c *config) + +// WithNames configures a dframe.Frame with the provided set of column names. +func WithNames(names ...string) Option { + return func(c *config) { + c.names = make([]string, len(names)) + copy(c.names, names) + } +} + +type config struct { + names []string +} + +func newConfig(n int) *config { + cfg := config{names: make([]string, n)} + for i := range cfg.names { + cfg.names[i] = fmt.Sprintf("col-%03d", i+1) + } + return &cfg +} + +// FromMatrix creates a new dframe.Frame from a gonum/mat.Matrix. +func FromMatrix(m mat.Matrix, opts ...Option) *dframe.Frame { + var ( + mem = memory.NewGoAllocator() + bld = array.NewFloat64Builder(mem) + + r, c = m.Dims() + arrs = make([]array.Interface, c) + fields = make([]arrow.Field, c) + ) + defer bld.Release() + + cfg := newConfig(c) + for _, opt := range opts { + opt(cfg) + } + + switch m := m.(type) { + case mat.RawColViewer: + for i := 0; i < c; i++ { + col := m.RawColView(i) + bld.AppendValues(col, nil) + arrs[i] = bld.NewArray() + fields[i] = arrow.Field{ + Name: cfg.names[i], + Type: arrs[i].DataType(), + } + } + default: + for i := 0; i < c; i++ { + for j := 0; j < r; j++ { + bld.Append(m.At(j, i)) + } + arrs[i] = bld.NewArray() + fields[i] = arrow.Field{ + Name: cfg.names[i], + Type: arrs[i].DataType(), + } + } + } + + schema := arrow.NewSchema(fields, nil) + df, err := dframe.FromArrays(schema, arrs) + if err != nil { + panic(err) + } + + return df +} + +// FromVector creates a new dframe.Frame from a gonum/mat.Vector. +func FromVector(vec mat.Vector, opts ...Option) *dframe.Frame { + var ( + mem = memory.NewGoAllocator() + bld = array.NewFloat64Builder(mem) + + rows = vec.Len() + arrs = make([]array.Interface, 1) + fields = make([]arrow.Field, 1) + ) + defer bld.Release() + + cfg := newConfig(1) + for _, opt := range opts { + opt(cfg) + } + + switch vec := vec.(type) { + case mat.RawColViewer: + col := vec.RawColView(0) + bld.AppendValues(col, nil) + arrs[0] = bld.NewArray() + fields[0] = arrow.Field{ + Name: cfg.names[0], + Type: arrs[0].DataType(), + } + default: + for i := 0; i < rows; i++ { + bld.Append(vec.AtVec(i)) + } + arrs[0] = bld.NewArray() + fields[0] = arrow.Field{ + Name: cfg.names[0], + Type: arrs[0].DataType(), + } + } + + schema := arrow.NewSchema(fields, nil) + df, err := dframe.FromArrays(schema, arrs) + if err != nil { + panic(err) + } + + return df +} diff --git a/dframe/dfmat/dfmat_example_test.go b/dframe/dfmat/dfmat_example_test.go new file mode 100644 index 0000000..25140a7 --- /dev/null +++ b/dframe/dfmat/dfmat_example_test.go @@ -0,0 +1,98 @@ +// Copyright ©2019 The Gonum Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package dfmat_test + +import ( + "fmt" + + "github.com/apache/arrow/go/arrow/array" + + "gonum.org/v1/exp/dframe/dfmat" + "gonum.org/v1/gonum/mat" +) + +func Example_fromMatrix() { + m := mat.NewDense(3, 2, []float64{ + 1, 2, + 3, 4, + 5, 6, + }) + + { + df := dfmat.FromMatrix(m, dfmat.WithNames("x", "y")) + defer df.Release() + + fmt.Printf("cols: %v\n", df.ColumnNames()) + + tr := array.NewTableReader(df, -1) + defer tr.Release() + + n := 0 + for tr.Next() { + rec := tr.Record() + for i, col := range rec.Columns() { + fmt.Printf("rec[%d][%q]: %v\n", n, rec.ColumnName(i), col) + } + n++ + } + } + + { + df := dfmat.FromMatrix(m.T(), dfmat.WithNames("x", "y", "z")) + defer df.Release() + + fmt.Printf("cols: %v\n", df.ColumnNames()) + + tr := array.NewTableReader(df, -1) + defer tr.Release() + + n := 0 + for tr.Next() { + rec := tr.Record() + for i, col := range rec.Columns() { + fmt.Printf("rec[%d][%q]: %v\n", n, rec.ColumnName(i), col) + } + n++ + } + } + + // Output: + // cols: [x y] + // rec[0]["x"]: [1 3 5] + // rec[0]["y"]: [2 4 6] + // cols: [x y z] + // rec[0]["x"]: [1 2] + // rec[0]["y"]: [3 4] + // rec[0]["z"]: [5 6] +} + +func Example_fromVector() { + m := mat.NewVecDense(6, []float64{ + 1, 2, + 3, 4, + 5, 6, + }) + + df := dfmat.FromVector(m, dfmat.WithNames("x")) + defer df.Release() + + fmt.Printf("cols: %v\n", df.ColumnNames()) + + tr := array.NewTableReader(df, -1) + defer tr.Release() + + n := 0 + for tr.Next() { + rec := tr.Record() + for i, col := range rec.Columns() { + fmt.Printf("rec[%d][%q]: %v\n", n, rec.ColumnName(i), col) + } + n++ + } + + // Output: + // cols: [x] + // rec[0]["x"]: [1 2 3 4 5 6] +} From 40e9cec1bec507c714c4b75c325f6eebb3391271 Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Fri, 11 Jan 2019 18:50:03 +0100 Subject: [PATCH 19/26] all: apply gofmt simplify --- dframe/dframe_example_test.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/dframe/dframe_example_test.go b/dframe/dframe_example_test.go index ae7d5dc..f5ba738 100644 --- a/dframe/dframe_example_test.go +++ b/dframe/dframe_example_test.go @@ -20,8 +20,8 @@ func ExampleFrame_fromTable() { schema := arrow.NewSchema( []arrow.Field{ - arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32}, - arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64}, + {Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64}, }, nil, ) @@ -95,8 +95,8 @@ func ExampleFrame_fromArrays() { schema := arrow.NewSchema( []arrow.Field{ - arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32}, - arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64}, + {Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64}, }, nil, ) @@ -154,8 +154,8 @@ func ExampleFrame_fromCols() { schema := arrow.NewSchema( []arrow.Field{ - arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32}, - arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64}, + {Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64}, }, nil, ) @@ -225,8 +225,8 @@ func ExampleFrame_fromFrame() { schema := arrow.NewSchema( []arrow.Field{ - arrow.Field{Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32}, - arrow.Field{Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64}, + {Name: "f1-i32", Type: arrow.PrimitiveTypes.Int32}, + {Name: "f2-f64", Type: arrow.PrimitiveTypes.Float64}, }, nil, ) From ea4d8e7f5316f71df231fb7c0c4afef91d4140b4 Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Fri, 11 Jan 2019 18:56:19 +0100 Subject: [PATCH 20/26] dframe: explain the reason for dframe.Tx (ie: ref-counting) --- dframe/README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dframe/README.md b/dframe/README.md index 4e94534..a4edfe2 100644 --- a/dframe/README.md +++ b/dframe/README.md @@ -91,7 +91,8 @@ defer o.Release() ``` It is not clear (to me!) yet whether an immutable `Frame` makes much sense in Go and with this ref-counting mechanism coming from Arrow. -But, immutable or not, one could recoup the nice "chained methods" API by introducing a `dframe.Tx` transaction: +But, immutable or not, the ref-counting mechanism exposed by Arrow needs to be addressed. +A possible solution is investigated by introducing a `dframe.Tx` transaction: ```go // Exec runs the provided function inside an atomic read/write transaction, From 76f4b621861e3ec20734b1e52b27426641218e73 Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Fri, 11 Jan 2019 18:56:50 +0100 Subject: [PATCH 21/26] dframe: cosmetics --- dframe/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dframe/README.md b/dframe/README.md index a4edfe2..72f697c 100644 --- a/dframe/README.md +++ b/dframe/README.md @@ -92,7 +92,7 @@ defer o.Release() It is not clear (to me!) yet whether an immutable `Frame` makes much sense in Go and with this ref-counting mechanism coming from Arrow. But, immutable or not, the ref-counting mechanism exposed by Arrow needs to be addressed. -A possible solution is investigated by introducing a `dframe.Tx` transaction: +A possible solution is investigated by introducing a `dframe.Tx` transaction: ```go // Exec runs the provided function inside an atomic read/write transaction, From 853ae0a0ec71135e2916c8c83c1c47479c790e6a Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Mon, 14 Jan 2019 10:12:07 +0100 Subject: [PATCH 22/26] PS 2 --- dframe/README.md | 2 +- dframe/dfmat/dfmat.go | 10 ++++++---- dframe/dframe.go | 18 +++++++----------- dframe/dframe_example_test.go | 4 ---- 4 files changed, 14 insertions(+), 20 deletions(-) diff --git a/dframe/README.md b/dframe/README.md index 72f697c..0a055dd 100644 --- a/dframe/README.md +++ b/dframe/README.md @@ -70,7 +70,7 @@ Using Arrow should also allow seamless interoperability with other data wranglin Operations on a `QFrame`, such as copying columns, dropping columns, sorting them or applying some kind of operation on columns, return a new `QFrame`, leaving the original untouched. Arrow uses a ref-counting mechanism for all the types that involve memory allocation (mainly to address workloads involving memory allocated on a GPGPU, by a SQL database or a mmap-file.) -This ref-counting mechanism is presented to the user as a pair of methods `Retain/Release` that resp. increment and decrement that reference count. +This ref-counting mechanism is presented to the user as a pair of methods `Retain`/`Release` that increment and decrement that reference count. At first, it would seem this mechanism would prevent to expose an API with "chained methods", as the intermediate `Frame` would be "leaked": ```go diff --git a/dframe/dfmat/dfmat.go b/dframe/dfmat/dfmat.go index 0ddc6f0..b730643 100644 --- a/dframe/dfmat/dfmat.go +++ b/dframe/dfmat/dfmat.go @@ -49,19 +49,20 @@ func newConfig(n int) *config { func FromMatrix(m mat.Matrix, opts ...Option) *dframe.Frame { var ( mem = memory.NewGoAllocator() - bld = array.NewFloat64Builder(mem) r, c = m.Dims() arrs = make([]array.Interface, c) fields = make([]arrow.Field, c) ) - defer bld.Release() cfg := newConfig(c) for _, opt := range opts { opt(cfg) } + bld := array.NewFloat64Builder(mem) + defer bld.Release() + switch m := m.(type) { case mat.RawColViewer: for i := 0; i < c; i++ { @@ -99,19 +100,20 @@ func FromMatrix(m mat.Matrix, opts ...Option) *dframe.Frame { func FromVector(vec mat.Vector, opts ...Option) *dframe.Frame { var ( mem = memory.NewGoAllocator() - bld = array.NewFloat64Builder(mem) rows = vec.Len() arrs = make([]array.Interface, 1) fields = make([]arrow.Field, 1) ) - defer bld.Release() cfg := newConfig(1) for _, opt := range opts { opt(cfg) } + bld := array.NewFloat64Builder(mem) + defer bld.Release() + switch vec := vec.(type) { case mat.RawColViewer: col := vec.RawColView(0) diff --git a/dframe/dframe.go b/dframe/dframe.go index 425c9db..b563b20 100644 --- a/dframe/dframe.go +++ b/dframe/dframe.go @@ -35,6 +35,8 @@ type Frame struct { rows int64 } +var _ array.Table = (*Frame)(nil) + // Dict is a map of string to array of data. type Dict map[string]interface{} @@ -271,14 +273,12 @@ func FromCols(cols []array.Column, opts ...Option) (*Frame, error) { } } - { - fields := make([]arrow.Field, len(cols)) - for i, col := range cols { - fields[i].Name = col.Name() - fields[i].Type = col.DataType() - } - df.schema = arrow.NewSchema(fields, nil) + fields := make([]arrow.Field, len(cols)) + for i, col := range cols { + fields[i].Name = col.Name() + fields[i].Type = col.DataType() } + df.schema = arrow.NewSchema(fields, nil) // validate the data frame and its constituents. // note we retain the columns after having validated the data frame @@ -622,7 +622,3 @@ func (tx *Tx) Drop(cols ...string) *Tx { tx.df.schema = sc return tx } - -var ( - _ array.Table = (*Frame)(nil) -) diff --git a/dframe/dframe_example_test.go b/dframe/dframe_example_test.go index f5ba738..fd339e5 100644 --- a/dframe/dframe_example_test.go +++ b/dframe/dframe_example_test.go @@ -15,7 +15,6 @@ import ( ) func ExampleFrame_fromTable() { - pool := memory.NewGoAllocator() schema := arrow.NewSchema( @@ -90,7 +89,6 @@ func ExampleFrame_fromTable() { } func ExampleFrame_fromArrays() { - pool := memory.NewGoAllocator() schema := arrow.NewSchema( @@ -149,7 +147,6 @@ func ExampleFrame_fromArrays() { } func ExampleFrame_fromCols() { - pool := memory.NewGoAllocator() schema := arrow.NewSchema( @@ -220,7 +217,6 @@ func ExampleFrame_fromCols() { } func ExampleFrame_fromFrame() { - pool := memory.NewGoAllocator() schema := arrow.NewSchema( From 23a82f4fee367dfeeb5c0556b6bd86143438704b Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Mon, 14 Jan 2019 10:48:05 +0100 Subject: [PATCH 23/26] PS 3 --- dframe/dfmat/dfmat_example_test.go | 15 +++++++++++--- dframe/dframe.go | 10 +++------- dframe/dframe_example_test.go | 32 +++++++++++++++++++----------- 3 files changed, 35 insertions(+), 22 deletions(-) diff --git a/dframe/dfmat/dfmat_example_test.go b/dframe/dfmat/dfmat_example_test.go index 25140a7..0701f18 100644 --- a/dframe/dfmat/dfmat_example_test.go +++ b/dframe/dfmat/dfmat_example_test.go @@ -9,10 +9,19 @@ import ( "github.com/apache/arrow/go/arrow/array" + "gonum.org/v1/exp/dframe" "gonum.org/v1/exp/dframe/dfmat" "gonum.org/v1/gonum/mat" ) +func columnNames(df *dframe.Frame) []string { + names := make([]string, df.NumCols()) + for i := range names { + names[i] = df.Name(i) + } + return names +} + func Example_fromMatrix() { m := mat.NewDense(3, 2, []float64{ 1, 2, @@ -24,7 +33,7 @@ func Example_fromMatrix() { df := dfmat.FromMatrix(m, dfmat.WithNames("x", "y")) defer df.Release() - fmt.Printf("cols: %v\n", df.ColumnNames()) + fmt.Printf("cols: %v\n", columnNames(df)) tr := array.NewTableReader(df, -1) defer tr.Release() @@ -43,7 +52,7 @@ func Example_fromMatrix() { df := dfmat.FromMatrix(m.T(), dfmat.WithNames("x", "y", "z")) defer df.Release() - fmt.Printf("cols: %v\n", df.ColumnNames()) + fmt.Printf("cols: %v\n", columnNames(df)) tr := array.NewTableReader(df, -1) defer tr.Release() @@ -78,7 +87,7 @@ func Example_fromVector() { df := dfmat.FromVector(m, dfmat.WithNames("x")) defer df.Release() - fmt.Printf("cols: %v\n", df.ColumnNames()) + fmt.Printf("cols: %v\n", columnNames(df)) tr := array.NewTableReader(df, -1) defer tr.Release() diff --git a/dframe/dframe.go b/dframe/dframe.go index b563b20..6d3e85a 100644 --- a/dframe/dframe.go +++ b/dframe/dframe.go @@ -409,13 +409,9 @@ func (df *Frame) Column(i int) *array.Column { return &df.cols[i] } -// ColumnNames returns the list of column names of this Frame. -func (df *Frame) ColumnNames() []string { - names := make([]string, df.NumCols()) - for i := range names { - names[i] = df.Column(i).Name() - } - return names +// Name returns the name of the i-th column of this Frame. +func (df *Frame) Name(i int) string { + return df.Column(i).Name() } // Exec runs the provided function inside an atomic read-write transaction, diff --git a/dframe/dframe_example_test.go b/dframe/dframe_example_test.go index fd339e5..280ef7f 100644 --- a/dframe/dframe_example_test.go +++ b/dframe/dframe_example_test.go @@ -14,6 +14,14 @@ import ( "gonum.org/v1/exp/dframe" ) +func columnNames(df *dframe.Frame) []string { + names := make([]string, df.NumCols()) + for i := range names { + names[i] = df.Name(i) + } + return names +} + func ExampleFrame_fromTable() { pool := memory.NewGoAllocator() @@ -50,7 +58,7 @@ func ExampleFrame_fromTable() { } defer df.Release() - fmt.Printf("cols: %v\n", df.ColumnNames()) + fmt.Printf("cols: %v\n", columnNames(df)) err = df.Exec(func(tx *dframe.Tx) error { tx.Drop("f1-i32") @@ -61,7 +69,7 @@ func ExampleFrame_fromTable() { log.Fatal(err) } - fmt.Printf("cols: %v\n", df.ColumnNames()) + fmt.Printf("cols: %v\n", columnNames(df)) tr := array.NewTableReader(df, 5) defer tr.Release() @@ -114,7 +122,7 @@ func ExampleFrame_fromArrays() { } defer df.Release() - fmt.Printf("cols: %v\n", df.ColumnNames()) + fmt.Printf("cols: %v\n", columnNames(df)) err = df.Exec(func(tx *dframe.Tx) error { tx.Drop("f1-i32") @@ -125,7 +133,7 @@ func ExampleFrame_fromArrays() { if err != nil { log.Fatal(err) } - fmt.Printf("cols: %v\n", df.ColumnNames()) + fmt.Printf("cols: %v\n", columnNames(df)) tr := array.NewTableReader(df, -1) defer tr.Release() @@ -183,7 +191,7 @@ func ExampleFrame_fromCols() { } defer df.Release() - fmt.Printf("cols: %v\n", df.ColumnNames()) + fmt.Printf("cols: %v\n", columnNames(df)) err = df.Exec(func(tx *dframe.Tx) error { tx.Drop("f1-i32") @@ -195,7 +203,7 @@ func ExampleFrame_fromCols() { log.Fatal(err) } - fmt.Printf("cols: %v\n", df.ColumnNames()) + fmt.Printf("cols: %v\n", columnNames(df)) tr := array.NewTableReader(df, 5) defer tr.Release() @@ -242,7 +250,7 @@ func ExampleFrame_fromFrame() { } defer df.Release() - fmt.Printf("cols: %v\n", df.ColumnNames()) + fmt.Printf("cols: %v\n", columnNames(df)) sub, err := dframe.FromFrame(df, func(tx *dframe.Tx) error { tx.Drop("f1-i32") @@ -255,8 +263,8 @@ func ExampleFrame_fromFrame() { } defer sub.Release() - fmt.Printf("sub: %v\n", sub.ColumnNames()) - fmt.Printf("cols: %v\n", df.ColumnNames()) + fmt.Printf("sub: %v\n", columnNames(sub)) + fmt.Printf("cols: %v\n", columnNames(df)) for i, df := range []*dframe.Frame{df, sub} { fmt.Printf("--- frame %d ---\n", i) @@ -295,7 +303,7 @@ func ExampleFrame_fromMem() { } defer df.Release() - fmt.Printf("cols: %v\n", df.ColumnNames()) + fmt.Printf("cols: %v\n", columnNames(df)) sub, err := dframe.FromFrame(df, func(tx *dframe.Tx) error { tx.Drop("f1-i32") @@ -308,8 +316,8 @@ func ExampleFrame_fromMem() { } defer sub.Release() - fmt.Printf("sub: %v\n", sub.ColumnNames()) - fmt.Printf("cols: %v\n", df.ColumnNames()) + fmt.Printf("sub: %v\n", columnNames(sub)) + fmt.Printf("cols: %v\n", columnNames(df)) for i, df := range []*dframe.Frame{df, sub} { fmt.Printf("--- frame %d ---\n", i) From 3d4f163fca6776074b9d4184503dc681bbe36a75 Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Mon, 14 Jan 2019 12:21:55 +0100 Subject: [PATCH 24/26] PS 4 --- dframe/README.md | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/dframe/README.md b/dframe/README.md index 0a055dd..b52549c 100644 --- a/dframe/README.md +++ b/dframe/README.md @@ -63,21 +63,25 @@ func (df *Frame) Column(i int) *array.Column { ... } func (df *Frame) ColumnNames() []string { ... } ``` -It is expected to build `dframe.Frame` on top of `arrow/array.Interface` and/or `arrow/tensor.Interface` to re-use the SIMD optimized operations and zero-copy optimization that are implemented within these packages. -Using Arrow should also allow seamless interoperability with other data wrangling systems, possibly written in other languages than Go. +It is expected to build `dframe.Frame` on top of the `arrow/array.Interface`. +Leveraging [Arrow](https://arrow.apache.org) for `dframe` enables interoperability with many analysis frameworks, possibly written in other languages than Go. +Arrow arrays are well specified: their memory layout is standardized and the IPC mechanism to send or receive them over the wire is also specified. +This increases the confidence the data we are writing or the analysis pipelines we build with Arrow could be migrated to something else (another language, another framework) if the need should arise. +The Go Arrow package is not feature complete yet with regard to the other language implementations (C++, Java.) +However, the Go implementation already ships with SIMD optimized operations and has the infrastructure for zero-copy support. `tobgu/qframe` presents a `QFrame` type that is essentially immutable. Operations on a `QFrame`, such as copying columns, dropping columns, sorting them or applying some kind of operation on columns, return a new `QFrame`, leaving the original untouched. Arrow uses a ref-counting mechanism for all the types that involve memory allocation (mainly to address workloads involving memory allocated on a GPGPU, by a SQL database or a mmap-file.) This ref-counting mechanism is presented to the user as a pair of methods `Retain`/`Release` that increment and decrement that reference count. -At first, it would seem this mechanism would prevent to expose an API with "chained methods", as the intermediate `Frame` would be "leaked": +It would seem this mechanism prevents from exposing an API with "chained methods": ```go o := df.Slice(0, 10).Select("col1", "col2").Apply("col1 + col2") ``` - -If we want an immutable `Frame`, the code above should be rewritten as: +Each intermediate `Frame` -- the one returned by `Slice`, the one returned by `Select`, ... -- would be "leaked" as it is missing a call to `Release()` to correctly decrement its reference count. +If we want an immutable `Frame` -- without leaking memory, the code above should instead be rewritten as: ```go sli := df.Slice(0, 10) @@ -91,8 +95,8 @@ defer o.Release() ``` It is not clear (to me!) yet whether an immutable `Frame` makes much sense in Go and with this ref-counting mechanism coming from Arrow. -But, immutable or not, the ref-counting mechanism exposed by Arrow needs to be addressed. -A possible solution is investigated by introducing a `dframe.Tx` transaction: +However, introducing a `dframe.Tx` transaction could tackle the memory leak. +One can achieve the above goal if one only allows modifications of the underlying `Frame` through a transaction, where all operations are applied to a single temporary `Frame`: ```go // Exec runs the provided function inside an atomic read/write transaction, @@ -125,6 +129,9 @@ func example(df *dframe.Frame) { } } ``` +Introducing a transaction has another nice feature: if the set of operations fails for some reason, one can rollback to the original state of the `Frame`. + +Finally, with a transaction context, one can build some kind of AST of operations that should be applied to a `Frame` and optionally optimize it behind the scene as one knows the complete set of operations to be carried. ```go // Open opens an already existing Frame using the provided driver technology, From 15d72fa9d63b48fa70bea6611dd949c83e228ad5 Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Tue, 15 Jan 2019 09:49:47 +0100 Subject: [PATCH 25/26] ci: drop 1.9.x because of Apache Arrow --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 0eaaf01..fecd6d5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,6 @@ language: go # Versions of go that are explicitly supported by gonum plus go tip. go: - - 1.9.x - 1.10.x - 1.11.x - master From 7741eff781142976fb10a022046fdf4603788598 Mon Sep 17 00:00:00 2001 From: Sebastien Binet Date: Tue, 15 Jan 2019 11:20:12 +0100 Subject: [PATCH 26/26] dframe: create stable order of columns for FromMem --- dframe/dframe.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/dframe/dframe.go b/dframe/dframe.go index 6d3e85a..a14173d 100644 --- a/dframe/dframe.go +++ b/dframe/dframe.go @@ -13,6 +13,7 @@ package dframe // import "gonum.org/v1/exp/dframe" import ( + "sort" "sync" "sync/atomic" @@ -49,7 +50,13 @@ func FromMem(dict Dict, opts ...Option) (*Frame, error) { fields = make([]arrow.Field, 0, len(dict)) ) - for k, v := range dict { + keys := make([]string, 0, len(dict)) + for k := range dict { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + v := dict[k] func(k string, v interface{}) { var ( arr array.Interface