Skip to content

Commit

Permalink
store capnproto data directly on disk (instead of JSON)
Browse files Browse the repository at this point in the history
The advantage is that we now don’t have to convert capnproto to JSON
until the client asks us for the specific page. I.e., for a query with
60_000 results on 6000 pages, we previously had to convert 60_000
results, whereas now chances are we’ll only need to convert a couple of
dozen of results.

This leads to a 2x speed-up (measured on large queries, but should be
transferrable to small queries as soon as they have >1 page of results).
  • Loading branch information
stapelberg committed Dec 17, 2014
1 parent e0b4b73 commit 466b7f3
Showing 1 changed file with 31 additions and 15 deletions.
46 changes: 31 additions & 15 deletions cmd/dcs-web/querymanager.go
Expand Up @@ -250,12 +250,19 @@ func queryBackend(queryid string, backend string, backendidx int, sourceQuery []
}

bufferedReader := bufio.NewReaderSize(conn, 65536)
bstate := state[queryid].perBackend[backendidx]
tempFileWriter := bstate.tempFileWriter
var capnbuf bytes.Buffer
var written int64

for !state[queryid].done {
conn.SetReadDeadline(time.Now().Add(10 * time.Second))

seg, err := capn.ReadFromPackedStream(bufferedReader, &capnbuf)
written = 0
tee := io.TeeReader(bufferedReader, io.MultiWriter(
tempFileWriter, countingWriter{&written}))

seg, err := capn.ReadFromPackedStream(tee, &capnbuf)
if err != nil {
if err == io.EOF {
log.Printf("[%s] [src:%s] EOF\n", queryid, backend)
Expand All @@ -270,8 +277,10 @@ func queryBackend(queryid string, backend string, backendidx int, sourceQuery []
if z.Which() == proto.Z_PROGRESSUPDATE {
storeProgress(queryid, backendidx, z.Progressupdate())
} else {
storeResult(queryid, backendidx, z.Match())
storeResult(queryid, backendidx, z.Match(), written)
}

bstate.tempFileOffset += written
}
log.Printf("[%s] [src:%s] query done, disconnecting\n", queryid, backend)
}
Expand Down Expand Up @@ -329,7 +338,7 @@ func maybeStartQuery(queryid, src, query string) bool {

for i := 0; i < len(backends); i++ {
state[queryid].filesTotal[i] = -1
path := filepath.Join(dir, fmt.Sprintf("unsorted_%d.json", i))
path := filepath.Join(dir, fmt.Sprintf("unsorted_%d.capnproto", i))
f, err := os.Create(path)
if err != nil {
log.Printf("[%s] could not create %q: %v\n", queryid, path, err)
Expand Down Expand Up @@ -480,7 +489,7 @@ func (c countingWriter) Write(p []byte) (n int, err error) {
return len(p), nil
}

func storeResult(queryid string, backendidx int, result proto.Match) {
func storeResult(queryid string, backendidx int, result proto.Match, written int64) {
// Without acquiring a lock, just check if we need to consider this result
// for the top 10 at all.
s := state[queryid]
Expand Down Expand Up @@ -535,14 +544,6 @@ func storeResult(queryid string, backendidx int, result proto.Match) {
}
}

var written int64
w := io.MultiWriter(s.perBackend[backendidx].tempFileWriter, countingWriter{&written})
if err := result.WriteJSON(w); err != nil {
log.Printf("[%s] could not write %v: %v\n", queryid, result, err)
failQuery(queryid)
return
}

bstate := s.perBackend[backendidx]
bstate.resultPointers = append(bstate.resultPointers, resultPointer{
backendidx: backendidx,
Expand All @@ -551,7 +552,6 @@ func storeResult(queryid string, backendidx int, result proto.Match) {
length: written,
pathHash: h.Sum64(),
packageName: bstate.packagePool.Get(result.Package())})
bstate.tempFileOffset += written
bstate.allPackages[result.Package()] = true
}

Expand All @@ -565,7 +565,7 @@ func failQuery(queryid string) {
}

func finishQuery(queryid string) {
log.Printf("[%s] done, closing all client channels.\n", queryid)
log.Printf("[%s] done (in %v), closing all client channels.\n", queryid, time.Since(state[queryid].started))
addEvent(queryid, []byte{}, nil)

if *influxDBHost != "" {
Expand Down Expand Up @@ -667,6 +667,9 @@ func ensureEnoughSpaceAvailable() {
}

func writeFromPointers(queryid string, f io.Writer, pointers []resultPointer) error {
var capnbuf bytes.Buffer
firstPathRank := state[queryid].FirstPathRank

state[queryid].tempFilesMu.Lock()
defer state[queryid].tempFilesMu.Unlock()

Expand All @@ -683,7 +686,20 @@ func writeFromPointers(queryid string, f io.Writer, pointers []resultPointer) er
return err
}
}
if _, err := io.CopyN(f, src, pointer.length); err != nil {
seg, err := capn.ReadFromPackedStream(src, &capnbuf)
if err != nil {
return err
}
z := proto.ReadRootZ(seg)
if z.Which() != proto.Z_MATCH {
return fmt.Errorf("Expected to find a proto.Z_MATCH, instead got %d", z.Which())
}
result := z.Match()
// We need to fix the ranking here because we persist raw results from
// the dcs-source-backend in queryBackend(), but then modify the
// ranking in storeResult().
result.SetRanking(result.Pathrank() + ((firstPathRank * 0.1) * result.Ranking()))
if err := result.WriteJSON(f); err != nil {
return err
}
}
Expand Down

0 comments on commit 466b7f3

Please sign in to comment.