From 056d921dd64cc9155def5f9cb81c7a26f5bab40a Mon Sep 17 00:00:00 2001 From: Andrew Kroh Date: Fri, 10 May 2019 09:28:29 -0400 Subject: [PATCH] Refactor logging in pgsql module (#12151) Guard debug logging statements with "isDebug" checks. And switch the module over to using named loggers. Fixes #12150 --- CHANGELOG.next.asciidoc | 1 + packetbeat/protos/pgsql/parse.go | 98 +++++++++++++++----------------- packetbeat/protos/pgsql/pgsql.go | 47 ++++++++++----- 3 files changed, 80 insertions(+), 66 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index eaa4a309c6c..4dec4b62bbe 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -122,6 +122,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Prevent duplicate packet loss error messages in HTTP events. {pull}10709[10709] - Avoid reporting unknown MongoDB opcodes more than once. {pull}10878[10878] - Fixed a memory leak when using process monitoring under Windows. {pull}12100[12100] +- Improved debug logging efficiency in PGQSL module. {issue}12150[12150] *Winlogbeat* diff --git a/packetbeat/protos/pgsql/parse.go b/packetbeat/protos/pgsql/parse.go index e1bd915335f..cf7d9f72d97 100644 --- a/packetbeat/protos/pgsql/parse.go +++ b/packetbeat/protos/pgsql/parse.go @@ -22,7 +22,6 @@ import ( "strings" "github.com/elastic/beats/libbeat/common" - "github.com/elastic/beats/libbeat/logp" ) var ( @@ -34,7 +33,7 @@ var ( ) func (pgsql *pgsqlPlugin) pgsqlMessageParser(s *pgsqlStream) (bool, bool) { - debugf("pgsqlMessageParser, off=%v", s.parseOffset) + pgsql.debugf("pgsqlMessageParser, off=%v", s.parseOffset) var ok, complete bool @@ -46,22 +45,22 @@ func (pgsql *pgsqlPlugin) pgsqlMessageParser(s *pgsqlStream) (bool, bool) { case pgsqlExtendedQueryState: ok, complete = pgsql.parseMessageExtendedQuery(s) default: - logp.Critical("Pgsql invalid parser state") + pgsql.log.Error("Pgsql invalid parser state") } - detailedf("pgsqlMessageParser return: ok=%v, complete=%v, off=%v", + pgsql.detailf("pgsqlMessageParser return: ok=%v, complete=%v, off=%v", ok, complete, s.parseOffset) return ok, complete } func (pgsql *pgsqlPlugin) parseMessageStart(s *pgsqlStream) (bool, bool) { - detailedf("parseMessageStart") + pgsql.detailf("parseMessageStart") m := s.message for len(s.data[s.parseOffset:]) >= 5 { - isSpecial, length, command := isSpecialPgsqlCommand(s.data[s.parseOffset:]) + isSpecial, length, command := pgsql.isSpecialCommand(s.data[s.parseOffset:]) if !isSpecial { return pgsql.parseCommand(s) } @@ -71,7 +70,7 @@ func (pgsql *pgsqlPlugin) parseMessageStart(s *pgsqlStream) (bool, bool) { // check buffer available if len(s.data[s.parseOffset:]) <= length { - detailedf("Wait for more data 1") + pgsql.detailf("Wait for more data 1") return true, false } @@ -103,7 +102,7 @@ func (pgsql *pgsqlPlugin) parseCommand(s *pgsqlStream) (bool, bool) { m := s.message // one byte reply to SSLRequest - detailedf("Reply for SSLRequest %c", typ) + pgsql.detailf("Reply for SSLRequest %c", typ) m.start = s.parseOffset s.parseOffset++ m.end = s.parseOffset @@ -118,15 +117,15 @@ func (pgsql *pgsqlPlugin) parseCommand(s *pgsqlStream) (bool, bool) { length := readLength(s.data[s.parseOffset+1:]) if length < 4 { // length should include the size of itself (int32) - detailedf("Invalid pgsql command length.") + pgsql.detailf("Invalid pgsql command length.") return false, false } if len(s.data[s.parseOffset:]) <= length { - detailedf("Wait for more data") + pgsql.detailf("Wait for more data") return true, false } - detailedf("Pgsql type %c, length=%d", typ, length) + pgsql.detailf("Pgsql type %c, length=%d", typ, length) switch typ { case 'Q': @@ -147,7 +146,7 @@ func (pgsql *pgsqlPlugin) parseCommand(s *pgsqlStream) (bool, bool) { return pgsql.parseExtResp(s, length) default: if !pgsqlValidType(typ) { - detailedf("invalid frame type: '%c'", typ) + pgsql.detailf("invalid frame type: '%c'", typ) return false, false } return pgsql.parseSkipMessage(s, length) @@ -172,7 +171,7 @@ func (pgsql *pgsqlPlugin) parseSimpleQuery(s *pgsqlStream, length int) (bool, bo m.query = query m.toExport = true - detailedf("Simple Query: %s", m.query) + pgsql.detailf("Simple Query: %s", m.query) return true, true } @@ -184,12 +183,12 @@ func (pgsql *pgsqlPlugin) parseRowDescription(s *pgsqlStream, length int) (bool, m.isOK = true m.toExport = true - err := pgsqlFieldsParser(s, s.data[s.parseOffset+5:s.parseOffset+length+1]) + err := pgsql.parseFields(s, s.data[s.parseOffset+5:s.parseOffset+length+1]) if err != nil { - detailedf("fields parse failed with: %v", err) + pgsql.detailf("parseFields failed with: %v", err) return false, false } - detailedf("Fields: %s", m.fields) + pgsql.detailf("Fields: %s", m.fields) s.parseOffset++ //type s.parseOffset += length //length @@ -218,7 +217,7 @@ func (pgsql *pgsqlPlugin) parseEmptyQueryResponse(s *pgsqlStream) (bool, bool) { m := s.message - detailedf("EmptyQueryResponse") + pgsql.detailf("EmptyQueryResponse") m.start = s.parseOffset m.isOK = true m.isRequest = false @@ -245,7 +244,7 @@ func (pgsql *pgsqlPlugin) parseCommandComplete(s *pgsqlStream, length int) (bool return false, false } - detailedf("CommandComplete length=%d, tag=%s", length, name) + pgsql.detailf("CommandComplete length=%d, tag=%s", length, name) s.parseOffset += length m.end = s.parseOffset @@ -269,7 +268,7 @@ func (pgsql *pgsqlPlugin) parseReadyForQuery(s *pgsqlStream, length int) (bool, func (pgsql *pgsqlPlugin) parseErrorResponse(s *pgsqlStream, length int) (bool, bool) { // ErrorResponse - detailedf("ErrorResponse") + pgsql.detailf("ErrorResponse") m := s.message m.start = s.parseOffset @@ -278,7 +277,7 @@ func (pgsql *pgsqlPlugin) parseErrorResponse(s *pgsqlStream, length int) (bool, m.toExport = true s.parseOffset++ //type - pgsqlErrorParser(s, s.data[s.parseOffset+4:s.parseOffset+length]) + pgsql.parseError(s, s.data[s.parseOffset+4:s.parseOffset+length]) s.parseOffset += length //length m.end = s.parseOffset @@ -289,7 +288,7 @@ func (pgsql *pgsqlPlugin) parseErrorResponse(s *pgsqlStream, length int) (bool, func (pgsql *pgsqlPlugin) parseExtReq(s *pgsqlStream, length int) (bool, bool) { // Ready for query -> Parse for an extended query request - detailedf("Parse") + pgsql.detailf("Parse") m := s.message m.start = s.parseOffset @@ -303,11 +302,11 @@ func (pgsql *pgsqlPlugin) parseExtReq(s *pgsqlStream, length int) (bool, bool) { query, err := common.ReadString(s.data[m.start+6:]) if err != nil { - detailedf("Invalid extended query request") + pgsql.detailf("Invalid extended query request") return false, false } m.query = query - detailedf("Parse in an extended query request: %s", m.query) + pgsql.detailf("Parse in an extended query request: %s", m.query) // Ignore SET statement if strings.HasPrefix(m.query, "SET ") { @@ -319,7 +318,7 @@ func (pgsql *pgsqlPlugin) parseExtReq(s *pgsqlStream, length int) (bool, bool) { func (pgsql *pgsqlPlugin) parseExtResp(s *pgsqlStream, length int) (bool, bool) { // Sync -> Parse completion for an extended query response - detailedf("ParseCompletion") + pgsql.detailf("ParseCompletion") m := s.message m.start = s.parseOffset @@ -329,7 +328,7 @@ func (pgsql *pgsqlPlugin) parseExtResp(s *pgsqlStream, length int) (bool, bool) s.parseOffset++ //type s.parseOffset += length - detailedf("Parse completion in an extended query response") + pgsql.detailf("Parse completion in an extended query response") s.parseState = pgsqlGetDataState return pgsql.parseMessageData(s) } @@ -349,7 +348,7 @@ func (pgsql *pgsqlPlugin) parseSkipMessage(s *pgsqlStream, length int) (bool, bo return true, true } -func pgsqlFieldsParser(s *pgsqlStream, buf []byte) error { +func (pgsql *pgsqlPlugin) parseFields(s *pgsqlStream, buf []byte) error { m := s.message if len(buf) < 2 { @@ -359,7 +358,6 @@ func pgsqlFieldsParser(s *pgsqlStream, buf []byte) error { // read field count (int16) off := 2 fieldCount := readCount(buf) - detailedf("Row Description field count=%d", fieldCount) fields := []string{} fieldsFormat := []byte{} @@ -400,8 +398,6 @@ func pgsqlFieldsParser(s *pgsqlStream, buf []byte) error { format := common.BytesNtohs(buf[off : off+2]) off += 2 fieldsFormat = append(fieldsFormat, byte(format)) - - detailedf("Field name=%s, format=%d", fieldName, format) } if off < len(buf) { @@ -411,13 +407,13 @@ func pgsqlFieldsParser(s *pgsqlStream, buf []byte) error { m.fields = fields m.fieldsFormat = fieldsFormat if m.numberOfFields != fieldCount { - logp.Err("Missing fields from RowDescription. Expected %d. Received %d", + pgsql.log.Errorf("Missing fields from RowDescription. Expected %d. Received %d", fieldCount, m.numberOfFields) } return nil } -func pgsqlErrorParser(s *pgsqlStream, buf []byte) { +func (pgsql *pgsqlPlugin) parseError(s *pgsqlStream, buf []byte) { m := s.message off := 0 for off < len(buf) { @@ -430,7 +426,7 @@ func pgsqlErrorParser(s *pgsqlStream, buf []byte) { // read field value(string) val, err := common.ReadString(buf[off+1:]) if err != nil { - logp.Err("Failed to read the column field") + pgsql.log.Error("Failed to read the column field") break } off += len(val) + 2 @@ -444,11 +440,11 @@ func pgsqlErrorParser(s *pgsqlStream, buf []byte) { m.errorSeverity = val } } - detailedf("%s %s %s", m.errorSeverity, m.errorCode, m.errorInfo) + pgsql.detailf("%s %s %s", m.errorSeverity, m.errorCode, m.errorInfo) } func (pgsql *pgsqlPlugin) parseMessageData(s *pgsqlStream) (bool, bool) { - detailedf("parseMessageData") + pgsql.detailf("parseMessageData") // The response to queries that return row sets contains: // RowDescription @@ -466,12 +462,12 @@ func (pgsql *pgsqlPlugin) parseMessageData(s *pgsqlStream) (bool, bool) { length := readLength(s.data[s.parseOffset+1:]) if length < 4 { // length should include the size of itself (int32) - detailedf("Invalid pgsql command length.") + pgsql.detailf("Invalid pgsql command length.") return false, false } if len(s.data[s.parseOffset:]) <= length { // wait for more - detailedf("Wait for more data") + pgsql.detailf("Wait for more data") return true, false } @@ -491,17 +487,17 @@ func (pgsql *pgsqlPlugin) parseMessageData(s *pgsqlStream) (bool, bool) { name, err := pgsqlString(s.data[s.parseOffset+4:], length-4) if err != nil { - detailedf("pgsql string invalid") + pgsql.detailf("pgsql string invalid") return false, false } - detailedf("CommandComplete length=%d, tag=%s", length, name) + pgsql.detailf("CommandComplete length=%d, tag=%s", length, name) s.parseOffset += length m.end = s.parseOffset m.size = uint64(m.end - m.start) s.parseState = pgsqlStartState - detailedf("Rows: %s", m.rows) + pgsql.detailf("Rows: %s", m.rows) return true, true case '2': @@ -515,7 +511,7 @@ func (pgsql *pgsqlPlugin) parseMessageData(s *pgsqlStream) (bool, bool) { return pgsql.parseRowDescription(s, length) default: // shouldn't happen -> return error - logp.Warn("Pgsql parser expected data message, but received command of type %v", typ) + pgsql.log.Warnf("Pgsql parser expected data message, but received command of type %v", typ) s.parseState = pgsqlStartState return false, false } @@ -530,7 +526,7 @@ func (pgsql *pgsqlPlugin) parseDataRow(s *pgsqlStream, buf []byte) error { // read field count (int16) off := 2 fieldCount := readCount(buf) - detailedf("DataRow field count=%d", fieldCount) + pgsql.detailf("DataRow field count=%d", fieldCount) rows := []string{} rowLength := 0 @@ -545,7 +541,7 @@ func (pgsql *pgsqlPlugin) parseDataRow(s *pgsqlStream, buf []byte) error { off += 4 if columnLength > 0 && columnLength > len(buf[off:]) { - logp.Err("Pgsql invalid column_length=%v, buffer_length=%v, i=%v", + pgsql.log.Errorf("Pgsql invalid column_length=%v, buffer_length=%v, i=%v", columnLength, len(buf[off:]), i) return errInvalidLength } @@ -568,7 +564,7 @@ func (pgsql *pgsqlPlugin) parseDataRow(s *pgsqlStream, buf []byte) error { rowLength += len(columnValue) } - detailedf("Value %s, length=%d, off=%d", string(columnValue), columnLength, off) + pgsql.detailf("Value %s, length=%d, off=%d", string(columnValue), columnLength, off) } if off < len(buf) { @@ -584,7 +580,7 @@ func (pgsql *pgsqlPlugin) parseDataRow(s *pgsqlStream, buf []byte) error { } func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool) { - detailedf("parseMessageExtendedQuery") + pgsql.detailf("parseMessageExtendedQuery") // An extended query request contains: // Parse @@ -603,12 +599,12 @@ func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool) length := readLength(s.data[s.parseOffset+1:]) if length < 4 { // length should include the size of itself (int32) - detailedf("Invalid pgsql command length.") + pgsql.detailf("Invalid pgsql command length.") return false, false } if len(s.data[s.parseOffset:]) <= length { // wait for more - detailedf("Wait for more data") + pgsql.detailf("Wait for more data") return true, false } @@ -647,7 +643,7 @@ func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool) return true, true default: // shouldn't happen -> return error - logp.Warn("Pgsql parser expected extended query message, but received command of type %v", typ) + pgsql.log.Warnf("Pgsql parser expected extended query message, but received command of type %v", typ) s.parseState = pgsqlStartState return false, false } @@ -656,7 +652,7 @@ func (pgsql *pgsqlPlugin) parseMessageExtendedQuery(s *pgsqlStream) (bool, bool) return true, false } -func isSpecialPgsqlCommand(data []byte) (bool, int, int) { +func (pgsql *pgsqlPlugin) isSpecialCommand(data []byte) (bool, int, int) { if len(data) < 8 { // 8 bytes required return false, 0, 0 @@ -670,15 +666,15 @@ func isSpecialPgsqlCommand(data []byte) (bool, int, int) { if length == 16 && code == 80877102 { // Cancel Request - logp.Debug("pgsqldetailed", "Cancel Request, length=%d", length) + pgsql.debugf("Cancel Request, length=%d", length) return true, length, cancelRequest } else if length == 8 && code == 80877103 { // SSL Request - logp.Debug("pgsqldetailed", "SSL Request, length=%d", length) + pgsql.debugf("SSL Request, length=%d", length) return true, length, sslRequest } else if code == 196608 { // Startup Message - logp.Debug("pgsqldetailed", "Startup Message, length=%d", length) + pgsql.debugf("Startup Message, length=%d", length) return true, length, startupMessage } return false, 0, 0 diff --git a/packetbeat/protos/pgsql/pgsql.go b/packetbeat/protos/pgsql/pgsql.go index f48731fee72..a16c07d64b9 100644 --- a/packetbeat/protos/pgsql/pgsql.go +++ b/packetbeat/protos/pgsql/pgsql.go @@ -30,9 +30,13 @@ import ( "github.com/elastic/beats/packetbeat/procs" "github.com/elastic/beats/packetbeat/protos" "github.com/elastic/beats/packetbeat/protos/tcp" + + "go.uber.org/zap" ) type pgsqlPlugin struct { + log, debug, detail *logp.Logger + isDebug, isDetail bool // config ports []int @@ -125,11 +129,6 @@ var ( errInvalidLength = errors.New("invalid length") ) -var ( - debugf = logp.MakeDebug("pgsql") - detailedf = logp.MakeDebug("pgsqldetailed") -) - var ( unmatchedResponses = monitoring.NewInt(nil, "pgsql.unmatched_responses") ) @@ -160,6 +159,11 @@ func New( func (pgsql *pgsqlPlugin) init(results protos.Reporter, config *pgsqlConfig) error { pgsql.setFromConfig(config) + pgsql.log = logp.NewLogger("pgsql") + pgsql.debug = logp.NewLogger("pgsql", zap.AddCallerSkip(1)) + pgsql.detail = logp.NewLogger("pgsqldetailed", zap.AddCallerSkip(1)) + pgsql.isDebug, pgsql.isDetail = logp.IsDebug("pgsql"), logp.IsDebug("pgsqldetailed") + pgsql.transactions = common.NewCache( pgsql.transactionTimeout, protos.DefaultTransactionHashSize) @@ -187,6 +191,20 @@ func (pgsql *pgsqlPlugin) getTransaction(k common.HashableTCPTuple) []*pgsqlTran return nil } +//go:inline +func (pgsql *pgsqlPlugin) debugf(format string, v ...interface{}) { + if pgsql.isDebug { + pgsql.debug.Debugf(format, v...) + } +} + +//go:inline +func (pgsql *pgsqlPlugin) detailf(format string, v ...interface{}) { + if pgsql.isDetail { + pgsql.detail.Debugf(format, v...) + } +} + func (pgsql *pgsqlPlugin) GetPorts() []int { return pgsql.ports } @@ -237,13 +255,13 @@ func (pgsql *pgsqlPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple, data: pkt.Payload, message: &pgsqlMessage{ts: pkt.Ts}, } - logp.Debug("pgsqldetailed", "New stream created") + pgsql.detailf("New stream created") } else { // concatenate bytes priv.data[dir].data = append(priv.data[dir].data, pkt.Payload...) - logp.Debug("pgsqldetailed", "Len data: %d cap data: %d", len(priv.data[dir].data), cap(priv.data[dir].data)) + pgsql.detailf("Len data: %d cap data: %d", len(priv.data[dir].data), cap(priv.data[dir].data)) if len(priv.data[dir].data) > tcp.TCPMaxDataInStream { - debugf("Stream data too large, dropping TCP stream") + pgsql.debugf("Stream data too large, dropping TCP stream") priv.data[dir] = nil return priv } @@ -262,12 +280,11 @@ func (pgsql *pgsqlPlugin) Parse(pkt *protos.Packet, tcptuple *common.TCPTuple, } ok, complete := pgsql.pgsqlMessageParser(priv.data[dir]) - //logp.Debug("pgsqldetailed", "MessageParser returned ok=%v complete=%v", ok, complete) if !ok { // drop this tcp stream. Will retry parsing with the next // segment in it priv.data[dir] = nil - debugf("Ignore Postgresql message. Drop tcp stream. Try parsing with the next segment") + pgsql.debugf("Ignore Postgresql message. Drop tcp stream. Try parsing with the next segment") return priv } @@ -333,7 +350,7 @@ func (pgsql *pgsqlPlugin) GapInStream(tcptuple *common.TCPTuple, dir uint8, // next layer but mark it as incomplete. stream := pgsqlData.data[dir] if messageHasEnoughData(stream.message) { - debugf("Message not complete, but sending to the next layer") + pgsql.debugf("Message not complete, but sending to the next layer") m := stream.message m.toExport = true m.end = stream.parseOffset @@ -378,7 +395,7 @@ func (pgsql *pgsqlPlugin) receivedPgsqlRequest(msg *pgsqlMessage) { // separated by ';' queries := pgsqlQueryParser(msg.query) - logp.Debug("pgsqldetailed", "Queries (%d) :%s", len(queries), queries) + pgsql.debugf("Queries (%d) :%s", len(queries), queries) transList := pgsql.getTransaction(tuple.Hashable()) if transList == nil { @@ -414,7 +431,7 @@ func (pgsql *pgsqlPlugin) receivedPgsqlResponse(msg *pgsqlMessage) { tuple := msg.tcpTuple transList := pgsql.getTransaction(tuple.Hashable()) if transList == nil || len(transList) == 0 { - debugf("Response from unknown transaction. Ignoring.") + pgsql.debugf("Response from unknown transaction. Ignoring.") unmatchedResponses.Add(1) return } @@ -424,7 +441,7 @@ func (pgsql *pgsqlPlugin) receivedPgsqlResponse(msg *pgsqlMessage) { // check if the request was received if trans.pgsql == nil { - debugf("Response from unknown transaction. Ignoring.") + pgsql.debugf("Response from unknown transaction. Ignoring.") unmatchedResponses.Add(1) return } @@ -449,7 +466,7 @@ func (pgsql *pgsqlPlugin) receivedPgsqlResponse(msg *pgsqlMessage) { pgsql.publishTransaction(trans) - debugf("Postgres transaction completed: %s\n%s", trans.pgsql, trans.responseRaw) + pgsql.debugf("Postgres transaction completed: %s\n%s", trans.pgsql, trans.responseRaw) } func (pgsql *pgsqlPlugin) publishTransaction(t *pgsqlTransaction) {