Skip to content
This repository has been archived by the owner on Jun 12, 2020. It is now read-only.

Commit

Permalink
refactor Query interface, add MultiGet
Browse files Browse the repository at this point in the history
  • Loading branch information
carloscm committed May 13, 2012
1 parent 0bb3565 commit 8e90a2f
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 113 deletions.
8 changes: 6 additions & 2 deletions src/gossie/mapping.go
Expand Up @@ -41,7 +41,7 @@ var (
type RowProvider interface {

// Key returns the row key
Key() []byte
Key() ([]byte, error)

// NextColumn returns the next column in the row, and advances the column pointer
NextColumn() (*Column, error)
Expand Down Expand Up @@ -237,7 +237,11 @@ func (m *sparseMapping) startUnmap(destination interface{}, provider RowProvider

// unmarshal key field
if f, found := si.goFields[m.key]; found {
err = f.unmarshalValue(provider.Key(), v)
key, err := provider.Key()
if err != nil {
return nil, nil, err
}
err = f.unmarshalValue(key, v)
if err != nil {
return nil, nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions src/gossie/mapping_test.go
Expand Up @@ -72,8 +72,8 @@ type testProvider struct {
limit int
}

func (t *testProvider) Key() []byte {
return t.row.Key
func (t *testProvider) Key() ([]byte, error) {
return t.row.Key, nil
}

func (t *testProvider) NextColumn() (*Column, error) {
Expand Down
200 changes: 125 additions & 75 deletions src/gossie/query.go
Expand Up @@ -5,25 +5,18 @@ import (
)

/*
todo:
autopaging?
Search() and interface(s) for indexed get
multiget
still need to think abstraction
or just with a direct call that accepts interface{} and assumes Go slices? what about composites?
*/

var (
Done = errors.New("No more results found")
)

const (
DEFAULT_LIMIT = 10000
DEFAULT_COLUMN_LIMIT = 10000
DEFAULT_ROW_LIMIT = 500
)

// Query is a high level interface for Cassandra queries
Expand All @@ -34,21 +27,29 @@ type Query interface {
// pool options value.
ConsistencyLevel(int) Query

// Limit sets the column limit to buffer at once.
Limit(int) Query
// Limit sets the column and rows to buffer at once.
Limit(columns, rows int) Query

// Reverse set to true will reverse the order of the columns in the result.
Reversed(bool) Query

// Get looks up a row with the given key (and optionally components for a
// composite) and returns a Result to it. If the row uses a composite
// comparator and you only specify the key and zero or more comparator
// components the Result will allow you to iterate over the entire row.
Get(key interface{}, components ...interface{}) (Result, error)
// Components buils a column slice for Get operations with fixed values
// for the passed components. It fills those components in the same order
// as in the method arguments.
Components(components ...interface{}) Query

// Between allows to pass different values for the last components of the
// Start and End columns in the column slice for Get operations. Do not
// pass the last component to Component() when usign Between(). start is
// inclusive, end is exclusive.
Between(start, end interface{}) Query

// GetBetween is like Get, but the last two passed components values are
// used as the last values for the slice Start and End composite values.
GetBetween(key interface{}, components ...interface{}) (Result, error)
// Get looks up a row with the given key. If the row uses a composite
// column names the Result will allow you to iterate over the entire row.
Get(key interface{}) (Result, error)

// MultiGet looks up multiple rows given the keys.
MultiGet(keys []interface{}) (Result, error)
}

// Result reads Query results into Go objects, internally buffering them.
Expand All @@ -64,15 +65,21 @@ type query struct {
pool *connectionPool
mapping Mapping
consistencyLevel int
limit int
columnLimit int
rowLimit int
reversed bool
components []interface{}
betweenStart interface{}
betweenEnd interface{}
}

func newQuery(cp *connectionPool, m Mapping) *query {
return &query{
pool: cp,
mapping: m,
limit: DEFAULT_LIMIT,
pool: cp,
mapping: m,
columnLimit: DEFAULT_COLUMN_LIMIT,
rowLimit: DEFAULT_ROW_LIMIT,
components: make([]interface{}, 0),
}
}

Expand All @@ -81,8 +88,9 @@ func (q *query) ConsistencyLevel(c int) Query {
return q
}

func (q *query) Limit(l int) Query {
q.limit = l
func (q *query) Limit(columns, rows int) Query {
q.columnLimit = columns
q.rowLimit = rows
return q
}

Expand All @@ -91,10 +99,32 @@ func (q *query) Reversed(r bool) Query {
return q
}

func (q *query) Get(key interface{}, components ...interface{}) (Result, error) {
keyB, err := q.mapping.MarshalKey(key)
if err != nil {
return nil, err
func (q *query) Components(components ...interface{}) Query {
q.components = components
return q
}

func (q *query) Between(start, end interface{}) Query {
q.betweenStart = start
q.betweenEnd = end
return q
}

func (q *query) Get(key interface{}) (Result, error) {
return q.MultiGet([]interface{}{key})
}

func (q *query) MultiGet(keys []interface{}) (Result, error) {
var err error

keysB := make([][]byte, 0)

for _, key := range keys {
keyB, err := q.mapping.MarshalKey(key)
if err != nil {
return nil, err
}
keysB = append(keysB, keyB)
}

reader := q.pool.Reader().Cf(q.mapping.Cf())
Expand All @@ -103,49 +133,48 @@ func (q *query) Get(key interface{}, components ...interface{}) (Result, error)
reader.ConsistencyLevel(q.consistencyLevel)
}

return &result{*q, keyB, reader, components, nil, 0, nil}, nil
}
q.buildSlice(reader)

func (q *query) GetBetween(key interface{}, components ...interface{}) (Result, error) {
if len(components) < 2 {
return nil, errors.New("GetBetween requires at least 2 component values")
}
r, err := q.Get(key, components[:len(components)-1]...)
if err != nil {
return nil, err
}
r.(*result).between = components[len(components)-1]
return r, nil
}
rows := make([]*Row, 0)

type result struct {
query
key []byte
reader Reader
components []interface{}
row *Row
position int
between interface{}
}
if len(keysB) == 1 {
row, err := reader.Get(keysB[0])
if err != nil {
return nil, err
}
if row != nil {
rows = []*Row{row}
}
} else {
rows, err = reader.MultiGet(keysB)
if err != nil {
return nil, err
}
}

func (r *result) Key() []byte {
return r.key
return &result{query: *q, buffer: rows}, nil
}

func (r *result) buildFixedSlice() error {
func (q *query) buildSlice(reader Reader) error {
start := make([]byte, 0)
end := make([]byte, 0)
if len(r.components) > 0 {
last := len(r.components) - 1
for i, c := range r.components {
b, err := r.mapping.MarshalComponent(c, i)

components := q.components
if q.betweenStart != nil {
components = append(components, q.betweenStart)
}

if len(components) > 0 {
last := len(components) - 1
for i, c := range components {
b, err := q.mapping.MarshalComponent(c, i)
if err != nil {
return err
}
start = append(start, packComposite(b, eocEquals)...)
if i == last {
if r.between != nil {
b, err := r.mapping.MarshalComponent(r.between, i)
if q.betweenEnd != nil {
b, err := q.mapping.MarshalComponent(q.betweenEnd, i)
if err != nil {
return err
}
Expand All @@ -158,28 +187,43 @@ func (r *result) buildFixedSlice() error {
}
}
}
r.reader.Slice(&Slice{Start: start, End: end, Count: r.limit, Reversed: r.reversed})

reader.Slice(&Slice{Start: start, End: end, Count: q.columnLimit, Reversed: q.reversed})
return nil
}

func (r *result) NextColumn() (*Column, error) {
type result struct {
query
buffer []*Row
row *Row
position int
}

func (r *result) feedRow() error {
if r.row == nil {
err := r.buildFixedSlice()
if err != nil {
return nil, err
if len(r.buffer) <= 0 {
return Done
}
row, err := r.reader.Get(r.key)
if err != nil {
return nil, err
}
if row == nil {
return nil, Done
}
r.row = row
r.row = r.buffer[0]
r.position = 0
r.buffer = r.buffer[1:len(r.buffer)]
}
return nil
}

func (r *result) Key() ([]byte, error) {
if err := r.feedRow(); err != nil {
return nil, err
}
return r.row.Key, nil
}

func (r *result) NextColumn() (*Column, error) {
if err := r.feedRow(); err != nil {
return nil, err
}
if r.position >= len(r.row.Columns) {
if r.position >= r.limit {
if r.position >= r.columnLimit {
return nil, EndAtLimit
} else {
return nil, EndBeforeLimit
Expand All @@ -198,5 +242,11 @@ func (r *result) Rewind() {
}

func (r *result) Next(destination interface{}) error {
return r.mapping.Unmap(destination, r)
err := r.mapping.Unmap(destination, r)
if err == Done {
// force new row feed and try again, just once
r.row = nil
err = r.mapping.Unmap(destination, r)
}
return err
}

0 comments on commit 8e90a2f

Please sign in to comment.