Permalink
Browse files

generate result pages as necessary

Previously, mostly due to the proof-of-concept design (collect all
results in memory, sort them, then write them out to disk and let nginx
serve them), we were writing the entire query to disk twice — once for
the regular results, once for per-package results.

For small queries, this is not a problem, but for big queries, that
consumes a significant amount of disk space and spindle time.

Therefore, we now generate (and cache!) the pages for which the
client(s) ask. This is a multi-minute speed-up for big queries.
  • Loading branch information...
stapelberg committed Dec 4, 2014
1 parent eeb5d8e commit c744b236e29225b6658b97a1a0debfcf76eb3b1f
Showing with 140 additions and 85 deletions.
  1. +114 −5 cmd/dcs-web/dcs-web.go
  2. +19 −73 cmd/dcs-web/querymanager.go
  3. +7 −7 nginx.example
View
@@ -14,10 +14,13 @@ import (
"hash/fnv"
"io"
"log"
"math"
"net/http"
"os"
"path/filepath"
"regexp"
"runtime/pprof"
"strconv"
"strings"
"time"
)
@@ -35,6 +38,9 @@ var (
"Where to write access.log entries (in Apache Common Log Format). Disabled if empty.")
accessLog *os.File
resultsPathRe = regexp.MustCompile(`^/results/([^/]+)/(perpackage_` + strconv.Itoa(resultsPerPackage) + `_)?page_([0-9]+).json$`)
packagesPathRe = regexp.MustCompile(`^/results/([^/]+)/packages.json$`)
)
func InstantServer(ws *websocket.Conn) {
@@ -112,16 +118,119 @@ func InstantServer(ws *websocket.Conn) {
}
}
func startJsonResponse(w http.ResponseWriter) {
w.Header().Set("Content-Type", "application/json")
// Set cache time for one hour. The files will ideally get cached both by
// nginx and the client(s).
utc := time.Now().UTC()
cacheSince := utc.Format(http.TimeFormat)
cacheUntil := utc.Add(1 * time.Hour).Format(http.TimeFormat)
w.Header().Set("Cache-Control", "max-age=3600, public")
w.Header().Set("Last-Modified", cacheSince)
w.Header().Set("Expires", cacheUntil)
}
func ResultsHandler(w http.ResponseWriter, r *http.Request) {
// TODO: ideally, this would also start the search in the background to avoid waiting for the round-trip to the client.
// TODO: also, what about non-javascript clients?
// While this just serves index.html, the javascript part of index.html
// realizes the path starts with /results/ and starts the search, then
// requests the specified page on search completion.
http.ServeFile(w, r, filepath.Join(*staticPath, "index.html"))
return
// Try to match /page_n.json or /perpackage_2_page_n.json
matches := resultsPathRe.FindStringSubmatch(r.URL.Path)
log.Printf("matches = %v\n", matches)
if matches == nil || len(matches) != 4 {
// See whether it’s /packages.json, then.
matches = packagesPathRe.FindStringSubmatch(r.URL.Path)
if matches == nil || len(matches) != 2 {
// While this just serves index.html, the javascript part of index.html
// realizes the path starts with /results/ and starts the search, then
// requests the specified page on search completion.
http.ServeFile(w, r, filepath.Join(*staticPath, "index.html"))
return
}
queryid := matches[1]
_, ok := state[queryid]
if !ok {
http.Error(w, "No such query.", http.StatusNotFound)
return
}
startJsonResponse(w)
packages := state[queryid].allPackagesSorted
if err := json.NewEncoder(w).Encode(struct{ Packages []string }{packages}); err != nil {
http.Error(w, fmt.Sprintf("Could not encode packages: %v", err), http.StatusInternalServerError)
}
return
}
queryid := matches[1]
page, err := strconv.Atoi(matches[3])
if err != nil {
log.Fatal("Could not convert %q into a number: %v\n", matches[2], err)
}
perpackage := (matches[2] == "perpackage_2_")
_, ok := state[queryid]
if !ok {
http.Error(w, "No such query.", http.StatusNotFound)
return
}
if !perpackage {
resultsPerPage := 10
pointers := state[queryid].resultPointers
pages := int(math.Ceil(float64(len(pointers)) / float64(resultsPerPage)))
if page > pages {
http.Error(w, "No such page.", http.StatusNotFound)
return
}
start := page * resultsPerPage
end := (page + 1) * resultsPerPage
if end > len(pointers) {
end = len(pointers)
}
startJsonResponse(w)
if err := writeFromPointers(queryid, w, pointers[start:end]); err != nil {
http.Error(w, fmt.Sprintf("Could not return results: %v", err), http.StatusInternalServerError)
}
} else {
bypkg := state[queryid].resultPointersByPkg
packages := state[queryid].allPackagesSorted
pages := int(math.Ceil(float64(len(packages)) / float64(packagesPerPage)))
if page > pages {
http.Error(w, "No such page.", http.StatusNotFound)
return
}
start := page * packagesPerPage
end := (page + 1) * packagesPerPage
if end > len(packages) {
end = len(packages)
}
startJsonResponse(w)
w.Write([]byte("["))
for idx, pkg := range packages[start:end] {
if idx == 0 {
fmt.Fprintf(w, `{"Package": "%s", "Results":`, pkg)
} else {
fmt.Fprintf(w, `,{"Package": "%s", "Results":`, pkg)
}
if err := writeFromPointers(queryid, w, bypkg[pkg]); err != nil {
http.Error(w, fmt.Sprintf("Could not return results: %v", err), http.StatusInternalServerError)
return
}
w.Write([]byte("}"))
}
w.Write([]byte("]"))
}
}
func main() {
@@ -170,9 +170,11 @@ type queryState struct {
// One file per backend, containing JSON-serialized results. When writing,
// we keep the offsets, so that we can later sort the pointers and write
// the resulting files.
tempFiles []*os.File
packagePool *stringpool.StringPool
resultPointers []resultPointer
tempFiles []*os.File
tempFilesMu *sync.Mutex
packagePool *stringpool.StringPool
resultPointers []resultPointer
resultPointersByPkg map[string][]resultPointer
allPackages map[string]bool
allPackagesSorted []string
@@ -265,6 +267,9 @@ func maybeStartQuery(queryid, src, query string) bool {
if !s.done {
continue
}
for _, f := range s.tempFiles {
f.Close()
}
delete(state, queryid)
}
log.Printf("Garbage collection done. %d queries remaining", len(state))
@@ -279,6 +284,7 @@ func maybeStartQuery(queryid, src, query string) bool {
filesProcessed: make([]int, len(backends)),
filesMu: &sync.Mutex{},
tempFiles: make([]*os.File, len(backends)),
tempFilesMu: &sync.Mutex{},
allPackages: make(map[string]bool),
allPackagesMu: &sync.Mutex{},
packagePool: stringpool.NewStringPool(),
@@ -524,9 +530,6 @@ func finishQuery(queryid string) {
log.Printf("[%s] done, closing all client channels.\n", queryid)
stateMu.Lock()
s := state[queryid]
for _, f := range s.tempFiles {
f.Close()
}
state[queryid] = s
stateMu.Unlock()
addEvent(queryid, []byte{}, nil)
@@ -622,6 +625,9 @@ func ensureEnoughSpaceAvailable() {
}
func writeFromPointers(queryid string, f io.Writer, pointers []resultPointer) error {
state[queryid].tempFilesMu.Lock()
defer state[queryid].tempFilesMu.Unlock()
if _, err := f.Write([]byte("[")); err != nil {
return err
}
@@ -685,52 +691,21 @@ func writeToDisk(queryid string) error {
state[queryid] = s
stateMu.Unlock()
log.Printf("[%s] writing, %d results.\n", queryid, len(pointers))
log.Printf("[%s] packages: %v\n", queryid, packages)
log.Printf("[%s] sorting, %d results, %d packages.\n", queryid, len(pointers), len(packages))
pointerSortingStarted := time.Now()
sort.Sort(pointerByRanking(pointers))
log.Printf("[%s] pointer sorting done (%v).\n", queryid, time.Since(pointerSortingStarted))
resultsPerPage := 10
dir := filepath.Join(*queryResultsPath, queryid)
if err := os.MkdirAll(dir, os.FileMode(0755)); err != nil {
return err
}
// TODO: it’d be so much better if we would correctly handle ESPACE errors
// in the code below (and above), but for that we need to carefully test it.
ensureEnoughSpaceAvailable()
f, err := os.Create(filepath.Join(dir, "packages.json"))
if err != nil {
return err
}
if err := json.NewEncoder(f).Encode(struct{ Packages []string }{packages}); err != nil {
return err
}
f.Close()
pages := int(math.Ceil(float64(len(pointers)) / float64(resultsPerPage)))
for page := 0; page < pages; page++ {
start := page * resultsPerPage
end := (page + 1) * resultsPerPage
if end > len(pointers) {
end = len(pointers)
}
name := filepath.Join(dir, fmt.Sprintf("page_%d.json", page))
log.Printf("[%s] writing %q\n", queryid, name)
f, err := os.Create(name)
if err != nil {
return err
}
if err := writeFromPointers(queryid, f, pointers[start:end]); err != nil {
f.Close()
return err
}
f.Close()
}
// Now save the results into their package-specific files.
byPkgSortingStarted := time.Now()
bypkg := make(map[string][]resultPointer)
for _, pointer := range pointers {
pkg := *pointer.packageName
@@ -747,41 +722,12 @@ func writeToDisk(queryid string) error {
pkgresults = append(pkgresults, pointer)
bypkg[name] = pkgresults
}
perPkgPages := int(math.Ceil(float64(len(s.allPackagesSorted)) / float64(packagesPerPage)))
for page := 0; page < perPkgPages; page++ {
start := page * packagesPerPage
end := (page + 1) * packagesPerPage
if end > len(s.allPackagesSorted) {
end = len(s.allPackagesSorted)
}
name := filepath.Join(dir, fmt.Sprintf("perpackage_2_page_%d.json", page))
log.Printf("[%s] writing %q\n", queryid, name)
f, err := os.Create(name)
if err != nil {
return err
}
f.Write([]byte("["))
for idx, pkg := range s.allPackagesSorted[start:end] {
if idx == 0 {
fmt.Fprintf(f, `{"Package": "%s", "Results":`, pkg)
} else {
fmt.Fprintf(f, `,{"Package": "%s", "Results":`, pkg)
}
if err := writeFromPointers(queryid, f, bypkg[pkg]); err != nil {
f.Close()
return err
}
f.Write([]byte("}"))
}
f.Write([]byte("]"))
f.Close()
}
log.Printf("[%s] by-pkg sorting done (%v).\n", queryid, time.Since(byPkgSortingStarted))
stateMu.Lock()
s = state[queryid]
s.resultPointers = pointers
s.resultPointersByPkg = bypkg
s.resultPages = pages
state[queryid] = s
stateMu.Unlock()
View
@@ -71,6 +71,7 @@ server {
gzip on;
gzip_comp_level 6;
gzip_types *;
gzip_proxied any;
location /nginx_status {
auth_basic off;
@@ -93,15 +94,14 @@ server {
proxy_pass http://dcsweb;
}
# We serve these perfectly cacheable JSON result files directly via nginx.
location ~ ^/results/(.*)/(.*\.json)$ {
alias /dcs-ssd/query_results/$1/$2;
autoindex on;
expires 1h;
}
location ~ ^/(perpackage-)?results/ {
limit_req zone=one burst=3 nodelay;
set $cache_key $scheme$host$uri$is_args$args$http_accept_encoding;
proxy_cache main;
proxy_cache_key $cache_key;
proxy_cache_valid 1h;
proxy_pass http://dcsweb;
}

0 comments on commit c744b23

Please sign in to comment.