Skip to content

Commit

Permalink
borkbork
Browse files Browse the repository at this point in the history
  • Loading branch information
calmh committed Jul 8, 2014
1 parent dd27dac commit 3b581ac
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 17 deletions.
24 changes: 24 additions & 0 deletions files/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,30 @@ func ldbWithHave(db *leveldb.DB, repo, node []byte, since uint64, fn fileIterato
return maxTs
}

func ldbWithAll(db *leveldb.DB, repo []byte, fn func(ts uint64, node []byte, f scanner.File)) {
start := nodeKey(repo, nil, nil) // before all repo/node files
limit := nodeKey(repo, []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, []byte{0xff, 0xff, 0xff, 0xff}) // after all repo/node files
snap, err := db.GetSnapshot()
if err != nil {
panic(err)
}
defer snap.Release()
dbi := snap.NewIterator(&util.Range{Start: start, Limit: limit}, nil)
defer dbi.Release()

for dbi.Next() {
var f scanner.File
node := dbi.Key()[65 : 65+32]
bs := dbi.Value()
ts := binary.BigEndian.Uint64(bs[:8])
err := f.UnmarshalXDR(bs[8:])
if err != nil {
panic(err)
}
fn(ts, node, f)
}
}

func ldbGet(db *leveldb.DB, repo, node, file []byte) scanner.File {
nk := nodeKey(repo, node, file)
bs, err := db.Get(nk, nil)
Expand Down
9 changes: 9 additions & 0 deletions files/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,15 @@ func NewSet(repo string, db *leveldb.DB) *Set {
repo: repo,
db: db,
}

var nodeID protocol.NodeID
ldbWithAll(db, []byte(repo), func(ts uint64, node []byte, f scanner.File) {
copy(nodeID[:], node)
if ts > s.changes[nodeID] {
s.changes[nodeID] = ts
}
})

return &s
}

Expand Down
51 changes: 36 additions & 15 deletions model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -74,6 +75,9 @@ type Model struct {

addedRepo bool
started bool

lastChange map[string]uint64
lcMut sync.Mutex
}

var (
Expand Down Expand Up @@ -101,6 +105,7 @@ func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVers
rawConn: make(map[protocol.NodeID]io.Closer),
nodeVer: make(map[protocol.NodeID]string),
sup: suppressor{threshold: int64(cfg.Options.MaxChangeKbps)},
lastChange: make(map[string]uint64),
}

var timeout = 20 * 60 // seconds
Expand Down Expand Up @@ -276,8 +281,17 @@ func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) {

// NeedSize returns the number and total size of currently needed files.
func (m *Model) NeedSize(repo string) (files int, bytes int64) {
f, d, b := sizeOf(m.NeedFilesRepo(repo, -1))
return f + d, b
m.rmut.RLock()
defer m.rmut.RUnlock()
if rf, ok := m.repoFiles[repo]; ok {
rf.WithNeed(protocol.LocalNodeID, func(f scanner.File) bool {
fs, de, by := sizeOfFile(f)
files += fs + de
bytes += by
return true
})
}
return
}

// NeedFiles returns the list of currently needed files
Expand All @@ -291,7 +305,7 @@ func (m *Model) NeedFilesRepo(repo string, max int) []scanner.File {
}
rf.WithNeed(protocol.LocalNodeID, func(f scanner.File) bool {
fs = append(fs, f)
return len(fs) < max
return max <= 0 || len(fs) < max
})
if r := m.repoCfgs[repo].FileRanker(); r != nil {
files.SortBy(r).Sort(fs)
Expand Down Expand Up @@ -411,6 +425,15 @@ func (m *Model) Close(node protocol.NodeID, err error) {
delete(m.rawConn, node)
delete(m.nodeVer, node)
m.pmut.Unlock()

nodeStr := node.String()
m.lcMut.Lock()
for k := range m.lastChange {
if strings.HasPrefix(k, nodeStr) {
m.lastChange[k] = 0
}
}
m.lcMut.Unlock()
}

// Request returns the specified data segment by reading it from local disk.
Expand Down Expand Up @@ -523,8 +546,9 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection)

func (m *Model) updateLocal(repo string, f scanner.File) {
m.rmut.RLock()
m.repoFiles[repo].Update(protocol.LocalNodeID, []scanner.File{f})
fs := m.repoFiles[repo]
m.rmut.RUnlock()
fs.Update(protocol.LocalNodeID, []scanner.File{f})
}

func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) {
Expand All @@ -544,9 +568,6 @@ func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset
}

func (m *Model) broadcastIndexLoop() {
var lastChange = map[string]uint64{}
var lcMut sync.Mutex

for {
time.Sleep(5 * time.Second)

Expand All @@ -561,11 +582,11 @@ func (m *Model) broadcastIndexLoop() {
repo := repo
fs := fs
conn := conn
key := fmt.Sprintf("%s/%s", repo, nodeID)
key := fmt.Sprintf("%s/%s", nodeID, repo)

lcMut.Lock()
sentChange, ok := lastChange[key]
lcMut.Unlock()
m.lcMut.Lock()
sentChange, ok := m.lastChange[key]
m.lcMut.Unlock()

if debug {
l.Debugf("broadcast index %s sentChange=%d curChange=%d ok=%v", key, sentChange, curChange, ok)
Expand All @@ -576,9 +597,9 @@ func (m *Model) broadcastIndexLoop() {
go func() {
initial := sentChange == 0
maxTs := m.sendIndexTo(sentChange, repo, fs, conn, initial)
lcMut.Lock()
lastChange[key] = maxTs
lcMut.Unlock()
m.lcMut.Lock()
m.lastChange[key] = maxTs
m.lcMut.Unlock()
if debug {
l.Debugf("broadcast index %s maxTs=%d", key, maxTs)
}
Expand Down Expand Up @@ -614,7 +635,7 @@ func (m *Model) sendIndexTo(since uint64, repo string, fs *files.Set, conn proto
}
conn.IndexUpdate(repo, batch)
}
batch = batch[:0]
batch = make([]protocol.FileInfo, 0, indexTxBatch)
}
return true
})
Expand Down
8 changes: 6 additions & 2 deletions model/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,14 @@ func (p *puller) run() {
default:
}

if v := p.model.Version(p.repoCfg.ID); v > prevVer {
curVer := p.model.Version(p.repoCfg.ID)
if debug {
l.Debugf("curVer=%d prevVer=%d", curVer, prevVer)
}
if curVer > prevVer {
// Queue more blocks to fetch, if any
p.queueNeededBlocks()
prevVer = v
prevVer = curVer
}
}
}
Expand Down

0 comments on commit 3b581ac

Please sign in to comment.