Skip to content

Commit

Permalink
don’t keep query results in memory, write them directly to disk
Browse files Browse the repository at this point in the history
For queries that fit in memory, the page cache will give us the
necessary performance. Queries that don’t fit in memory are enabled by
this commit :).
  • Loading branch information
stapelberg committed Dec 1, 2014
1 parent fdae522 commit d2922fe
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 50 deletions.
194 changes: 144 additions & 50 deletions cmd/dcs-web/querymanager.go
Expand Up @@ -8,7 +8,9 @@ import (
"github.com/Debian/dcs/cmd/dcs-web/common" "github.com/Debian/dcs/cmd/dcs-web/common"
"github.com/Debian/dcs/cmd/dcs-web/search" "github.com/Debian/dcs/cmd/dcs-web/search"
dcsregexp "github.com/Debian/dcs/regexp" dcsregexp "github.com/Debian/dcs/regexp"
"github.com/Debian/dcs/stringpool"
"github.com/influxdb/influxdb-go" "github.com/influxdb/influxdb-go"
"hash/fnv"
"io" "io"
"log" "log"
"math" "math"
Expand Down Expand Up @@ -115,6 +117,38 @@ func (s ByRanking) Swap(i, j int) {
s[i], s[j] = s[j], s[i] s[i], s[j] = s[j], s[i]
} }


type resultPointer struct {
backendidx int
ranking float32
offset int64
length int64

// Used as a tie-breaker when sorting by ranking to guarantee stable
// results, independent of the order in which the results are returned from
// source backends.
pathHash uint64

// Used for per-package results. Points into a stringpool.StringPool
packageName *string
}

type pointerByRanking []resultPointer

func (s pointerByRanking) Len() int {
return len(s)
}

func (s pointerByRanking) Less(i, j int) bool {
if s[i].ranking == s[j].ranking {
return s[i].pathHash > s[j].pathHash
}
return s[i].ranking > s[j].ranking
}

func (s pointerByRanking) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}

type queryState struct { type queryState struct {
started time.Time started time.Time
events []event events []event
Expand All @@ -132,9 +166,12 @@ type queryState struct {
resultPages int resultPages int
numResults int numResults int


// TODO: this will be deleted once we write everything directly to disk // One file per backend, containing JSON-serialized results. When writing,
allResults []Result // we keep the offsets, so that we can later sort the pointers and write
allResultsMu *sync.Mutex // the resulting files.
tempFiles []*os.File
packagePool *stringpool.StringPool
resultPointers []resultPointer


allPackages map[string]bool allPackages map[string]bool
allPackagesSorted []string allPackagesSorted []string
Expand Down Expand Up @@ -198,7 +235,7 @@ func queryBackend(queryid string, backend string, backendidx int, query string)
} }
} }
if r.Type == "result" { if r.Type == "result" {
storeResult(queryid, r) storeResult(queryid, backendidx, r)
} else if r.Type == "progress" { } else if r.Type == "progress" {
storeProgress(queryid, backendidx, r) storeProgress(queryid, backendidx, r)
} }
Expand All @@ -214,7 +251,7 @@ func maybeStartQuery(queryid, src, query string) bool {
// XXX: Starting a new query while there may still be clients reading that // XXX: Starting a new query while there may still be clients reading that
// query is not a great idea. Best fix may be to make getEvent() use a // query is not a great idea. Best fix may be to make getEvent() use a
// querystate instead of the string identifier. // querystate instead of the string identifier.
if !running || time.Since(querystate.started) > 15*time.Minute { if !running || time.Since(querystate.started) > 30*time.Minute {
backends := strings.Split(*common.SourceBackends, ",") backends := strings.Split(*common.SourceBackends, ",")
state[queryid] = queryState{ state[queryid] = queryState{
started: time.Now(), started: time.Now(),
Expand All @@ -224,13 +261,32 @@ func maybeStartQuery(queryid, src, query string) bool {
filesTotal: make([]int, len(backends)), filesTotal: make([]int, len(backends)),
filesProcessed: make([]int, len(backends)), filesProcessed: make([]int, len(backends)),
filesMu: &sync.Mutex{}, filesMu: &sync.Mutex{},
allResults: make([]Result, 0), tempFiles: make([]*os.File, len(backends)),
allResultsMu: &sync.Mutex{},
allPackages: make(map[string]bool), allPackages: make(map[string]bool),
allPackagesMu: &sync.Mutex{}, allPackagesMu: &sync.Mutex{},
packagePool: stringpool.NewStringPool(),
}

var err error
dir := filepath.Join(*queryResultsPath, queryid)
if err := os.MkdirAll(dir, os.FileMode(0755)); err != nil {
// TODO: mark the query as failed
log.Printf("[%s] could not create %q: %v\n", queryid, dir, err)
return false
} }

// TODO: it’d be so much better if we would correctly handle ESPACE errors
// in the code below (and above), but for that we need to carefully test it.
ensureEnoughSpaceAvailable()

for i := 0; i < len(backends); i++ { for i := 0; i < len(backends); i++ {
state[queryid].filesTotal[i] = -1 state[queryid].filesTotal[i] = -1
path := filepath.Join(dir, fmt.Sprintf("unsorted_%d.json", i))
state[queryid].tempFiles[i], err = os.Create(path)
if err != nil {
log.Printf("[%s] could not create %q: %v\n", queryid, path, err)
// TODO: mark query as failed
}
} }
log.Printf("initial results = %v\n", state[queryid]) log.Printf("initial results = %v\n", state[queryid])
for idx, backend := range backends { for idx, backend := range backends {
Expand Down Expand Up @@ -260,7 +316,7 @@ func sendPaginationUpdate(queryid string, s queryState) {
} }
} }


func storeResult(queryid string, result Result) { func storeResult(queryid string, backendidx int, result Result) {
result.Type = "result" result.Type = "result"


result.Package = result.Path[:strings.Index(result.Path, "_")] result.Package = result.Path[:strings.Index(result.Path, "_")]
Expand All @@ -269,8 +325,6 @@ func storeResult(queryid string, result Result) {
// for the top 10 at all. // for the top 10 at all.
s := state[queryid] s := state[queryid]


log.Printf("[%s] (currently %d) result %v\n", queryid, len(s.allResults), result)

if s.FirstPathRank > 0 { if s.FirstPathRank > 0 {
// Now store the combined ranking of PathRanking (pre) and Ranking (post). // Now store the combined ranking of PathRanking (pre) and Ranking (post).
// We add the values because they are both percentages. // We add the values because they are both percentages.
Expand Down Expand Up @@ -302,24 +356,51 @@ func storeResult(queryid string, result Result) {
addEventMarshal(queryid, &result) addEventMarshal(queryid, &result)
} }


// TODO: as a first POC, keep all results in memory, sort them, write them out to files. tmpOffset, err := state[queryid].tempFiles[backendidx].Seek(0, os.SEEK_CUR)
if err != nil {
log.Printf("[%s] could not seek: %v\n", queryid, err)
// TODO: mark query as failed
return
}

if err := json.NewEncoder(s.tempFiles[backendidx]).Encode(result); err != nil {
log.Printf("[%s] could not write %v: %v\n", queryid, result, err)
// TODO: mark query as failed
}

offsetAfterWriting, err := state[queryid].tempFiles[backendidx].Seek(0, os.SEEK_CUR)
if err != nil {
log.Printf("[%s] could not seek: %v\n", queryid, err)
// TODO: mark query as failed
return
}

h := fnv.New64()
io.WriteString(h, result.Path)

stateMu.Lock() stateMu.Lock()
s = state[queryid] s = state[queryid]
s.allResults = append(s.allResults, result) s.resultPointers = append(s.resultPointers, resultPointer{
backendidx: backendidx,
ranking: result.Ranking,
offset: tmpOffset,
length: offsetAfterWriting - tmpOffset,
pathHash: h.Sum64(),
packageName: s.packagePool.Get(result.Package)})
s.allPackages[result.Package] = true s.allPackages[result.Package] = true
s.numResults++ s.numResults++
state[queryid] = s state[queryid] = s
stateMu.Unlock() stateMu.Unlock()

// TODO: write the result to disk, no matter what
// TODO: eventually, we’ll want to write it to unsorted.json and sort it afterwards. we could do that by reading through the file, storing (ranking, file_offset) tuples, sorting them, then writing out the sorted files. note that we can even store the (ranking, file_offset) tuples at the time when the results come in.
} }


func finishQuery(queryid string) { func finishQuery(queryid string) {
log.Printf("[%s] done, closing all client channels.\n", queryid) log.Printf("[%s] done, closing all client channels.\n", queryid)
stateMu.Lock() stateMu.Lock()
s := state[queryid] s := state[queryid]
s.done = true s.done = true
for _, f := range s.tempFiles {
f.Close()
}
state[queryid] = s state[queryid] = s
stateMu.Unlock() stateMu.Unlock()
addEvent(queryid, []byte{}, nil) addEvent(queryid, []byte{}, nil)
Expand Down Expand Up @@ -414,18 +495,48 @@ func ensureEnoughSpaceAvailable() {
} }
} }


func createFromPointers(queryid string, name string, pointers []resultPointer) error {
log.Printf("[%s] writing %q\n", queryid, name)
f, err := os.Create(name)
if err != nil {
return err
}
defer f.Close()
if _, err := f.Write([]byte("[")); err != nil {
return err
}
for idx, pointer := range pointers {
src := state[queryid].tempFiles[pointer.backendidx]
if _, err := src.Seek(pointer.offset, os.SEEK_SET); err != nil {
return err
}
if idx > 0 {
if _, err := f.Write([]byte(",")); err != nil {
return err
}
}
if _, err := io.CopyN(f, src, pointer.length); err != nil {
return err
}
}
if _, err := f.Write([]byte("]\n")); err != nil {
return err
}
return nil
}

func writeToDisk(queryid string) { func writeToDisk(queryid string) {
// Get the slice with results and unset it on the state so that processing can continue. // Get the slice with results and unset it on the state so that processing can continue.
stateMu.Lock() stateMu.Lock()
s := state[queryid] s := state[queryid]
results := s.allResults pointers := s.resultPointers
if len(results) == 0 { if len(pointers) == 0 {
log.Printf("[%s] not writing, no results.\n", queryid) log.Printf("[%s] not writing, no results.\n", queryid)
stateMu.Unlock() stateMu.Unlock()
finishQuery(queryid) finishQuery(queryid)
return return
} }
s.allResults = make([]Result, 0) s.resultPointers = nil
idx := 0 idx := 0
packages := make([]string, len(s.allPackages)) packages := make([]string, len(s.allPackages))
// TODO: sort by ranking as soon as we store the best ranking with each package. (at the moment it’s first result, first stored) // TODO: sort by ranking as soon as we store the best ranking with each package. (at the moment it’s first result, first stored)
Expand All @@ -437,10 +548,10 @@ func writeToDisk(queryid string) {
state[queryid] = s state[queryid] = s
stateMu.Unlock() stateMu.Unlock()


log.Printf("[%s] writing, %d results.\n", queryid, len(results)) log.Printf("[%s] writing, %d results.\n", queryid, len(pointers))
log.Printf("[%s] packages: %v\n", queryid, packages) log.Printf("[%s] packages: %v\n", queryid, packages)


sort.Sort(ByRanking(results)) sort.Sort(pointerByRanking(pointers))


resultsPerPage := 10 resultsPerPage := 10
dir := filepath.Join(*queryResultsPath, queryid) dir := filepath.Join(*queryResultsPath, queryid)
Expand All @@ -466,57 +577,40 @@ func writeToDisk(queryid string) {
} }
f.Close() f.Close()


pages := int(math.Ceil(float64(len(results)) / float64(resultsPerPage))) pages := int(math.Ceil(float64(len(pointers)) / float64(resultsPerPage)))
for page := 0; page < pages; page++ { for page := 0; page < pages; page++ {
start := page * resultsPerPage start := page * resultsPerPage
end := (page + 1) * resultsPerPage end := (page + 1) * resultsPerPage
if end > len(results) { if end > len(pointers) {
end = len(results) end = len(pointers)
} }

name := filepath.Join(dir, fmt.Sprintf("page_%d.json", page)) name := filepath.Join(dir, fmt.Sprintf("page_%d.json", page))
log.Printf("[%s] writing %q\n", queryid, name) if err := createFromPointers(queryid, name, pointers[start:end]); err != nil {
f, err := os.Create(name) log.Printf("[%s] could not create %q from pointers: %v\n", queryid, name, err)
if err != nil {
log.Printf("[%s] could not create %q: %v\n", queryid, f, err)
// TODO: mark query as failed // TODO: mark query as failed
return return
} }
encoder := json.NewEncoder(f)
if err := encoder.Encode(results[start:end]); err != nil {
log.Printf("[%s] could not write %v: %v\n", queryid, results[start:end], err)
// TODO: mark query as failed
return
}
// We don’t use defer f.Close() because that would only be executed once the function returns.
f.Close()
} }


// Now save the results into their package-specific files. // Now save the results into their package-specific files.
bypkg := make(map[string][]Result) bypkg := make(map[string][]resultPointer)
for _, result := range results { for _, pointer := range pointers {
pkgresults := bypkg[result.Package] pkgresults := bypkg[*pointer.packageName]
if len(pkgresults) >= resultsPerPackage { if len(pkgresults) >= resultsPerPackage {
continue continue
} }
pkgresults = append(pkgresults, result) pkgresults = append(pkgresults, pointer)
bypkg[result.Package] = pkgresults bypkg[*pointer.packageName] = pkgresults
} }


for pkg, pkgresults := range bypkg { for pkg, pkgresults := range bypkg {
name := filepath.Join(dir, fmt.Sprintf("pkg_%s.json", pkg)) name := filepath.Join(dir, fmt.Sprintf("pkg_%s.json", pkg))
log.Printf("[%s] writing %q\n", queryid, name) if err := createFromPointers(queryid, name, pkgresults); err != nil {
f, err := os.Create(name) log.Printf("[%s] could not create %q from pointers: %v\n", queryid, name, err)
if err != nil {
log.Printf("[%s] could not create %q: %v\n", queryid, f, err)
// TODO: mark query as failed // TODO: mark query as failed
return return
} }
if err := json.NewEncoder(f).Encode(pkgresults); err != nil {
log.Printf("[%s] could not write results: %v\n", queryid, err)
// TODO: mark query as failed
return
}
f.Close()
} }


stateMu.Lock() stateMu.Lock()
Expand Down
39 changes: 39 additions & 0 deletions stringpool/stringpool.go
@@ -0,0 +1,39 @@
// stringpool provides a pool of string pointers, ensuring that each string is
// stored only once in memory. This is useful for queries that have many
// results, as the amount of source packages is limited. So, as soon as
// len(results) > len(sourcepackages), you save memory using a stringpool.
package stringpool

import (
"sync"
)

type StringPool struct {
sync.RWMutex
strings map[string]*string
}

func NewStringPool() *StringPool {
return &StringPool{
strings: make(map[string]*string)}
}

func (pool *StringPool) Get(s string) *string {
// Check if the entry is already in the pool with a slightly cheaper
// (read-only) mutex.
pool.RLock()
stored, ok := pool.strings[s]
pool.RUnlock()
if ok {
return stored
}

pool.Lock()
defer pool.Unlock()
stored, ok = pool.strings[s]
if ok {
return stored
}
pool.strings[s] = &s
return &s
}

0 comments on commit d2922fe

Please sign in to comment.