Skip to content

Commit

Permalink
Report unhandled bytes.
Browse files Browse the repository at this point in the history
  • Loading branch information
dustin committed Feb 17, 2012
1 parent ef602c1 commit e337236
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 17 deletions.
3 changes: 3 additions & 0 deletions .gitignore
@@ -0,0 +1,3 @@
#*
*~
/pktreader
55 changes: 38 additions & 17 deletions pktreader.go
Expand Up @@ -21,9 +21,17 @@ var timeScale *float64 = flag.Float64("timescale", 1.0,

const channelSize = 10000

var childrenWG = sync.WaitGroup{}

type reportMsg struct {
final bool
op gomemcached.CommandCode
dnu uint64
}

type bytesource struct {
ch <-chan []byte
reporter chan<- gomemcached.CommandCode
reporter chan<- reportMsg
current []byte
}

Expand All @@ -40,40 +48,43 @@ func (b *bytesource) Read(out []byte) (int, error) {
return copied, nil
}

func NewByteSource(from <-chan []byte, rchan chan<- gomemcached.CommandCode) *bytesource {
func NewByteSource(from <-chan []byte, rchan chan<- reportMsg) *bytesource {
return &bytesource{ch: from, reporter: rchan}
}

func readUntil(r *bufio.Reader, b byte) error {
func readUntil(r *bufio.Reader, b byte) (skipped uint64, err error) {
one := []byte{0}
for {
var bytes []byte
bytes, err := r.Peek(1)
if err != nil {
return err
return skipped, err
}
if len(bytes) == 1 && bytes[0] == b {
return nil
return skipped, nil
}
n, err := r.Read(one)
if err != nil {
return err
return skipped, err
}
skipped += uint64(n)
if n == 1 && one[0] == b {
return nil
return skipped, nil
}
}
panic("Unreachable")
return nil
}

func processRequest(name string, ch *bytesource, req *gomemcached.MCRequest) {
// fmt.Printf("from %v: %v\n", name, pkt)
ch.reporter <- req.Opcode
ch.reporter <- reportMsg{op: req.Opcode}
}

func consumer(name string, ch *bytesource) {
defer childrenWG.Done()
msgs := 0
rd := bufio.NewReader(ch)
dnu := uint64(0)
ever := true
for ever {
pkt, err := memcached.ReadPacket(rd)
Expand All @@ -82,7 +93,7 @@ func consumer(name string, ch *bytesource) {
switch pkt.Opcode {
case gomemcached.GET, gomemcached.SET, gomemcached.GETQ,
gomemcached.SETQ, gomemcached.DELETE:
if len(pkt.Key) == 0 {
if len(pkt.Key) > 16 || len(pkt.Key) < 4 {
fmt.Printf("Weird invalid looking packet: %v\n", pkt)
} else {
processRequest(name, ch, &pkt)
Expand All @@ -94,7 +105,8 @@ func consumer(name string, ch *bytesource) {
msgs++
default:
// fmt.Printf("recovering from error: %v\n", err)
err = readUntil(rd, gomemcached.REQ_MAGIC)
skipped, err := readUntil(rd, gomemcached.REQ_MAGIC)
dnu += skipped
if err != nil {
ever = false
if err != io.EOF {
Expand All @@ -105,13 +117,13 @@ func consumer(name string, ch *bytesource) {
ever = false
}
}
dnu := uint64(0)
// Just read the thing to completion.
for bytes := range ch.ch {
dnu += uint64(len(bytes))
}
fmt.Printf("Completed %d messages, did not understand %s from %s\n",
msgs, humanize.Bytes(dnu), name)
ch.reporter <- reportMsg{final: true, dnu: dnu}
}

func syncTime(pktTime, firstPacket, localStart time.Time) {
Expand All @@ -125,7 +137,7 @@ func syncTime(pktTime, firstPacket, localStart time.Time) {
}
}

func stream(filename string, rchan chan<- gomemcached.CommandCode) {
func stream(filename string, rchan chan<- reportMsg) {
h, err := pcap.Openoffline(filename)
if h == nil {
fmt.Printf("Openoffline(%s) failed: %s\n", filename, err)
Expand Down Expand Up @@ -160,6 +172,7 @@ func stream(filename string, rchan chan<- gomemcached.CommandCode) {
ch := clients[sender]
if ch == nil {
ch = make(chan []byte, channelSize)
childrenWG.Add(1)
go consumer(sender, NewByteSource(ch, rchan))
clients[sender] = ch
// fmt.Printf("Inferred connect from " + sender + "\n")
Expand All @@ -182,10 +195,15 @@ func stream(filename string, rchan chan<- gomemcached.CommandCode) {
}
}

func report(ch <-chan gomemcached.CommandCode, wg *sync.WaitGroup) {
func report(ch <-chan reportMsg, wg *sync.WaitGroup) {
counts := [256]uint64{}
for cc := range ch {
counts[int(cc)]++
var dnu uint64
for msg := range ch {
if msg.final {
dnu += msg.dnu
} else {
counts[int(msg.op)]++
}
}

tw := tabwriter.NewWriter(os.Stdout, 8, 4, 2, ' ', 0)
Expand All @@ -197,16 +215,19 @@ func report(ch <-chan gomemcached.CommandCode, wg *sync.WaitGroup) {
}
tw.Flush()

fmt.Printf("Did not understand %s bytes\n", humanize.Bytes(dnu))

wg.Done()
}

func main() {
flag.Parse()
reportchan := make(chan gomemcached.CommandCode, 100000)
reportchan := make(chan reportMsg, 100000)
wg := sync.WaitGroup{}
wg.Add(1)
go report(reportchan, &wg)
stream(flag.Arg(0), reportchan)
childrenWG.Wait()
close(reportchan)
wg.Wait()
}

0 comments on commit e337236

Please sign in to comment.