Skip to content
This repository has been archived by the owner on Jun 12, 2020. It is now read-only.

Commit

Permalink
working Cursor Read/Write
Browse files Browse the repository at this point in the history
  • Loading branch information
carloscm committed Apr 15, 2012
1 parent a6601cc commit 8ea771c
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 144 deletions.
32 changes: 12 additions & 20 deletions README.md
Expand Up @@ -91,37 +91,29 @@ type Timeline struct {
} }


row, err = Map(&Timeline{"userid", 10000000000004, "Author Name", "Hey this thing rocks!"}) row, err = Map(&Timeline{"userid", 10000000000004, "Author Name", "Hey this thing rocks!"})
err = pool.Mutation().Insert("Timeline", row).Run()
```` ````


### Cursors ### Cursors


As a convenient wrapper over Map/Unmap and the Query/Mutation interfaces Gossie provides the Cursor interface. This wrapper implements a classic database cursor over rows and composite row slices. Example: As a convenient wrapper over Map/Unmap and the Query/Mutation interfaces Gossie provides the Cursor interface. This wrapper implements a classic database cursor over rows and composite row slices. Example:


```Go ```Go
// initialize a cursor
type Timeline struct { cursor := pool.Cursor()
UserID string `cf:"Timeline" key:"UserID" col:"TweetID,*name" val:"*value"`
TweetID int64
Author string
Body string
}
// initialize a cursor over a struct instance. we can reuse both the cursor and the struct for all operations
tweet := &Timeline{"userid", 10000000000004, "Author Name", "Hey this thing rocks!"}
cursor := pool.Cursor(tweet)
// write a single tweet // write a single tweet
cursor.Write() tweet := &Timeline{"userid", 10000000000004, "Author Name", "Hey this thing rocks!"}
err = cursor.Write(tweet)
// read a single tweet. this will implicitly use the key field for the row key, and will add a slice operation // read a single tweet. this will implicitly use the key field for the row key, and will add a slice
// if the struct has fixed composite columns // operation if the struct has fixed composite columns
tweet.Read(1) err = tweet.Read(tweet)
// change the row key and the fixed composite filed and read a different tweet, reusing both the struct and the cursor // init a new struct instance with just the key and composite fields to read a different tweet
tweet.UserID = "anotheruser" tweet2 := &Timeline{"userid", 10000000000004}
tweet.TweetID = 20000000000001 err = cursor.Read(tweet2)
cursor.Read(1) // tweet no
// tweet now contains the just read tweet
```` ````


Comming soon: range reads for composites with buffering and paging Comming soon: range reads for composites with buffering and paging
Expand Down
6 changes: 3 additions & 3 deletions src/gossie/connection.go
Expand Up @@ -60,7 +60,7 @@ type ConnectionPool interface {
Mutation() Mutation Mutation() Mutation


// Cursor returns a high level interface for read and write operations over structs // Cursor returns a high level interface for read and write operations over structs
Cursor(interface{}) Cursor Cursor() Cursor


// Close all the connections in the pool // Close all the connections in the pool
Close() Close()
Expand Down Expand Up @@ -311,8 +311,8 @@ func (cp *connectionPool) Mutation() Mutation {
return makeMutation(cp, cp.options.WriteConsistency) return makeMutation(cp, cp.options.WriteConsistency)
} }


func (cp *connectionPool) Cursor(source interface{}) Cursor { func (cp *connectionPool) Cursor() Cursor {
return makeCursor(cp, source) return makeCursor(cp)
} }


func (cp *connectionPool) Keyspace() string { func (cp *connectionPool) Keyspace() string {
Expand Down
123 changes: 61 additions & 62 deletions src/gossie/cursor.go
Expand Up @@ -22,51 +22,42 @@ todo:
*/ */


var (
ErrorNotFound = errors.New("Row (plus any composites) not found")
)

// Cursor is a simple cursor-based interface for reading and writing structs from a Cassandra column family. // Cursor is a simple cursor-based interface for reading and writing structs from a Cassandra column family.
type Cursor interface { type Cursor interface {


// Read rows (or slices of a row) to fill up the struct. //Search(...)
//
// For limit == 1 a single get operation will be issued. If a composite is present then this operation will
// issue a slice with an exact match for the composite value
//
// For limit > 1, and for structs with no composite, a range get operation will be issued, the result buffered
// internally in the Cursor, and the first returned row unmapped into the struct. Next() is then available for
// paging inside the returned results and will issue new range get operations when neede. Please note that row
// range get is unordered in most Cassandra configurations. TO DO
//
// If the struct has a composite column name then this operation will issue a single row get, but with a slice
// predicate that allows for iteration over the row with Next()/Prev() TO DO
Read(limit int) error

//Next() bool //Next() bool
//Prev() bool //Prev() bool

//First() //First()


// Read a single row (or slice of a row) to fill up the struct.
// A single get operation will be issued. If a composite is present then this operation will issue a slice with
// an exact match for the composite value with the field values present in the struct
Read(interface{}) error

// Write a single row (or slice of a row) with the data currently in the struct. // Write a single row (or slice of a row) with the data currently in the struct.
Write() error Write(interface{}) error


//Delete() //Delete()
} }


type cursor struct { type cursor struct {
source interface{} pool *connectionPool
pool *connectionPool
//position int
//buffer []interface{}
} }


func makeCursor(cp *connectionPool, source interface{}) (c *cursor) { func makeCursor(cp *connectionPool) (c *cursor) {
return &cursor{ return &cursor{
source: source, pool: cp,
pool: cp,
} }
} }


func (c *cursor) Write() error { func (c *cursor) Write(source interface{}) error {


row, ms, err := internalMap(c.source) row, ms, err := internalMap(source)
if err != nil { if err != nil {
return err return err
} }
Expand All @@ -78,26 +69,20 @@ func (c *cursor) Write() error {
return nil return nil
} }


func (c *cursor) Read(limit int) error { func (c *cursor) Read(source interface{}) error {
if limit < 0 {
return errors.New("Limit is less than 1, nothing to read")
}


// deconstruct the source struct into a reflect.Value and a (cached) struct mapping // deconstruct the source struct into a reflect.Value and a (cached) struct mapping
ms, err := newMappedStruct(c.source) ms, err := newMappedStruct(source)
if err != nil { if err != nil {
return err return err
} }


// sanity checks // sanity checks
if !ms.sm.isStarNameColumn && !ms.sm.isSliceColumn {
return errors.New(fmt.Sprint("Struct ", ms.v.Type().Name(), " has no *name nor slice field in its col tag, nothing to read"))
}
if ms.sm.isSliceColumn { if ms.sm.isSliceColumn {
return errors.New(fmt.Sprint("Slice field in col tag is unsuported in Cursor for now, check back soon!")) return errors.New(fmt.Sprint("Slice field in col tag is unsuported in Cursor for now, check back soon!"))
} }


// marshal the key field for the key to look up // marshal the key field
key, err := ms.marshalKey() key, err := ms.marshalKey()
if err != nil { if err != nil {
return err return err
Expand All @@ -106,41 +91,55 @@ func (c *cursor) Read(limit int) error {
// start building the query // start building the query
q := c.pool.Query().Cf(ms.sm.cf) q := c.pool.Query().Cf(ms.sm.cf)


// build a slice composite comparator if needed
if ms.sm.isCompositeColumn {
// iterate over the components and set an equality comparison for every simple field
start := make([]byte, 0)
end := make([]byte, 0)
var component int
for component = 0; component < len(ms.sm.columns); component++ {
fm := ms.sm.columns[component]
if fm.fieldKind != baseTypeField {
break
}
b, err := ms.mapColumn(baseTypeField, fm, 0)
if err != nil {
return err
}
start = packComposite(start, b, eocEquals)
end = packComposite(end, b, eocGreater)
}

/*if component < len(ms.sm.columns) {
// we still got one to go, this means the last one was an iterable non-fixed type (*name or go slice)
//fm := ms.sm.columns[component]
// TODO: this will only work for *name
b := make([]byte, 0)
start = packComposite(start, b, o[6], o[7], o[8])
end = packComposite(end, b, o[9], o[10], o[11])
}*/

// TODO: fix hardcoded number of columns
q.Slice(&Slice{Start: start, End: end, Count: 100})
}

//isCompositeColumn bool //isCompositeColumn bool
//isSliceColumn bool //isSliceColumn bool
//isStarNameColumn bool //isStarNameColumn bool


if limit == 1 { row, err := q.Get(key)


if ms.sm.isCompositeColumn { if err != nil {
return errors.New(fmt.Sprint("Cursor composite support will be implemented soon!")) return err
/* }
// we only want a single result so issue an exact match composite comparator slice
start := make([]byte, 0)
start = packComposite(start, component []byte, true, true, true)
packComposite(start, component []byte, true, sliceStart, inclusive bool) []byte {


s := &Slice{ if row == nil {
Start: return ErrorNotFound
End: }
Count: 1
}
q.Slice(s)
*/


} err = Unmap(row, source)
row, err := q.Get(key) if err != nil {
if err != nil { return err
return err
}
err = Unmap(row, c.source)
if err != nil {
return err
}
} else {
return errors.New(fmt.Sprint("Limit > 1 will be implemented soon!"))
} }


return nil return nil
Expand Down
44 changes: 36 additions & 8 deletions src/gossie/cursor_test.go
Expand Up @@ -5,6 +5,14 @@ import (
"testing" "testing"
) )


/*
todo:
since most of the Cursor interface is still in flux the current tests are minimal
*/

type ReasonableOne struct { type ReasonableOne struct {
Username string `cf:"Reasonable" key:"Username" col:"TweetID,*name" val:"*value"` Username string `cf:"Reasonable" key:"Username" col:"TweetID,*name" val:"*value"`
TweetID int64 TweetID int64
Expand All @@ -28,25 +36,45 @@ func TestRead(t *testing.T) {
Body: "hey this thing appears to work, nice!", Body: "hey this thing appears to work, nice!",
} }


cursor := cp.Cursor(ro) cursor := cp.Cursor()
err = cursor.Write() err = cursor.Write(ro)
if err != nil { if err != nil {
t.Error("Writing struct:", err) t.Error("Writing struct:", err)
} }


ro2 := &ReasonableOne{ ro2 := &ReasonableOne{
Username: "testuser", Username: "testuser",
TweetID: 100000000000002, TweetID: 100000000000003,
Lat: 2.00002,
Lon: 1.11,
Body: "more words",
} }

err = cursor.Write(ro2)
cursor2 := cp.Cursor(ro2)
err = cursor2.Read(1)
if err != nil { if err != nil {
t.Error("Reading struct:", err) t.Error("Writing struct:", err)
} }


if !reflect.DeepEqual(ro, ro2) { ro3 := &ReasonableOne{
Username: "testuser",
TweetID: 100000000000002,
}
err = cursor.Read(ro3)
if err != nil {
t.Fatal("Reading struct:", err)
}
if !reflect.DeepEqual(ro, ro3) {
t.Error("Read does not match Write") t.Error("Read does not match Write")
} }


ro3 = &ReasonableOne{
Username: "testuser",
TweetID: 100000000000003,
}
err = cursor.Read(ro3)
if err != nil {
t.Fatal("Reading struct:", err)
}
if !reflect.DeepEqual(ro2, ro3) {
t.Error("Read does not match Write")
}
} }

0 comments on commit 8ea771c

Please sign in to comment.