From 8e90a2fc2d78d79e4f7c78bcca287f8bcd96a6cd Mon Sep 17 00:00:00 2001 From: Carlos Carrasco Date: Sun, 13 May 2012 21:31:46 +0200 Subject: [PATCH] refactor Query interface, add MultiGet --- src/gossie/mapping.go | 8 +- src/gossie/mapping_test.go | 4 +- src/gossie/query.go | 200 +++++++++++++++++++++++-------------- src/gossie/query_test.go | 127 +++++++++++++++++------ src/gossie/uuid.go | 6 +- 5 files changed, 232 insertions(+), 113 deletions(-) diff --git a/src/gossie/mapping.go b/src/gossie/mapping.go index 68ced68..881c8ec 100644 --- a/src/gossie/mapping.go +++ b/src/gossie/mapping.go @@ -41,7 +41,7 @@ var ( type RowProvider interface { // Key returns the row key - Key() []byte + Key() ([]byte, error) // NextColumn returns the next column in the row, and advances the column pointer NextColumn() (*Column, error) @@ -237,7 +237,11 @@ func (m *sparseMapping) startUnmap(destination interface{}, provider RowProvider // unmarshal key field if f, found := si.goFields[m.key]; found { - err = f.unmarshalValue(provider.Key(), v) + key, err := provider.Key() + if err != nil { + return nil, nil, err + } + err = f.unmarshalValue(key, v) if err != nil { return nil, nil, err } diff --git a/src/gossie/mapping_test.go b/src/gossie/mapping_test.go index 8231d88..3da8d29 100644 --- a/src/gossie/mapping_test.go +++ b/src/gossie/mapping_test.go @@ -72,8 +72,8 @@ type testProvider struct { limit int } -func (t *testProvider) Key() []byte { - return t.row.Key +func (t *testProvider) Key() ([]byte, error) { + return t.row.Key, nil } func (t *testProvider) NextColumn() (*Column, error) { diff --git a/src/gossie/query.go b/src/gossie/query.go index 91fc7a8..d5d50b5 100644 --- a/src/gossie/query.go +++ b/src/gossie/query.go @@ -5,17 +5,9 @@ import ( ) /* - todo: - autopaging? - Search() and interface(s) for indexed get - - multiget - still need to think abstraction - or just with a direct call that accepts interface{} and assumes Go slices? what about composites? - */ var ( @@ -23,7 +15,8 @@ var ( ) const ( - DEFAULT_LIMIT = 10000 + DEFAULT_COLUMN_LIMIT = 10000 + DEFAULT_ROW_LIMIT = 500 ) // Query is a high level interface for Cassandra queries @@ -34,21 +27,29 @@ type Query interface { // pool options value. ConsistencyLevel(int) Query - // Limit sets the column limit to buffer at once. - Limit(int) Query + // Limit sets the column and rows to buffer at once. + Limit(columns, rows int) Query // Reverse set to true will reverse the order of the columns in the result. Reversed(bool) Query - // Get looks up a row with the given key (and optionally components for a - // composite) and returns a Result to it. If the row uses a composite - // comparator and you only specify the key and zero or more comparator - // components the Result will allow you to iterate over the entire row. - Get(key interface{}, components ...interface{}) (Result, error) + // Components buils a column slice for Get operations with fixed values + // for the passed components. It fills those components in the same order + // as in the method arguments. + Components(components ...interface{}) Query + + // Between allows to pass different values for the last components of the + // Start and End columns in the column slice for Get operations. Do not + // pass the last component to Component() when usign Between(). start is + // inclusive, end is exclusive. + Between(start, end interface{}) Query - // GetBetween is like Get, but the last two passed components values are - // used as the last values for the slice Start and End composite values. - GetBetween(key interface{}, components ...interface{}) (Result, error) + // Get looks up a row with the given key. If the row uses a composite + // column names the Result will allow you to iterate over the entire row. + Get(key interface{}) (Result, error) + + // MultiGet looks up multiple rows given the keys. + MultiGet(keys []interface{}) (Result, error) } // Result reads Query results into Go objects, internally buffering them. @@ -64,15 +65,21 @@ type query struct { pool *connectionPool mapping Mapping consistencyLevel int - limit int + columnLimit int + rowLimit int reversed bool + components []interface{} + betweenStart interface{} + betweenEnd interface{} } func newQuery(cp *connectionPool, m Mapping) *query { return &query{ - pool: cp, - mapping: m, - limit: DEFAULT_LIMIT, + pool: cp, + mapping: m, + columnLimit: DEFAULT_COLUMN_LIMIT, + rowLimit: DEFAULT_ROW_LIMIT, + components: make([]interface{}, 0), } } @@ -81,8 +88,9 @@ func (q *query) ConsistencyLevel(c int) Query { return q } -func (q *query) Limit(l int) Query { - q.limit = l +func (q *query) Limit(columns, rows int) Query { + q.columnLimit = columns + q.rowLimit = rows return q } @@ -91,10 +99,32 @@ func (q *query) Reversed(r bool) Query { return q } -func (q *query) Get(key interface{}, components ...interface{}) (Result, error) { - keyB, err := q.mapping.MarshalKey(key) - if err != nil { - return nil, err +func (q *query) Components(components ...interface{}) Query { + q.components = components + return q +} + +func (q *query) Between(start, end interface{}) Query { + q.betweenStart = start + q.betweenEnd = end + return q +} + +func (q *query) Get(key interface{}) (Result, error) { + return q.MultiGet([]interface{}{key}) +} + +func (q *query) MultiGet(keys []interface{}) (Result, error) { + var err error + + keysB := make([][]byte, 0) + + for _, key := range keys { + keyB, err := q.mapping.MarshalKey(key) + if err != nil { + return nil, err + } + keysB = append(keysB, keyB) } reader := q.pool.Reader().Cf(q.mapping.Cf()) @@ -103,49 +133,48 @@ func (q *query) Get(key interface{}, components ...interface{}) (Result, error) reader.ConsistencyLevel(q.consistencyLevel) } - return &result{*q, keyB, reader, components, nil, 0, nil}, nil -} + q.buildSlice(reader) -func (q *query) GetBetween(key interface{}, components ...interface{}) (Result, error) { - if len(components) < 2 { - return nil, errors.New("GetBetween requires at least 2 component values") - } - r, err := q.Get(key, components[:len(components)-1]...) - if err != nil { - return nil, err - } - r.(*result).between = components[len(components)-1] - return r, nil -} + rows := make([]*Row, 0) -type result struct { - query - key []byte - reader Reader - components []interface{} - row *Row - position int - between interface{} -} + if len(keysB) == 1 { + row, err := reader.Get(keysB[0]) + if err != nil { + return nil, err + } + if row != nil { + rows = []*Row{row} + } + } else { + rows, err = reader.MultiGet(keysB) + if err != nil { + return nil, err + } + } -func (r *result) Key() []byte { - return r.key + return &result{query: *q, buffer: rows}, nil } -func (r *result) buildFixedSlice() error { +func (q *query) buildSlice(reader Reader) error { start := make([]byte, 0) end := make([]byte, 0) - if len(r.components) > 0 { - last := len(r.components) - 1 - for i, c := range r.components { - b, err := r.mapping.MarshalComponent(c, i) + + components := q.components + if q.betweenStart != nil { + components = append(components, q.betweenStart) + } + + if len(components) > 0 { + last := len(components) - 1 + for i, c := range components { + b, err := q.mapping.MarshalComponent(c, i) if err != nil { return err } start = append(start, packComposite(b, eocEquals)...) if i == last { - if r.between != nil { - b, err := r.mapping.MarshalComponent(r.between, i) + if q.betweenEnd != nil { + b, err := q.mapping.MarshalComponent(q.betweenEnd, i) if err != nil { return err } @@ -158,28 +187,43 @@ func (r *result) buildFixedSlice() error { } } } - r.reader.Slice(&Slice{Start: start, End: end, Count: r.limit, Reversed: r.reversed}) + + reader.Slice(&Slice{Start: start, End: end, Count: q.columnLimit, Reversed: q.reversed}) return nil } -func (r *result) NextColumn() (*Column, error) { +type result struct { + query + buffer []*Row + row *Row + position int +} + +func (r *result) feedRow() error { if r.row == nil { - err := r.buildFixedSlice() - if err != nil { - return nil, err + if len(r.buffer) <= 0 { + return Done } - row, err := r.reader.Get(r.key) - if err != nil { - return nil, err - } - if row == nil { - return nil, Done - } - r.row = row + r.row = r.buffer[0] r.position = 0 + r.buffer = r.buffer[1:len(r.buffer)] + } + return nil +} + +func (r *result) Key() ([]byte, error) { + if err := r.feedRow(); err != nil { + return nil, err + } + return r.row.Key, nil +} + +func (r *result) NextColumn() (*Column, error) { + if err := r.feedRow(); err != nil { + return nil, err } if r.position >= len(r.row.Columns) { - if r.position >= r.limit { + if r.position >= r.columnLimit { return nil, EndAtLimit } else { return nil, EndBeforeLimit @@ -198,5 +242,11 @@ func (r *result) Rewind() { } func (r *result) Next(destination interface{}) error { - return r.mapping.Unmap(destination, r) + err := r.mapping.Unmap(destination, r) + if err == Done { + // force new row feed and try again, just once + r.row = nil + err = r.mapping.Unmap(destination, r) + } + return err } diff --git a/src/gossie/query_test.go b/src/gossie/query_test.go index bd455c8..660f28b 100644 --- a/src/gossie/query_test.go +++ b/src/gossie/query_test.go @@ -8,6 +8,7 @@ import ( /* todo: + more MultiGet tests refactor tests into something resembling maintenable code */ @@ -59,7 +60,11 @@ type CompositeFull struct { } func createTimeseries(t *testing.T, cp ConnectionPool) int { - cp.Writer().Delete("Timeseries", []byte("testuser")).Run() + cp.Writer(). + Delete("Timeseries", []byte("testuser")). + Delete("Timeseries", []byte("testuser2")). + Delete("Timeseries", []byte("testuser3")). + Run() mT, err := NewMapping(&Timeseries{}) @@ -67,13 +72,20 @@ func createTimeseries(t *testing.T, cp ConnectionPool) int { seqBase := int(time.Now().UnixNano() % 1e6) w := cp.Writer() - for i := 0; i < 100; i++ { + for i := 0; i < 300; i++ { u, err := NewTimeUUID() if err != nil { t.Fatal("Error generating TimeUUID:", err) } + key := "testuser" + if i >= 100 { + key = "testuser2" + } + if i >= 200 { + key = "testuser3" + } r := &Timeseries{ - Username: "testuser", + Username: key, TimeUUID: u, Seq: seqBase + i, } @@ -226,10 +238,9 @@ func TestQueryGet(t *testing.T) { ///// - q0 := cp.Query(m0) r0 := &ReasonableZero{} - res, err := q0.Get("nope") + res, err := cp.Query(m0).Get("nope") if err != nil { t.Fatal("Query get error:", err) } @@ -237,8 +248,12 @@ func TestQueryGet(t *testing.T) { if err != Done { t.Fatal("Result Next is not Done:", err) } + err = res.Next(r0) + if err != Done { + t.Fatal("Result Next is not Done:", err) + } - res, err = q0.Get("testuser") + res, err = cp.Query(m0).Get("testuser") if err != nil { t.Fatal("Query get error:", err) } @@ -253,13 +268,16 @@ func TestQueryGet(t *testing.T) { if err != Done { t.Fatal("Result Next is not Done:", err) } + err = res.Next(r0) + if err != Done { + t.Fatal("Result Next is not Done:", err) + } ///// - q1 := cp.Query(m1) r1 := &ReasonableOne{} - res, err = q1.Get("nope") + res, err = cp.Query(m1).Get("nope") if err != nil { t.Fatal("Query get error:", err) } @@ -267,8 +285,12 @@ func TestQueryGet(t *testing.T) { if err != Done { t.Fatal("Result Next is not Done:", err) } + err = res.Next(r1) + if err != Done { + t.Fatal("Result Next is not Done:", err) + } - res, err = q1.Get("testuser") + res, err = cp.Query(m1).Get("testuser") if err != nil { t.Fatal("Query get error:", err) } @@ -292,8 +314,12 @@ func TestQueryGet(t *testing.T) { if err != Done { t.Fatal("Result Next is not Done:", err) } + err = res.Next(r1) + if err != Done { + t.Fatal("Result Next is not Done:", err) + } - res, err = q1.GetBetween("testuser", int64(100000000000050), int64(100000000000070)) + res, err = cp.Query(m1).Between(int64(100000000000050), int64(100000000000070)).Get("testuser") if err != nil { t.Fatal("Query get error:", err) } @@ -318,8 +344,12 @@ func TestQueryGet(t *testing.T) { if err != Done { t.Fatal("Result Next is not Done:", err) } + err = res.Next(r1) + if err != Done { + t.Fatal("Result Next is not Done:", err) + } - res, err = q1.Get("testuser", int64(100000000000010)) + res, err = cp.Query(m1).Components(int64(100000000000010)).Get("testuser") if err != nil { t.Fatal("Query get error:", err) } @@ -334,14 +364,17 @@ func TestQueryGet(t *testing.T) { if err != Done { t.Fatal("Result Next is not Done:", err) } + err = res.Next(r1) + if err != Done { + t.Fatal("Result Next is not Done:", err) + } ///// seqBase := createTimeseries(t, cp) - qT := cp.Query(mT) rT := &Timeseries{} - res, err = qT.Get("testuser") + res, err = cp.Query(mT).Get("testuser") if err != nil { t.Fatal("Query get error:", err) } @@ -351,16 +384,12 @@ func TestQueryGet(t *testing.T) { t.Fatal("Result next error:", err) } if rT.Seq != seqBase+i { - t.Log(rT) t.Error("Read does not match Write") } } seqBase = createTimeseries(t, cp) - - qT.Reversed(true) - - res, err = qT.Get("testuser") + res, err = cp.Query(mT).Reversed(true).Get("testuser") if err != nil { t.Fatal("Query get error:", err) } @@ -370,18 +399,35 @@ func TestQueryGet(t *testing.T) { t.Fatal("Result next error:", err) } if rT.Seq != seqBase+i { - t.Log(seqBase + i) - t.Log(rT) t.Error("Read does not match Write") } } + seqBase = createTimeseries(t, cp) + res, err = cp.Query(mT).Reversed(true).MultiGet([]interface{}{"testuser", "testuser2", "testuser3"}) + if err != nil { + t.Fatal("Query get error:", err) + } + for i := 0; i < 300; i++ { + err = res.Next(rT) + if err != nil { + t.Fatal("Result next error:", err) + } + } + err = res.Next(rT) + if err != Done { + t.Fatal("Result Next is not Done:", err) + } + err = res.Next(rT) + if err != Done { + t.Fatal("Result Next is not Done:", err) + } + ///// - q2 := cp.Query(m2) r2 := &ReasonableTwo{} - res, err = q2.Get("nope") + res, err = cp.Query(m2).Get("nope") if err != nil { t.Fatal("Query get error:", err) } @@ -389,8 +435,12 @@ func TestQueryGet(t *testing.T) { if err != Done { t.Fatal("Result Next is not Done:", err) } + err = res.Next(r2) + if err != Done { + t.Fatal("Result Next is not Done:", err) + } - res, err = q2.Get("testuser") + res, err = cp.Query(m2).Get("testuser") if err != nil { t.Fatal("Query get error:", err) } @@ -417,8 +467,12 @@ func TestQueryGet(t *testing.T) { if err != Done { t.Fatal("Result Next is not Done:", err) } + err = res.Next(r2) + if err != Done { + t.Fatal("Result Next is not Done:", err) + } - res, err = q2.GetBetween("testuser", int64(100000000000002), int64(100000000000007)) + res, err = cp.Query(m2).Between(int64(100000000000002), int64(100000000000007)).Get("testuser") if err != nil { t.Fatal("Query get error:", err) } @@ -445,8 +499,12 @@ func TestQueryGet(t *testing.T) { if err != Done { t.Fatal("Result Next is not Done:", err) } + err = res.Next(r2) + if err != Done { + t.Fatal("Result Next is not Done:", err) + } - res, err = q2.Get("testuser", int64(100000000000003)) + res, err = cp.Query(m2).Components(int64(100000000000003)).Get("testuser") if err != nil { t.Fatal("Query get error:", err) } @@ -471,8 +529,12 @@ func TestQueryGet(t *testing.T) { if err != Done { t.Fatal("Result Next is not Done:", err) } + err = res.Next(r2) + if err != Done { + t.Fatal("Result Next is not Done:", err) + } - res, err = q2.GetBetween("testuser", int64(100000000000003), int64(30006), int64(30009)) + res, err = cp.Query(m2).Components(int64(100000000000003)).Between(int64(30006), int64(30009)).Get("testuser") if err != nil { t.Fatal("Query get error:", err) } @@ -497,8 +559,12 @@ func TestQueryGet(t *testing.T) { if err != Done { t.Fatal("Result Next is not Done:", err) } + err = res.Next(r2) + if err != Done { + t.Fatal("Result Next is not Done:", err) + } - res, err = q2.Get("testuser", int64(100000000000005), int64(50003)) + res, err = cp.Query(m2).Components(int64(100000000000005), int64(50003)).Get("testuser") if err != nil { t.Fatal("Query get error:", err) } @@ -513,15 +579,18 @@ func TestQueryGet(t *testing.T) { if err != Done { t.Fatal("Result Next is not Done:", err) } + err = res.Next(r2) + if err != Done { + t.Fatal("Result Next is not Done:", err) + } ///// seqBase = createCompositeFull(t, cp) - qC := cp.Query(mC) rC := &CompositeFull{} - res, err = qC.Get("testuser") + res, err = cp.Query(mC).Get("testuser") if err != nil { t.Fatal("Query get error:", err) } diff --git a/src/gossie/uuid.go b/src/gossie/uuid.go index 2e74625..7d5e130 100644 --- a/src/gossie/uuid.go +++ b/src/gossie/uuid.go @@ -12,11 +12,7 @@ import ( /* to do: - implement concurrent get with atomic increment of prev timestamp, like in Java - check correctness of NewTimeUUID - comparison - comp func - NewTimeUUIDMax and NewTimeUUIDMin for building slice comps + comp func */ type UUID [16]byte