Skip to content
Browse files

Refactor storage toward providing dirty methods and using ldb snapshots

  • Loading branch information...
1 parent cd12097 commit 08ca4863e39f490d3aaca833e9ce169bd68f8e43 @brendonh committed Feb 10, 2013
Showing with 196 additions and 115 deletions.
  1. +0 −4 src/loge/database.go
  2. +130 −77 src/loge/leveldb.go
  3. +3 −3 src/loge/objects.go
  4. +59 −27 src/loge/storage.go
  5. +2 −2 src/loge/transactions.go
  6. +2 −2 src/logetest/main.go
View
4 src/loge/database.go
@@ -202,7 +202,3 @@ func (db *LogeDB) ensureObj(ref objRef, load bool) *logeObject {
}
-func (db *LogeDB) newWriteBatch() writeBatch {
- return db.store.newWriteBatch()
-}
-
View
207 src/loge/leveldb.go
@@ -16,16 +16,23 @@ const ldb_LINK_INFO_TAG uint16 = 3
const ldb_INDEX_TAG uint16 = 4
const ldb_START_TAG uint16 = 8
+var linkSpec *spack.TypeSpec = spack.MakeTypeSpec([]string{})
+var linkInfoSpec *spack.TypeSpec = spack.MakeTypeSpec(linkInfo{})
+
+type levelDBWriter interface {
+ GetType(string) *spack.VersionedType
+ Put([]byte, []byte) error
+ Delete([]byte) error
+}
+
type levelDBStore struct {
basePath string
db *levigo.DB
types *spack.TypeSet
- writeQueue chan *levelDBWriteBatch
+ writeQueue chan *levelDBContext
flushed bool
- linkSpec *spack.TypeSpec
- linkInfoSpec *spack.TypeSpec
}
type levelDBResultSet struct {
@@ -35,8 +42,8 @@ type levelDBResultSet struct {
closed bool
}
-type levelDBWriteBatch struct {
- store *levelDBStore
+type levelDBContext struct {
+ ldbStore *levelDBStore
batch []levelDBWriteEntry
result chan error
}
@@ -65,11 +72,8 @@ func NewLevelDBStore(basePath string) LogeStore {
db: db,
types: spack.NewTypeSet(),
- writeQueue: make(chan *levelDBWriteBatch),
+ writeQueue: make(chan *levelDBContext),
flushed: false,
-
- linkSpec: spack.MakeTypeSpec([]string{}),
- linkInfoSpec: spack.MakeTypeSpec(linkInfo{}),
}
store.types.LastTag = ldb_START_TAG
@@ -116,9 +120,20 @@ func (store *levelDBStore) registerType(typ *logeType) {
vt.Dirty = false
}
+func (store *levelDBStore) GetType(typeName string) *spack.VersionedType {
+ return store.types.Type(typeName)
+}
-func (store *levelDBStore) get(typ *logeType, key LogeKey) interface{} {
+func (store *levelDBStore) Put(key []byte, val []byte) error {
+ return store.db.Put(writeOptions, key, val)
+}
+func (store *levelDBStore) Delete(key []byte) error {
+ return store.db.Delete(writeOptions, key)
+}
+
+
+func (store *levelDBStore) get(typ *logeType, key LogeKey) interface{} {
var vt = store.types.Type(typ.Name)
var encKey = vt.EncodeKey(string(key))
@@ -162,11 +177,18 @@ func (store *levelDBStore) getLinks(typ *logeType, linkName string, objKey LogeK
}
var links linkList
- spack.DecodeFromBytes(&links, store.linkSpec, val)
+ spack.DecodeFromBytes(&links, linkSpec, val)
return links
}
+func (store *levelDBStore) store(obj *logeObject) error {
+ return ldb_store(store, obj)
+}
+
+func (store *levelDBStore) storeLinks(obj *logeObject) error {
+ return ldb_storeLinks(store, obj)
+}
// -----------------------------------------------
// Search
@@ -228,9 +250,9 @@ func (rs *levelDBResultSet) Close() {
// Write Batches
// -----------------------------------------------
-func (store *levelDBStore) newWriteBatch() writeBatch {
- return &levelDBWriteBatch{
- store: store,
+func (store *levelDBStore) newContext() transactionContext {
+ return &levelDBContext{
+ ldbStore: store,
batch: make([]levelDBWriteEntry, 0),
result: make(chan error),
}
@@ -239,94 +261,68 @@ func (store *levelDBStore) newWriteBatch() writeBatch {
func (store *levelDBStore) writer() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
- for batch := range store.writeQueue {
- if batch == nil {
+ for context := range store.writeQueue {
+ if context == nil {
break
}
- batch.result<- batch.Write()
+ context.result<- context.Write()
}
store.flushed = true
}
-func (batch *levelDBWriteBatch) Store(obj *logeObject) error {
- var vt = batch.store.types.Type(obj.Type.Name)
- var key = vt.EncodeKey(string(obj.Key))
-
- if !obj.Current.hasValue() {
- batch.Delete(key)
- return nil
- }
-
- var val, err = vt.EncodeObj(obj.Current.Object)
- if err != nil {
- panic(fmt.Sprintf("Encoding error: %v\n", err))
- }
-
- batch.Append(key, val)
-
- return nil
+func (context *levelDBContext) GetType(typeName string) *spack.VersionedType {
+ return context.ldbStore.types.Type(typeName)
}
-func (batch *levelDBWriteBatch) StoreLinks(linkObj *logeObject) error {
- var set = linkObj.Current.Object.(*linkSet)
-
- if len(set.Added) == 0 && len(set.Removed) == 0 {
- return nil
- }
-
- var vt = batch.store.types.Type(linkObj.Type.Name)
- var linkInfo = linkObj.Type.Links[linkObj.LinkName]
-
- var key = encodeTaggedKey([]uint16{ldb_LINK_TAG, vt.Tag, linkInfo.Tag}, string(linkObj.Key))
- val, _ := spack.EncodeToBytes(set.ReadKeys(), batch.store.linkSpec)
-
- batch.Append(key, val)
-
- var prefix = encodeTaggedKey([]uint16{ldb_INDEX_TAG, vt.Tag, linkInfo.Tag}, "")
- var source = string(linkObj.Key)
-
- for _, target := range set.Removed {
- var key = encodeIndexKey(prefix, target, source)
- batch.Delete(key)
- }
-
- for _, target := range set.Added {
- var key = encodeIndexKey(prefix, target, source)
- batch.Append(key, []byte{})
- }
-
-
+func (context *levelDBContext) Put(key []byte, val []byte) error {
+ context.batch = append(context.batch, levelDBWriteEntry{ key, val, false })
return nil
}
-func (batch *levelDBWriteBatch) Append(key []byte, val []byte) {
- batch.batch = append(batch.batch, levelDBWriteEntry{ key, val, false })
-}
-
-func (batch *levelDBWriteBatch) Delete(key []byte) {
- batch.batch = append(batch.batch, levelDBWriteEntry{ key, nil, true })
+func (context *levelDBContext) Delete(key []byte) error {
+ context.batch = append(context.batch, levelDBWriteEntry{ key, nil, true })
+ return nil
}
-func (batch *levelDBWriteBatch) Commit() error {
- batch.store.writeQueue <- batch
- return <-batch.result
+func (context *levelDBContext) commit() error {
+ context.ldbStore.writeQueue <- context
+ return <-context.result
}
-func (batch *levelDBWriteBatch) Write() error {
+func (context *levelDBContext) Write() error {
var wb = levigo.NewWriteBatch()
defer wb.Close()
- for _, entry := range batch.batch {
+ for _, entry := range context.batch {
if entry.Delete {
wb.Delete(entry.Key)
} else {
wb.Put(entry.Key, entry.Val)
}
}
- return batch.store.db.Write(writeOptions, wb)
+ return context.ldbStore.db.Write(writeOptions, wb)
+}
+
+func (context *levelDBContext) store(obj *logeObject) error {
+ return ldb_store(context, obj)
+}
+
+func (context *levelDBContext) storeLinks(obj *logeObject) error {
+ return ldb_storeLinks(context, obj)
+}
+
+func (context *levelDBContext) find(typ *logeType, linkName string, target LogeKey) ResultSet {
+ return context.ldbStore.find(typ, linkName, target)
+}
+
+func (context *levelDBContext) get(typ *logeType, key LogeKey) interface{} {
+ return context.ldbStore.get(typ, key)
}
+func (context *levelDBContext) getLinks(typ *logeType, linkName string, key LogeKey) []string {
+ return context.ldbStore.getLinks(typ, linkName, key)
+}
// -----------------------------------------------
// Internals
@@ -356,7 +352,7 @@ func (store *levelDBStore) tagVersions(vt *spack.VersionedType, typ *logeType) {
for it = it; it.Valid(); it.Next() {
var info = &linkInfo{}
- spack.DecodeFromBytes(info, store.linkInfoSpec, it.Value())
+ spack.DecodeFromBytes(info, linkInfoSpec, it.Value())
typ.Links[info.Name] = info
}
@@ -377,7 +373,7 @@ func (store *levelDBStore) tagVersions(vt *spack.VersionedType, typ *logeType) {
maxTag++
info.Tag = maxTag
var key = encodeTaggedKey([]uint16{ldb_LINK_INFO_TAG, vt.Tag}, info.Name)
- enc, _ := spack.EncodeToBytes(info, store.linkInfoSpec)
+ enc, _ := spack.EncodeToBytes(info, linkInfoSpec)
fmt.Printf("Updating link: %s::%s (%d)\n", typ.Name, info.Name, info.Tag)
var err = store.db.Put(writeOptions, key, enc)
if err != nil {
@@ -410,6 +406,63 @@ func encodeIndexKey(prefix []byte, target string, source string) []byte {
return buf
}
+
+// -----------------------------------------------
+// Levigo interaction
+// -----------------------------------------------
+
+func ldb_store(writer levelDBWriter, obj *logeObject) error {
+ var vt = writer.GetType(obj.Type.Name)
+ var key = vt.EncodeKey(string(obj.Key))
+
+ if !obj.Current.hasValue() {
+ writer.Delete(key)
+ return nil
+ }
+
+ var val, err = vt.EncodeObj(obj.Current.Object)
+
+ if err != nil {
+ panic(fmt.Sprintf("Encoding error: %v\n", err))
+ }
+
+ writer.Put(key, val)
+
+ return nil
+}
+
+func ldb_storeLinks(writer levelDBWriter, linkObj *logeObject) error {
+ var set = linkObj.Current.Object.(*linkSet)
+
+ if len(set.Added) == 0 && len(set.Removed) == 0 {
+ return nil
+ }
+
+ var vt = writer.GetType(linkObj.Type.Name)
+ var linkInfo = linkObj.Type.Links[linkObj.LinkName]
+
+ var key = encodeTaggedKey([]uint16{ldb_LINK_TAG, vt.Tag, linkInfo.Tag}, string(linkObj.Key))
+ val, _ := spack.EncodeToBytes(set.ReadKeys(), linkSpec)
+
+ writer.Put(key, val)
+
+ var prefix = encodeTaggedKey([]uint16{ldb_INDEX_TAG, vt.Tag, linkInfo.Tag}, "")
+ var source = string(linkObj.Key)
+
+ for _, target := range set.Removed {
+ var key = encodeIndexKey(prefix, target, source)
+ writer.Delete(key)
+ }
+
+ for _, target := range set.Added {
+ var key = encodeIndexKey(prefix, target, source)
+ writer.Put(key, []byte{})
+ }
+
+
+ return nil
+}
+
// -----------------------------------------------
// Prefix iterator
// -----------------------------------------------
View
6 src/loge/objects.go
@@ -48,13 +48,13 @@ func (obj *logeObject) newVersion() *objectVersion {
}
}
-func (obj *logeObject) applyVersion(version *objectVersion, batch writeBatch) {
+func (obj *logeObject) applyVersion(version *objectVersion, context storeContext) {
obj.Current = version
if obj.LinkName == "" {
- batch.Store(obj)
+ context.store(obj)
} else {
- batch.StoreLinks(obj)
+ context.storeLinks(obj)
}
version.Dirty = false
View
86 src/loge/storage.go
@@ -1,13 +1,11 @@
package loge
+
type LogeStore interface {
- get(t *logeType, key LogeKey) interface{}
- getLinks(*logeType, string, LogeKey) []string
- find(*logeType, string, LogeKey) ResultSet
+ storeContext
close()
-
registerType(*logeType)
- newWriteBatch() writeBatch
+ newContext() transactionContext
}
type ResultSet interface {
@@ -16,12 +14,18 @@ type ResultSet interface {
Close()
}
-type writeBatch interface {
- Store(*logeObject) error
- StoreLinks(*logeObject) error
- Commit() error
+type storeContext interface {
+ get(*logeType, LogeKey) interface{}
+ getLinks(*logeType, string, LogeKey) []string
+ find(*logeType, string, LogeKey) ResultSet
+ store(*logeObject) error
+ storeLinks(*logeObject) error
}
+type transactionContext interface {
+ storeContext
+ commit() error
+}
type objectMap map[string]map[LogeKey]interface{}
@@ -31,8 +35,8 @@ type memStore struct {
lock spinLock
}
-type memWriteBatch struct {
- store *memStore
+type memContext struct {
+ mstore *memStore
writes []memWriteEntry
}
@@ -61,12 +65,10 @@ func (store *memStore) registerType(typ *logeType) {
func (store *memStore) get(t *logeType, key LogeKey) interface{} {
var objMap = store.objects[t.Name]
-
object, ok := objMap[key]
if !ok {
return nil
}
-
return object
}
@@ -85,16 +87,40 @@ func (store *memStore) find(typ *logeType, linkName string, key LogeKey) ResultS
panic("Find not implemented on memstore")
}
-func (store *memStore) newWriteBatch() writeBatch {
- return &memWriteBatch{
- store: store,
+func (store *memStore) store(obj *logeObject) error {
+ obj.Lock.SpinLock()
+ defer obj.Lock.Unlock()
+ store.objects[obj.Type.Name][obj.Key] = obj.Current.Object
+ return nil
+}
+
+func (store *memStore) storeLinks(obj *logeObject) error {
+ obj.Lock.SpinLock()
+ defer obj.Lock.Unlock()
+ var typeKey = memStoreLinkKey(obj.Type.Name, obj.LinkName)
+ var val = linkList(obj.Current.Object.(*linkSet).ReadKeys())
+ store.objects[typeKey][obj.Key] = val
+ return nil
+}
+
+func (store *memStore) newContext() transactionContext {
+ return &memContext{
+ mstore: store,
}
}
-func (batch *memWriteBatch) Store(obj *logeObject) error {
- batch.writes = append(
- batch.writes,
+func (context *memContext) get(t *logeType, key LogeKey) interface{} {
+ return context.mstore.get(t, key)
+}
+
+func (context *memContext) getLinks(t *logeType, linkName string, key LogeKey) []string {
+ return context.mstore.getLinks(t, linkName, key)
+}
+
+func (context *memContext) store(obj *logeObject) error {
+ context.writes = append(
+ context.writes,
memWriteEntry{
TypeKey: obj.Type.Name,
ObjKey: obj.Key,
@@ -103,9 +129,9 @@ func (batch *memWriteBatch) Store(obj *logeObject) error {
return nil
}
-func (batch *memWriteBatch) StoreLinks(obj *logeObject) error {
- batch.writes = append(
- batch.writes,
+func (context *memContext) storeLinks(obj *logeObject) error {
+ context.writes = append(
+ context.writes,
memWriteEntry{
TypeKey: memStoreLinkKey(obj.Type.Name, obj.LinkName),
ObjKey: obj.Key,
@@ -114,11 +140,17 @@ func (batch *memWriteBatch) StoreLinks(obj *logeObject) error {
return nil
}
-func (batch *memWriteBatch) Commit() error {
- batch.store.lock.SpinLock()
- defer batch.store.lock.Unlock()
- for _, entry := range batch.writes {
- batch.store.objects[entry.TypeKey][entry.ObjKey] = entry.Value
+func (context *memContext) find(typ *logeType, linkName string, key LogeKey) ResultSet {
+ // Until I can be bothered
+ panic("Find not implemented on memstore")
+}
+
+func (context *memContext) commit() error {
+ var store = context.mstore
+ store.lock.SpinLock()
+ defer store.lock.Unlock()
+ for _, entry := range context.writes {
+ store.objects[entry.TypeKey][entry.ObjKey] = entry.Value
}
return nil
}
View
4 src/loge/transactions.go
@@ -187,15 +187,15 @@ func (t *Transaction) tryCommit() bool {
}
}
- var batch = t.db.newWriteBatch()
+ var batch = t.db.store.newContext()
for _, version := range t.versions {
version.LogeObj.RefCount--
if version.Dirty {
version.LogeObj.applyVersion(version, batch)
}
}
- var err = batch.Commit()
+ var err = batch.commit()
if err != nil {
t.state = ERROR
fmt.Printf("Commit error: %v\n", err)
View
4 src/logetest/main.go
@@ -12,8 +12,8 @@ type Pet struct {
}
func main() {
- LinkBench()
- //LinkSandbox()
+ //LinkBench()
+ LinkSandbox()
//WriteBench()
//Sandbox()
//Example()

0 comments on commit 08ca486

Please sign in to comment.
Something went wrong with that request. Please try again.