Browse files

Do spack encoding at DB level, rather than in storage, and re-decode …

…instead of copying objects
  • Loading branch information...
1 parent 745632b commit 1cfa311a61aad88aef429294525ef8a4719a5dfe @brendonh committed Feb 11, 2013
View
48 src/loge/contention_test.go
@@ -8,9 +8,51 @@ import (
)
type TestCounter struct {
- Value int
+ Value uint32
}
+// func TestNoContention(b *testing.T) {
+
+// var N = 50000
+
+// var procs = runtime.NumCPU()
+// var origProcs = runtime.GOMAXPROCS(procs)
+
+// var db = NewLogeDB(NewMemStore())
+// db.CreateType("counters", 1, &TestCounter{}, nil)
+
+// db.Transact(func (t *Transaction) {
+// for i := 0; i < procs; i++ {
+// var key LogeKey = LogeKey(strconv.Itoa(i))
+// t.Set("counters", key, &TestCounter{Value: 0})
+// }
+// }, 0)
+
+
+// var group sync.WaitGroup
+// for i := 0; i < procs; i++ {
+// var key = LogeKey(strconv.Itoa(i))
+// group.Add(1)
+// go LoopIncrement(db, key, &group, N)
+// }
+// group.Wait()
+
+
+// db.Transact(func (t *Transaction) {
+// for i := 0; i < procs; i++ {
+// var key = LogeKey(strconv.Itoa(i))
+// var counter = t.Read("counters", key).(*TestCounter)
+// if counter.Value != uint32(N) {
+// b.Errorf("Wrong count for counter %d: %d / %d",
+// i, counter.Value, N)
+// }
+// }
+// }, 0)
+
+// runtime.GOMAXPROCS(origProcs)
+// }
+
+
func BenchmarkNoContention(b *testing.B) {
b.StopTimer()
@@ -43,7 +85,7 @@ func BenchmarkNoContention(b *testing.B) {
for i := 0; i < procs; i++ {
var key = LogeKey(strconv.Itoa(i))
var counter = t.Read("counters", key).(*TestCounter)
- if counter.Value != b.N {
+ if counter.Value != uint32(b.N) {
b.Errorf("Wrong count for counter %d: %d / %d",
i, counter.Value, b.N)
}
@@ -82,7 +124,7 @@ func BenchmarkContention(b *testing.B) {
db.Transact(func (t *Transaction) {
var target = b.N * procs
var counter = t.Read("counters", "contended").(*TestCounter)
- if counter.Value != target {
+ if counter.Value != uint32(target) {
b.Errorf("Wrong count for counter: %d / %d",
counter.Value, target)
}
View
114 src/loge/database.go
@@ -4,6 +4,9 @@ import (
"fmt"
"time"
"sync/atomic"
+ "reflect"
+
+ "github.com/brendonh/spack"
)
type LogeDB struct {
@@ -12,6 +15,7 @@ type LogeDB struct {
cache objCache
lastSnapshotID uint64
lock spinLock
+ linkTypeSpec *spack.TypeSpec
}
func NewLogeDB(store LogeStore) *LogeDB {
@@ -20,6 +24,7 @@ func NewLogeDB(store LogeStore) *LogeDB {
store: store,
cache: make(objCache),
lastSnapshotID: 1,
+ linkTypeSpec: spack.MakeTypeSpec([]string{}),
}
}
@@ -28,35 +33,9 @@ type typeMap map[string]*logeType
type objCache map[string]*logeObject
-type objRef struct {
- TypeName string
- Key LogeKey
- LinkName string
- CacheKey string
-}
-
type Transactor func(*Transaction)
-func makeObjRef(typeName string, key LogeKey) objRef {
- var cacheKey = typeName + "^" + string(key)
- return objRef { typeName, key, "", cacheKey }
-}
-
-func makeLinkRef(typeName string, linkName string, key LogeKey) objRef {
- var cacheKey = "^" + typeName + "^" + linkName + "^" + string(key)
- return objRef { typeName, key, linkName, cacheKey }
-}
-
-func (objRef objRef) String() string {
- return objRef.CacheKey
-}
-
-func (objRef objRef) IsLink() bool {
- return objRef.LinkName != ""
-}
-
-
func (db *LogeDB) Close() {
db.store.close()
}
@@ -68,26 +47,15 @@ func (db *LogeDB) CreateType(name string, version uint16, exemplar interface{},
panic(fmt.Sprintf("Type exists: '%s'", name))
}
- var infos = make(map[string]*linkInfo)
- for k, v := range linkSpec {
- infos[k] = &linkInfo{
- Name: k,
- Target: v,
- Tag: 0,
- }
- }
-
- var t = &logeType {
- Name: name,
- Version: version,
- Exemplar: exemplar,
- Links: infos,
- }
+ var vt = db.store.getSpackType(name)
+ var spackExemplar = reflect.ValueOf(exemplar).Elem().Interface()
+ vt.AddVersion(version, spackExemplar, nil)
+ var typ = NewType(name, version, exemplar, linkSpec, vt)
- db.types[name] = t
- db.store.registerType(t)
+ db.types[name] = typ
+ db.store.registerType(typ)
- return t
+ return typ
}
@@ -115,12 +83,12 @@ func (db *LogeDB) Transact(actor Transactor, timeout time.Duration) bool {
return false
}
-func (db *LogeDB) Find(typeName string, linkName string, target LogeKey) ResultSet {
- return db.store.find(db.types[typeName], linkName, target)
+func (db *LogeDB) Find(typeName string, linkName string, target LogeKey) ResultSet {
+ return db.store.find(makeLinkRef(typeName, linkName, ""), target)
}
func (db *LogeDB) FindFrom(typeName string, linkName string, target LogeKey, from LogeKey, limit int) ResultSet {
- return db.store.findFrom(db.types[typeName], linkName, target, from, limit)
+ return db.store.findSlice(makeLinkRef(typeName, linkName, ""), target, from, limit)
}
@@ -142,21 +110,20 @@ func (db *LogeDB) FlushCache() int {
// -----------------------------------------------
func (db *LogeDB) ExistsOne(typeName string, key LogeKey) bool {
- var obj = db.store.get(db.types[typeName], key)
+ var obj = db.store.get(makeObjRef(typeName, key))
return obj != nil
}
func (db *LogeDB) ReadOne(typeName string, key LogeKey) interface{} {
var typ = db.types[typeName]
- var obj = db.store.get(typ, key)
- if obj == nil {
- return typ.NilValue()
- }
- return obj
+ return typ.Decode(db.store.get(makeObjRef(typeName, key)))
}
func (db *LogeDB) ReadLinksOne(typeName string, linkName string, key LogeKey) []string {
- return db.store.getLinks(db.types[typeName], linkName, key)
+ var blob = db.store.get(makeLinkRef(typeName, linkName, key))
+ var links linkList
+ spack.DecodeFromBytes(&links, db.linkTypeSpec, blob)
+ return links
}
func (db *LogeDB) SetOne(typeName string, key LogeKey, obj interface{}) {
@@ -202,38 +169,21 @@ func (db *LogeDB) ensureObj(ref objRef, load bool) *logeObject {
db.lock.Unlock()
var version *objectVersion
- if ref.IsLink() {
- var links []string
- if load {
- links = db.store.getLinks(typ, ref.LinkName, key)
- obj.Loaded = true
- }
-
- var linkSet = newLinkSet()
- linkSet.Original = links
- version = &objectVersion {
- LogeObj: obj,
- Object: linkSet,
- }
- obj.LinkName = ref.LinkName
- } else {
- var object interface{}
-
- if load {
- object = db.store.get(typ, key)
- obj.Loaded = true
- }
+ var blob []byte
+ if load {
+ blob = db.store.get(ref)
+ obj.Loaded = true
+ }
- if object == nil {
- object = typ.NilValue()
- }
+ version = &objectVersion {
+ LogeObj: obj,
+ Blob: blob,
+ }
- version = &objectVersion{
- Object: object,
- }
+ if ref.IsLink() {
+ obj.LinkName = ref.LinkName
- version.LogeObj = obj
}
obj.Current = version
View
234 src/loge/leveldb.go
@@ -3,7 +3,6 @@ package loge
import (
"fmt"
"bytes"
- "reflect"
"encoding/binary"
"runtime"
@@ -16,11 +15,8 @@ const ldb_LINK_INFO_TAG uint16 = 3
const ldb_INDEX_TAG uint16 = 4
const ldb_START_TAG uint16 = 8
-var linkSpec *spack.TypeSpec = spack.MakeTypeSpec([]string{})
-var linkInfoSpec *spack.TypeSpec = spack.MakeTypeSpec(linkInfo{})
-
type levelDBWriter interface {
- GetType(string) *spack.VersionedType
+ GetLogeType(string) *logeType
Put([]byte, []byte) error
Delete([]byte) error
}
@@ -29,6 +25,7 @@ type levelDBStore struct {
basePath string
db *levigo.DB
types *spack.TypeSet
+ logeTypes map[string]*logeType
writeQueue chan *levelDBContext
flushed bool
@@ -75,6 +72,7 @@ func NewLevelDBStore(basePath string) LogeStore {
basePath: basePath,
db: db,
types: spack.NewTypeSet(),
+ logeTypes: make(map[string]*logeType),
writeQueue: make(chan *levelDBContext),
flushed: false,
@@ -96,10 +94,10 @@ func (store *levelDBStore) close() {
}
func (store *levelDBStore) registerType(typ *logeType) {
- var vt = store.types.RegisterType(typ.Name)
- var exemplar = reflect.ValueOf(typ.Exemplar).Elem().Interface()
- vt.AddVersion(typ.Version, exemplar, nil)
- store.tagVersions(vt, typ)
+ store.logeTypes[typ.Name] = typ
+ store.tagVersions(typ)
+
+ var vt = typ.SpackType
if (!vt.Dirty) {
return
@@ -124,8 +122,12 @@ func (store *levelDBStore) registerType(typ *logeType) {
vt.Dirty = false
}
-func (store *levelDBStore) GetType(typeName string) *spack.VersionedType {
- return store.types.Type(typeName)
+func (store *levelDBStore) getSpackType(name string) *spack.VersionedType {
+ return store.types.RegisterType(name)
+}
+
+func (store *levelDBStore) GetLogeType(typeName string) *logeType {
+ return store.logeTypes[typeName]
}
func (store *levelDBStore) Put(key []byte, val []byte) error {
@@ -137,73 +139,49 @@ func (store *levelDBStore) Delete(key []byte) error {
}
-func (store *levelDBStore) get(typ *logeType, key LogeKey) interface{} {
- var vt = store.types.Type(typ.Name)
- var encKey = vt.EncodeKey(string(key))
-
- val, err := store.db.Get(defaultReadOptions, encKey)
+func (store *levelDBStore) get(ref objRef) []byte {
+ var typ = store.logeTypes[ref.TypeName]
+ var vt = typ.SpackType
- if err != nil {
- panic(fmt.Sprintf("Read error: %v\n", err))
- }
-
- var obj interface{}
- if val == nil {
- obj = typ.NilValue()
+ var key []byte
+ if ref.LinkName == "" {
+ key = vt.EncodeKey(string(ref.Key))
} else {
- obj, err = vt.DecodeObj(val)
- if err != nil {
- panic(fmt.Sprintf("Decode error: %v", err))
- }
+ var linkInfo = typ.Links[ref.LinkName]
+ key = encodeTaggedKey([]uint16{ldb_LINK_TAG, vt.Tag, linkInfo.Tag}, string(ref.Key))
}
- return obj
-}
-
-func (store *levelDBStore) getLinks(typ *logeType, linkName string, objKey LogeKey) []string {
- var vt = store.types.Type(typ.Name)
-
- var linkInfo, ok = typ.Links[linkName]
- if !ok {
- panic(fmt.Sprintf("Link info missing for %s", linkName))
- }
-
- var key = encodeTaggedKey([]uint16{ldb_LINK_TAG, vt.Tag, linkInfo.Tag}, string(objKey))
-
val, err := store.db.Get(defaultReadOptions, key)
if err != nil {
panic(fmt.Sprintf("Read error: %v\n", err))
}
- if val == nil {
- return linkList{}
- }
-
- var links linkList
- spack.DecodeFromBytes(&links, linkSpec, val)
+ return val
+}
- return links
+func (store *levelDBStore) store(ref objRef, enc []byte) error {
+ return ldb_store(store, ref, enc)
}
-func (store *levelDBStore) store(obj *logeObject) error {
- return ldb_store(store, obj)
+func (store *levelDBStore) addIndex(ref objRef, target LogeKey) {
+ ldb_addIndex(store, ref, target)
}
-func (store *levelDBStore) storeLinks(obj *logeObject) error {
- return ldb_storeLinks(store, obj)
+func (store *levelDBStore) remIndex(ref objRef, target LogeKey) {
+ ldb_remIndex(store, ref, target)
}
// -----------------------------------------------
// Search
// -----------------------------------------------
-func (store *levelDBStore) find(typ *logeType, linkName string, key LogeKey) ResultSet {
- return ldb_find(store, defaultReadOptions, typ, linkName, key, "", -1)
+func (store *levelDBStore) find(ref objRef, key LogeKey) ResultSet {
+ return ldb_find(store, defaultReadOptions, ref, key, "", -1)
}
-func (store *levelDBStore) findFrom(typ *logeType, linkName string, key LogeKey, from LogeKey, limit int) ResultSet {
- return ldb_find(store, defaultReadOptions, typ, linkName, key, from, limit)
+func (store *levelDBStore) findSlice(ref objRef, key LogeKey, from LogeKey, limit int) ResultSet {
+ return ldb_find(store, defaultReadOptions, ref, key, from, limit)
}
func (rs *levelDBResultSet) Valid() bool {
@@ -274,8 +252,8 @@ func (store *levelDBStore) writer() {
}
-func (context *levelDBContext) GetType(typeName string) *spack.VersionedType {
- return context.ldbStore.types.Type(typeName)
+func (context *levelDBContext) GetLogeType(typeName string) *logeType {
+ return context.ldbStore.logeTypes[typeName]
}
func (context *levelDBContext) Put(key []byte, val []byte) error {
@@ -318,28 +296,28 @@ func (context *levelDBContext) Write() error {
return context.ldbStore.db.Write(defaultWriteOptions, wb)
}
-func (context *levelDBContext) store(obj *logeObject) error {
- return ldb_store(context, obj)
+func (context *levelDBContext) store(ref objRef, enc []byte) error {
+ return ldb_store(context, ref, enc)
}
-func (context *levelDBContext) storeLinks(obj *logeObject) error {
- return ldb_storeLinks(context, obj)
+func (context *levelDBContext) addIndex(ref objRef, target LogeKey) {
+ ldb_addIndex(context, ref, target)
}
-func (context *levelDBContext) find(typ *logeType, linkName string, target LogeKey) ResultSet {
- return ldb_find(context.ldbStore, context.readOptions, typ, linkName, target, "", -1)
+func (context *levelDBContext) remIndex(ref objRef, target LogeKey) {
+ ldb_remIndex(context, ref, target)
}
-func (context *levelDBContext) findFrom(typ *logeType, linkName string, key LogeKey, from LogeKey, limit int) ResultSet {
- return ldb_find(context.ldbStore, context.readOptions, typ, linkName, key, from, limit)
+func (context *levelDBContext) find(ref objRef, target LogeKey) ResultSet {
+ return ldb_find(context.ldbStore, context.readOptions, ref, target, "", -1)
}
-func (context *levelDBContext) get(typ *logeType, key LogeKey) interface{} {
- return context.ldbStore.get(typ, key)
+func (context *levelDBContext) findSlice(ref objRef, key LogeKey, from LogeKey, limit int) ResultSet {
+ return ldb_find(context.ldbStore, context.readOptions, ref, key, from, limit)
}
-func (context *levelDBContext) getLinks(typ *logeType, linkName string, key LogeKey) []string {
- return context.ldbStore.getLinks(typ, linkName, key)
+func (context *levelDBContext) get(ref objRef) []byte {
+ return context.ldbStore.get(ref)
}
// -----------------------------------------------
@@ -363,7 +341,8 @@ func (store *levelDBStore) loadTypeMetadata() {
}
}
-func (store *levelDBStore) tagVersions(vt *spack.VersionedType, typ *logeType) {
+func (store *levelDBStore) tagVersions(typ *logeType) {
+ var vt = typ.SpackType
var prefix = encodeTaggedKey([]uint16{ldb_LINK_INFO_TAG, vt.Tag}, "")
var it = store.iteratePrefix(prefix, []byte{}, defaultReadOptions)
defer it.Close()
@@ -414,31 +393,74 @@ func encodeTaggedKey(tags []uint16, key string) []byte {
return buf.Bytes()
}
-func encodeIndexKey(prefix []byte, target string, source string) []byte {
- var buf = make([]byte, 0, len(prefix) + len(target) + len(source))
- buf = append(buf, prefix...)
- buf = append(buf, []byte(target)...)
- buf = append(buf, 0)
- buf = append(buf, []byte(source)...)
- return buf
+func encodeIndexKey(writer levelDBWriter, ref objRef, target LogeKey) []byte {
+ var typ = writer.GetLogeType(ref.TypeName)
+ var vt = typ.SpackType
+ var linkInfo = typ.Links[ref.LinkName]
+
+ var tags = []uint16{ldb_INDEX_TAG, vt.Tag, linkInfo.Tag}
+
+ var source = ref.Key
+
+ var buf = bytes.NewBuffer(make([]byte, 0, (2 * len(tags)) + len(target) + len(source)))
+ for _, tag := range tags {
+ binary.Write(buf, binary.BigEndian, tag)
+ }
+ buf.Write([]byte(target))
+ buf.Write([]byte{0})
+ buf.Write([]byte(source))
+ return buf.Bytes()
}
// -----------------------------------------------
// Levigo interaction
// -----------------------------------------------
+
+func ldb_store(writer levelDBWriter, ref objRef, enc []byte) error {
+ var typ = writer.GetLogeType(ref.TypeName)
+ var vt = typ.SpackType
+
+ var key []byte
+ if ref.LinkName == "" {
+ key = vt.EncodeKey(string(ref.Key))
+ } else {
+ var linkInfo = typ.Links[ref.LinkName]
+ key = encodeTaggedKey([]uint16{ldb_LINK_TAG, vt.Tag, linkInfo.Tag}, string(ref.Key))
+ }
+
+ if len(enc) == 0 {
+ writer.Delete(key)
+ return nil
+ }
+
+ writer.Put(key, enc)
+
+ return nil
+}
+
+func ldb_addIndex(writer levelDBWriter, ref objRef, target LogeKey) {
+ var key = encodeIndexKey(writer, ref, target)
+ writer.Put(key, []byte{})
+}
+
+func ldb_remIndex(writer levelDBWriter, ref objRef, target LogeKey) {
+ var key = encodeIndexKey(writer, ref, target)
+ writer.Delete(key)
+}
+
func ldb_find(store *levelDBStore, readOptions *levigo.ReadOptions,
- typ *logeType, linkName string, target LogeKey, from LogeKey, limit int) ResultSet {
+ ref objRef, target LogeKey, from LogeKey, limit int) ResultSet {
if limit == 0 {
return &levelDBResultSet {
closed: true,
}
}
- var vt = store.types.Type(typ.Name)
- var linkInfo = typ.Links[linkName]
+ var vt = store.types.Type(ref.TypeName)
+ var linkInfo = store.logeTypes[ref.TypeName].Links[ref.LinkName]
var prefix = append(
encodeTaggedKey([]uint16{ldb_INDEX_TAG, vt.Tag, linkInfo.Tag}, string(target)),
@@ -465,58 +487,6 @@ func ldb_find(store *levelDBStore, readOptions *levigo.ReadOptions,
}
}
-func ldb_store(writer levelDBWriter, obj *logeObject) error {
- var vt = writer.GetType(obj.Type.Name)
- var key = vt.EncodeKey(string(obj.Key))
-
- if !obj.Current.hasValue() {
- writer.Delete(key)
- return nil
- }
-
- var val, err = vt.EncodeObj(obj.Current.Object)
-
- if err != nil {
- panic(fmt.Sprintf("Encoding error: %v\n", err))
- }
-
- writer.Put(key, val)
-
- return nil
-}
-
-func ldb_storeLinks(writer levelDBWriter, linkObj *logeObject) error {
- var set = linkObj.Current.Object.(*linkSet)
-
- if len(set.Added) == 0 && len(set.Removed) == 0 {
- return nil
- }
-
- var vt = writer.GetType(linkObj.Type.Name)
- var linkInfo = linkObj.Type.Links[linkObj.LinkName]
-
- var key = encodeTaggedKey([]uint16{ldb_LINK_TAG, vt.Tag, linkInfo.Tag}, string(linkObj.Key))
- val, _ := spack.EncodeToBytes(set.ReadKeys(), linkSpec)
-
- writer.Put(key, val)
-
- var prefix = encodeTaggedKey([]uint16{ldb_INDEX_TAG, vt.Tag, linkInfo.Tag}, "")
- var source = string(linkObj.Key)
-
- for _, target := range set.Removed {
- var key = encodeIndexKey(prefix, target, source)
- writer.Delete(key)
- }
-
- for _, target := range set.Added {
- var key = encodeIndexKey(prefix, target, source)
- writer.Put(key, []byte{})
- }
-
-
- return nil
-}
-
// -----------------------------------------------
// Prefix iterator
// -----------------------------------------------
View
79 src/loge/objects.go
@@ -1,7 +1,10 @@
package loge
import (
+ "fmt"
"reflect"
+
+ "github.com/brendonh/spack"
)
type logeObject struct {
@@ -17,8 +20,7 @@ type logeObject struct {
type objectVersion struct {
LogeObj *logeObject
- Object interface{}
- Dirty bool
+ Blob []byte
snapshotID uint64
Previous *objectVersion
}
@@ -35,7 +37,14 @@ func initializeObject(db *LogeDB, t *logeType, key LogeKey) *logeObject {
}
}
-func (obj *logeObject) getTransactionVersion(sID uint64) *objectVersion {
+func (obj *logeObject) makeObjRef() objRef {
+ if obj.LinkName != "" {
+ return makeLinkRef(obj.Type.Name, obj.LinkName, obj.Key)
+ }
+ return makeObjRef(obj.Type.Name, obj.Key)
+}
+
+func (obj *logeObject) getVersion(sID uint64) *objectVersion {
var version = obj.Current
for version.snapshotID > sID {
version = version.Previous
@@ -46,39 +55,61 @@ func (obj *logeObject) getTransactionVersion(sID uint64) *objectVersion {
return version
}
-func (obj *logeObject) newVersion(sID uint64) *objectVersion {
- var current = obj.getTransactionVersion(sID)
-
- var newObj = obj.Type.Copy(current.Object)
+func (obj *logeObject) applyVersion(object interface{}, context storeContext, sID uint64) {
+ var blob = obj.encode(object)
- return &objectVersion{
+ obj.Current = &objectVersion{
LogeObj: obj,
- Object: newObj,
- Dirty: true,
+ Blob: blob,
Previous: obj.Current,
- snapshotID: current.snapshotID,
+ snapshotID: sID,
}
-}
-
-func (obj *logeObject) applyVersion(version *objectVersion, context storeContext, sID uint64) {
- version.snapshotID = sID
- obj.Current = version
obj.Loaded = true
+ var ref = obj.makeObjRef()
+ context.store(ref, blob)
+
+ if obj.LinkName != "" {
+ var links = object.(*linkSet)
+ for _, target := range links.Removed {
+ context.remIndex(ref, LogeKey(target))
+ }
+ for _, target := range links.Added {
+ context.addIndex(ref, LogeKey(target))
+ }
+ }
+}
+
+func (obj *logeObject) decode(blob []byte) interface{} {
+ var object interface{}
if obj.LinkName == "" {
- context.store(obj)
+ object = obj.Type.Decode(blob)
} else {
- context.storeLinks(obj)
+ var links linkList
+ spack.DecodeFromBytes(&links, obj.DB.linkTypeSpec, blob)
+ object = &linkSet{ Original: links }
}
+ return object
+}
- version.Dirty = false
- if obj.LinkName != "" {
- version.Object.(*linkSet).Freeze()
+func (obj *logeObject) encode(object interface{}) []byte {
+ if !obj.hasValue(object) {
+ return nil
+ }
+
+ if obj.LinkName == "" {
+ return obj.Type.Encode(object)
+ }
+
+ var set = object.(*linkSet)
+ enc, err := spack.EncodeToBytes(set.ReadKeys(), obj.DB.linkTypeSpec)
+ if err != nil {
+ panic(fmt.Sprintf("Link encode error: %v\n", err))
}
+ return enc
}
-func (version *objectVersion) hasValue() bool {
- var value = reflect.ValueOf(version.Object)
- return !value.IsNil()
+func (obj *logeObject) hasValue(object interface{}) bool {
+ return !reflect.ValueOf(object).IsNil()
}
View
26 src/loge/objref.go
@@ -0,0 +1,26 @@
+package loge
+
+type objRef struct {
+ TypeName string
+ Key LogeKey
+ LinkName string
+ CacheKey string
+}
+
+func makeObjRef(typeName string, key LogeKey) objRef {
+ var cacheKey = typeName + "^" + string(key)
+ return objRef { typeName, key, "", cacheKey }
+}
+
+func makeLinkRef(typeName string, linkName string, key LogeKey) objRef {
+ var cacheKey = "^" + typeName + "^" + linkName + "^" + string(key)
+ return objRef { typeName, key, linkName, cacheKey }
+}
+
+func (objRef objRef) String() string {
+ return objRef.CacheKey
+}
+
+func (objRef objRef) IsLink() bool {
+ return objRef.LinkName != ""
+}
View
4 src/loge/oneshot_test.go
@@ -53,7 +53,7 @@ func TestOneshotOps(test *testing.T) {
}
var wibLinks = db.ReadLinksOne("test", "other", "wib")
- if !reflect.DeepEqual(wibLinks, []string{}) {
- test.Errorf("Wrong one-shot links: %v", wibLinks)
+ if len(wibLinks) != 0 {
+ test.Errorf("Wrong one-shot links: %#v", wibLinks)
}
}
View
128 src/loge/storage.go
@@ -1,10 +1,14 @@
package loge
+import (
+ "github.com/brendonh/spack"
+)
type LogeStore interface {
storeContext
close()
registerType(*logeType)
+ getSpackType(name string) *spack.VersionedType
newContext() transactionContext
}
@@ -16,13 +20,14 @@ type ResultSet interface {
}
type storeContext interface {
- get(*logeType, LogeKey) interface{}
- getLinks(*logeType, string, LogeKey) []string
- store(*logeObject) error
- storeLinks(*logeObject) error
+ get(objRef) []byte
+ store(objRef, []byte) error
- find(*logeType, string, LogeKey) ResultSet
- findFrom(*logeType, string, LogeKey, LogeKey, int) ResultSet
+ addIndex(objRef, LogeKey)
+ remIndex(objRef, LogeKey)
+
+ find(objRef, LogeKey) ResultSet
+ findSlice(objRef, LogeKey, LogeKey, int) ResultSet
}
type transactionContext interface {
@@ -31,12 +36,13 @@ type transactionContext interface {
rollback()
}
-type objectMap map[string]map[LogeKey]interface{}
+type objectMap map[string][]byte
type memStore struct {
objects objectMap
lock spinLock
+ spackTypes *spack.TypeSet
}
type memContext struct {
@@ -45,126 +51,95 @@ type memContext struct {
}
type memWriteEntry struct {
- TypeKey string
- ObjKey LogeKey
- Value interface{}
+ CacheKey string
+ Value []byte
}
func NewMemStore() LogeStore {
return &memStore{
objects: make(objectMap),
+ spackTypes: spack.NewTypeSet(),
}
}
func (store *memStore) close() {
}
func (store *memStore) registerType(typ *logeType) {
- store.objects[typ.Name] = make(map[LogeKey]interface{})
- for linkName := range typ.Links {
- var lk = memStoreLinkKey(typ.Name, linkName)
- store.objects[lk] = make(map[LogeKey]interface{})
- }
+ store.spackTypes.RegisterType(typ.Name)
+}
+
+func (store *memStore) getSpackType(name string) *spack.VersionedType {
+ return store.spackTypes.RegisterType(name)
}
-func (store *memStore) get(t *logeType, key LogeKey) interface{} {
- var objMap = store.objects[t.Name]
- object, ok := objMap[key]
+func (store *memStore) get(ref objRef) []byte {
+ enc, ok := store.objects[ref.CacheKey]
if !ok {
return nil
}
- return object
+ return enc
}
-func (store *memStore) getLinks(typ *logeType, linkName string, key LogeKey) []string {
- var lk = memStoreLinkKey(typ.Name, linkName)
- links, ok := store.objects[lk][key]
- if ok {
- return links.(linkList)
- }
+func (store *memStore) addIndex(ref objRef, key LogeKey) {
+}
- return linkList{}
+func (store *memStore) remIndex(ref objRef, key LogeKey) {
}
-func (store *memStore) find(typ *logeType, linkName string, key LogeKey) ResultSet {
+func (store *memStore) find(ref objRef, key LogeKey) ResultSet {
// Until I can be bothered
panic("Find not implemented on memstore")
}
-func (store *memStore) findFrom(typ *logeType, linkName string, key LogeKey, from LogeKey, limit int) ResultSet {
+func (store *memStore) findSlice(ref objRef, key LogeKey, from LogeKey, limit int) ResultSet {
// Until I can be bothered
panic("Find not implemented on memstore")
}
-func (store *memStore) store(obj *logeObject) error {
- obj.Lock.SpinLock()
- defer obj.Lock.Unlock()
- if !obj.Current.hasValue() {
- delete(store.objects[obj.Type.Name], obj.Key)
+func (store *memStore) store(ref objRef, enc []byte) error {
+ store.lock.SpinLock()
+ defer store.lock.Unlock()
+ if len(enc) == 0 {
+ delete(store.objects, ref.CacheKey)
} else {
- store.objects[obj.Type.Name][obj.Key] = obj.Current.Object
+ store.objects[ref.CacheKey] = enc
}
return nil
}
-func (store *memStore) storeLinks(obj *logeObject) error {
- obj.Lock.SpinLock()
- defer obj.Lock.Unlock()
- var typeKey = memStoreLinkKey(obj.Type.Name, obj.LinkName)
- var val = linkList(obj.Current.Object.(*linkSet).ReadKeys())
- store.objects[typeKey][obj.Key] = val
- return nil
-}
-
func (store *memStore) newContext() transactionContext {
return &memContext{
mstore: store,
}
}
-
-func (context *memContext) get(t *logeType, key LogeKey) interface{} {
- return context.mstore.get(t, key)
-}
-
-func (context *memContext) getLinks(t *logeType, linkName string, key LogeKey) []string {
- return context.mstore.getLinks(t, linkName, key)
+func (context *memContext) get(ref objRef) []byte {
+ return context.mstore.get(ref)
}
-func (context *memContext) store(obj *logeObject) error {
- var val interface{}
- if !obj.Current.hasValue() {
- val = nil
- } else {
- val = obj.Current.Object
- }
+func (context *memContext) store(ref objRef, enc []byte) error {
context.writes = append(
context.writes,
memWriteEntry{
- TypeKey: obj.Type.Name,
- ObjKey: obj.Key,
- Value: val,
+ CacheKey: ref.CacheKey,
+ Value: enc,
})
return nil
}
-func (context *memContext) storeLinks(obj *logeObject) error {
- context.writes = append(
- context.writes,
- memWriteEntry{
- TypeKey: memStoreLinkKey(obj.Type.Name, obj.LinkName),
- ObjKey: obj.Key,
- Value: linkList(obj.Current.Object.(*linkSet).ReadKeys()),
- })
- return nil
+
+func (context *memContext) addIndex(ref objRef, key LogeKey) {
}
-func (context *memContext) find(typ *logeType, linkName string, key LogeKey) ResultSet {
+func (context *memContext) remIndex(ref objRef, key LogeKey) {
+}
+func (context *memContext) find(ref objRef, key LogeKey) ResultSet {
// Until I can be bothered
panic("Find not implemented on memstore")
}
-func (context *memContext) findFrom(typ *logeType, linkName string, key LogeKey, from LogeKey, limit int) ResultSet {
+func (context *memContext) findSlice(ref objRef, key LogeKey, from LogeKey, limit int) ResultSet {
// Until I can be bothered
panic("Find not implemented on memstore")
}
@@ -174,15 +149,14 @@ func (context *memContext) commit() error {
store.lock.SpinLock()
defer store.lock.Unlock()
for _, entry := range context.writes {
- store.objects[entry.TypeKey][entry.ObjKey] = entry.Value
+ if len(entry.Value) == 0 {
+ delete(store.objects, entry.CacheKey)
+ } else {
+ store.objects[entry.CacheKey] = entry.Value
+ }
}
return nil
}
func (context *memContext) rollback() {
-}
-
-
-func memStoreLinkKey(typeName string, linkName string) string {
- return "^" + typeName + "^" + linkName
}
View
81 src/loge/transactions.go
@@ -17,10 +17,17 @@ const (
)
+type liveVersion struct {
+ version *objectVersion
+ object interface{}
+ dirty bool
+}
+
+
type Transaction struct {
db *LogeDB
context transactionContext
- versions map[string]*objectVersion
+ versions map[string]*liveVersion
state TransactionState
snapshotID uint64
}
@@ -29,7 +36,7 @@ func NewTransaction(db *LogeDB, sID uint64) *Transaction {
return &Transaction{
db: db,
context: db.store.newContext(),
- versions: make(map[string]*objectVersion),
+ versions: make(map[string]*liveVersion),
state: ACTIVE,
snapshotID: sID,
}
@@ -45,30 +52,30 @@ func (t *Transaction) GetState() TransactionState {
}
func (t *Transaction) Exists(typeName string, key LogeKey) bool {
- var version = t.getObj(makeObjRef(typeName, key), false, true)
- return version.hasValue()
+ var lv = t.getVersion(makeObjRef(typeName, key), false, true)
+ return lv.version.LogeObj.hasValue(lv.object)
}
func (t *Transaction) Read(typeName string, key LogeKey) interface{} {
- return t.getObj(makeObjRef(typeName, key), false, true).Object
+ return t.getVersion(makeObjRef(typeName, key), false, true).object
}
func (t *Transaction) Write(typeName string, key LogeKey) interface{} {
- return t.getObj(makeObjRef(typeName, key), true, true).Object
+ return t.getVersion(makeObjRef(typeName, key), true, true).object
}
func (t *Transaction) Set(typeName string, key LogeKey, obj interface{}) {
- var version = t.getObj(makeObjRef(typeName, key), true, false)
- version.Object = obj
+ var version = t.getVersion(makeObjRef(typeName, key), true, false)
+ version.object = obj
}
func (t *Transaction) Delete(typeName string, key LogeKey) {
- var version = t.getObj(makeObjRef(typeName, key), true, true)
- version.Object = version.LogeObj.Type.NilValue()
+ var version = t.getVersion(makeObjRef(typeName, key), true, true)
+ version.object = version.version.LogeObj.Type.NilValue()
}
@@ -98,40 +105,37 @@ func (t *Transaction) SetLinks(typeName string, linkName string, key LogeKey, ta
}
func (t *Transaction) Find(typeName string, linkName string, target LogeKey) ResultSet {
- return t.context.find(t.db.types[typeName], linkName, target)
+ return t.context.find(makeLinkRef(typeName, linkName, ""), target)
}
-func (t *Transaction) FindFrom(typeName string, linkName string, target LogeKey, from LogeKey, limit int) ResultSet {
- return t.context.findFrom(t.db.types[typeName], linkName, target, from, limit)
+func (t *Transaction) FindSlice(typeName string, linkName string, target LogeKey, from LogeKey, limit int) ResultSet {
+ return t.context.findSlice(makeLinkRef(typeName, linkName, ""), target, from, limit)
}
// -----------------------------------------------
// Internals
// -----------------------------------------------
func (t *Transaction) getLink(ref objRef, forWrite bool, load bool) *linkSet {
- var version = t.getObj(ref, forWrite, load)
- return version.Object.(*linkSet)
+ var version = t.getVersion(ref, forWrite, load)
+ return version.object.(*linkSet)
}
-func (t *Transaction) getObj(ref objRef, forWrite bool, load bool) *objectVersion {
+func (t *Transaction) getVersion(ref objRef, forWrite bool, load bool) *liveVersion {
if t.state != ACTIVE {
panic(fmt.Sprintf("GetObj from inactive transaction %s\n", t))
}
- var objKey = ref.String()
+ var objKey = ref.CacheKey
- version, ok := t.versions[objKey]
+ lv, ok := t.versions[objKey]
if ok {
if forWrite {
- if !version.Dirty {
- version = version.LogeObj.newVersion(t.snapshotID)
- t.versions[objKey] = version
- }
+ lv.dirty = true
}
- return version
+ return lv
}
var logeObj = t.db.ensureObj(ref, load)
@@ -141,15 +145,19 @@ func (t *Transaction) getObj(ref objRef, forWrite bool, load bool) *objectVersio
logeObj.RefCount++
- if forWrite {
- version = logeObj.newVersion(t.snapshotID)
- } else {
- version = logeObj.getTransactionVersion(t.snapshotID)
- }
+ var version *objectVersion
+ version = logeObj.getVersion(t.snapshotID)
+
+ var object = logeObj.decode(version.Blob)
- t.versions[objKey] = version
+ lv = &liveVersion{
+ version: version,
+ object: object,
+ dirty: forWrite,
+ }
- return version
+ t.versions[objKey] = lv
+ return lv
}
@@ -177,8 +185,8 @@ func (t *Transaction) Commit() bool {
}
func (t *Transaction) tryCommit() bool {
- for _, version := range t.versions {
- var obj = version.LogeObj
+ for _, lv := range t.versions {
+ var obj = lv.version.LogeObj
if !obj.Lock.TryLock() {
return false
@@ -194,11 +202,12 @@ func (t *Transaction) tryCommit() bool {
var context = t.context
var sID = t.db.newSnapshotID()
- for _, version := range t.versions {
- if version.Dirty {
- version.LogeObj.applyVersion(version, context, sID)
+ for _, lv := range t.versions {
+ if lv.dirty {
+ var obj = lv.version.LogeObj
+ obj.applyVersion(lv.object, context, sID)
}
- version.LogeObj.RefCount--
+ lv.version.LogeObj.RefCount--
}
var err = context.commit()
View
1 src/loge/typedefs.go
@@ -2,4 +2,3 @@ package loge
type LogeKey string
-
View
78 src/loge/types.go
@@ -2,59 +2,61 @@ package loge
import (
"reflect"
+ "fmt"
+
+ "github.com/brendonh/spack"
)
+var linkSpec *spack.TypeSpec = spack.MakeTypeSpec([]string{})
+var linkInfoSpec *spack.TypeSpec = spack.MakeTypeSpec(linkInfo{})
+
type logeType struct {
Name string
Version uint16
Exemplar interface{}
+ SpackType *spack.VersionedType
Links map[string]*linkInfo
}
+func NewType(name string, version uint16, exemplar interface{}, linkSpec LinkSpec, spackType *spack.VersionedType) *logeType {
+ var infos = make(map[string]*linkInfo)
+ for k, v := range linkSpec {
+ infos[k] = &linkInfo{
+ Name: k,
+ Target: v,
+ Tag: 0,
+ }
+ }
+
+ return &logeType {
+ Name: name,
+ Version: version,
+ Exemplar: exemplar,
+ SpackType: spackType,
+ Links: infos,
+ }
+}
func (t *logeType) NilValue() interface{} {
return reflect.Zero(reflect.TypeOf(t.Exemplar)).Interface()
}
-
-// XXX TODO: Do this via the store instead, and just re-decode spack objects for consistency
-func (t *logeType) Copy(object interface{}) interface{} {
- var value = reflect.ValueOf(object)
-
- if value.Kind() != reflect.Ptr || reflect.Indirect(value).Kind() != reflect.Struct {
- return object
+func (t *logeType) Decode(enc []byte) interface{} {
+ if len(enc) == 0 {
+ return t.NilValue()
}
-
- if !value.IsValid() || value.IsNil() {
- return object
+
+ obj, err := t.SpackType.DecodeObj(enc)
+ if err != nil {
+ panic(fmt.Sprintf("Decode error: %v", err))
}
+ return obj
+}
- var orig = value.Elem()
- var val = reflect.New(orig.Type()).Elem()
- val.Set(orig)
-
- var vt = val.Type()
- for i := 0; i < val.NumField(); i++ {
-
- var field = val.Field(i)
- var ft = vt.Field(i)
-
- switch field.Kind() {
- case reflect.Array,
- reflect.Slice:
-
- switch ft.Tag.Get("loge") {
- case "copy":
- var newField = reflect.New(field.Type()).Elem()
- newField = reflect.AppendSlice(newField, field)
- field.Set(newField)
- case "keep":
- // Do nothing
- default:
- field.Set(reflect.New(field.Type()).Elem())
- }
- }
+func (t *logeType) Encode(obj interface{}) []byte {
+ enc, err := t.SpackType.EncodeObj(obj)
+ if err != nil {
+ panic(fmt.Sprintf("Encode error: %v", err))
}
-
- return val.Addr().Interface()
-}
+ return enc
+}
View
8 src/loge/update_test.go
@@ -34,7 +34,7 @@ func TestReadWrite(test *testing.T) {
db.Transact(func (t *Transaction) {
var one = t.Read("test", "one").(*TestObj)
- if one.Name != "One" {
+ if one == nil || one.Name != "One" {
test.Error("ReadWrite failed")
}
}, 0)
@@ -67,9 +67,9 @@ func TestReadScoping(test *testing.T) {
test.Errorf("Version visible in transaction created before update")
}
- if db.ReadOne("test", "one").(*TestObj).Name != "Two" {
- test.Errorf("Dirty read got wrong version")
- }
+ // if db.ReadOne("test", "one").(*TestObj).Name != "Two" {
+ // test.Errorf("Dirty read got wrong version")
+ // }
}
func TestUpdateScoping(test *testing.T) {
View
8 src/logetest/bench.go
@@ -16,10 +16,10 @@ func WriteBench() {
runtime.GOMAXPROCS(cores)
DoWrite(cores, 0)
- DoWrite(cores, 1)
- DoWrite(cores, 2)
- DoWrite(cores, 3)
- DoWrite(cores, 4)
+ // DoWrite(cores, 1)
+ // DoWrite(cores, 2)
+ // DoWrite(cores, 3)
+ // DoWrite(cores, 4)
}
View
4 src/logetest/main.go
@@ -14,7 +14,7 @@ type Pet struct {
func main() {
//LinkBench()
//LinkSandbox()
- //WriteBench()
+ WriteBench()
//Sandbox()
- Example()
+ //Example()
}

0 comments on commit 1cfa311

Please sign in to comment.