Permalink
Browse files

use capnproto instead of JSON between dcs-web and source backends

This reduces the amount of time we need to spend encoding/decoding JSON,
which was one of the bottlenecks.

Also, don’t use the seek syscall for getting the current offset, it is
quite expensive (second function in top5 of a dcs-web profile).
  • Loading branch information...
stapelberg committed Dec 14, 2014
1 parent fd69038 commit 8efd3b41ed17912e320d180ff1033cb7203e44e2
@@ -5,6 +5,7 @@ import (
"encoding/json"
"flag"
"fmt"
"github.com/Debian/dcs/proto"
"github.com/Debian/dcs/ranking"
"github.com/Debian/dcs/regexp"
"github.com/Debian/dcs/varz"
@@ -21,6 +22,8 @@ import (
"strings"
"sync"
"time"
capn "github.com/glycerine/go-capnproto"
)
var (
@@ -277,10 +280,23 @@ func queryIndexBackend(query string) ([]string, error) {
return filenames, nil
}
func sendProgressUpdate(conn net.Conn, connMu *sync.Mutex, filesProcessed, filesTotal int) (int64, error) {
seg := capn.NewBuffer(nil)
z := proto.NewRootZ(seg)
p := proto.NewProgressUpdate(seg)
p.SetFilesprocessed(uint64(filesProcessed))
p.SetFilestotal(uint64(filesTotal))
z.SetProgressupdate(p)
connMu.Lock()
defer connMu.Unlock()
return seg.WriteToPacked(conn)
}
// Reads a single JSON request from the TCP connection, performs the search and
// sends results back over the TCP connection as they appear.
func streamingQuery(conn net.Conn) {
defer conn.Close()
connMu := new(sync.Mutex)
logprefix := fmt.Sprintf("[%s]", conn.RemoteAddr().String())
type sourceRequest struct {
@@ -340,20 +356,7 @@ func streamingQuery(conn net.Conn) {
// Send the first progress update so that clients know how many files are
// going to be searched.
type progressUpdate struct {
// Set to “progress”.
Type string
FilesProcessed int
FilesTotal int
}
encoder := json.NewEncoder(conn)
if err := encoder.Encode(&progressUpdate{
Type: "progress",
FilesTotal: len(files),
}); err != nil {
if _, err := sendProgressUpdate(conn, connMu, 0, len(files)); err != nil {
log.Printf("%s %v\n", logprefix, err)
return
}
@@ -392,11 +395,7 @@ func streamingQuery(conn net.Conn) {
cnt += add
if time.Since(lastProgressUpdate) > progressInterval {
if err := encoder.Encode(&progressUpdate{
Type: "progress",
FilesProcessed: cnt,
FilesTotal: len(files),
}); err != nil {
if _, err := sendProgressUpdate(conn, connMu, cnt, len(files)); err != nil {
if !errorShown {
log.Printf("%s %v\n", logprefix, err)
// We need to read the 'progress' channel, so we cannot
@@ -409,11 +408,7 @@ func streamingQuery(conn net.Conn) {
}
}
if err := encoder.Encode(&progressUpdate{
Type: "progress",
FilesProcessed: len(files),
FilesTotal: len(files),
}); err != nil {
if _, err := sendProgressUpdate(conn, connMu, len(files), len(files)); err != nil {
log.Printf("%s %v\n", logprefix, err)
}
close(progress)
@@ -459,12 +454,30 @@ func streamingQuery(conn net.Conn) {
for _, match := range matches {
match.Ranking = ranking.PostRank(rankingopts, &match, &querystr)
match.PathRank = file.Ranking
match.Path = match.Path[len(*unpackedPath):]
//match.Path = match.Path[len(*unpackedPath):]
// NB: populating match.Ranking happens in
// cmd/dcs-web/querymanager because it depends on at least
// one other result.
if err := encoder.Encode(&match); err != nil {
// TODO: ideally, we’d get capn buffers from grep.File(), let’s do that after profiling the decoding performance
seg := capn.NewBuffer(nil)
z := proto.NewRootZ(seg)
m := proto.NewMatch(seg)
m.SetPath(match.Path[len(*unpackedPath):])
m.SetLine(uint32(match.Line))
m.SetPackage(m.Path()[:strings.Index(m.Path(), "/")])
m.SetCtxp2(match.Ctxp2)
m.SetCtxp1(match.Ctxp1)
m.SetContext(match.Context)
m.SetCtxn1(match.Ctxn1)
m.SetCtxn2(match.Ctxn2)
m.SetPathrank(match.PathRank)
m.SetRanking(match.Ranking)
z.SetMatch(m)
connMu.Lock()
if _, err := seg.WriteToPacked(conn); err != nil {
connMu.Unlock()
log.Printf("%s %v\n", logprefix, err)
// Drain the work channel, but without doing any work.
// This effectively exits the worker goroutine(s)
@@ -473,6 +486,7 @@ func streamingQuery(conn net.Conn) {
}
break
}
connMu.Unlock()
}
progress <- 1
Oops, something went wrong.

0 comments on commit 8efd3b4

Please sign in to comment.