From e3372360e8048f74959bc5cad08d31ad19805cda Mon Sep 17 00:00:00 2001 From: Dustin Sallings Date: Fri, 17 Feb 2012 00:13:39 -0800 Subject: [PATCH] Report unhandled bytes. --- .gitignore | 3 +++ pktreader.go | 55 ++++++++++++++++++++++++++++++++++++---------------- 2 files changed, 41 insertions(+), 17 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c1452d4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +#* +*~ +/pktreader diff --git a/pktreader.go b/pktreader.go index a8938f3..4492ca3 100644 --- a/pktreader.go +++ b/pktreader.go @@ -21,9 +21,17 @@ var timeScale *float64 = flag.Float64("timescale", 1.0, const channelSize = 10000 +var childrenWG = sync.WaitGroup{} + +type reportMsg struct { + final bool + op gomemcached.CommandCode + dnu uint64 +} + type bytesource struct { ch <-chan []byte - reporter chan<- gomemcached.CommandCode + reporter chan<- reportMsg current []byte } @@ -40,40 +48,43 @@ func (b *bytesource) Read(out []byte) (int, error) { return copied, nil } -func NewByteSource(from <-chan []byte, rchan chan<- gomemcached.CommandCode) *bytesource { +func NewByteSource(from <-chan []byte, rchan chan<- reportMsg) *bytesource { return &bytesource{ch: from, reporter: rchan} } -func readUntil(r *bufio.Reader, b byte) error { +func readUntil(r *bufio.Reader, b byte) (skipped uint64, err error) { one := []byte{0} for { + var bytes []byte bytes, err := r.Peek(1) if err != nil { - return err + return skipped, err } if len(bytes) == 1 && bytes[0] == b { - return nil + return skipped, nil } n, err := r.Read(one) if err != nil { - return err + return skipped, err } + skipped += uint64(n) if n == 1 && one[0] == b { - return nil + return skipped, nil } } panic("Unreachable") - return nil } func processRequest(name string, ch *bytesource, req *gomemcached.MCRequest) { // fmt.Printf("from %v: %v\n", name, pkt) - ch.reporter <- req.Opcode + ch.reporter <- reportMsg{op: req.Opcode} } func consumer(name string, ch *bytesource) { + defer childrenWG.Done() msgs := 0 rd := bufio.NewReader(ch) + dnu := uint64(0) ever := true for ever { pkt, err := memcached.ReadPacket(rd) @@ -82,7 +93,7 @@ func consumer(name string, ch *bytesource) { switch pkt.Opcode { case gomemcached.GET, gomemcached.SET, gomemcached.GETQ, gomemcached.SETQ, gomemcached.DELETE: - if len(pkt.Key) == 0 { + if len(pkt.Key) > 16 || len(pkt.Key) < 4 { fmt.Printf("Weird invalid looking packet: %v\n", pkt) } else { processRequest(name, ch, &pkt) @@ -94,7 +105,8 @@ func consumer(name string, ch *bytesource) { msgs++ default: // fmt.Printf("recovering from error: %v\n", err) - err = readUntil(rd, gomemcached.REQ_MAGIC) + skipped, err := readUntil(rd, gomemcached.REQ_MAGIC) + dnu += skipped if err != nil { ever = false if err != io.EOF { @@ -105,13 +117,13 @@ func consumer(name string, ch *bytesource) { ever = false } } - dnu := uint64(0) // Just read the thing to completion. for bytes := range ch.ch { dnu += uint64(len(bytes)) } fmt.Printf("Completed %d messages, did not understand %s from %s\n", msgs, humanize.Bytes(dnu), name) + ch.reporter <- reportMsg{final: true, dnu: dnu} } func syncTime(pktTime, firstPacket, localStart time.Time) { @@ -125,7 +137,7 @@ func syncTime(pktTime, firstPacket, localStart time.Time) { } } -func stream(filename string, rchan chan<- gomemcached.CommandCode) { +func stream(filename string, rchan chan<- reportMsg) { h, err := pcap.Openoffline(filename) if h == nil { fmt.Printf("Openoffline(%s) failed: %s\n", filename, err) @@ -160,6 +172,7 @@ func stream(filename string, rchan chan<- gomemcached.CommandCode) { ch := clients[sender] if ch == nil { ch = make(chan []byte, channelSize) + childrenWG.Add(1) go consumer(sender, NewByteSource(ch, rchan)) clients[sender] = ch // fmt.Printf("Inferred connect from " + sender + "\n") @@ -182,10 +195,15 @@ func stream(filename string, rchan chan<- gomemcached.CommandCode) { } } -func report(ch <-chan gomemcached.CommandCode, wg *sync.WaitGroup) { +func report(ch <-chan reportMsg, wg *sync.WaitGroup) { counts := [256]uint64{} - for cc := range ch { - counts[int(cc)]++ + var dnu uint64 + for msg := range ch { + if msg.final { + dnu += msg.dnu + } else { + counts[int(msg.op)]++ + } } tw := tabwriter.NewWriter(os.Stdout, 8, 4, 2, ' ', 0) @@ -197,16 +215,19 @@ func report(ch <-chan gomemcached.CommandCode, wg *sync.WaitGroup) { } tw.Flush() + fmt.Printf("Did not understand %s bytes\n", humanize.Bytes(dnu)) + wg.Done() } func main() { flag.Parse() - reportchan := make(chan gomemcached.CommandCode, 100000) + reportchan := make(chan reportMsg, 100000) wg := sync.WaitGroup{} wg.Add(1) go report(reportchan, &wg) stream(flag.Arg(0), reportchan) + childrenWG.Wait() close(reportchan) wg.Wait() }