Skip to content

Question: Insert on Duplicate Transaction #1289

@swdee

Description

@swdee

Go version: 1.13.9
Badger version: 2.0.3

Having used RDBMS for many years I am trying to implement Insert on Duplicate functionality. My first implementation was to create a Badger transaction then check if the key exists and if it doesn't insert the key/value. If the key already exists then the transaction is aborted and a duplicate error is returned.

The code for this program;

package main

import (
	"fmt"
	"github.com/dgraph-io/badger/v2"
	"github.com/dgraph-io/badger/v2/options"
	"log"
	"sync"
)

type Repo struct {
	DB *badger.DB
	sync.Mutex
}

type entry struct {
	key int
	val string
}

const (
	duplicate    = true
	notDuplicate = false
)

func main() {

	opts := badger.DefaultOptions("").
		WithInMemory(true).
		WithCompression(options.Snappy)

	db, err := badger.Open(opts)

	if err != nil {
		log.Fatal("Error connecting to DB", err)
	}
	defer db.Close()

	repo := &Repo{DB: db}

	wg := sync.WaitGroup{}

	total := 16
	wg.Add(total)

	for i := 0; i < total; i++ {
		go func(gi int) {
			defer wg.Done()

			has, err := repo.insert(22, "cat")

			if err != nil {
				log.Fatal("insert error:", err)
			}

			if !has {
				fmt.Println("gi=", gi, "NOT DUPLICATE")
			} else {
				fmt.Println("gi=", gi, "was duplicate")
			}
		}(i)
	}

	wg.Wait()
	fmt.Println("done")
}

func (r *Repo) insert(key int, val string) (bool, error) {

	//r.Lock()
	//defer r.Unlock()

	tx := r.DB.NewTransaction(true)
	defer tx.Discard()

	// check if key exists before inserting
	entries, err := r.get(tx, key)

	if err != nil {
		return notDuplicate, err
	}

	if len(entries) > 0 {
		return duplicate, nil
	}

	// no key exists so insert entry
	err = tx.Set(primaryKey(key), []byte(val))

	if err != nil {
		return notDuplicate, err
	}

	return notDuplicate, tx.Commit()
}

func (r *Repo) get(tx *badger.Txn, key int) ([]entry, error) {

	it := tx.NewIterator(badger.DefaultIteratorOptions)
	defer it.Close()

	data := make([]entry, 0)

	prefix := primaryKey(key)

	for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
		item := it.Item()

		val, err := item.ValueCopy(nil)

		if err != nil {
			return nil, err
		}

		data = append(data, entry{key, string(val)})
	}

	return data, nil
}

func primaryKey(key int) []byte {
	return []byte(fmt.Sprintf("store:%d", key))
}

When running this program multiple times, occasionally we get instances where it reports successful insertion multiple times, instead of the expected once.

$ go run dbinsert-collision.go 
gi= 0 NOT DUPLICATE
gi= 15 NOT DUPLICATE
gi= 4 was duplicate
gi= 13 was duplicate
gi= 14 was duplicate
gi= 2 was duplicate
gi= 10 was duplicate
gi= 11 was duplicate
gi= 5 was duplicate
gi= 3 was duplicate
gi= 12 was duplicate
gi= 8 was duplicate
gi= 9 was duplicate
gi= 1 was duplicate
gi= 7 was duplicate
gi= 6 was duplicate
done
badger 2020/04/06 13:49:15 DEBUG: Storing value log head: {Fid:0 Len:0 Offset:0}
badger 2020/04/06 13:49:15 INFO: Got compaction priority: {level:0 score:1.73 dropPrefix:[]}
badger 2020/04/06 13:49:15 INFO: Running for level: 0
badger 2020/04/06 13:49:15 DEBUG: LOG Compact. Added 2 keys. Skipped 1 keys. Iteration took: 2.543024ms
badger 2020/04/06 13:49:15 DEBUG: Discard stats: map[]
badger 2020/04/06 13:49:15 INFO: LOG Compact 0->1, del 1 tables, add 1 tables, took 3.336583ms
badger 2020/04/06 13:49:15 INFO: Compaction for level: 0 DONE
badger 2020/04/06 13:49:15 INFO: Force compaction on level 0 done

$ go run dbinsert-collision.go 
gi= 1 NOT DUPLICATE
gi= 8 was duplicate
gi= 0 was duplicate
gi= 7 was duplicate
gi= 6 was duplicate
gi= 15 was duplicate
gi= 10 was duplicate
gi= 2 was duplicate
gi= 11 was duplicate
gi= 12 was duplicate
gi= 13 was duplicate
gi= 14 was duplicate
gi= 3 was duplicate
gi= 5 was duplicate
gi= 4 was duplicate
gi= 9 was duplicate
done
badger 2020/04/06 13:49:18 DEBUG: Storing value log head: {Fid:0 Len:0 Offset:0}
badger 2020/04/06 13:49:18 INFO: Got compaction priority: {level:0 score:1.73 dropPrefix:[]}
badger 2020/04/06 13:49:18 INFO: Running for level: 0
badger 2020/04/06 13:49:18 DEBUG: LOG Compact. Added 2 keys. Skipped 0 keys. Iteration took: 46.107076ms
badger 2020/04/06 13:49:18 DEBUG: Discard stats: map[]
badger 2020/04/06 13:49:18 INFO: LOG Compact 0->1, del 1 tables, add 1 tables, took 47.327832ms
badger 2020/04/06 13:49:18 INFO: Compaction for level: 0 DONE
badger 2020/04/06 13:49:18 INFO: Force compaction on level 0 done

$ go run dbinsert-collision.go 
gi= 15 NOT DUPLICATE
gi= 0 NOT DUPLICATE
gi= 8 was duplicate
gi= 1 was duplicate
gi= 14 was duplicate
gi= 3 was duplicate
gi= 6 was duplicate
gi= 5 was duplicate
gi= 9 was duplicate
gi= 7 was duplicate
gi= 12 was duplicate
gi= 13 was duplicate
gi= 10 was duplicate
gi= 11 was duplicate
gi= 4 was duplicate
gi= 2 was duplicate
done
badger 2020/04/06 13:49:20 DEBUG: Storing value log head: {Fid:0 Len:0 Offset:0}
badger 2020/04/06 13:49:20 INFO: Got compaction priority: {level:0 score:1.73 dropPrefix:[]}
badger 2020/04/06 13:49:20 INFO: Running for level: 0
badger 2020/04/06 13:49:20 DEBUG: LOG Compact. Added 2 keys. Skipped 1 keys. Iteration took: 2.167897ms
badger 2020/04/06 13:49:20 DEBUG: Discard stats: map[]
badger 2020/04/06 13:49:20 INFO: LOG Compact 0->1, del 1 tables, add 1 tables, took 3.182679ms
badger 2020/04/06 13:49:20 INFO: Compaction for level: 0 DONE
badger 2020/04/06 13:49:20 INFO: Force compaction on level 0 done

If we uncomment the two mutex lock lines in the insert() function, this puts a global lock on the Repo which ensures only one entry is inserted. However why does a Badger Transaction not perform as I am expecting with the above use case?

Metadata

Metadata

Assignees

Labels

kind/bugSomething is broken.priority/P1Serious issue that requires eventual attention (can wait a bit)status/acceptedWe accept to investigate or work on it.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions