Skip to content
This repository has been archived by the owner on Mar 9, 2019. It is now read-only.

Spill to dirty pages, write to disk #3

Merged
merged 1 commit into from
Jan 31, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions branch.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

// branch represents a temporary in-memory branch page.
type branch struct {
pgid pgid
depth int
parent *branch
items branchItems
}
Expand Down Expand Up @@ -42,11 +44,11 @@ func (b *branch) put(id pgid, newid pgid, key []byte, replace bool) {
}

// read initializes the item data from an on-disk page.
func (b *branch) read(page *page) {
ncount := int(page.count)
b.items = make(branchItems, ncount)
bnodes := (*[maxNodesPerPage]bnode)(unsafe.Pointer(&page.ptr))
for i := 0; i < ncount; i++ {
func (b *branch) read(p *page) {
b.pgid = p.id
b.items = make(branchItems, int(p.count))
bnodes := (*[maxNodesPerPage]bnode)(unsafe.Pointer(&p.ptr))
for i := 0; i < int(p.count); i++ {
bnode := &bnodes[i]
item := &b.items[i]
item.pgid = bnode.pgid
Expand Down Expand Up @@ -109,6 +111,12 @@ func (b *branch) split(pageSize int) []*branch {
return branches
}

type branches []*branch

func (s branches) Len() int { return len(s) }
func (s branches) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s branches) Less(i, j int) bool { return s[i].depth < s[j].depth }

type branchItems []branchItem

type branchItem struct {
Expand Down
2 changes: 2 additions & 0 deletions bucket.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package bolt

const MaxBucketNameSize = 255

type Bucket struct {
*bucket
name string
Expand Down
14 changes: 9 additions & 5 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,9 @@ func (db *DB) mmap() error {
} else if int(info.Size()) < db.pageSize*2 {
return &Error{"file size too small", err}
}
size := int(info.Size())

// TEMP(benbjohnson): Set max size to 1MB.
size := 2 << 20

// Memory-map the data file as a byte slice.
if db.data, err = db.syscall.Mmap(int(db.file.Fd()), 0, size, syscall.PROT_READ, syscall.MAP_SHARED); err != nil {
Expand Down Expand Up @@ -159,7 +161,9 @@ func (db *DB) init() error {
m.pageSize = uint32(db.pageSize)
m.version = Version
m.free = 2
m.sys.root = 3
m.sys = 3
m.pgid = 4
m.txnid = txnid(i)
}

// Write an empty freelist at page 3.
Expand All @@ -171,7 +175,7 @@ func (db *DB) init() error {
// Write an empty leaf page at page 4.
p = db.pageInBuffer(buf[:], pgid(3))
p.id = pgid(3)
p.flags = p_leaf
p.flags = p_sys
p.count = 0

// Write the buffer to our data file.
Expand Down Expand Up @@ -206,7 +210,7 @@ func (db *DB) Transaction() (*Transaction, error) {

// Create a transaction associated with the database.
t := &Transaction{}
t.init(db, db.meta())
t.init(db)

return t, nil
}
Expand All @@ -230,7 +234,7 @@ func (db *DB) RWTransaction() (*RWTransaction, error) {
branches: make(map[pgid]*branch),
leafs: make(map[pgid]*leaf),
}
t.init(db, db.meta())
t.init(db)

return t, nil
}
Expand Down
7 changes: 4 additions & 3 deletions leaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

// leaf represents an in-memory, deserialized leaf page.
type leaf struct {
pgid pgid
parent *branch
items leafItems
}
Expand Down Expand Up @@ -39,10 +40,10 @@ func (l *leaf) put(key []byte, value []byte) {

// read initializes the item data from an on-disk page.
func (l *leaf) read(p *page) {
ncount := int(p.count)
l.items = make(leafItems, ncount)
l.pgid = p.id
l.items = make(leafItems, int(p.count))
lnodes := (*[maxNodesPerPage]lnode)(unsafe.Pointer(&p.ptr))
for i := 0; i < ncount; i++ {
for i := 0; i < int(p.count); i++ {
lnode := &lnodes[i]
item := &l.items[i]
item.key = lnode.key()
Expand Down
14 changes: 13 additions & 1 deletion meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ type meta struct {
pageSize uint32
pgid pgid
free pgid
sys pgid
txnid txnid
sys bucket
}

// validate checks the marker bytes and version of the meta page to ensure it matches this binary.
Expand All @@ -30,8 +30,20 @@ func (m *meta) validate() error {

// copy copies one meta object to another.
func (m *meta) copy(dest *meta) {
dest.magic = m.magic
dest.version = m.version
dest.pageSize = m.pageSize
dest.pgid = m.pgid
dest.free = m.free
dest.txnid = m.txnid
dest.sys = m.sys
}

// write writes the meta onto a page.
func (m *meta) write(p *page) {
// Page id is either going to be 0 or 1 which we can determine by the Txn ID.
p.id = pgid(m.txnid % 2)
p.flags |= p_meta

m.copy(p.meta())
}
9 changes: 8 additions & 1 deletion page.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ const (
p_branch = 0x01
p_leaf = 0x02
p_meta = 0x04
p_freelist = 0x08
p_sys = 0x08
p_freelist = 0x10
)

type pgid uint64
Expand Down Expand Up @@ -56,3 +57,9 @@ func (p *page) bnodes() []bnode {
func (p *page) freelist() []pgid {
return ((*[maxNodesPerPage]pgid)(unsafe.Pointer(&p.ptr)))[0:p.count]
}

type pages []*page

func (s pages) Len() int { return len(s) }
func (s pages) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s pages) Less(i, j int) bool { return s[i].id < s[j].id }
167 changes: 148 additions & 19 deletions rwtransaction.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bolt

import (
"sort"
"unsafe"
)

Expand All @@ -12,31 +13,43 @@ type RWTransaction struct {
leafs map[pgid]*leaf
}

// init initializes the transaction.
func (t *RWTransaction) init(db *DB) {
t.Transaction.init(db)
t.pages = make(map[pgid]*page)

// Copy the meta and increase the transaction id.
t.meta = &meta{}
db.meta().copy(t.meta)
t.meta.txnid += txnid(2)
}

// CreateBucket creates a new bucket.
func (t *RWTransaction) CreateBucket(name string) error {
// Check if bucket already exists.
if b := t.Bucket(name); b != nil {
return &Error{"bucket already exists", nil}
} else if len(name) == 0 {
return &Error{"bucket name cannot be blank", nil}
} else if len(name) > MaxBucketNameSize {
return &Error{"bucket name too large", nil}
}

// Create a new bucket entry.
var buf [unsafe.Sizeof(bucket{})]byte
var raw = (*bucket)(unsafe.Pointer(&buf[0]))
raw.root = 0
// Create a blank root leaf page.
p := t.allocate(1)
p.flags = p_leaf

// Move cursor to insertion location.
c := t.sys.Cursor()
c.Goto([]byte(name))

// Load the leaf node from the cursor and insert the key/value.
t.leaf(c).put([]byte(name), buf[:])
// Add bucket to system page.
t.sys.put(name, &bucket{root: p.id})

return nil
}

// DropBucket deletes a bucket.
func (t *RWTransaction) DeleteBucket(b *Bucket) error {
// TODO: Remove from main DB.
func (t *RWTransaction) DeleteBucket(name string) error {
// Remove from system page.
t.sys.del(name)

// TODO: Delete entry from system bucket.
// TODO: Free all pages.
// TODO: Remove cursor.
Expand Down Expand Up @@ -74,13 +87,31 @@ func (t *RWTransaction) Delete(key []byte) error {
return nil
}

// Commit writes all changes to disk.
func (t *RWTransaction) Commit() error {
// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.

// TODO: Flush data.
// TODO: Rebalance.

// Spill data onto dirty pages.
t.spill()

// TODO: Update meta.
// TODO: Write meta.
// Spill system page.
p := t.allocate((t.sys.size() / t.db.pageSize) + 1)
t.sys.write(p)

// Write dirty pages to disk.
if err := t.write(); err != nil {
return err
}

// Update the meta.
t.meta.sys = p.id

// Write meta to disk.
if err := t.writeMeta(); err != nil {
return err
}

return nil
}
Expand All @@ -99,10 +130,107 @@ func (t *RWTransaction) close() error {
}

// allocate returns a contiguous block of memory starting at a given page.
func (t *RWTransaction) allocate(size int) (*page, error) {
// TODO: Find a continuous block of free pages.
// TODO: If no free pages are available, resize the mmap to allocate more.
return nil, nil
func (t *RWTransaction) allocate(count int) *page {
// TODO(benbjohnson): Use pages from the freelist.

// Allocate a set of contiguous pages from the end of the file.
buf := make([]byte, count*t.db.pageSize)
p := (*page)(unsafe.Pointer(&buf[0]))
p.id = t.meta.pgid
p.overflow = uint32(count - 1)

// Increment the last page id.
t.meta.pgid += pgid(count)

// Save it in our page cache.
t.pages[p.id] = p

return p
}

// spill writes all the leafs and branches to dirty pages.
func (t *RWTransaction) spill() {
// Spill leafs first.
for _, l := range t.leafs {
t.spillLeaf(l)
}

// Sort branches by highest depth first.
branches := make(branches, 0, len(t.branches))
for _, b := range t.branches {
branches = append(branches, b)
}
sort.Sort(branches)

// Spill branches by deepest first.
for _, b := range branches {
t.spillBranch(b)
}
}

// spillLeaf writes a leaf to one or more dirty pages.
func (t *RWTransaction) spillLeaf(l *leaf) {
parent := l.parent

// Split leaf, if necessary.
leafs := l.split(t.db.pageSize)

// TODO: If this is a root leaf and we split then add a parent branch.

// Process each resulting leaf.
previd := leafs[0].pgid
for index, l := range leafs {
// Allocate contiguous space for the leaf.
p := t.allocate((l.size() / t.db.pageSize) + 1)

// Write the leaf to the page.
l.write(p)

// Insert or replace the node in the parent branch with the new pgid.
if parent != nil {
parent.put(previd, p.id, l.items[0].key, (index == 0))
previd = l.pgid
}
}
}

// spillBranch writes a branch to one or more dirty pages.
func (t *RWTransaction) spillBranch(l *branch) {
warn("[pending] RWTransaction.spillBranch()") // TODO
}

// write writes any dirty pages to disk.
func (t *RWTransaction) write() error {
// TODO(benbjohnson): If our last page id is greater than the mmap size then lock the DB and resize.

// Sort pages by id.
pages := make(pages, 0, len(t.pages))
for _, p := range t.pages {
pages = append(pages, p)
}
sort.Sort(pages)

// Write pages to disk in order.
for _, p := range pages {
size := (int(p.overflow) + 1) * t.db.pageSize
buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size]
t.db.file.WriteAt(buf, int64(p.id)*int64(t.db.pageSize))
}

return nil
}

// writeMeta writes the meta to the disk.
func (t *RWTransaction) writeMeta() error {
// Create a temporary buffer for the meta page.
buf := make([]byte, t.db.pageSize)
p := t.db.pageInBuffer(buf, 0)
t.meta.write(p)

// Write the meta page to file.
t.db.metafile.WriteAt(buf, int64(p.id)*int64(t.db.pageSize))

return nil
}

// leaf retrieves a leaf object based on the current position of a cursor.
Expand Down Expand Up @@ -141,6 +269,7 @@ func (t *RWTransaction) branch(stack []elem) *branch {
// Otherwise create a branch and cache it.
b := &branch{}
b.read(t.page(id))
b.depth = len(stack) - 1
b.parent = t.branch(stack[:len(stack)-1])
t.branches[id] = b

Expand Down
Loading