Skip to content

Commit

Permalink
Refactor logging in pgsql module (elastic#12151)
Browse files Browse the repository at this point in the history
Guard debug logging statements with "isDebug" checks. And switch the module over to using named loggers.

Fixes elastic#12150

(cherry picked from commit 056d921)
  • Loading branch information
andrewkroh committed May 21, 2019
1 parent b498b33 commit 40a2a9d
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 66 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Expand Up @@ -59,6 +59,7 @@ https://github.com/elastic/beats/compare/v6.7.2...6.8[Check the HEAD diff]
*Packetbeat*

- Fixed a memory leak when using process monitoring under Windows. {pull}12100[12100]
- Improved debug logging efficiency in PGQSL module. {issue}12150[12150]

*Winlogbeat*

Expand Down
98 changes: 47 additions & 51 deletions packetbeat/protos/pgsql/parse.go
Expand Up @@ -22,7 +22,6 @@ import (
"strings"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

var (
Expand All @@ -34,7 +33,7 @@ var (
)

func (pgsql *pgsqlPlugin) pgsqlMessageParser(s *pgsqlStream) (bool, bool) {
debugf("pgsqlMessageParser, off=%v", s.parseOffset)
pgsql.debugf("pgsqlMessageParser, off=%v", s.parseOffset)

var ok, complete bool

Expand All @@ -46,22 +45,22 @@ func (pgsql *pgsqlPlugin) pgsqlMessageParser(s *pgsqlStream) (bool, bool) {
case pgsqlExtendedQueryState:
ok, complete = pgsql.parseMessageExtendedQuery(s)
default:
logp.Critical("Pgsql invalid parser state")
pgsql.log.Error("Pgsql invalid parser state")
}

detailedf("pgsqlMessageParser return: ok=%v, complete=%v, off=%v",
pgsql.detailf("pgsqlMessageParser return: ok=%v, complete=%v, off=%v",
ok, complete, s.parseOffset)

return ok, complete
}

func (pgsql *pgsqlPlugin) parseMessageStart(s *pgsqlStream) (bool, bool) {
detailedf("parseMessageStart")
pgsql.detailf("parseMessageStart")

m := s.message

for len(s.data[s.parseOffset:]) >= 5 {
isSpecial, length, command := isSpecialPgsqlCommand(s.data[s.parseOffset:])
isSpecial, length, command := pgsql.isSpecialCommand(s.data[s.parseOffset:])
if !isSpecial {
return pgsql.parseCommand(s)
}
Expand All @@ -71,7 +70,7 @@ func (pgsql *pgsqlPlugin) parseMessageStart(s *pgsqlStream) (bool, bool) {

// check buffer available
if len(s.data[s.parseOffset:]) <= length {
detailedf("Wait for more data 1")
pgsql.detailf("Wait for more data 1")
return true, false
}

Expand Down Expand Up @@ -103,7 +102,7 @@ func (pgsql *pgsqlPlugin) parseCommand(s *pgsqlStream) (bool, bool) {
m := s.message

// one byte reply to SSLRequest
detailedf("Reply for SSLRequest %c", typ)
pgsql.detailf("Reply for SSLRequest %c", typ)
m.start = s.parseOffset
s.parseOffset++
m.end = s.parseOffset
Expand All @@ -118,15 +117,15 @@ func (pgsql *pgsqlPlugin) parseCommand(s *pgsqlStream) (bool, bool) {
length := readLength(s.data[s.parseOffset+1:])
if length < 4 {
// length should include the size of itself (int32)
detailedf("Invalid pgsql command length.")
pgsql.detailf("Invalid pgsql command length.")
return false, false
}
if len(s.data[s.parseOffset:]) <= length {
detailedf("Wait for more data")
pgsql.detailf("Wait for more data")
return true, false
}

detailedf("Pgsql type %c, length=%d", typ, length)
pgsql.detailf("Pgsql type %c, length=%d", typ, length)

switch typ {
case 'Q':
Expand All @@ -147,7 +146,7 @@ func (pgsql *pgsqlPlugin) parseCommand(s *pgsqlStream) (bool, bool) {
return pgsql.parseExtResp(s, length)
default:
if !pgsqlValidType(typ) {
detailedf("invalid frame type: '%c'", typ)
pgsql.detailf("invalid frame type: '%c'", typ)
return false, false
}
return pgsql.parseSkipMessage(s, length)
Expand All @@ -172,7 +171,7 @@ func (pgsql *pgsqlPlugin) parseSimpleQuery(s *pgsqlStream, length int) (bool, bo
m.query = query

m.toExport = true
detailedf("Simple Query: %s", m.query)
pgsql.detailf("Simple Query: %s", m.query)
return true, true
}

Expand All @@ -184,12 +183,12 @@ func (pgsql *pgsqlPlugin) parseRowDescription(s *pgsqlStream, length int) (bool,
m.isOK = true
m.toExport = true

err := pgsqlFieldsParser(s, s.data[s.parseOffset+5:s.parseOffset+length+1])
err := pgsql.parseFields(s, s.data[s.parseOffset+5:s.parseOffset+length+1])
if err != nil {
detailedf("fields parse failed with: %v", err)
pgsql.detailf("parseFields failed with: %v", err)
return false, false
}
detailedf("Fields: %s", m.fields)
pgsql.detailf("Fields: %s", m.fields)

s.parseOffset++ //type
s.parseOffset += length //length
Expand Down Expand Up @@ -218,7 +217,7 @@ func (pgsql *pgsqlPlugin) parseEmptyQueryResponse(s *pgsqlStream) (bool, bool) {

m := s.message

detailedf("EmptyQueryResponse")
pgsql.detailf("EmptyQueryResponse")
m.start = s.parseOffset
m.isOK = true
m.isRequest = false
Expand All @@ -245,7 +244,7 @@ func (pgsql *pgsqlPlugin) parseCommandComplete(s *pgsqlStream, length int) (bool
return false, false
}

detailedf("CommandComplete length=%d, tag=%s", length, name)
pgsql.detailf("CommandComplete length=%d, tag=%s", length, name)

s.parseOffset += length
m.end = s.parseOffset
Expand All @@ -269,7 +268,7 @@ func (pgsql *pgsqlPlugin) parseReadyForQuery(s *pgsqlStream, length int) (bool,

func (pgsql *pgsqlPlugin) parseErrorResponse(s *pgsqlStream, length int) (bool, bool) {
// ErrorResponse
detailedf("ErrorResponse")
pgsql.detailf("ErrorResponse")

m := s.message
m.start = s.parseOffset
Expand All @@ -278,7 +277,7 @@ func (pgsql *pgsqlPlugin) parseErrorResponse(s *pgsqlStream, length int) (bool,
m.toExport = true

s.parseOffset++ //type
pgsqlErrorParser(s, s.data[s.parseOffset+4:s.parseOffset+length])
pgsql.parseError(s, s.data[s.parseOffset+4:s.parseOffset+length])

s.parseOffset += length //length
m.end = s.parseOffset
Expand All @@ -289,7 +288,7 @@ func (pgsql *pgsqlPlugin) parseErrorResponse(s *pgsqlStream, length int) (bool,

func (pgsql *pgsqlPlugin) parseExtReq(s *pgsqlStream, length int) (bool, bool) {
// Ready for query -> Parse for an extended query request
detailedf("Parse")
pgsql.detailf("Parse")

m := s.message
m.start = s.parseOffset
Expand All @@ -303,11 +302,11 @@ func (pgsql *pgsqlPlugin) parseExtReq(s *pgsqlStream, length int) (bool, bool) {

query, err := common.ReadString(s.data[m.start+6:])
if err != nil {
detailedf("Invalid extended query request")
pgsql.detailf("Invalid extended query request")
return false, false
}
m.query = query
detailedf("Parse in an extended query request: %s", m.query)
pgsql.detailf("Parse in an extended query request: %s", m.query)

// Ignore SET statement
if strings.HasPrefix(m.query, "SET ") {
Expand All @@ -319,7 +318,7 @@ func (pgsql *pgsqlPlugin) parseExtReq(s *pgsqlStream, length int) (bool, bool) {

func (pgsql *pgsqlPlugin) parseExtResp(s *pgsqlStream, length int) (bool, bool) {
// Sync -> Parse completion for an extended query response
detailedf("ParseCompletion")
pgsql.detailf("ParseCompletion")

m := s.message
m.start = s.parseOffset
Expand All @@ -329,7 +328,7 @@ func (pgsql *pgsqlPlugin) parseExtResp(s *pgsqlStream, length int) (bool, bool)

s.parseOffset++ //type
s.parseOffset += length
detailedf("Parse completion in an extended query response")
pgsql.detailf("Parse completion in an extended query response")
s.parseState = pgsqlGetDataState
return pgsql.parseMessageData(s)
}
Expand All @@ -349,7 +348,7 @@ func (pgsql *pgsqlPlugin) parseSkipMessage(s *pgsqlStream, length int) (bool, bo
return true, true
}

func pgsqlFieldsParser(s *pgsqlStream, buf []byte) error {
func (pgsql *pgsqlPlugin) parseFields(s *pgsqlStream, buf []byte) error {
m := s.message

if len(buf) < 2 {
Expand All @@ -359,7 +358,6 @@ func pgsqlFieldsParser(s *pgsqlStream, buf []byte) error {
// read field count (int16)
off := 2
fieldCount := readCount(buf)
detailedf("Row Description field count=%d", fieldCount)

fields := []string{}
fieldsFormat := []byte{}
Expand Down Expand Up @@ -400,8 +398,6 @@ func pgsqlFieldsParser(s *pgsqlStream, buf []byte) error {
format := common.BytesNtohs(buf[off : off+2])
off += 2
fieldsFormat = append(fieldsFormat, byte(format))

detailedf("Field name=%s, format=%d", fieldName, format)
}

if off < len(buf) {
Expand All @@ -411,13 +407,13 @@ func pgsqlFieldsParser(s *pgsqlStream, buf []byte) error {
m.fields = fields
m.fieldsFormat = fieldsFormat
if m.numberOfFields != fieldCount {
logp.Err("Missing fields from RowDescription. Expected %d. Received %d",
pgsql.log.Errorf("Missing fields from RowDescription. Expected %d. Received %d",
fieldCount, m.numberOfFields)
}
return nil
}

func pgsqlErrorParser(s *pgsqlStream, buf []byte) {
func (pgsql *pgsqlPlugin) parseError(s *pgsqlStream, buf []byte) {
m := s.message
off := 0
for off < len(buf) {
Expand All @@ -430,7 +426,7 @@ func pgsqlErrorParser(s *pgsqlStream, buf []byte) {
// read field value(string)
val, err := common.ReadString(buf[off+1:])
if err != nil {
logp.Err("Failed to read the column field")
pgsql.log.Error("Failed to read the column field")
break
}
off += len(val) + 2
Expand All @@ -444,11 +440,11 @@ func pgsqlErrorParser(s *pgsqlStream, buf []byte) {
m.errorSeverity = val
}
}
detailedf("%s %s %s", m.errorSeverity, m.errorCode, m.errorInfo)
pgsql.detailf("%s %s %s", m.errorSeverity, m.errorCode, m.errorInfo)
}

func (pgsql *pgsqlPlugin) parseMessageData(s *pgsqlStream) (bool, bool) {
detailedf("parseMessageData")
pgsql.detailf("parseMessageData")

// The response to queries that return row sets contains:
// RowDescription
Expand All @@ -466,12 +462,12 @@ func (pgsql *pgsqlPlugin) parseMessageData(s *pgsqlStream) (bool, bool) {
length := readLength(s.data[s.parseOffset+1:])
if length < 4 {
// length should include the size of itself (int32)
detailedf("Invalid pgsql command length.")
pgsql.detailf("Invalid pgsql command length.")
return false, false
}
if len(s.data[s.parseOffset:]) <= length {
// wait for more
detailedf("Wait for more data")
pgsql.detailf("Wait for more data")
return true, false
}

Expand All @@ -491,17 +487,17 @@ func (pgsql *pgsqlPlugin) parseMessageData(s *pgsqlStream) (bool, bool) {

name, err := pgsqlString(s.data[s.parseOffset+4:], length-4)
if err != nil {
detailedf("pgsql string invalid")
pgsql.detailf("pgsql string invalid")
return false, false
}

detailedf("CommandComplete length=%d, tag=%s", length, name)
pgsql.detailf("CommandComplete length=%d, tag=%s", length, name)
s.parseOffset += length
m.end = s.parseOffset
m.size = uint64(m.end - m.start)
s.parseState = pgsqlStartState

detailedf("Rows: %s", m.rows)
pgsql.detailf("Rows: %s", m.rows)

return true, true
case '2':
Expand All @@ -515,7 +511,7 @@ func (pgsql *pgsqlPlugin) parseMessageData(s *pgsqlStream) (bool, bool) {
return pgsql.parseRowDescription(s, length)
default:
// shouldn't happen -> return error
logp.Warn("Pgsql parser expected data message, but received command of type %v", typ)
pgsql.log.Warnf("Pgsql parser expected data message, but received command of type %v", typ)
s.parseState = pgsqlStartState
return false, false
}
Expand All @@ -530,7 +526,7 @@ func (pgsql *pgsqlPlugin) parseDataRow(s *pgsqlStream, buf []byte) error {
// read field count (int16)
off := 2
fieldCount := readCount(buf)
detailedf("DataRow field count=%d", fieldCount)
pgsql.detailf("DataRow field count=%d", fieldCount)

rows := []string{}
rowLength := 0
Expand All @@ -545,7 +541,7 @@ func (pgsql *pgsqlPlugin) parseDataRow(s *pgsqlStream, buf []byte) error {
off += 4

if columnLength > 0 && columnLength > len(buf[off:]) {
logp.Err("Pgsql invalid column_length=%v, buffer_length=%v, i=%v",
pgsql.log.Errorf("Pgsql invalid column_length=%v, buffer_length=%v, i=%v",
columnLength, len(buf[off:]), i)
return errInvalidLength
}
Expand All @@ -568,7 +564,7 @@ func (pgsql *pgsqlPlugin) parseDataRow(s *pgsqlStream, buf []byte) error {
rowLength += len(columnValue)
}

detailedf("Value %s, length=%d, off=%d", string(columnValue), columnLength, off)
pgsql.detailf("Value %s, length=%d, off=%d", string(columnValue), columnLength, off)
}

if off < len(buf) {
Expand All @@ -584,7 +580,7 @@ func (pgsql *pgsqlPlugin) parseDataRow(s *pgsqlStream, buf []byte) error {
}

func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool) {
detailedf("parseMessageExtendedQuery")
pgsql.detailf("parseMessageExtendedQuery")

// An extended query request contains:
// Parse
Expand All @@ -603,12 +599,12 @@ func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool)
length := readLength(s.data[s.parseOffset+1:])
if length < 4 {
// length should include the size of itself (int32)
detailedf("Invalid pgsql command length.")
pgsql.detailf("Invalid pgsql command length.")
return false, false
}
if len(s.data[s.parseOffset:]) <= length {
// wait for more
detailedf("Wait for more data")
pgsql.detailf("Wait for more data")
return true, false
}

Expand Down Expand Up @@ -647,7 +643,7 @@ func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool)
return true, true
default:
// shouldn't happen -> return error
logp.Warn("Pgsql parser expected extended query message, but received command of type %v", typ)
pgsql.log.Warnf("Pgsql parser expected extended query message, but received command of type %v", typ)
s.parseState = pgsqlStartState
return false, false
}
Expand All @@ -656,7 +652,7 @@ func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool)
return true, false
}

func isSpecialPgsqlCommand(data []byte) (bool, int, int) {
func (pgsql *pgsqlPlugin) isSpecialCommand(data []byte) (bool, int, int) {
if len(data) < 8 {
// 8 bytes required
return false, 0, 0
Expand All @@ -670,15 +666,15 @@ func isSpecialPgsqlCommand(data []byte) (bool, int, int) {

if length == 16 && code == 80877102 {
// Cancel Request
logp.Debug("pgsqldetailed", "Cancel Request, length=%d", length)
pgsql.debugf("Cancel Request, length=%d", length)
return true, length, cancelRequest
} else if length == 8 && code == 80877103 {
// SSL Request
logp.Debug("pgsqldetailed", "SSL Request, length=%d", length)
pgsql.debugf("SSL Request, length=%d", length)
return true, length, sslRequest
} else if code == 196608 {
// Startup Message
logp.Debug("pgsqldetailed", "Startup Message, length=%d", length)
pgsql.debugf("Startup Message, length=%d", length)
return true, length, startupMessage
}
return false, 0, 0
Expand Down

0 comments on commit 40a2a9d

Please sign in to comment.