Permalink
Browse files

Batch transaction writes

  • Loading branch information...
1 parent 9298cab commit bb76c6bb4e6ee5ad63dc5973d62b4b631512e46f @brendonh committed Feb 3, 2013
Showing with 140 additions and 74 deletions.
  1. +2 −7 src/loge/database.go
  2. +62 −44 src/loge/leveldb.go
  3. +8 −2 src/loge/objects.go
  4. +59 −19 src/loge/storage.go
  5. +9 −2 src/loge/transactions.go
View
@@ -86,7 +86,6 @@ func (db *LogeDB) CreateTransaction() *Transaction {
return NewTransaction(db)
}
-
type Transactor func(*Transaction)
func (db *LogeDB) Transact(actor Transactor, timeout time.Duration) bool {
@@ -162,12 +161,8 @@ func (db *LogeDB) EnsureObj(objRef ObjRef) *LogeObject {
}
-func (db *LogeDB) StoreObj(obj *LogeObject) {
- if obj.LinkName == "" {
- db.store.Store(obj)
- } else {
- db.store.StoreLinks(obj)
- }
+func (db *LogeDB) NewWriteBatch() LogeWriteBatch {
+ return db.store.NewWriteBatch()
}
View
@@ -20,11 +20,18 @@ type LevelDBStore struct {
basePath string
db *levigo.DB
types *spack.TypeSet
- nextTypeNum int
+
linkSpec *spack.TypeSpec
linkInfoSpec *spack.TypeSpec
}
+type LevelDBWriteBatch struct {
+ store *LevelDBStore
+ batch *levigo.WriteBatch
+ count uint
+}
+
+
var writeOptions = levigo.NewWriteOptions()
var readOptions = levigo.NewReadOptions()
@@ -144,23 +151,6 @@ func (store *LevelDBStore) tagVersions(vt *spack.VersionedType, typ *LogeType) {
}
-func (store *LevelDBStore) Store(obj *LogeObject) error {
- var vt = store.types.Type(obj.Type.Name)
- var key = vt.EncodeKey(string(obj.Key))
- var val, err = vt.EncodeObj(obj.Current.Object)
-
- if err != nil {
- panic(fmt.Sprintf("Encoding error: %v\n", err))
- }
-
- err = store.db.Put(writeOptions, key, val)
- if err != nil {
- panic(fmt.Sprintf("Write error: %v\n", err))
- }
-
- return nil
-}
-
func (store *LevelDBStore) Get(typ *LogeType, key LogeKey) interface{} {
var vt = store.types.Type(typ.Name)
@@ -185,32 +175,6 @@ func (store *LevelDBStore) Get(typ *LogeType, key LogeKey) interface{} {
return obj
}
-// -----------------------------------------------
-// Links
-// -----------------------------------------------
-
-func (store *LevelDBStore) StoreLinks(linkObj *LogeObject) error {
- var set = linkObj.Current.Object.(*LinkSet)
-
- if len(set.Added) == 0 && len(set.Removed) == 0 {
- return nil
- }
-
- var vt = store.types.Type(linkObj.Type.Name)
- var linkInfo = linkObj.Type.Links[linkObj.LinkName]
-
- var key = encodeTaggedKey([]uint16{LINK_TAG, vt.Tag, linkInfo.Tag}, string(linkObj.Key))
-
- enc, _ := spack.EncodeToBytes(set.ReadKeys(), store.linkSpec)
-
- var err = store.db.Put(writeOptions, key, enc)
- if err != nil {
- panic(fmt.Sprintf("Write error: %v\n", err))
- }
-
- return nil
-}
-
func (store *LevelDBStore) GetLinks(typ *LogeType, linkName string, objKey LogeKey) Links {
var vt = store.types.Type(typ.Name)
@@ -233,6 +197,60 @@ func (store *LevelDBStore) GetLinks(typ *LogeType, linkName string, objKey LogeK
return links
}
+func (store *LevelDBStore) NewWriteBatch() LogeWriteBatch {
+ return &LevelDBWriteBatch{
+ store: store,
+ batch: levigo.NewWriteBatch(),
+ }
+}
+
+// -----------------------------------------------
+// Write Batches
+// -----------------------------------------------
+
+func (batch *LevelDBWriteBatch) Store(obj *LogeObject) error {
+ var vt = batch.store.types.Type(obj.Type.Name)
+ var key = vt.EncodeKey(string(obj.Key))
+ var val, err = vt.EncodeObj(obj.Current.Object)
+
+ if err != nil {
+ panic(fmt.Sprintf("Encoding error: %v\n", err))
+ }
+
+ batch.batch.Put(key, val)
+ batch.count++
+
+ return nil
+}
+
+func (batch *LevelDBWriteBatch) StoreLinks(linkObj *LogeObject) error {
+ var set = linkObj.Current.Object.(*LinkSet)
+
+ if len(set.Added) == 0 && len(set.Removed) == 0 {
+ return nil
+ }
+
+ var vt = batch.store.types.Type(linkObj.Type.Name)
+ var linkInfo = linkObj.Type.Links[linkObj.LinkName]
+
+ var key = encodeTaggedKey([]uint16{LINK_TAG, vt.Tag, linkInfo.Tag}, string(linkObj.Key))
+ val, _ := spack.EncodeToBytes(set.ReadKeys(), batch.store.linkSpec)
+
+ batch.batch.Put(key, val)
+ batch.count++
+
+ return nil
+}
+
+func (batch *LevelDBWriteBatch) Commit() error {
+ return batch.store.db.Write(writeOptions, batch.batch)
+}
+
+
+// -----------------------------------------------
+// Key encoding
+// -----------------------------------------------
+
func encodeTaggedKey(tags []uint16, key string) []byte {
var keyBytes = []byte(key)
var buf = bytes.NewBuffer(make([]byte, 0, len(keyBytes) + (2 * len(tags))))
View
@@ -57,9 +57,15 @@ func (obj *LogeObject) Applicable(version *LogeObjectVersion) bool {
}
-func (obj *LogeObject) ApplyVersion(version *LogeObjectVersion) {
+func (obj *LogeObject) ApplyVersion(version *LogeObjectVersion, batch LogeWriteBatch) {
obj.Current = version
- obj.DB.StoreObj(obj)
+
+ if obj.LinkName == "" {
+ batch.Store(obj)
+ } else {
+ batch.StoreLinks(obj)
+ }
+
version.Dirty = false
if obj.LinkName != "" {
version.Object.(*LinkSet).Freeze()
View
@@ -1,13 +1,19 @@
package loge
+
type LogeStore interface {
RegisterType(*LogeType)
- Store(*LogeObject) error
Get(t *LogeType, key LogeKey) interface{}
+ GetLinks(*LogeType, string, LogeKey) Links
+
+ NewWriteBatch() LogeWriteBatch
+}
+type LogeWriteBatch interface {
+ Store(*LogeObject) error
StoreLinks(*LogeObject) error
- GetLinks(*LogeType, string, LogeKey) Links
+ Commit() error
}
@@ -18,6 +24,16 @@ type MemStore struct {
objects objectMap
}
+type MemStoreWriteBatch struct {
+ store *MemStore
+ writes []MemStoreWriteEntry
+}
+
+type MemStoreWriteEntry struct {
+ TypeKey string
+ ObjKey LogeKey
+ Value interface{}
+}
func NewMemStore() *MemStore {
return &MemStore{
@@ -29,18 +45,11 @@ func NewMemStore() *MemStore {
func (store *MemStore) RegisterType(typ *LogeType) {
store.objects[typ.Name] = make(map[LogeKey]interface{})
for linkName := range typ.Links {
- var lk = store.linkKey(typ.Name, linkName)
+ var lk = memStoreLinkKey(typ.Name, linkName)
store.objects[lk] = make(map[LogeKey]interface{})
}
}
-
-func (store *MemStore) Store(obj *LogeObject) error {
- store.objects[obj.Type.Name][obj.Key] = obj.Current.Object
- return nil
-}
-
-
func (store *MemStore) Get(t *LogeType, key LogeKey) interface{} {
var objMap = store.objects[t.Name]
@@ -52,15 +61,8 @@ func (store *MemStore) Get(t *LogeType, key LogeKey) interface{} {
return object
}
-
-func (store *MemStore) StoreLinks(obj *LogeObject) error {
- var lk = store.linkKey(obj.Type.Name, obj.LinkName)
- store.objects[lk][obj.Key] = Links(obj.Current.Object.(*LinkSet).ReadKeys())
- return nil
-}
-
func (store *MemStore) GetLinks(typ *LogeType, linkName string, key LogeKey) Links {
- var lk = store.linkKey(typ.Name, linkName)
+ var lk = memStoreLinkKey(typ.Name, linkName)
links, ok := store.objects[lk][key]
if ok {
return links.(Links)
@@ -69,6 +71,44 @@ func (store *MemStore) GetLinks(typ *LogeType, linkName string, key LogeKey) Lin
return Links{}
}
-func (store *MemStore) linkKey(typeName string, linkName string) string {
+
+func (store *MemStore) NewWriteBatch() LogeWriteBatch {
+ return &MemStoreWriteBatch{
+ store: store,
+ }
+}
+
+
+func (batch *MemStoreWriteBatch) Store(obj *LogeObject) error {
+ batch.writes = append(
+ batch.writes,
+ MemStoreWriteEntry{
+ TypeKey: obj.Type.Name,
+ ObjKey: obj.Key,
+ Value: obj.Current.Object,
+ })
+ return nil
+}
+
+func (batch *MemStoreWriteBatch) StoreLinks(obj *LogeObject) error {
+ batch.writes = append(
+ batch.writes,
+ MemStoreWriteEntry{
+ TypeKey: memStoreLinkKey(obj.Type.Name, obj.LinkName),
+ ObjKey: obj.Key,
+ Value: Links(obj.Current.Object.(*LinkSet).ReadKeys()),
+ })
+ return nil
+}
+
+func (batch *MemStoreWriteBatch) Commit() error {
+ for _, entry := range batch.writes {
+ batch.store.objects[entry.TypeKey][entry.ObjKey] = entry.Value
+ }
+ return nil
+}
+
+
+func memStoreLinkKey(typeName string, linkName string) string {
return "^" + typeName + "^" + linkName
}
View
@@ -167,15 +167,22 @@ func (t *Transaction) tryCommit() bool {
return true
}
}
-
+
+ var batch = t.DB.NewWriteBatch()
for _, version := range t.Versions {
//fmt.Printf("Version %v\n", version)
version.LogeObj.RefCount--
if version.Dirty {
- version.LogeObj.ApplyVersion(version)
+ version.LogeObj.ApplyVersion(version, batch)
}
}
+ var err = batch.Commit()
+ if err != nil {
+ t.State = ERROR
+ fmt.Printf("Commit error: %v\n", err)
+ }
+
t.State = FINISHED
return true
}

0 comments on commit bb76c6b

Please sign in to comment.