Skip to content
This repository has been archived by the owner on Jan 26, 2022. It is now read-only.

Correctly handle closed TCP connection #57

Merged
merged 2 commits into from
Sep 3, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## 1.1.0 (unreleased)
= [PR #57](https://github.com/ekanite/ekanite/pull/57): Correctly handle closed TCP connections.
- [PR #37](https://github.com/ekanite/ekanite/pull/37): Move debug var support to dedicated HTTP server.
- [PR #36](https://github.com/ekanite/ekanite/pull/36): Remove noisy log message.
- [PR #52](https://github.com/ekanite/ekanite/pull/52): Close file descriptor after listing shards in directory.
Expand Down
37 changes: 37 additions & 0 deletions cmd/ekanited/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,43 @@ func (s *testSystem) IngestConn() net.Conn {
return conn
}

// Test_SingleLineTimeout ensures a log line is detected without closing the connection.
func Test_SingleLineTimeout(t *testing.T) {
path := tempPath()
defer os.RemoveAll(path)
sys := NewSystem(path)
ingestConn := sys.IngestConn()

line := "<33>5 1985-04-12T23:20:50.52Z test.com cron 304 - password rejected"
n, err := ingestConn.Write([]byte(line))
if err != nil {
t.Fatalf("failed to write '%s' to Collector: %s", line, err.Error())
}
if n != len(line) {
t.Fatalf("insufficient bytes written to Collector, exp: %d, wrote: %d", len(line), n)
}
sys.e.waitForCount(1)
}

// Test_SingleLineClose ensures a log line is detected after closing a connection.
func Test_SingleLineClose(t *testing.T) {
path := tempPath()
defer os.RemoveAll(path)
sys := NewSystem(path)
ingestConn := sys.IngestConn()

line := "<33>5 1985-04-12T23:20:50.52Z test.com cron 304 - password rejected"
n, err := ingestConn.Write([]byte(line))
if err != nil {
t.Fatalf("failed to write '%s' to Collector: %s", line, err.Error())
}
if n != len(line) {
t.Fatalf("insufficient bytes written to Collector, exp: %d, wrote: %d", len(line), n)
}
ingestConn.Close()
sys.e.waitForCount(1)
}

// Test_EndToEnd ensures a complete system operates as expected.
func Test_EndToEnd(t *testing.T) {
path := tempPath()
Expand Down
11 changes: 9 additions & 2 deletions input/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,18 +123,20 @@ func (s *TCPCollector) handleConnection(conn net.Conn, c chan<- *Event) {
stats.Add("tcpConnReadError", 1)
if neterr, ok := err.(net.Error); ok && neterr.Timeout() {
stats.Add("tcpConnReadTimeout", 1)
log, match = delimiter.Vestige()
} else if err == io.EOF {
stats.Add("tcpConnReadEOF", 1)
log, match = delimiter.Vestige()
} else {
stats.Add("tcpConnUnrecoverError", 1)
return
}

log, match = delimiter.Vestige()
} else {
stats.Add("tcpBytesRead", 1)
log, match = delimiter.Push(b)
}

// Log line available?
if match {
stats.Add("tcpEventsRx", 1)
if s.parser.Parse(bytes.NewBufferString(log).Bytes()) {
Expand All @@ -147,6 +149,11 @@ func (s *TCPCollector) handleConnection(conn net.Conn, c chan<- *Event) {
}
}
}

// Was the connection closed?
if err == io.EOF {
return
}
}
}

Expand Down