Skip to content

Commit

Permalink
Send requests to a real server.
Browse files Browse the repository at this point in the history
  • Loading branch information
dustin committed Feb 17, 2012
1 parent cbf8794 commit f5e1484
Showing 1 changed file with 32 additions and 2 deletions.
34 changes: 32 additions & 2 deletions pktreader.go
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/dustin/go-humanize"
"github.com/dustin/gomemcached"
mc "github.com/dustin/gomemcached/client"
"github.com/dustin/gomemcached/server"
"github.com/dustin/gopcap"
)
Expand All @@ -23,6 +24,8 @@ var packetRecovery *bool = flag.Bool("recover", true,
"Attempt to recover from corrupt memcached streams")
var maxBodyLen *uint = flag.Uint("maxBodyLen", uint(memcached.MaxBodyLen),
"Maximum body length of a valid packet")
var server *string = flag.String("server", "localhost:11211",
"memcached server to connect to")

const channelSize = 10000

Expand Down Expand Up @@ -80,7 +83,12 @@ func readUntil(r *bufio.Reader, b byte) (skipped uint64, err error) {
panic("Unreachable")
}

func processRequest(name string, ch *bytesource, req *gomemcached.MCRequest) {
func processRequest(name string, ch *bytesource, req *gomemcached.MCRequest,
client *mc.Client) {
// log.Printf("Transmitting %v", *req)
if client != nil {
client.Transmit(req)
}
// log.Printf("from %v: %v", name, pkt)
ch.reporter <- reportMsg{op: req.Opcode}
}
Expand Down Expand Up @@ -112,8 +120,30 @@ func looksValid(req *gomemcached.MCRequest) bool {
return true
}

func mcResponseConsumer(client *mc.Client) {
for {
res := client.Receive()
if res.Status != 0 {
log.Printf("Memcached error: %v", res)
}
}
}

func consumer(name string, ch *bytesource) {
defer childrenWG.Done()

var client *mc.Client
if *server != "" {
var err error
client, err = mc.Connect("tcp", *server)
if err == nil {
defer client.Close()
go mcResponseConsumer(client)
} else {
log.Printf("Error connecting to memcached server: %v", err)
}
}

msgs := 0
rd := bufio.NewReader(ch)
dnu := uint64(0)
Expand All @@ -123,7 +153,7 @@ func consumer(name string, ch *bytesource) {
switch {
case err == nil:
if looksValid(&pkt) {
processRequest(name, ch, &pkt)
processRequest(name, ch, &pkt, client)
} else {
log.Printf("Invalid request found: %v", pkt)
}
Expand Down

0 comments on commit f5e1484

Please sign in to comment.