Skip to content
This repository has been archived by the owner on Feb 11, 2021. It is now read-only.

Commit

Permalink
Merge pull request goraft#150 from xiangli-cmu/sync_fix
Browse files Browse the repository at this point in the history
Fix file issue
  • Loading branch information
xiang90 committed Jan 15, 2014
2 parents d08ec1d + 8ca39ae commit 2c0c8f0
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 11 deletions.
28 changes: 18 additions & 10 deletions log.go
Expand Up @@ -206,6 +206,11 @@ func (l *Log) close() {
l.entries = make([]*LogEntry, 0)
}

// sync to disk
func (l *Log) sync() error {
return l.file.Sync()
}

//--------------------------------------
// Entries
//--------------------------------------
Expand Down Expand Up @@ -477,7 +482,7 @@ func (l *Log) appendEntries(entries []*LogEntry) error {
startPosition += size
}
w.Flush()
err = l.file.Sync()
err = l.sync()

if err != nil {
panic(err)
Expand Down Expand Up @@ -573,7 +578,8 @@ func (l *Log) compact(index uint64, term uint64) error {
}

// create a new log file and add all the entries
file, err := os.OpenFile(l.path+".new", os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
new_file_path := l.path + ".new"
file, err := os.OpenFile(new_file_path, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
if err != nil {
return err
}
Expand All @@ -582,25 +588,27 @@ func (l *Log) compact(index uint64, term uint64) error {
entry.Position = position

if _, err = entry.encode(file); err != nil {
file.Close()
os.Remove(new_file_path)
return err
}
}
// close the current log file
l.file.Close()
file.Sync()

// remove the current log file to .bak
err = os.Remove(l.path)
if err != nil {
return err
}
old_file := l.file

// rename the new log file
err = os.Rename(l.path+".new", l.path)
err = os.Rename(new_file_path, l.path)
if err != nil {
file.Close()
os.Remove(new_file_path)
return err
}
l.file = file

// close the old log file
old_file.Close()

// compaction the in memory log
l.entries = entries
l.startIndex = index
Expand Down
4 changes: 3 additions & 1 deletion server.go
Expand Up @@ -960,6 +960,8 @@ func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
committedIndex := s.log.commitIndex

if commitIndex > committedIndex {
// leader needs to do a fsync before committing log entries
s.log.sync()
s.log.setCommitIndex(commitIndex)
s.debugln("commit index ", commitIndex)
}
Expand Down Expand Up @@ -1329,7 +1331,7 @@ func (s *server) writeConf() {
confPath := path.Join(s.path, "conf")
tmpConfPath := path.Join(s.path, "conf.tmp")

err := ioutil.WriteFile(tmpConfPath, b, 0600)
err := writeFileSynced(tmpConfPath, b, 0600)

if err != nil {
panic(err)
Expand Down
31 changes: 31 additions & 0 deletions util.go
@@ -0,0 +1,31 @@
package raft

import (
"io"
"os"
)

// WriteFile writes data to a file named by filename.
// If the file does not exist, WriteFile creates it with permissions perm;
// otherwise WriteFile truncates it before writing.
// This is copied from ioutil.WriteFile with the addition of a Sync call to
// ensure the data reaches the disk.
func writeFileSynced(filename string, data []byte, perm os.FileMode) error {
f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm)
if err != nil {
return err
}

n, err := f.Write(data)
if n < len(data) {
f.Close()
return io.ErrShortWrite
}

err = f.Sync()
if err != nil {
return err
}

return f.Close()
}

0 comments on commit 2c0c8f0

Please sign in to comment.