Skip to content
This repository has been archived by the owner on Dec 3, 2018. It is now read-only.

Commit

Permalink
fixes for review
Browse files Browse the repository at this point in the history
  • Loading branch information
Christopher Schinnerl committed Oct 23, 2017
1 parent 288d9bd commit 5e2f8b2
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 172 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
# IntelliJ IDEA
.idea

# Vim
*.swp
*.swo
13 changes: 8 additions & 5 deletions consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,24 @@ const (
)

const (
// The following enumeration defines the different possible pageStatus
// values
pageStatusInvalid = iota
pageStatusOther
pageStatusWritten
pageStatusComitted
pageStatusApplied
)

// Metadata contains the header and version of the data being stored.
type Metadata struct {
Header, Version string
}

var (
// metadata defines the WAL v1.0.0 metadata
metadata = Metadata{
Header: "WAL",
Version: "1.0",
}
)

// Metadata contains the header and version of the data being stored.
type Metadata struct {
Header, Version string
}
1 change: 1 addition & 0 deletions dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type (
}
)

// prodDependencies is a passthrough to the standard library calls
type prodDependencies struct{}

func (prodDependencies) disrupt(string) bool { return false }
Expand Down
32 changes: 17 additions & 15 deletions page.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ import (
type page struct {
// offset is the offset in the file that this page has.
offset uint64 // Is NOT marshalled to disk.

// transactionChecksum is the hash of all the pages and data in the
// committed transaction, including page status, nextPage, the payloads,
// and the transaction number. The checksum is only used internal to the
// WAL.
transactionChecksum [crypto.HashSize]byte

// pageStatus should never be set to '0' since this is the default value
// and would indicate an incorrectly initialized page
//
Expand Down Expand Up @@ -47,12 +54,6 @@ type page struct {
// it is marshalled to math.MaxUint64
transactionNumber uint64 // Gets marshalled to disk.

// transactionChecksum is the hash of all the pages and data in the
// committed transaction, including page status, nextPage, the payloads,
// and the transaction number. The checksum is only used internal to the
// WAL.
transactionChecksum [crypto.HashSize]byte // Gets marshalled to disk for the first page only.

// payload contains the marshalled update, which may be spread over multiple
// pages if it is large. If spread over multiple pages, the full payload
// can be assembled by appending the separate payloads together. To keep the
Expand All @@ -63,24 +64,25 @@ type page struct {

// Marshal marshals a page.
func (p page) Marshal() ([]byte, error) {
var nextPage uint64
if p.pageStatus == pageStatusInvalid {
panic(errors.New("Sanity check failed. Page was marshalled with invalid PageStatus"))
}

var nextPagePosition uint64
if p.nextPage != nil {
nextPage = p.nextPage.offset
nextPagePosition = p.nextPage.offset
} else {
nextPage = math.MaxUint64
nextPagePosition = math.MaxUint64
}

buffer := new(bytes.Buffer)

// write checksum, pageStatus, transactionNumber and nextPage
_, err1 := buffer.Write(p.transactionChecksum[:])
err2 := binary.Write(buffer, binary.LittleEndian, p.pageStatus)
if p.pageStatus == pageStatusInvalid {
panic(errors.New("Sanity check failed. Page was marshalled with invalid PageStatus"))
}

err3 := binary.Write(buffer, binary.LittleEndian, p.transactionNumber)
err4 := binary.Write(buffer, binary.LittleEndian, nextPage)
err4 := binary.Write(buffer, binary.LittleEndian, nextPagePosition)

// write payloadSize and payload
err5 := binary.Write(buffer, binary.LittleEndian, uint64(len(p.payload)))
Expand All @@ -100,8 +102,8 @@ func (p page) Marshal() ([]byte, error) {
return buffer.Bytes(), nil
}

// Write writes the page to disk
func (p page) Write(f file) error {
// WriteToFile writes the page to disk
func (p page) writeToFile(f file) error {
data, err := p.Marshal()
if err != nil {
return build.ExtendErr("Marshalling the page failed", err)
Expand Down
193 changes: 77 additions & 116 deletions transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,23 +74,15 @@ type Transaction struct {
commitComplete bool
releaseComplete bool

// firstPage and finalPage can be the same page. When committing the
// transaction, the first page is updated to indicate that the commitment
// is complete, and the final page is updated to have an update number (so
// that when loaded later, the pages can be returned in the correct
// order) and the checksum on the final page is added which covers all of
// the data of the whole transaction.
//
// The middle pages do not change after being written.
// firstPage is the first page of the transaction. It is the last page that
// is written when finalizing a commit and when releasing a transaction.
firstPage *page
finalPage *page

// Updates defines the set of updates that compose the transaction.
Updates []Update

// The wal that was used to create the transaction
wal *WAL

// An internal channel to signal that the transaction was initialized and
// can be committed
initComplete chan error
Expand All @@ -115,18 +107,16 @@ func (t Transaction) checksum() ([crypto.HashSize]byte, error) {
return crypto.HashBytes(data), nil
}

//commit commits a transaction by setting the correct status and checksum
// commit commits a transaction by setting the correct status and checksum
func (t *Transaction) commit(done chan error) {
// Signal completion of the commit
defer close(done)

// Make sure that the initialization of the transaction finished
select {
case err := <-t.initComplete:
if err != nil {
done <- err
return
}
err := <-t.initComplete
if err != nil {
done <- err
return
}

// set the status of the first page first
Expand All @@ -138,22 +128,20 @@ func (t *Transaction) commit(done chan error) {
// calculate the checksum and write it to the first page
checksum, err := t.checksum()
if err != nil {
// Don't return here to avoid a deadlock. Do it after the counter was increased
done <- build.ExtendErr("Unable to create checksum of transaction", err)
return
}
t.firstPage.transactionChecksum = checksum

// Finalize the commit by writing the first page with the updated status if
// there have been no errors so far.
if t.wal.deps.disrupt("CommitFail") {
// Disk failure causes the commit to fail
err = errors.New("Write failed on purpose")
}

if err == nil {
err = t.firstPage.Write(t.wal.logFile)
done <- errors.New("Write failed on purpose")
return
}

err = t.firstPage.writeToFile(t.wal.logFile)
if err != nil {
done <- build.ExtendErr("Writing the first page failed", err)
return
Expand Down Expand Up @@ -242,7 +230,7 @@ func unmarshalUpdates(data []byte) ([]Update, error) {
// transactions's updates into a payload and splits the payload equally among
// the pages. Once finished those pages are written to disk and the transaction
// is committed.
func threadedInitTransaction(t *Transaction) {
func initTransaction(t *Transaction) {
defer close(t.initComplete)

// Marshal all the updates to get their total length on disk
Expand All @@ -252,60 +240,23 @@ func threadedInitTransaction(t *Transaction) {
return
}

// Find out how many pages are needed for the update
requiredPages := uint64(float64(len(data))/float64(maxPayloadSize+8) + 1)

// Get the pages from the wal
reservedPages := t.wal.reservePages(requiredPages)

// Set the fields of each page
pages := make([]page, requiredPages)
for i := uint64(0); i < requiredPages; i++ {
// Set offset according to the index in reservedPages
pages[i].offset = reservedPages[i]

// Set nextPage if the current page isn't the last one
// otherwise let it be nil
if i+1 < requiredPages {
pages[i].nextPage = &pages[i+1]
}

// Set pageStatus of the first page to pageStatusWritten
if i == 0 {
pages[i].pageStatus = pageStatusWritten
} else {
pages[i].pageStatus = pageStatusOther
}

// Copy part of the update into the payload
payloadsize := maxPayloadSize
if len(data[i*maxPayloadSize:]) < payloadsize {
payloadsize = len(data[i*maxPayloadSize:])
}
pages[i].payload = make([]byte, payloadsize)
copy(pages[i].payload, data[i*maxPayloadSize:])
}
pages := t.wal.managedReservePages(data)

// Set the first and final page of the transaction
t.firstPage = &pages[0]
t.finalPage = &pages[len(pages)-1]

// write the pages to disk and set the pageStatus
page := t.firstPage
for page != nil {
err := page.Write(t.wal.logFile)
if err != nil {
t.initComplete <- build.ExtendErr("Couldn't write the page to file", err)
return
}
page = page.nextPage
if err := t.writeToFile(); err != nil {
t.initComplete <- build.ExtendErr("Couldn't write the page to file", err)
return
}
}

// validateChecksum checks if a transaction has been corrupted by computing a hash
// and comparing it to the one in the firstPage of the transaction
func (t Transaction) validateChecksum() error {
// Check if finalPage is set
// Check if firstPage is set
if t.firstPage == nil {
return errors.New("Couldn't verify checksum. firstPage is nil")
}
Expand All @@ -319,60 +270,41 @@ func (t Transaction) validateChecksum() error {
return nil
}

// NewTransaction creates a transaction from a set of updates
func (w *WAL) NewTransaction(updates []Update) *Transaction {
// Create new transaction
newTransaction := Transaction{
Updates: updates,
wal: w,
initComplete: make(chan error),
}

// Initialize the transaction by splitting up the payload among free pages
// and writing them to disk.
go threadedInitTransaction(&newTransaction)

return &newTransaction
}

// SignalUpdatesApplied informs the WAL that it is safe to free the used pages to reuse them in a new transaction
func (t *Transaction) SignalUpdatesApplied() <-chan error {
func (t *Transaction) SignalUpdatesApplied() error {
if !t.setupComplete || !t.commitComplete || t.releaseComplete {
panic("misuse of transaction - call each of the signaling methods exactly once, in serial, in order")
}
t.releaseComplete = true
notifyChannel := make(chan error)

go func() {
// Set the page status to applied
t.firstPage.pageStatus = pageStatusApplied

// Write the page to disk
var err error
if t.wal.deps.disrupt("ReleaseFail") {
// Disk failure causes the commit to fail
err = errors.New("Write failed on purpose")
} else {
err = t.firstPage.Write(t.wal.logFile)
}

if err != nil {
notifyChannel <- build.ExtendErr("Couldn't write the page to file", err)
return
}
t.wal.fSync()
// Update the wallets available pages
page := t.firstPage
t.wal.mu.Lock()
for page != nil {
// Append the index of the freed page
t.wal.availablePages = append(t.wal.availablePages, page.offset)
page = page.nextPage
}
t.wal.mu.Unlock()
notifyChannel <- nil
}()
return notifyChannel
// Set the page status to applied
t.firstPage.pageStatus = pageStatusApplied

// Write the page to disk
var err error
if t.wal.deps.disrupt("ReleaseFail") {
// Disk failure causes the commit to fail
err = errors.New("Write failed on purpose")
} else {
err = t.firstPage.writeToFile(t.wal.logFile)
}

if err != nil {
return build.ExtendErr("Couldn't write the page to file", err)
}

t.wal.fSync()
// Update the wallets available pages
page := t.firstPage
t.wal.mu.Lock()
for page != nil {
// Append the index of the freed page
t.wal.availablePages = append(t.wal.availablePages, page.offset)
page = page.nextPage
}
t.wal.mu.Unlock()

return nil
}

// SignalSetupComplete will signal to the WAL that any required setup has
Expand All @@ -383,10 +315,39 @@ func (t *Transaction) SignalSetupComplete() <-chan error {
panic("misuse of transaction - call each of the signaling methods exactly ones, in serial, in order")
}
t.setupComplete = true
done := make(chan error)

// Commit the transaction non-blocking
done := make(chan error)
go t.commit(done)

return done
}

// NewTransaction creates a transaction from a set of updates
func (w *WAL) NewTransaction(updates []Update) *Transaction {
// Create new transaction
newTransaction := Transaction{
Updates: updates,
wal: w,
initComplete: make(chan error),
}

// Initialize the transaction by splitting up the payload among free pages
// and writing them to disk.
go initTransaction(&newTransaction)

return &newTransaction
}

// writeToFile writes all the pages of the transaction to disk
func (t *Transaction) writeToFile() error {
page := t.firstPage
for page != nil {
err := page.writeToFile(t.wal.logFile)
if err != nil {
return err
}
page = page.nextPage
}

return nil
}
Loading

0 comments on commit 5e2f8b2

Please sign in to comment.