Permalink
Browse files

sane query and result interfaces

  • Loading branch information...
carloscm committed May 11, 2012
1 parent 6e03459 commit b23d895a6d326ff15a2c47e619e908117ac3dd99
Showing with 252 additions and 145 deletions.
  1. +7 −12 README.md
  2. +4 −4 src/gossie/connection.go
  3. +89 −47 src/gossie/mapping.go
  4. +33 −4 src/gossie/mapping_test.go
  5. +76 −47 src/gossie/query.go
  6. +43 −31 src/gossie/query_test.go
View
@@ -102,12 +102,11 @@ When calling NewMapping() you can tag your struct fiels with `name`, `type` and
The tags `cf`, `key`, `cols` and `value` can be used in any field in the struct to document a mapping. `cf` is the column family name. `key` is the field name in the struct that stores the Cassandra row key value. `cols` is a list of struct fiels that build up the composite column name, if there is any. `value` is the field that stores the column value for compact storage rows.
-### Query and Result interfaces (planned)
+### Query and Result
-High level queries with transparent paging and buffering. This is still WIP, a possible example:
+Query allows to look up mapped structs over Cassandra rows. Pass to `Query.Get` the row key, followed by zero or more composite keys, to get a Result. `Result.Next` reads a single struct from the Cassandra row, and returns `Done` when no more structs can be read.
```Go
-
query := pool.Query(TweetMapping)
// a single tweet, since we pass the row key and all possible composite values
@@ -116,25 +115,21 @@ result, err := query.Get("username", 10000000000004)
// all tweets for a given user
result, err := query.Get("username")
-// all tweets for a given user, starting at a certain TweetID
-result, err := query.Where("TweetID", ">=", 10000000000004).Get("username")
-
// iterating over results
for {
- var t Tweet
- err := result.Next(&t)
+ t := &Tweet{}
+ err := result.Next(t)
if err != nil {
break
}
- ...
}
````
# Planned features
-- Query: range reads for composites with buffering and paging
-- Query: secondary index read with buffering and paging
-- Query: multiget reads with buffering and paging
+- Error passing overhaul, to be based on typing
+- Query: secondary index read with buffering
+- Query: multiget reads with buffering
- A higher level abstraction for writes (Batch interface)
- High level mapping for Go slices
- High level mapping for Go maps
View
@@ -36,7 +36,7 @@ type ConnectionPool interface {
Writer() Writer
// Query returns a high level interface for read operations over structs
- //Query() Query
+ Query(Mapping) Query
// Close all the connections in the pool
Close()
@@ -318,9 +318,9 @@ func (cp *connectionPool) Writer() Writer {
return newWriter(cp, cp.options.WriteConsistency)
}
-//func (cp *connectionPool) Cursor() Cursor {
-// return newCursor(cp)
-//}
+func (cp *connectionPool) Query(m Mapping) Query {
+ return newQuery(cp, m)
+}
func (cp *connectionPool) Keyspace() string {
return cp.keyspace
View
@@ -26,10 +26,19 @@ type Mapping interface {
// Map converts a Go object compatible with this Mapping into a Row
Map(source interface{}) (*Row, error)
- // Ummap fills the passed Go object with data from the row, starting at the
- // offset column. It returns the count of consumed columns, or -1 if there
- // wasn't enough columns to fill the Go object
- Unmap(destination interface{}, offset int, row *Row) (int, error)
+ // Ummap fills the passed Go object with data from a row
+ Unmap(destination interface{}, provider RowProvider) error
+}
+
+var (
+ EndBeforeLimit = errors.New("No more results found before reaching the limit")
+ EndAtLimit = errors.New("No more results found but reached the limit")
+)
+
+type RowProvider interface {
+ Key() []byte
+ NextColumn() (*Column, error)
+ Rewind()
}
var (
@@ -211,15 +220,15 @@ func (m *sparseMapping) Map(source interface{}) (*Row, error) {
return row, nil
}
-func (m *sparseMapping) startUnmap(destination interface{}, row *Row) (*reflect.Value, *structInspection, error) {
+func (m *sparseMapping) startUnmap(destination interface{}, provider RowProvider) (*reflect.Value, *structInspection, error) {
v, si, err := validateAndInspectStruct(destination)
if err != nil {
return nil, nil, err
}
// unmarshal key field
if f, found := si.goFields[m.key]; found {
- err = f.unmarshalValue(row.Key, v)
+ err = f.unmarshalValue(provider.Key(), v)
if err != nil {
return nil, nil, err
}
@@ -245,74 +254,98 @@ func (m *sparseMapping) unmapComponents(v *reflect.Value, si *structInspection,
return nil
}
-func (m *sparseMapping) extractComponents(column *Column, v *reflect.Value, biasN int) ([][]byte, error) {
+func (m *sparseMapping) extractComponents(column *Column, v *reflect.Value, bias int) ([][]byte, error) {
var components [][]byte
if len(m.components) > 0 {
components = unpackComposite(column.Name)
} else {
components = [][]byte{column.Name}
}
- if len(components) != (len(m.components) + biasN) {
+ if len(components) != (len(m.components) + bias) {
return components, errors.New(fmt.Sprint("Returned number of components in composite column name does not match struct mapping in struct ", v.Type().Name()))
}
return components, nil
}
-func (m *sparseMapping) Unmap(destination interface{}, offset int, row *Row) (int, error) {
- readColumns := 0
+// TODO: speed this up
+func (m *sparseMapping) isNewComponents(prev, next [][]byte, bias int) bool {
+ if len(prev) != len(next) {
+ return true
+ }
+ for i := 0; i < len(prev)-bias; i++ {
+ p := prev[i]
+ n := next[i]
+ if len(p) != len(n) {
+ return true
+ }
+ for j := 0; j < len(p); j++ {
+ if p[j] != n[j] {
+ return true
+ }
+ }
+ }
+ return false
+}
- v, si, err := m.startUnmap(destination, row)
+func (m *sparseMapping) Unmap(destination interface{}, provider RowProvider) error {
+ v, si, err := m.startUnmap(destination, provider)
if err != nil {
- return readColumns, err
+ return err
}
compositeFieldsAreSet := false
+ var previousComponents [][]byte
+
+ for {
+ column, err := provider.NextColumn()
+ if err == Done {
+ return Done
+ } else if err == EndBeforeLimit {
+ if compositeFieldsAreSet {
+ break
+ } else {
+ return Done
+ }
+ } else if err == EndAtLimit {
+ return Done
+ } else if err != nil {
+ return err
+ }
- // FIXME: change this code to NOT expect a fixed number of columns and
- // instead adapt itself to the data by assuming the first column composite
- // to be uniform for all the struct values (except field name), then
- // request column by column with some kind of interface that does
- // buffering reads on demand from an underlying query
- min := len(si.goFields) - len(m.components) - 1
- if min > len(row.Columns) {
- return -1, nil
- }
-
- columns := row.Columns[offset : offset+min]
- for _, column := range columns {
- readColumns++
components, err := m.extractComponents(column, v, 1)
if err != nil {
- return readColumns, err
+ return err
}
-
- // FIXME: it is possible for a row to contain multiple composite
- // values instead of an uniform one, indicating that a new "object"
- // started. assume that is not the case for now!
- // iterate over composite components, just once, to set the composite
- // fields
if !compositeFieldsAreSet {
+ // first column
if err := m.unmapComponents(v, si, components); err != nil {
- return readColumns, err
+ return err
}
compositeFieldsAreSet = true
+ } else {
+ if m.isNewComponents(previousComponents, components, 1) {
+ provider.Rewind()
+ break
+ }
}
// lookup field by name
var name string
err = Unmarshal(components[len(components)-1], UTF8Type, &name)
if err != nil {
- return readColumns, errors.New(fmt.Sprint("Error unmarshaling composite field as UTF8Type for field name in struct ", v.Type().Name(), ", error: ", err))
+ return errors.New(fmt.Sprint("Error unmarshaling composite field as UTF8Type for field name in struct ", v.Type().Name(), ", error: ", err))
}
if f, found := si.cassandraFields[name]; found {
err := f.unmarshalValue(column.Value, v)
if err != nil {
- return readColumns, errors.New(fmt.Sprint("Error unmarshaling column: ", name, " value: ", err))
+ return errors.New(fmt.Sprint("Error unmarshaling column: ", name, " value: ", err))
}
}
+
+ previousComponents = components
}
- return readColumns, nil
+ return nil
}
func newCompactMapping(si *structInspection, cf string, keyField string, valueField string, componentFields ...string) Mapping {
@@ -348,29 +381,38 @@ func (m *compactMapping) Map(source interface{}) (*Row, error) {
return row, nil
}
-func (m *compactMapping) Unmap(destination interface{}, offset int, row *Row) (int, error) {
- v, si, err := m.startUnmap(destination, row)
+func (m *compactMapping) Unmap(destination interface{}, provider RowProvider) error {
+ v, si, err := m.startUnmap(destination, provider)
if err != nil {
- return 0, err
+ return err
}
- if len(row.Columns) <= 0 {
- return -1, nil
+
+ column, err := provider.NextColumn()
+ if err == Done {
+ return Done
+ } else if err == EndBeforeLimit {
+ return Done
+ } else if err == EndAtLimit {
+ return Done
+ } else if err != nil {
+ return err
}
- column := row.Columns[offset]
+
components, err := m.extractComponents(column, v, 0)
if err != nil {
- return 1, err
+ return err
}
if err := m.unmapComponents(v, si, components); err != nil {
- return 1, err
+ return err
}
if f, found := si.goFields[m.value]; found {
err := f.unmarshalValue(column.Value, v)
if err != nil {
- return 1, errors.New(fmt.Sprint("Error unmarshaling column for compact value: ", err))
+ return errors.New(fmt.Sprint("Error unmarshaling column for compact value: ", err))
}
} else {
- return 1, errors.New(fmt.Sprint("Mapping value field ", m.value, " not found in passed struct of type ", v.Type().Name()))
+ return errors.New(fmt.Sprint("Mapping value field ", m.value, " not found in passed struct of type ", v.Type().Name()))
}
- return 1, nil
+
+ return nil
}
View
@@ -8,6 +8,7 @@ import (
/*
todo:
deeper tests, over more methods, and over all internal types
+ compact mapping
*/
type everythingComp struct {
@@ -65,14 +66,42 @@ func (shell *structTestShell) checkMap(t *testing.T, m Mapping, expectedStruct i
}
}
+type testProvider struct {
+ row *Row
+ pos int
+ limit int
+}
+
+func (t *testProvider) Key() []byte {
+ return t.row.Key
+}
+
+func (t *testProvider) NextColumn() (*Column, error) {
+ if t.pos >= len(t.row.Columns) {
+ if t.pos >= t.limit {
+ return nil, EndAtLimit
+ } else {
+ return nil, EndBeforeLimit
+ }
+ }
+ c := t.row.Columns[t.pos]
+ t.pos++
+ return c, nil
+}
+
+func (t *testProvider) Rewind() {
+ t.pos--
+ if t.pos < 0 {
+ t.pos = 0
+ }
+}
+
func (shell *structTestShell) checkUnmap(t *testing.T, m Mapping) interface{} {
- n, err := m.Unmap(shell.resultStruct, 0, shell.expectedRow)
+ tp := &testProvider{shell.expectedRow, 0, 10000}
+ err := m.Unmap(shell.resultStruct, tp)
if err != nil {
t.Error("Error unmapping struct: ", err)
}
- if n != len(shell.expectedRow.Columns) {
- t.Error("Wrong number of columns consumed when unmapping struct")
- }
if !reflect.DeepEqual(shell.resultStruct, shell.expectedStruct) {
t.Error("Unmapped struct ", shell.name, " does not match expected instance ", shell.expectedStruct, " actual ", shell.resultStruct)
}
Oops, something went wrong.

0 comments on commit b23d895

Please sign in to comment.