Skip to content
This repository has been archived by the owner on Oct 18, 2021. It is now read-only.

Commit

Permalink
initial port to gouchstore
Browse files Browse the repository at this point in the history
  • Loading branch information
mschoch committed Mar 3, 2014
1 parent 27c4ad6 commit d4ff584
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 68 deletions.
5 changes: 2 additions & 3 deletions cache_test.go
Expand Up @@ -4,7 +4,7 @@ import (
"testing"
"time"

"github.com/dustin/go-couchstore"
"github.com/mschoch/gouchstore"
)

func benchCacheSize(b *testing.B, num int) {
Expand All @@ -17,8 +17,7 @@ func benchCacheSize(b *testing.B, num int) {

startTime := time.Now()
for i := 0; i < num; i++ {
di := couchstore.NewDocInfo(startTime.Format(time.RFC3339Nano),
0)
di := gouchstore.NewDocumentInfo(startTime.Format(time.RFC3339Nano))
p.infos = append(p.infos, di)
}

Expand Down
48 changes: 19 additions & 29 deletions database.go
Expand Up @@ -9,7 +9,7 @@ import (
"sync"
"time"

"github.com/dustin/go-couchstore"
"github.com/mschoch/gouchstore"
)

type dbOperation uint8
Expand All @@ -34,7 +34,7 @@ type dbWriter struct {
dbname string
ch chan dbqitem
quit chan bool
db *couchstore.Couchstore
db *gouchstore.Gouchstore
}

var errClosed = errors.New("closed")
Expand Down Expand Up @@ -71,17 +71,17 @@ func dbBase(n string) string {
return n[left:right]
}

func dbopen(name string) (*couchstore.Couchstore, error) {
func dbopen(name string) (*gouchstore.Gouchstore, error) {
path := dbPath(name)
db, err := couchstore.Open(dbPath(name), false)
db, err := gouchstore.Open(dbPath(name), 0)
if err == nil {
recordDBConn(path, db)
}
return db, err
}

func dbcreate(path string) error {
db, err := couchstore.Open(path, true)
db, err := gouchstore.Open(path, gouchstore.OPEN_CREATE)
if err != nil {
return err
}
Expand Down Expand Up @@ -120,8 +120,8 @@ func dblist(root string) []string {
return rv
}

func dbCompact(dq *dbWriter, bulk couchstore.BulkWriter, queued int,
qi dbqitem) (couchstore.BulkWriter, error) {
func dbCompact(dq *dbWriter, bulk gouchstore.BulkWriter, queued int,
qi dbqitem) (gouchstore.BulkWriter, error) {
start := time.Now()
if queued > 0 {
bulk.Commit()
Expand All @@ -134,7 +134,7 @@ func dbCompact(dq *dbWriter, bulk couchstore.BulkWriter, queued int,
dbn := dbPath(dq.dbname)
queued = 0
start = time.Now()
err := dq.db.CompactTo(dbn + ".compact")
err := dq.db.Compact(dbn + ".compact")
if err != nil {
log.Printf("Error compacting: %v", err)
return dq.db.Bulk(), err
Expand Down Expand Up @@ -186,13 +186,12 @@ func dbWriteLoop(dq *dbWriter) {
liveOps++
switch qi.op {
case opStoreItem:
bulk.Set(couchstore.NewDocInfo(qi.k,
couchstore.DocIsCompressed),
couchstore.NewDocument(qi.k, qi.data))
bulk.Set(gouchstore.NewDocumentInfo(qi.k),
gouchstore.NewDocument(qi.k, qi.data))
queued++
case opDeleteItem:
queued++
bulk.Delete(couchstore.NewDocInfo(qi.k, 0))
bulk.Delete(gouchstore.NewDocumentInfo(qi.k))
case opCompact:
var err error
bulk, err = dbCompact(dq, bulk, queued, qi)
Expand Down Expand Up @@ -301,11 +300,11 @@ func dbGetDoc(dbname, id string) ([]byte, error) {
}
defer closeDBConn(db)

doc, _, err := db.Get(id)
doc, err := db.DocumentById(id)
if err != nil {
return nil, err
}
return doc.Value(), err
return doc.Body, err
}

func dbwalk(dbname, from, to string, f func(k string, v []byte) error) error {
Expand All @@ -316,13 +315,9 @@ func dbwalk(dbname, from, to string, f func(k string, v []byte) error) error {
}
defer closeDBConn(db)

return db.WalkDocs(from, func(d *couchstore.Couchstore,
di *couchstore.DocInfo, doc *couchstore.Document) error {
if to != "" && di.ID() >= to {
return couchstore.StopIteration
}

return f(di.ID(), doc.Value())
return db.WalkDocs(from, to, func(d *gouchstore.Gouchstore,
di *gouchstore.DocumentInfo, doc *gouchstore.Document) error {
return f(di.ID, doc.Body)
})
}

Expand All @@ -334,14 +329,9 @@ func dbwalkKeys(dbname, from, to string, f func(k string) error) error {
}
defer closeDBConn(db)

return db.Walk(from, func(d *couchstore.Couchstore,
di *couchstore.DocInfo) error {
if to != "" && di.ID() >= to {
return couchstore.StopIteration
}

return f(di.ID())
})
return db.AllDocuments(from, to, func(db *gouchstore.Gouchstore, documentInfo *gouchstore.DocumentInfo, userContext interface{}) error {
return f(documentInfo.ID)
}, nil)
}

func parseKey(s string) int64 {
Expand Down
8 changes: 4 additions & 4 deletions debug.go
Expand Up @@ -8,7 +8,7 @@ import (
"runtime"
"sync"

"github.com/dustin/go-couchstore"
"github.com/mschoch/gouchstore"
)

type frameSnap []uintptr
Expand All @@ -30,17 +30,17 @@ type dbOpenState struct {
}

var openConnLock = sync.Mutex{}
var openConns = map[*couchstore.Couchstore]dbOpenState{}
var openConns = map[*gouchstore.Gouchstore]dbOpenState{}

func recordDBConn(path string, db *couchstore.Couchstore) {
func recordDBConn(path string, db *gouchstore.Gouchstore) {
callers := make([]uintptr, 32)
n := runtime.Callers(2, callers)
openConnLock.Lock()
openConns[db] = dbOpenState{path, frameSnap(callers[:n-1])}
openConnLock.Unlock()
}

func closeDBConn(db *couchstore.Couchstore) {
func closeDBConn(db *gouchstore.Gouchstore) {
db.Close()
openConnLock.Lock()
_, ok := openConns[db]
Expand Down
8 changes: 4 additions & 4 deletions handlers.go
Expand Up @@ -12,9 +12,9 @@ import (
"strings"
"time"

"github.com/dustin/go-couchstore"
"github.com/dustin/go-humanize"
"github.com/dustin/gojson"
"github.com/mschoch/gouchstore"
)

func serverInfo(parts []string, w http.ResponseWriter, req *http.Request) {
Expand Down Expand Up @@ -269,7 +269,7 @@ func deleteBulk(args []string, w http.ResponseWriter, req *http.Request) {

err = dbwalkKeys(args[0], from, to, func(k string) error {

bulk.Delete(couchstore.NewDocInfo(k, 0))
bulk.Delete(gouchstore.NewDocumentInfo(k))
deleteCount++
if deleteCount >= commitThreshold {
bulk.Commit()
Expand Down Expand Up @@ -433,11 +433,11 @@ func dbInfo(args []string, w http.ResponseWriter, req *http.Request) {
}
defer closeDBConn(db)

inf, err := db.Info()
inf, err := db.DatabaseInfo()
if err == nil {
mustEncode(200, w, map[string]interface{}{
"last_seq": inf.LastSeq,
"doc_count": inf.DocCount,
"doc_count": inf.DocumentCount,
"deleted_count": inf.DeletedCount,
"space_used": inf.SpaceUsed,
"header_pos": inf.HeaderPosition,
Expand Down
40 changes: 18 additions & 22 deletions query.go
Expand Up @@ -9,15 +9,15 @@ import (
"sync/atomic"
"time"

"github.com/dustin/go-couchstore"
"github.com/dustin/go-jsonpointer"
"github.com/dustin/gojson"
"github.com/mschoch/gouchstore"
)

var errTimeout = errors.New("query timed out")

type ptrval struct {
di *couchstore.DocInfo
di *gouchstore.DocumentInfo
val interface{}
included bool
}
Expand All @@ -40,8 +40,8 @@ type processIn struct {
cacheKey string
dbname string
key int64
infos []*couchstore.DocInfo
nextInfo *couchstore.DocInfo
infos []*gouchstore.DocumentInfo
nextInfo *gouchstore.DocumentInfo
ptrs []string
reds []string
before time.Time
Expand Down Expand Up @@ -83,7 +83,7 @@ func resolveFetch(j []byte, keys []string) map[string]interface{} {
return rv
}

func processDoc(di *couchstore.DocInfo, chs []chan ptrval,
func processDoc(di *gouchstore.DocumentInfo, chs []chan ptrval,
doc []byte, ptrs []string,
filters []string, filtervals []string,
included bool) {
Expand Down Expand Up @@ -130,7 +130,7 @@ func processDoc(di *couchstore.DocInfo, chs []chan ptrval,
for i, p := range ptrs {
val := fetched[p]
if p == "_id" {
val = di.ID()
val = di.ID
}
switch x := val.(type) {
case int, uint, int64, float64, uint64, bool:
Expand Down Expand Up @@ -174,10 +174,10 @@ func processDocs(pi *processIn) {
go func() {
defer closeAll(chans)

dodoc := func(di *couchstore.DocInfo, included bool) {
doc, err := db.GetFromDocInfo(di)
dodoc := func(di *gouchstore.DocumentInfo, included bool) {
doc, err := db.DocumentByDocumentInfo(di)
if err == nil {
processDoc(di, chans, doc.Value(), pi.ptrs,
processDoc(di, chans, doc.Body, pi.ptrs,
pi.filters, pi.filtervals, included)
} else {
for i := range pi.ptrs {
Expand Down Expand Up @@ -226,8 +226,8 @@ func docProcessor(ch <-chan *processIn) {
}
}

func fetchDocs(dbname string, key int64, infos []*couchstore.DocInfo,
nextInfo *couchstore.DocInfo, ptrs []string, reds []string,
func fetchDocs(dbname string, key int64, infos []*gouchstore.DocumentInfo,
nextInfo *gouchstore.DocumentInfo, ptrs []string, reds []string,
filters []string, filtervals []string,
before time.Time, out chan<- *processOut) {

Expand Down Expand Up @@ -257,17 +257,13 @@ func runQuery(q *queryIn) {

chunk := int64(time.Duration(q.group) * time.Millisecond)

infos := []*couchstore.DocInfo{}
infos := []*gouchstore.DocumentInfo{}
g := int64(0)
nextg := ""

err = db.Walk(q.from, func(d *couchstore.Couchstore,
di *couchstore.DocInfo) error {
kstr := di.ID()
err = db.AllDocuments(q.from, q.to, func(db *gouchstore.Gouchstore, di *gouchstore.DocumentInfo, userContext interface{}) error {
kstr := di.ID
var err error
if q.to != "" && kstr >= q.to {
err = couchstore.StopIteration
}

atomic.AddInt32(&q.totalKeys, 1)

Expand All @@ -278,7 +274,7 @@ func runQuery(q *queryIn) {
q.ptrs, q.reds, q.filters, q.filtervals,
q.before, q.out)

infos = make([]*couchstore.DocInfo, 0, len(infos))
infos = make([]*gouchstore.DocumentInfo, 0, len(infos))
}

k := parseKey(kstr)
Expand All @@ -290,7 +286,7 @@ func runQuery(q *queryIn) {
infos = append(infos, di)

return err
})
}, nil)

if err == nil && len(infos) > 0 {
atomic.AddInt32(&q.started, 1)
Expand Down Expand Up @@ -374,7 +370,7 @@ func convertTofloat64Rate(in chan ptrval) chan float64 {
case string:
x, err := strconv.ParseFloat(value, 64)
if err == nil {
prevts = parseKey(v.di.ID())
prevts = parseKey(v.di.ID)
preval = x
break FIND_USABLE
}
Expand All @@ -388,7 +384,7 @@ func convertTofloat64Rate(in chan ptrval) chan float64 {
case string:
x, err := strconv.ParseFloat(value, 64)
if err == nil {
thists := parseKey(v.di.ID())
thists := parseKey(v.di.ID)

val := ((x - preval) /
(float64(thists-prevts) / 1e9))
Expand Down
12 changes: 6 additions & 6 deletions query_test.go
Expand Up @@ -7,7 +7,7 @@ import (
"testing"
"time"

"github.com/dustin/go-couchstore"
"github.com/mschoch/gouchstore"
)

var testInput = []interface{}{}
Expand Down Expand Up @@ -40,11 +40,11 @@ func streamCollection(s []interface{}) chan ptrval {
for _, r := range s {
t = t.Add(time.Second)
ts := t.Format(time.RFC3339Nano)
ch <- ptrval{couchstore.NewDocInfo(ts, 0), r, true}
ch <- ptrval{gouchstore.NewDocumentInfo(ts), r, true}
}
t = t.Add(time.Second)
ts := t.Format(time.RFC3339Nano)
ch <- ptrval{couchstore.NewDocInfo(ts, 0), nextValue, false}
ch <- ptrval{gouchstore.NewDocumentInfo(ts), nextValue, false}
}()
return ch
}
Expand Down Expand Up @@ -76,12 +76,12 @@ func TestPairRateConversion(t *testing.T) {

tm := time.Now().UTC()
val1 := "20"
ch <- ptrval{couchstore.NewDocInfo(tm.Format(time.RFC3339Nano), 0),
ch <- ptrval{gouchstore.NewDocumentInfo(tm.Format(time.RFC3339Nano)),
val1, true}

tm = tm.Add(5 * time.Second)
val2 := "25"
ch <- ptrval{couchstore.NewDocInfo(tm.Format(time.RFC3339Nano), 0),
ch <- ptrval{gouchstore.NewDocumentInfo(tm.Format(time.RFC3339Nano)),
val2, false}

close(ch)
Expand Down Expand Up @@ -241,7 +241,7 @@ func TestNilReducers(t *testing.T) {

func TestPointers(t *testing.T) {
docID := "2013-02-22T16:29:19.750264Z"
di := couchstore.NewDocInfo(docID, 0)
di := gouchstore.NewDocumentInfo(docID)
tests := []struct {
pointer string
exp interface{}
Expand Down

0 comments on commit d4ff584

Please sign in to comment.