Skip to content

Commit

Permalink
separate sqlite concerns
Browse files Browse the repository at this point in the history
  • Loading branch information
faddat committed Apr 5, 2024
1 parent e3ae6b8 commit 7eb2952
Show file tree
Hide file tree
Showing 3 changed files with 267 additions and 224 deletions.
224 changes: 0 additions & 224 deletions sqlite.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package db

import (
"bytes"
"database/sql"
"fmt"
"log"
Expand Down Expand Up @@ -207,226 +206,3 @@ func (*SQLiteDB) Compact(_, _ []byte) error {
// SQLite does not support manual compaction, so this is a no-op.
return nil
}

// ============ BATCH ===============

var _ Batch = (*sqliteBatch)(nil)

type sqliteBatch struct {
db *SQLiteDB
tx *sql.Tx
ops []operation
}

func newSQLiteBatch(db *SQLiteDB) *sqliteBatch {
return &sqliteBatch{
db: db,
ops: []operation{},
}
}

// Set implements Batch.
func (b *sqliteBatch) Set(key, value []byte) error {
if len(key) == 0 {
return errKeyEmpty
}
if value == nil {
return errValueNil
}
b.ops = append(b.ops, operation{opTypeSet, key, value})
return nil
}

// Delete implements Batch.
func (b *sqliteBatch) Delete(key []byte) error {
if len(key) == 0 {
return errKeyEmpty
}
b.ops = append(b.ops, operation{opTypeDelete, key, nil})
return nil
}

// Write implements Batch.
func (b *sqliteBatch) Write() error {
return b.write(false)
}

// WriteSync implements Batch.
func (b *sqliteBatch) WriteSync() error {
return b.write(true)
}

func (b *sqliteBatch) write(sync bool) error {
if b.tx == nil {
return fmt.Errorf("cannot write to closed batch")
}

for _, op := range b.ops {
switch op.opType {
case opTypeSet:
_, err := b.tx.Exec("INSERT OR REPLACE INTO kv (key, value) VALUES (?, ?)", op.key, op.value)
if err != nil {
return err
}
case opTypeDelete:
_, err := b.tx.Exec("DELETE FROM kv WHERE key = ?", op.key)
if err != nil {
return err
}
default:
return fmt.Errorf("unknown operation type: %v", op.opType)
}
}

// Clear the batch after writing
b.ops = nil

if sync {
return b.tx.Commit()
} else {
return nil
}
}

// Close implements Batch.
func (b *sqliteBatch) Close() error {
if b.tx != nil {
err := b.tx.Rollback()
b.tx = nil
if err != nil {
return err
}
}
b.ops = nil
return nil
}

// =========== ITERATOR ================

var _ Iterator = (*sqliteIterator)(nil)

type sqliteIterator struct {
rows *sql.Rows
start, end []byte
isReverse bool
isInvalid bool
key, value []byte
}

func newSQLiteIterator(rows *sql.Rows, start, end []byte, isReverse bool) *sqliteIterator {
itr := &sqliteIterator{
rows: rows,
start: start,
end: end,
isReverse: isReverse,
isInvalid: false,
}
if isReverse {
itr.last()
} else {
itr.first()
}
return itr
}

func (itr *sqliteIterator) Domain() ([]byte, []byte) {
return itr.start, itr.end
}

func (itr *sqliteIterator) Valid() bool {
// Once invalid, forever invalid.
if itr.isInvalid {
return false
}

// If source errors, invalid.
if err := itr.Error(); err != nil {
itr.isInvalid = true
return false
}

// If key is end or past it, invalid.
start := itr.start
end := itr.end
key := itr.key
if itr.isReverse {
if start != nil && bytes.Compare(key, start) < 0 {
itr.isInvalid = true
return false
}
} else {
if end != nil && bytes.Compare(end, key) <= 0 {
itr.isInvalid = true
return false
}
}

// Valid
return true
}

func (itr *sqliteIterator) Key() []byte {
itr.assertIsValid()
return cp(itr.key)
}

func (itr *sqliteIterator) Value() []byte {
itr.assertIsValid()
return cp(itr.value)
}

func (itr *sqliteIterator) Next() {
itr.assertIsValid()
if itr.isReverse {
itr.prev()
} else {
itr.next()
}
}

func (itr *sqliteIterator) Error() error {
return itr.rows.Err()
}

func (itr *sqliteIterator) Close() error {
return itr.rows.Close()
}

func (itr *sqliteIterator) assertIsValid() {
if !itr.Valid() {
panic("iterator is invalid")
}
}

func (itr *sqliteIterator) first() {
if itr.rows.Next() {
itr.scanRow()
} else {
itr.isInvalid = true
}
}

func (itr *sqliteIterator) last() {
for itr.rows.Next() {
itr.scanRow()
}
}

func (itr *sqliteIterator) next() {
if itr.rows.Next() {
itr.scanRow()
} else {
itr.isInvalid = true
}
}

func (itr *sqliteIterator) prev() {
itr.isInvalid = true
}

func (itr *sqliteIterator) scanRow() {
err := itr.rows.Scan(&itr.key, &itr.value)
if err != nil {
itr.isInvalid = true
}
}
129 changes: 129 additions & 0 deletions sqlite_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package db

import (
"database/sql"
"fmt"

_ "github.com/glebarez/go-sqlite"
)

// ============ BATCH ===============

var _ Batch = (*sqliteBatch)(nil)

type sqliteBatch struct {
db *SQLiteDB
tx *sql.Tx
ops []operation
}

func newSQLiteBatch(db *SQLiteDB) *sqliteBatch {
return &sqliteBatch{
db: db,
ops: []operation{},
}
}

// Set implements Batch.
func (b *sqliteBatch) Set(key, value []byte) error {
if len(key) == 0 {
return errKeyEmpty
}
if value == nil {
return errValueNil
}
b.ops = append(b.ops, operation{opTypeSet, key, value})
return nil
}

// Delete implements Batch.
func (b *sqliteBatch) Delete(key []byte) error {
if len(key) == 0 {
return errKeyEmpty
}
b.ops = append(b.ops, operation{opTypeDelete, key, nil})
return nil
}

// Write implements Batch.
func (b *sqliteBatch) Write() error {
if b.tx != nil {
return fmt.Errorf("batch already written or not properly closed")
}
tx, err := b.db.db.Begin()
if err != nil {
return err
}
b.tx = tx
err = b.write(false)
if err != nil {
_ = b.tx.Rollback()
b.tx = nil
return err
}
return b.tx.Commit()
}

// WriteSync implements Batch.
func (b *sqliteBatch) WriteSync() error {
if b.tx != nil {
return fmt.Errorf("batch already written or not properly closed")
}
tx, err := b.db.db.Begin()
if err != nil {
return err
}
b.tx = tx
err = b.write(true)
if err != nil {
_ = b.tx.Rollback()
b.tx = nil
return err
}
return b.tx.Commit()
}

func (b *sqliteBatch) write(sync bool) error {
if b.tx == nil {
return fmt.Errorf("cannot write to closed batch")
}

for _, op := range b.ops {
switch op.opType {
case opTypeSet:
_, err := b.tx.Exec("INSERT OR REPLACE INTO kv (key, value) VALUES (?, ?)", op.key, op.value)
if err != nil {
return err
}
case opTypeDelete:
_, err := b.tx.Exec("DELETE FROM kv WHERE key = ?", op.key)
if err != nil {
return err
}
default:
return fmt.Errorf("unknown operation type: %v", op.opType)
}
}

// Clear the batch after writing
b.ops = nil

if sync {
return b.tx.Commit()
} else {
return nil
}
}

// Close implements Batch.
func (b *sqliteBatch) Close() error {
if b.tx != nil {
err := b.tx.Rollback()
b.tx = nil
if err != nil {
return err
}
}
b.ops = nil
return nil
}
Loading

0 comments on commit 7eb2952

Please sign in to comment.