Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
refactor state so that per-backend state is separate
This allows us to prevent locking the stateMu on every storeResult()
call, as instead each of the storeResult() calls will just write to the
state for the backend it was called for.
  • Loading branch information
stapelberg committed Dec 14, 2014
1 parent f7fc49a commit 5d46a57
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 60 deletions.
141 changes: 82 additions & 59 deletions cmd/dcs-web/querymanager.go
Expand Up @@ -2,6 +2,7 @@ package main

import (
"bufio"
"bytes"
"encoding/json"
"flag"
"fmt"
Expand Down Expand Up @@ -155,6 +156,18 @@ func (s pointerByRanking) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

type perBackendState struct {
// One file per backend, containing JSON-serialized results. When writing,
// we keep the offsets, so that we can later sort the pointers and write
// the resulting files.
tempFile *os.File
tempFileWriter *bufio.Writer
tempFileOffset int64
packagePool *stringpool.StringPool
resultPointers []resultPointer
allPackages map[string]bool
}

type queryState struct {
started time.Time
ended time.Time
Expand All @@ -171,25 +184,27 @@ type queryState struct {
filesMu *sync.Mutex

resultPages int
numResults int

// One file per backend, containing JSON-serialized results. When writing,
// we keep the offsets, so that we can later sort the pointers and write
// the resulting files.
tempFiles []*os.File
tempFilesOffset []int64
tempFilesMu *sync.Mutex
packagePool *stringpool.StringPool
// This guards concurrent access to any perBackend[].tempFile.
tempFilesMu *sync.Mutex
perBackend []*perBackendState

resultPointers []resultPointer
resultPointersByPkg map[string][]resultPointer

allPackages map[string]bool
allPackagesSorted []string
allPackagesMu *sync.Mutex

FirstPathRank float32
}

func (qs *queryState) numResults() int {
var result int
for _, bstate := range qs.perBackend {
result += len(bstate.resultPointers)
}
return result
}

var (
state = make(map[string]queryState)
stateMu sync.Mutex
Expand Down Expand Up @@ -279,28 +294,23 @@ func maybeStartQuery(queryid, src, query string) bool {
if !s.done {
continue
}
for _, f := range s.tempFiles {
f.Close()
for _, state := range s.perBackend {
state.tempFile.Close()
}
delete(state, queryid)
}
log.Printf("Garbage collection done. %d queries remaining", len(state))
}
backends := strings.Split(*common.SourceBackends, ",")
state[queryid] = queryState{
started: time.Now(),
query: query,
newEvent: sync.NewCond(&sync.Mutex{}),
resultMu: &sync.Mutex{},
filesTotal: make([]int, len(backends)),
filesProcessed: make([]int, len(backends)),
filesMu: &sync.Mutex{},
tempFiles: make([]*os.File, len(backends)),
tempFilesOffset: make([]int64, len(backends)),
tempFilesMu: &sync.Mutex{},
allPackages: make(map[string]bool),
allPackagesMu: &sync.Mutex{},
packagePool: stringpool.NewStringPool(),
started: time.Now(),
query: query,
newEvent: sync.NewCond(&sync.Mutex{}),
filesTotal: make([]int, len(backends)),
filesProcessed: make([]int, len(backends)),
filesMu: &sync.Mutex{},
perBackend: make([]*perBackendState, len(backends)),
tempFilesMu: &sync.Mutex{},
}

varz.Increment("active-queries")
Expand All @@ -320,12 +330,18 @@ func maybeStartQuery(queryid, src, query string) bool {
for i := 0; i < len(backends); i++ {
state[queryid].filesTotal[i] = -1
path := filepath.Join(dir, fmt.Sprintf("unsorted_%d.json", i))
state[queryid].tempFiles[i], err = os.Create(path)
f, err := os.Create(path)
if err != nil {
log.Printf("[%s] could not create %q: %v\n", queryid, path, err)
failQuery(queryid)
return false
}
state[queryid].perBackend[i] = &perBackendState{
packagePool: stringpool.NewStringPool(),
tempFile: f,
tempFileWriter: bufio.NewWriterSize(f, 65536),
allPackages: make(map[string]bool),
}
}
log.Printf("initial results = %v\n", state[queryid])

Expand Down Expand Up @@ -413,14 +429,13 @@ func QueryzHandler(w http.ResponseWriter, r *http.Request) {
Ended: s.ended,
StartedFromNow: time.Since(s.started),
Duration: s.ended.Sub(s.started),
NumResults: len(s.resultPointers),
NumPackages: len(s.allPackages),
NumResults: s.numResults(),
NumResultPages: s.resultPages,
FilesTotal: s.filesTotal,
FilesProcessed: s.filesProcessed,
}
if stats[idx].NumResults == 0 && stats[idx].Done {
stats[idx].NumResults = s.numResults
stats[idx].NumResults = s.numResults()
}
idx++
}
Expand Down Expand Up @@ -480,7 +495,13 @@ func storeResult(queryid string, backendidx int, result proto.Match) {
// different enough for each query that we can’t hardcode it).
result.SetRanking(result.Pathrank() + ((s.FirstPathRank * 0.1) * result.Ranking()))
} else {
// This code path (and lock acquisition) gets executed only on the
// first result.
stateMu.Lock()
s = state[queryid]
s.FirstPathRank = result.Pathrank()
state[queryid] = s
stateMu.Unlock()
}

worst := s.results[9]
Expand All @@ -506,7 +527,7 @@ func storeResult(queryid string, backendidx int, result proto.Match) {
}

var written int64
w := io.MultiWriter(s.tempFiles[backendidx], countingWriter{&written})
w := io.MultiWriter(s.perBackend[backendidx].tempFileWriter, countingWriter{&written})
if err := result.WriteJSON(w); err != nil {
log.Printf("[%s] could not write %v: %v\n", queryid, result, err)
failQuery(queryid)
Expand All @@ -516,20 +537,16 @@ func storeResult(queryid string, backendidx int, result proto.Match) {
h := fnv.New64()
io.WriteString(h, result.Path())

stateMu.Lock()
s = state[queryid]
s.resultPointers = append(s.resultPointers, resultPointer{
bstate := s.perBackend[backendidx]
bstate.resultPointers = append(bstate.resultPointers, resultPointer{
backendidx: backendidx,
ranking: result.Ranking(),
offset: s.tempFilesOffset[backendidx],
offset: bstate.tempFileOffset,
length: written,
pathHash: h.Sum64(),
packageName: s.packagePool.Get(result.Package())})
s.tempFilesOffset[backendidx] += written
s.allPackages[result.Package()] = true
s.numResults++
state[queryid] = s
stateMu.Unlock()
packageName: bstate.packagePool.Get(result.Package())})
bstate.tempFileOffset += written
bstate.allPackages[result.Package()] = true
}

func failQuery(queryid string) {
Expand Down Expand Up @@ -559,15 +576,16 @@ func finishQuery(queryid string) {
}

var seriesBatch []*influxdb.Series
s := state[queryid]
series := influxdb.Series{
Name: "query-finished.int-dcsi-web",
Columns: []string{"queryid", "searchterm", "milliseconds", "results"},
Points: [][]interface{}{
[]interface{}{
queryid,
state[queryid].query,
time.Since(state[queryid].started) / time.Millisecond,
state[queryid].numResults,
s.query,
time.Since(s.started) / time.Millisecond,
s.numResults(),
},
},
}
Expand Down Expand Up @@ -650,7 +668,7 @@ func writeFromPointers(queryid string, f io.Writer, pointers []resultPointer) er
return err
}
for idx, pointer := range pointers {
src := state[queryid].tempFiles[pointer.backendidx]
src := state[queryid].perBackend[pointer.backendidx].tempFile
if _, err := src.Seek(pointer.offset, os.SEEK_SET); err != nil {
return err
}
Expand All @@ -673,32 +691,37 @@ func writeToDisk(queryid string) error {
// Get the slice with results and unset it on the state so that processing can continue.
stateMu.Lock()
s := state[queryid]
pointers := s.resultPointers
pointers := make([]resultPointer, 0, s.numResults())
for _, bstate := range s.perBackend {
pointers = append(pointers, bstate.resultPointers...)
bstate.tempFileWriter.Flush()
}
if len(pointers) == 0 {
log.Printf("[%s] not writing, no results.\n", queryid)
stateMu.Unlock()
return nil
}
s.resultPointers = nil
idx := 0

// For each full package (i3-wm_4.8-1), store only the newest version.
packageVersions := make(map[string]dpkgversion.Version)
for pkg, _ := range s.allPackages {
underscore := strings.Index(pkg, "_")
name := pkg[:underscore]
version, err := dpkgversion.Parse(pkg[underscore+1:])
if err != nil {
log.Printf("[%s] parsing version %q failed: %v\n", queryid, pkg[underscore+1:], err)
continue
}
for _, bstate := range s.perBackend {
for pkg, _ := range bstate.allPackages {
underscore := strings.Index(pkg, "_")
name := pkg[:underscore]
version, err := dpkgversion.Parse(pkg[underscore+1:])
if err != nil {
log.Printf("[%s] parsing version %q failed: %v\n", queryid, pkg[underscore+1:], err)
continue
}

if bestversion, ok := packageVersions[name]; ok {
if dpkgversion.Compare(version, bestversion) > 0 {
if bestversion, ok := packageVersions[name]; ok {
if dpkgversion.Compare(version, bestversion) > 0 {
packageVersions[name] = version
}
} else {
packageVersions[name] = version
}
} else {
packageVersions[name] = version
}
}

Expand Down Expand Up @@ -795,7 +818,7 @@ func storeProgress(queryid string, backendidx int, progress proto.ProgressUpdate
QueryId: queryid,
FilesProcessed: filesProcessed,
FilesTotal: filesTotal,
Results: s.numResults,
Results: s.numResults(),
})
if filesProcessed == filesTotal {
finishQuery(queryid)
Expand Down
2 changes: 1 addition & 1 deletion cmd/dcs-web/templates/queryz.html
Expand Up @@ -96,7 +96,7 @@ <h3>{{.Searchterm}}</h3>
<tr><th>ended</th><td>{{.Ended}} (ran for {{.Duration}})</td></tr>
<tr><th>done</th><td>{{.Done}}</td></tr>
<tr><th>events</th><td>{{.NumEvents}}</td></tr>
<tr><th>results</th><td>{{.NumResults}} (in {{.NumPackages}} source packages) (on {{.NumResultPages}} pages)</td></tr>
<tr><th>results</th><td>{{.NumResults}} (on {{.NumResultPages}} pages)</td></tr>
<tr><th>files processed</th><td><code>{{.FilesProcessed}}</code></td></tr>
<tr><th>files total</th><td><code>{{.FilesTotal}}</code></td></tr>
</table>
Expand Down

0 comments on commit 5d46a57

Please sign in to comment.