Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Slightly dubious transaction isoliation for Find(), via LevelDB snaps…

…hots
  • Loading branch information...
commit 464a294bedd095c8e5c3a341e32f3d67d408cbc8 1 parent 35c2304
Brendon Hogger authored
115 src/loge/leveldb.go
@@ -44,6 +44,8 @@ type levelDBResultSet struct {
44 44
45 45 type levelDBContext struct {
46 46 ldbStore *levelDBStore
  47 + snapshot *levigo.Snapshot
  48 + readOptions *levigo.ReadOptions
47 49 batch []levelDBWriteEntry
48 50 result chan error
49 51 }
@@ -54,8 +56,8 @@ type levelDBWriteEntry struct {
54 56 Delete bool
55 57 }
56 58
57   -var writeOptions = levigo.NewWriteOptions()
58   -var readOptions = levigo.NewReadOptions()
  59 +var defaultWriteOptions = levigo.NewWriteOptions()
  60 +var defaultReadOptions = levigo.NewReadOptions()
59 61
60 62 func NewLevelDBStore(basePath string) LogeStore {
61 63
@@ -111,7 +113,7 @@ func (store *levelDBStore) registerType(typ *logeType) {
111 113 panic(fmt.Sprintf("Error encoding type %s: %v", vt.Name, err))
112 114 }
113 115
114   - err = store.db.Put(writeOptions, keyVal, typeVal)
  116 + err = store.db.Put(defaultWriteOptions, keyVal, typeVal)
115 117
116 118 if err != nil {
117 119 panic(fmt.Sprintf("Couldn't write type metadata: %v\n", err))
@@ -125,11 +127,11 @@ func (store *levelDBStore) GetType(typeName string) *spack.VersionedType {
125 127 }
126 128
127 129 func (store *levelDBStore) Put(key []byte, val []byte) error {
128   - return store.db.Put(writeOptions, key, val)
  130 + return store.db.Put(defaultWriteOptions, key, val)
129 131 }
130 132
131 133 func (store *levelDBStore) Delete(key []byte) error {
132   - return store.db.Delete(writeOptions, key)
  134 + return store.db.Delete(defaultWriteOptions, key)
133 135 }
134 136
135 137
@@ -137,7 +139,7 @@ func (store *levelDBStore) get(typ *logeType, key LogeKey) interface{} {
137 139 var vt = store.types.Type(typ.Name)
138 140 var encKey = vt.EncodeKey(string(key))
139 141
140   - val, err := store.db.Get(readOptions, encKey)
  142 + val, err := store.db.Get(defaultReadOptions, encKey)
141 143
142 144 if err != nil {
143 145 panic(fmt.Sprintf("Read error: %v\n", err))
@@ -166,7 +168,7 @@ func (store *levelDBStore) getLinks(typ *logeType, linkName string, objKey LogeK
166 168
167 169 var key = encodeTaggedKey([]uint16{ldb_LINK_TAG, vt.Tag, linkInfo.Tag}, string(objKey))
168 170
169   - val, err := store.db.Get(readOptions, key)
  171 + val, err := store.db.Get(defaultReadOptions, key)
170 172
171 173 if err != nil {
172 174 panic(fmt.Sprintf("Read error: %v\n", err))
@@ -194,32 +196,8 @@ func (store *levelDBStore) storeLinks(obj *logeObject) error {
194 196 // Search
195 197 // -----------------------------------------------
196 198
197   -func (store *levelDBStore) find(typ *logeType, linkName string, target LogeKey) ResultSet {
198   - var vt = store.types.Type(typ.Name)
199   - var linkInfo = typ.Links[linkName]
200   -
201   - var prefix = append(
202   - encodeTaggedKey([]uint16{ldb_INDEX_TAG, vt.Tag, linkInfo.Tag}, string(target)),
203   - 0)
204   -
205   - var it = store.iteratePrefix(prefix)
206   - if !it.Valid() {
207   - it.Close()
208   - return &levelDBResultSet {
209   - closed: true,
210   - }
211   - }
212   -
213   - var prefixLen = len(prefix)
214   -
215   - var next = string(it.Key()[prefixLen:])
216   -
217   - return &levelDBResultSet{
218   - it: it,
219   - prefixLen: prefixLen,
220   - next: next,
221   - closed: false,
222   - }
  199 +func (store *levelDBStore) find(typ *logeType, linkName string, key LogeKey) ResultSet {
  200 + return ldb_find(store, defaultReadOptions, typ, linkName, key)
223 201 }
224 202
225 203 func (rs *levelDBResultSet) Valid() bool {
@@ -240,6 +218,14 @@ func (rs *levelDBResultSet) Next() LogeKey {
240 218 return LogeKey(next)
241 219 }
242 220
  221 +func (rs *levelDBResultSet) All() []LogeKey {
  222 + var keys = make([]LogeKey, 0)
  223 + for rs.Valid() {
  224 + keys = append(keys, rs.Next())
  225 + }
  226 + return keys
  227 +}
  228 +
243 229 func (rs *levelDBResultSet) Close() {
244 230 rs.it.Close()
245 231 rs.closed = true
@@ -247,12 +233,17 @@ func (rs *levelDBResultSet) Close() {
247 233
248 234
249 235 // -----------------------------------------------
250   -// Write Batches
  236 +// Transaction Contexts
251 237 // -----------------------------------------------
252 238
253 239 func (store *levelDBStore) newContext() transactionContext {
  240 + var snapshot = store.db.NewSnapshot()
  241 + var options = levigo.NewReadOptions()
  242 + options.SetSnapshot(snapshot)
254 243 return &levelDBContext{
255 244 ldbStore: store,
  245 + readOptions: options,
  246 + snapshot: snapshot,
256 247 batch: make([]levelDBWriteEntry, 0),
257 248 result: make(chan error),
258 249 }
@@ -287,7 +278,17 @@ func (context *levelDBContext) Delete(key []byte) error {
287 278
288 279 func (context *levelDBContext) commit() error {
289 280 context.ldbStore.writeQueue <- context
290   - return <-context.result
  281 + var err = <-context.result
  282 + context.cleanup()
  283 + return err
  284 +}
  285 +
  286 +func (context *levelDBContext) rollback() {
  287 + context.cleanup()
  288 +}
  289 +
  290 +func (context *levelDBContext) cleanup() {
  291 + context.ldbStore.db.ReleaseSnapshot(context.snapshot)
291 292 }
292 293
293 294 func (context *levelDBContext) Write() error {
@@ -301,7 +302,7 @@ func (context *levelDBContext) Write() error {
301 302 }
302 303 }
303 304
304   - return context.ldbStore.db.Write(writeOptions, wb)
  305 + return context.ldbStore.db.Write(defaultWriteOptions, wb)
305 306 }
306 307
307 308 func (context *levelDBContext) store(obj *logeObject) error {
@@ -313,7 +314,7 @@ func (context *levelDBContext) storeLinks(obj *logeObject) error {
313 314 }
314 315
315 316 func (context *levelDBContext) find(typ *logeType, linkName string, target LogeKey) ResultSet {
316   - return context.ldbStore.find(typ, linkName, target)
  317 + return ldb_find(context.ldbStore, context.readOptions, typ, linkName, target)
317 318 }
318 319
319 320 func (context *levelDBContext) get(typ *logeType, key LogeKey) interface{} {
@@ -331,7 +332,7 @@ func (context *levelDBContext) getLinks(typ *logeType, linkName string, key Loge
331 332 func (store *levelDBStore) loadTypeMetadata() {
332 333 var typeType = store.types.Type("_type")
333 334 var tag = typeType.EncodeTag()
334   - var it = store.iteratePrefix(tag)
  335 + var it = store.iteratePrefix(tag, defaultReadOptions)
335 336 defer it.Close()
336 337
337 338 for it = it; it.Valid(); it.Next() {
@@ -347,7 +348,7 @@ func (store *levelDBStore) loadTypeMetadata() {
347 348
348 349 func (store *levelDBStore) tagVersions(vt *spack.VersionedType, typ *logeType) {
349 350 var prefix = encodeTaggedKey([]uint16{ldb_LINK_INFO_TAG, vt.Tag}, "")
350   - var it = store.iteratePrefix(prefix)
  351 + var it = store.iteratePrefix(prefix, defaultReadOptions)
351 352 defer it.Close()
352 353
353 354 for it = it; it.Valid(); it.Next() {
@@ -375,7 +376,7 @@ func (store *levelDBStore) tagVersions(vt *spack.VersionedType, typ *logeType) {
375 376 var key = encodeTaggedKey([]uint16{ldb_LINK_INFO_TAG, vt.Tag}, info.Name)
376 377 enc, _ := spack.EncodeToBytes(info, linkInfoSpec)
377 378 fmt.Printf("Updating link: %s::%s (%d)\n", typ.Name, info.Name, info.Tag)
378   - var err = store.db.Put(writeOptions, key, enc)
  379 + var err = store.db.Put(defaultWriteOptions, key, enc)
379 380 if err != nil {
380 381 panic(fmt.Sprintf("Write error: %v\n", err))
381 382 }
@@ -411,6 +412,38 @@ func encodeIndexKey(prefix []byte, target string, source string) []byte {
411 412 // Levigo interaction
412 413 // -----------------------------------------------
413 414
  415 +func ldb_find(store *levelDBStore, readOptions *levigo.ReadOptions,
  416 + typ *logeType, linkName string, target LogeKey) ResultSet {
  417 +
  418 + fmt.Printf("Find options %v\n", readOptions)
  419 +
  420 + var vt = store.types.Type(typ.Name)
  421 + var linkInfo = typ.Links[linkName]
  422 +
  423 + var prefix = append(
  424 + encodeTaggedKey([]uint16{ldb_INDEX_TAG, vt.Tag, linkInfo.Tag}, string(target)),
  425 + 0)
  426 +
  427 + var it = store.iteratePrefix(prefix, readOptions)
  428 + if !it.Valid() {
  429 + it.Close()
  430 + return &levelDBResultSet {
  431 + closed: true,
  432 + }
  433 + }
  434 +
  435 + var prefixLen = len(prefix)
  436 +
  437 + var next = string(it.Key()[prefixLen:])
  438 +
  439 + return &levelDBResultSet{
  440 + it: it,
  441 + prefixLen: prefixLen,
  442 + next: next,
  443 + closed: false,
  444 + }
  445 +}
  446 +
414 447 func ldb_store(writer levelDBWriter, obj *logeObject) error {
415 448 var vt = writer.GetType(obj.Type.Name)
416 449 var key = vt.EncodeKey(string(obj.Key))
@@ -473,7 +506,7 @@ type prefixIterator struct {
473 506 Finished bool
474 507 }
475 508
476   -func (store *levelDBStore) iteratePrefix(prefix []byte) *prefixIterator {
  509 +func (store *levelDBStore) iteratePrefix(prefix []byte, readOptions *levigo.ReadOptions) *prefixIterator {
477 510 var it = store.db.NewIterator(readOptions)
478 511 it.Seek(prefix)
479 512
5 src/loge/storage.go
@@ -9,6 +9,7 @@ type LogeStore interface {
9 9 }
10 10
11 11 type ResultSet interface {
  12 + All() []LogeKey
12 13 Next() LogeKey
13 14 Valid() bool
14 15 Close()
@@ -25,6 +26,7 @@ type storeContext interface {
25 26 type transactionContext interface {
26 27 storeContext
27 28 commit() error
  29 + rollback()
28 30 }
29 31
30 32 type objectMap map[string]map[LogeKey]interface{}
@@ -165,6 +167,9 @@ func (context *memContext) commit() error {
165 167 return nil
166 168 }
167 169
  170 +func (context *memContext) rollback() {
  171 +}
  172 +
168 173
169 174 func memStoreLinkKey(typeName string, linkName string) string {
170 175 return "^" + typeName + "^" + linkName
13 src/loge/transactions.go
@@ -19,6 +19,7 @@ const (
19 19
20 20 type Transaction struct {
21 21 db *LogeDB
  22 + context transactionContext
22 23 versions map[string]*objectVersion
23 24 state TransactionState
24 25 }
@@ -26,6 +27,7 @@ type Transaction struct {
26 27 func NewTransaction(db *LogeDB) *Transaction {
27 28 return &Transaction{
28 29 db: db,
  30 + context: db.store.newContext(),
29 31 versions: make(map[string]*objectVersion),
30 32 state: ACTIVE,
31 33 }
@@ -93,6 +95,10 @@ func (t *Transaction) SetLinks(typeName string, linkName string, key LogeKey, ta
93 95 t.getLink(makeLinkRef(typeName, linkName, key), true, true).Set(stringTargets)
94 96 }
95 97
  98 +func (t *Transaction) Find(typeName string, linkName string, target LogeKey) ResultSet {
  99 + return t.context.find(t.db.types[typeName], linkName, target)
  100 +}
  101 +
96 102 // -----------------------------------------------
97 103 // Internals
98 104 // -----------------------------------------------
@@ -186,15 +192,16 @@ func (t *Transaction) tryCommit() bool {
186 192 }
187 193 }
188 194
189   - var batch = t.db.store.newContext()
  195 + var context = t.context
  196 +
190 197 for _, version := range t.versions {
191 198 version.LogeObj.RefCount--
192 199 if version.Dirty {
193   - version.LogeObj.applyVersion(version, batch)
  200 + version.LogeObj.applyVersion(version, context)
194 201 }
195 202 }
196 203
197   - var err = batch.commit()
  204 + var err = context.commit()
198 205 if err != nil {
199 206 t.state = ERROR
200 207 fmt.Printf("Commit error: %v\n", err)
24 src/logetest/bench.go
@@ -11,16 +11,23 @@ const TOTAL = 1000000
11 11 const BATCH_SIZE = 10000
12 12
13 13 func WriteBench() {
14   - var db = loge.NewLogeDB(loge.NewLevelDBStore("data/bench"))
15   -
16   - defer db.Close()
17   -
18   - db.CreateType("person", 1, &Person{}, nil)
19   -
20 14 var cores = runtime.NumCPU()
21 15 fmt.Printf("Using %d cores\n", cores)
22 16 runtime.GOMAXPROCS(cores)
23 17
  18 + DoWrite(cores, 0)
  19 + DoWrite(cores, 1)
  20 + DoWrite(cores, 2)
  21 + DoWrite(cores, 3)
  22 + DoWrite(cores, 4)
  23 +
  24 +}
  25 +
  26 +func DoWrite(cores int, idx int) {
  27 + var db = loge.NewLogeDB(loge.NewLevelDBStore(fmt.Sprintf("data/bench%d", idx)))
  28 + defer db.Close()
  29 + db.CreateType("person", 1, &Person{}, nil)
  30 +
24 31 var startTime = time.Now()
25 32
26 33 var tokens = make(chan bool, cores)
@@ -42,8 +49,11 @@ func WriteBench() {
42 49 }
43 50
44 51 fmt.Printf("Done in %v\n", time.Since(startTime))
45   -}
  52 + db.FlushCache()
  53 +
  54 + fmt.Printf("Flushed\n")
46 55
  56 +}
47 57
48 58 func WritePeopleBatch(db *loge.LogeDB, start int, end int, tokens chan bool) {
49 59 db.Transact(func(t *loge.Transaction) {
4 src/logetest/main.go
@@ -13,8 +13,8 @@ type Pet struct {
13 13
14 14 func main() {
15 15 //LinkBench()
16   - LinkSandbox()
17   - //WriteBench()
  16 + //LinkSandbox()
  17 + WriteBench()
18 18 //Sandbox()
19 19 //Example()
20 20 }
40 src/logetest/sandbox.go
@@ -8,28 +8,20 @@ import (
8 8 func Sandbox() {
9 9 var db = loge.NewLogeDB(loge.NewLevelDBStore("data/sandbox"))
10 10
11   - db.CreateType("person", 1, &Person{}, nil)
12   -
13   - db.CreateType("pet", 1, &Pet{}, loge.LinkSpec{
14   - "owner": "person",
15   - "friend": "pet",
16   - })
17   -
18   - db.Transact(func(trans *loge.Transaction) {
19   - var prev = trans.Read("person", "brendon").(*Person)
20   -
21   - fmt.Printf("Previous: %v\n", prev)
22   -
23   - var brend = Person{
24   - Name: "Brendon",
25   - Age: 32,
26   - Bits: []uint16{1,4,3},
27   - }
28   -
29   - trans.Set("person", "brendon", &brend)
30   - }, 0)
31   -
32   - db.Transact(func(trans *loge.Transaction) {
33   - trans.Delete("person", "brendon")
34   - }, 0)
  11 + db.CreateType("person", 1, &Person{}, loge.LinkSpec{ "friend": "person" })
  12 +
  13 + var trans1 = db.CreateTransaction()
  14 + var trans2 = db.CreateTransaction()
  15 +
  16 + trans1.Set("person", "Brendon", &Person{ "Brendon", 31, nil })
  17 + trans1.Set("person", "Mike", &Person{ "Mike", 38, nil })
  18 + trans1.SetLinks("person", "friend", "Brendon", []loge.LogeKey{ "Mike" })
  19 +
  20 + fmt.Printf("%v\n", trans1.Find("person", "friend", "Mike").All())
  21 +
  22 + trans1.Commit()
  23 +
  24 + fmt.Printf("%v\n", trans2.Find("person", "friend", "Mike").All())
  25 +
  26 + fmt.Printf("%v\n", db.Find("person", "friend", "Mike").All())
35 27 }

0 comments on commit 464a294

Please sign in to comment.
Something went wrong with that request. Please try again.