Skip to content

Commit

Permalink
Merge pull request #201 from buger/tcp-gc
Browse files Browse the repository at this point in the history
Improve raw input scheduler
  • Loading branch information
buger committed Sep 2, 2015
2 parents cd83e6f + f378def commit ed94985
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 73 deletions.
2 changes: 1 addition & 1 deletion http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewHTTPClient(baseURL string, config *HTTPClientConfig) *HTTPClient {
}

if config.ResponseBufferSize == 0 {
config.ResponseBufferSize = 512 * 1024 // 500kb
config.ResponseBufferSize = 100 * 1024 // 100kb
}

client := new(HTTPClient)
Expand Down
4 changes: 2 additions & 2 deletions input_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func (i *RAWInput) Read(data []byte) (int, error) {
var header []byte

if msg.IsIncoming {
header = payloadHeader(RequestPayload, msg.UUID(), msg.Start)
header = payloadHeader(RequestPayload, msg.UUID(), msg.Start.UnixNano())
} else {
header = payloadHeader(ResponsePayload, msg.UUID(), msg.End-msg.RequestStart)
header = payloadHeader(ResponsePayload, msg.UUID(), msg.End.UnixNano()-msg.RequestStart.UnixNano())
}

copy(data[0:len(header)], header)
Expand Down
4 changes: 4 additions & 0 deletions proto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ var httpMethods []string = []string{
}

func IsHTTPPayload(payload []byte) bool {
if len(payload) < 4 {
return false
}

method := string(payload[0:4])

for _, m := range httpMethods {
Expand Down
50 changes: 33 additions & 17 deletions raw_socket_listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ type Listener struct {
// Messages ready to be send to client
messagesChan chan *TCPMessage

// Used for notifications about completed or expired messages
messageDelChan chan *TCPMessage

addr string // IP to listen
port uint16 // Port to listen

Expand All @@ -57,7 +54,7 @@ type Listener struct {
}

type request struct {
start int64
start time.Time
ack uint32
}

Expand All @@ -67,7 +64,6 @@ func NewListener(addr string, port string, expire time.Duration, captureResponse

l.packetsChan = make(chan *TCPPacket, 10000)
l.messagesChan = make(chan *TCPMessage, 10000)
l.messageDelChan = make(chan *TCPMessage, 10000)
l.quit = make(chan bool)

l.messages = make(map[string]*TCPMessage)
Expand All @@ -92,28 +88,44 @@ func NewListener(addr string, port string, expire time.Duration, captureResponse
}

func (t *Listener) listen() {
gcTicker := time.Tick(t.messageExpire / 2)

for {
select {
case <-t.quit:
t.conn.Close()
return
// If message ready for deletion it means that its also complete or expired by timeout
case message := <-t.messageDelChan:
delete(t.ackAliases, message.Ack)
delete(t.messages, message.ID)
// We need to use channels to process each packet to avoid data races
case packet := <-t.packetsChan:
t.processTCPPacket(packet)

if !message.IsIncoming {
delete(t.respAliases, message.Ack)
case <- gcTicker:
now := time.Now()

for _, message := range t.messages {
if now.Sub(message.Start) >= t.messageExpire {
t.dispatchMessage(message)
}
}
}
}
}

t.messagesChan <- message
func (t *Listener) dispatchMessage(message *TCPMessage) {
delete(t.ackAliases, message.Ack)
delete(t.messages, message.ID)

// We need to use channels to process each packet to avoid data races
case packet := <-t.packetsChan:
t.processTCPPacket(packet)
if !message.IsIncoming {
delete(t.respAliases, message.Ack)

// Do not track responses which have no associated requests
if message.RequestAck == 0 {
return
}
}
t.messagesChan <- message
}

func (t *Listener) readRAWSocket() {
conn, e := net.ListenPacket("ip4:tcp", t.addr)
t.conn = conn
Expand Down Expand Up @@ -212,8 +224,7 @@ func (t *Listener) processTCPPacket(packet *TCPPacket) {
message, ok := t.messages[mID]

if !ok {
// We sending messageDelChan channel, so message object can communicate with Listener and notify it if message completed
message = NewTCPMessage(mID, t.messageDelChan, packet.Ack, &t.messageExpire, isIncoming)
message = NewTCPMessage(mID, packet.Ack, isIncoming)
t.messages[mID] = message

if !isIncoming && responseRequest != nil {
Expand Down Expand Up @@ -246,6 +257,11 @@ func (t *Listener) processTCPPacket(packet *TCPPacket) {

// Adding packet to message
message.AddPacket(packet)

// If message contains only single packet immediately dispatch it
if !message.IsMultipart() {
t.dispatchMessage(message)
}
}

// Receive TCP messages from the listener channel
Expand Down
68 changes: 15 additions & 53 deletions raw_socket_listener/tcp_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/buger/gor/proto"
"log"
"strconv"
"sync"
"time"
)

Expand All @@ -21,47 +20,25 @@ type TCPMessage struct {
ID string // Message ID
Ack uint32
ResponseAck uint32
RequestStart int64
RequestStart time.Time
RequestAck uint32
Start int64
End int64
Start time.Time
End time.Time
IsIncoming bool

packets []*TCPPacket

timer *time.Timer // Used for expire check

delChan chan *TCPMessage

expire *time.Duration

mu sync.Mutex
}

// NewTCPMessage pointer created from a Acknowledgment number and a channel of messages readuy to be deleted
func NewTCPMessage(ID string, delChan chan *TCPMessage, Ack uint32, expire *time.Duration, IsIncoming bool) (msg *TCPMessage) {
msg = &TCPMessage{ID: ID, Ack: Ack, expire: expire, IsIncoming: IsIncoming}
msg.Start = time.Now().UnixNano()
msg.delChan = delChan // used for notifying that message completed or expired
func NewTCPMessage(ID string, Ack uint32, IsIncoming bool) (msg *TCPMessage) {
msg = &TCPMessage{ID: ID, Ack: Ack, IsIncoming: IsIncoming}
msg.Start = time.Now()

return
}

// Timeout notifies message to stop listening, close channel and message ready to be sent
func (t *TCPMessage) Timeout() {
t.mu.Lock()
if t.timer != nil {
t.timer.Stop()
}
t.mu.Unlock()

// Notify RAWListener that message is ready to be send to replay server
// Responses without requests gets discarded
if t.IsIncoming || t.RequestStart != 0 {
t.delChan <- t
}
}

// Bytes return message content
func (t *TCPMessage) Bytes() (output []byte) {
for _, p := range t.packets {
Expand All @@ -73,7 +50,9 @@ func (t *TCPMessage) Bytes() (output []byte) {

// Size returns total size of message
func (t *TCPMessage) Size() (size int) {
for _, p := range t.packets {
size += len(proto.Body(t.packets[0].Data))

for _, p := range t.packets[1:] {
size += len(p.Data)
}

Expand Down Expand Up @@ -102,29 +81,12 @@ func (t *TCPMessage) AddPacket(packet *TCPPacket) {
t.packets = append([]*TCPPacket{packet}, t.packets...)
}

t.End = time.Now().UnixNano()
}

if !t.isMultipart() {
t.Timeout()
} else {
t.mu.Lock()
// If more then 1 packet, wait for more, and set expiration
if len(t.packets) == 1 {
// Every time we receive packet we reset this timer
t.timer = time.AfterFunc(*t.expire, t.Timeout)
} else {
// Reset message timeout timer
if t.timer != nil {
t.timer.Reset(*t.expire)
}
}
t.mu.Unlock()
t.End = time.Now()
}
}

// isMultipart returns true if message contains from multiple tcp packets
func (t *TCPMessage) isMultipart() bool {
func (t *TCPMessage) IsMultipart() bool {
if len(t.packets) > 1 {
return true
}
Expand All @@ -143,7 +105,7 @@ func (t *TCPMessage) isMultipart() bool {
l, _ := strconv.Atoi(string(length))

// If content-length equal current body length
if l > 0 && l == len(proto.Body(payload)) {
if l > 0 && l == t.Size() {
return false
}
}
Expand All @@ -158,7 +120,7 @@ func (t *TCPMessage) isMultipart() bool {
l, _ := strconv.Atoi(string(length))

// If content-length equal current body length
if l > 0 && l == len(proto.Body(payload)) {
if l > 0 && l == t.Size() {
return false
}
}
Expand All @@ -171,10 +133,10 @@ func (t *TCPMessage) UUID() []byte {
var key []byte

if t.IsIncoming {
key = strconv.AppendInt(key, t.Start, 10)
key = strconv.AppendInt(key, t.Start.UnixNano(), 10)
key = strconv.AppendUint(key, uint64(t.Ack), 10)
} else {
key = strconv.AppendInt(key, t.RequestStart, 10)
key = strconv.AppendInt(key, t.RequestStart.UnixNano(), 10)
key = strconv.AppendUint(key, uint64(t.RequestAck), 10)
}

Expand Down

0 comments on commit ed94985

Please sign in to comment.