Skip to content
This repository has been archived by the owner on Mar 9, 2019. It is now read-only.

Commit

Permalink
Merge pull request #3 from benbjohnson/spill
Browse files Browse the repository at this point in the history
Spill to dirty pages, write to disk
  • Loading branch information
benbjohnson committed Jan 31, 2014
2 parents d087fb4 + 26f6fef commit d05191d
Show file tree
Hide file tree
Showing 11 changed files with 387 additions and 75 deletions.
18 changes: 13 additions & 5 deletions branch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

// branch represents a temporary in-memory branch page.
type branch struct {
pgid pgid
depth int
parent *branch
items branchItems
}
Expand Down Expand Up @@ -42,11 +44,11 @@ func (b *branch) put(id pgid, newid pgid, key []byte, replace bool) {
}

// read initializes the item data from an on-disk page.
func (b *branch) read(page *page) {
ncount := int(page.count)
b.items = make(branchItems, ncount)
bnodes := (*[maxNodesPerPage]bnode)(unsafe.Pointer(&page.ptr))
for i := 0; i < ncount; i++ {
func (b *branch) read(p *page) {
b.pgid = p.id
b.items = make(branchItems, int(p.count))
bnodes := (*[maxNodesPerPage]bnode)(unsafe.Pointer(&p.ptr))
for i := 0; i < int(p.count); i++ {
bnode := &bnodes[i]
item := &b.items[i]
item.pgid = bnode.pgid
Expand Down Expand Up @@ -109,6 +111,12 @@ func (b *branch) split(pageSize int) []*branch {
return branches
}

type branches []*branch

func (s branches) Len() int { return len(s) }
func (s branches) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s branches) Less(i, j int) bool { return s[i].depth < s[j].depth }

type branchItems []branchItem

type branchItem struct {
Expand Down
2 changes: 2 additions & 0 deletions bucket.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package bolt

const MaxBucketNameSize = 255

type Bucket struct {
*bucket
name string
Expand Down
14 changes: 9 additions & 5 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ func (db *DB) mmap() error {
} else if int(info.Size()) < db.pageSize*2 {
return &Error{"file size too small", err}
}
size := int(info.Size())

// TEMP(benbjohnson): Set max size to 1MB.
size := 2 << 20

// Memory-map the data file as a byte slice.
if db.data, err = db.syscall.Mmap(int(db.file.Fd()), 0, size, syscall.PROT_READ, syscall.MAP_SHARED); err != nil {
Expand Down Expand Up @@ -159,7 +161,9 @@ func (db *DB) init() error {
m.pageSize = uint32(db.pageSize)
m.version = Version
m.free = 2
m.sys.root = 3
m.sys = 3
m.pgid = 4
m.txnid = txnid(i)
}

// Write an empty freelist at page 3.
Expand All @@ -171,7 +175,7 @@ func (db *DB) init() error {
// Write an empty leaf page at page 4.
p = db.pageInBuffer(buf[:], pgid(3))
p.id = pgid(3)
p.flags = p_leaf
p.flags = p_sys
p.count = 0

// Write the buffer to our data file.
Expand Down Expand Up @@ -206,7 +210,7 @@ func (db *DB) Transaction() (*Transaction, error) {

// Create a transaction associated with the database.
t := &Transaction{}
t.init(db, db.meta())
t.init(db)

return t, nil
}
Expand All @@ -230,7 +234,7 @@ func (db *DB) RWTransaction() (*RWTransaction, error) {
branches: make(map[pgid]*branch),
leafs: make(map[pgid]*leaf),
}
t.init(db, db.meta())
t.init(db)

return t, nil
}
Expand Down
7 changes: 4 additions & 3 deletions leaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

// leaf represents an in-memory, deserialized leaf page.
type leaf struct {
pgid pgid
parent *branch
items leafItems
}
Expand Down Expand Up @@ -39,10 +40,10 @@ func (l *leaf) put(key []byte, value []byte) {

// read initializes the item data from an on-disk page.
func (l *leaf) read(p *page) {
ncount := int(p.count)
l.items = make(leafItems, ncount)
l.pgid = p.id
l.items = make(leafItems, int(p.count))
lnodes := (*[maxNodesPerPage]lnode)(unsafe.Pointer(&p.ptr))
for i := 0; i < ncount; i++ {
for i := 0; i < int(p.count); i++ {
lnode := &lnodes[i]
item := &l.items[i]
item.key = lnode.key()
Expand Down
14 changes: 13 additions & 1 deletion meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ type meta struct {
pageSize uint32
pgid pgid
free pgid
sys pgid
txnid txnid
sys bucket
}

// validate checks the marker bytes and version of the meta page to ensure it matches this binary.
Expand All @@ -30,8 +30,20 @@ func (m *meta) validate() error {

// copy copies one meta object to another.
func (m *meta) copy(dest *meta) {
dest.magic = m.magic
dest.version = m.version
dest.pageSize = m.pageSize
dest.pgid = m.pgid
dest.free = m.free
dest.txnid = m.txnid
dest.sys = m.sys
}

// write writes the meta onto a page.
func (m *meta) write(p *page) {
// Page id is either going to be 0 or 1 which we can determine by the Txn ID.
p.id = pgid(m.txnid % 2)
p.flags |= p_meta

m.copy(p.meta())
}
9 changes: 8 additions & 1 deletion page.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ const (
p_branch = 0x01
p_leaf = 0x02
p_meta = 0x04
p_freelist = 0x08
p_sys = 0x08
p_freelist = 0x10
)

type pgid uint64
Expand Down Expand Up @@ -56,3 +57,9 @@ func (p *page) bnodes() []bnode {
func (p *page) freelist() []pgid {
return ((*[maxNodesPerPage]pgid)(unsafe.Pointer(&p.ptr)))[0:p.count]
}

type pages []*page

func (s pages) Len() int { return len(s) }
func (s pages) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s pages) Less(i, j int) bool { return s[i].id < s[j].id }
167 changes: 148 additions & 19 deletions rwtransaction.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bolt

import (
"sort"
"unsafe"
)

Expand All @@ -12,31 +13,43 @@ type RWTransaction struct {
leafs map[pgid]*leaf
}

// init initializes the transaction.
func (t *RWTransaction) init(db *DB) {
t.Transaction.init(db)
t.pages = make(map[pgid]*page)

// Copy the meta and increase the transaction id.
t.meta = &meta{}
db.meta().copy(t.meta)
t.meta.txnid += txnid(2)
}

// CreateBucket creates a new bucket.
func (t *RWTransaction) CreateBucket(name string) error {
// Check if bucket already exists.
if b := t.Bucket(name); b != nil {
return &Error{"bucket already exists", nil}
} else if len(name) == 0 {
return &Error{"bucket name cannot be blank", nil}
} else if len(name) > MaxBucketNameSize {
return &Error{"bucket name too large", nil}
}

// Create a new bucket entry.
var buf [unsafe.Sizeof(bucket{})]byte
var raw = (*bucket)(unsafe.Pointer(&buf[0]))
raw.root = 0
// Create a blank root leaf page.
p := t.allocate(1)
p.flags = p_leaf

// Move cursor to insertion location.
c := t.sys.Cursor()
c.Goto([]byte(name))

// Load the leaf node from the cursor and insert the key/value.
t.leaf(c).put([]byte(name), buf[:])
// Add bucket to system page.
t.sys.put(name, &bucket{root: p.id})

return nil
}

// DropBucket deletes a bucket.
func (t *RWTransaction) DeleteBucket(b *Bucket) error {
// TODO: Remove from main DB.
func (t *RWTransaction) DeleteBucket(name string) error {
// Remove from system page.
t.sys.del(name)

// TODO: Delete entry from system bucket.
// TODO: Free all pages.
// TODO: Remove cursor.
Expand Down Expand Up @@ -74,13 +87,31 @@ func (t *RWTransaction) Delete(key []byte) error {
return nil
}

// Commit writes all changes to disk.
func (t *RWTransaction) Commit() error {
// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.

// TODO: Flush data.
// TODO: Rebalance.

// Spill data onto dirty pages.
t.spill()

// TODO: Update meta.
// TODO: Write meta.
// Spill system page.
p := t.allocate((t.sys.size() / t.db.pageSize) + 1)
t.sys.write(p)

// Write dirty pages to disk.
if err := t.write(); err != nil {
return err
}

// Update the meta.
t.meta.sys = p.id

// Write meta to disk.
if err := t.writeMeta(); err != nil {
return err
}

return nil
}
Expand All @@ -99,10 +130,107 @@ func (t *RWTransaction) close() error {
}

// allocate returns a contiguous block of memory starting at a given page.
func (t *RWTransaction) allocate(size int) (*page, error) {
// TODO: Find a continuous block of free pages.
// TODO: If no free pages are available, resize the mmap to allocate more.
return nil, nil
func (t *RWTransaction) allocate(count int) *page {
// TODO(benbjohnson): Use pages from the freelist.

// Allocate a set of contiguous pages from the end of the file.
buf := make([]byte, count*t.db.pageSize)
p := (*page)(unsafe.Pointer(&buf[0]))
p.id = t.meta.pgid
p.overflow = uint32(count - 1)

// Increment the last page id.
t.meta.pgid += pgid(count)

// Save it in our page cache.
t.pages[p.id] = p

return p
}

// spill writes all the leafs and branches to dirty pages.
func (t *RWTransaction) spill() {
// Spill leafs first.
for _, l := range t.leafs {
t.spillLeaf(l)
}

// Sort branches by highest depth first.
branches := make(branches, 0, len(t.branches))
for _, b := range t.branches {
branches = append(branches, b)
}
sort.Sort(branches)

// Spill branches by deepest first.
for _, b := range branches {
t.spillBranch(b)
}
}

// spillLeaf writes a leaf to one or more dirty pages.
func (t *RWTransaction) spillLeaf(l *leaf) {
parent := l.parent

// Split leaf, if necessary.
leafs := l.split(t.db.pageSize)

// TODO: If this is a root leaf and we split then add a parent branch.

// Process each resulting leaf.
previd := leafs[0].pgid
for index, l := range leafs {
// Allocate contiguous space for the leaf.
p := t.allocate((l.size() / t.db.pageSize) + 1)

// Write the leaf to the page.
l.write(p)

// Insert or replace the node in the parent branch with the new pgid.
if parent != nil {
parent.put(previd, p.id, l.items[0].key, (index == 0))
previd = l.pgid
}
}
}

// spillBranch writes a branch to one or more dirty pages.
func (t *RWTransaction) spillBranch(l *branch) {
warn("[pending] RWTransaction.spillBranch()") // TODO
}

// write writes any dirty pages to disk.
func (t *RWTransaction) write() error {
// TODO(benbjohnson): If our last page id is greater than the mmap size then lock the DB and resize.

// Sort pages by id.
pages := make(pages, 0, len(t.pages))
for _, p := range t.pages {
pages = append(pages, p)
}
sort.Sort(pages)

// Write pages to disk in order.
for _, p := range pages {
size := (int(p.overflow) + 1) * t.db.pageSize
buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size]
t.db.file.WriteAt(buf, int64(p.id)*int64(t.db.pageSize))
}

return nil
}

// writeMeta writes the meta to the disk.
func (t *RWTransaction) writeMeta() error {
// Create a temporary buffer for the meta page.
buf := make([]byte, t.db.pageSize)
p := t.db.pageInBuffer(buf, 0)
t.meta.write(p)

// Write the meta page to file.
t.db.metafile.WriteAt(buf, int64(p.id)*int64(t.db.pageSize))

return nil
}

// leaf retrieves a leaf object based on the current position of a cursor.
Expand Down Expand Up @@ -141,6 +269,7 @@ func (t *RWTransaction) branch(stack []elem) *branch {
// Otherwise create a branch and cache it.
b := &branch{}
b.read(t.page(id))
b.depth = len(stack) - 1
b.parent = t.branch(stack[:len(stack)-1])
t.branches[id] = b

Expand Down
Loading

0 comments on commit d05191d

Please sign in to comment.