Skip to content
Browse files

Add log replay.

  • Loading branch information...
1 parent 58d744f commit 2fb264ecbfaef325bdfa964d02a5bacc01d8f135 @dustin dustin committed Jun 7, 2012
Showing with 172 additions and 15 deletions.
  1. +101 −0 replay.go
  2. +71 −15 vbmap.go
View
101 replay.go
@@ -0,0 +1,101 @@
+package main
+
+import (
+ "compress/gzip"
+ "encoding/json"
+ "io"
+ "log"
+ "os"
+ "time"
+
+ "github.com/couchbaselabs/go-couchbase"
+ "github.com/dustin/replaykit"
+)
+
+type replayEvent struct {
+ All map[string]map[string]interface{}
+ Bucket couchbase.Bucket `json:"bucket-data"`
+ Timings map[string]map[string]interface{}
+ VBDetails map[string]map[string]interface{} `json:"vbucket-details"`
+ Timestamp time.Time `json:"ts"`
+}
+
+type playbackState struct {
+ statech chan *replayEvent
+ reqch chan chan *replayEvent
+
+ st *replayEvent
+}
+
+func (c *playbackState) loop() {
+ for {
+ select {
+ case c.st = <-c.statech:
+ case req := <-c.reqch:
+ req <- c.st
+ }
+ }
+}
+
+func (c *playbackState) current() *replayEvent {
+ ch := make(chan *replayEvent)
+ c.reqch <- ch
+ return <-ch
+}
+
+var currentState playbackState
+
+func (r *replayEvent) TS() time.Time {
+ return r.Timestamp
+}
+
+type replaySource struct {
+ d *json.Decoder
+}
+
+func (r *replaySource) Next() replay.Event {
+ rv := replayEvent{}
+ err := r.d.Decode(&rv)
+ if err != nil {
+ if err != io.EOF {
+ log.Printf("Error decoding: %v", err)
+ }
+ return nil
+ }
+ return &rv
+}
+
+type replayAction struct {
+}
+
+func (r *replayAction) Process(ev replay.Event) {
+ re := ev.(*replayEvent)
+ log.Printf("Got thing as of %v", re.TS())
+ currentState.statech <- re
+}
+
+func replayFile(replaySpeed float64, path string) {
+ r := replay.New(replaySpeed)
+ f, err := os.Open(path)
+ maybefatal(err, "Error opening replay data: %v", err)
+ defer f.Close()
+ g, err := gzip.NewReader(f)
+ maybefatal(err, "Error starting decompression stream: %v", err)
+
+ rs := &replaySource{json.NewDecoder(g)}
+ a := &replayAction{}
+
+ r.Run(rs, a)
+
+}
+
+func startReplay(replaySpeed float64, path string) {
+ currentState.statech = make(chan *replayEvent)
+ currentState.reqch = make(chan chan *replayEvent)
+
+ go currentState.loop()
+
+ for {
+ replayFile(replaySpeed, path)
+ }
+}
View
86 vbmap.go
@@ -86,13 +86,7 @@ func getBucket(req *http.Request) *couchbase.Bucket {
return bucket
}
-func mapHandler(w http.ResponseWriter, req *http.Request) {
- w.Header().Set("Content-type", "application/json")
- w.Header().Set("Access-Control-Allow-Origin", "*")
-
- bucket := getBucket(req)
- defer bucket.Close()
-
+func displayMap(w http.ResponseWriter, req *http.Request, bucket *couchbase.Bucket) {
commonSuffix := bucket.CommonAddressSuffix()
commonSuffixMC := couchbase.FindCommonSuffix(bucket.VBucketServerMap.ServerList)
@@ -114,14 +108,20 @@ func mapHandler(w http.ResponseWriter, req *http.Request) {
}
}
-type vbstats map[string]map[string]interface{}
+func mapHandler(w http.ResponseWriter, req *http.Request) {
+ w.Header().Set("Content-type", "application/json")
+ w.Header().Set("Access-Control-Allow-Origin", "*")
-func getVbStats(bucket *couchbase.Bucket, commonSuffixMC string) map[string]vbstats {
+ bucket := getBucket(req)
+ defer bucket.Close()
+}
- allstats := bucket.GetStats("vbucket-details")
+type vbstats map[string]map[string]interface{}
+func processVBDetails(vbd map[string]map[string]string,
+ commonSuffixMC string) map[string]vbstats {
rv := map[string]vbstats{}
- for fullname, m := range allstats {
+ for fullname, m := range vbd {
sn := couchbase.CleanupHost(fullname, commonSuffixMC)
rv[sn] = vbstats{}
@@ -146,6 +146,10 @@ func getVbStats(bucket *couchbase.Bucket, commonSuffixMC string) map[string]vbst
return rv
}
+func getVbStats(bucket *couchbase.Bucket, commonSuffixMC string) map[string]vbstats {
+ return processVBDetails(bucket.GetStats("vbucket-details"), commonSuffixMC)
+}
+
func vbHandler(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-type", "application/json")
w.Header().Set("Access-Control-Allow-Origin", "*")
@@ -205,9 +209,51 @@ func files(contentType string, paths ...string) handler {
}
}
+func replaymapHandler(w http.ResponseWriter, req *http.Request) {
+ re := currentState.current()
+ displayMap(w, req, &re.Bucket)
+}
+
+func replayvbHandler(w http.ResponseWriter, req *http.Request) {
+ w.Header().Set("Content-type", "application/json")
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+
+ re := currentState.current()
+
+ conv := map[string]map[string]string{}
+ for s, m := range re.VBDetails {
+ out, ok := conv[s]
+ if !ok {
+ out = make(map[string]string)
+ conv[s] = out
+ }
+ for k, v := range m {
+ out[k] = fmt.Sprintf("%v", v)
+ }
+ }
+
+ vbd := processVBDetails(conv, "")
+
+ req.ParseForm()
+ var_name := req.FormValue("name")
+
+ if var_name != "" {
+ fmt.Fprintf(w, "var "+var_name+" = ")
+ }
+ err := json.NewEncoder(w).Encode(vbd)
+ maybefatal(err, "Error encoding output: %v", err)
+ if var_name != "" {
+ fmt.Fprintf(w, ";")
+ }
+}
+
func main() {
staticPath := flag.Bool("static", false,
"Interpret URL as a static path (for testing)")
+ replayPath := flag.Bool("replay", false,
+ "Provide a replay json.gz for /map and /vb")
+ replaySpeed := flag.Float64("replaySpeed", 1.0,
+ "Realtime multiplier for replay")
flag.Parse()
http.HandleFunc("/", files("text/html", "root.html"))
@@ -216,12 +262,22 @@ func main() {
})
http.Handle("/static/", http.FileServer(http.Dir(".")))
- if *staticPath {
- http.HandleFunc("/map", files("application/json", flag.Args()...))
- http.HandleFunc("/vb", files("application/json", flag.Args()...))
- } else {
+ if *staticPath && *replayPath {
+ log.Fatalf("Static and replay paths are mutually exclusive.")
+ }
+
+ switch {
+ default:
http.HandleFunc("/map", mapHandler)
http.HandleFunc("/vb", vbHandler)
+ case *staticPath:
+ http.HandleFunc("/map", files("application/json", flag.Args()...))
+ http.HandleFunc("/vb", files("application/json", flag.Args()...))
+ case *replayPath:
+ go startReplay(*replaySpeed, flag.Arg(0))
+ http.HandleFunc("/map", replaymapHandler)
+ http.HandleFunc("/vb", replayvbHandler)
}
+
log.Fatal(http.ListenAndServe(":4444", nil))
}

0 comments on commit 2fb264e

Please sign in to comment.
Something went wrong with that request. Please try again.