Skip to content

Commit

Permalink
More fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
buger committed Mar 7, 2023
1 parent 1834e69 commit 4d60617
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 56 deletions.
4 changes: 3 additions & 1 deletion pkg/binary/output_binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const (
initialDynamicWorkers = 10
)

var outputLogger = log.With().Str("component", "output_binary").Logger()

// BinaryOutputConfig struct for holding binary output configuration
type BinaryOutputConfig struct {
Workers int `json:"output-binary-workers"`
Expand Down Expand Up @@ -162,7 +164,7 @@ func (o *BinaryOutput) sendRequest(client *TCPClient, msg *plugin.Message) {
stop := time.Now()

if err != nil {
log.Error().Err(err).Msg("Request error")
outputLogger.Error().Err(err).Msg("Request error")
}

if o.config.TrackResponses {
Expand Down
28 changes: 13 additions & 15 deletions pkg/binary/tcp_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"net"
"syscall"
"time"

"github.com/rs/zerolog/log"
)

const (
Expand Down Expand Up @@ -77,7 +75,7 @@ func (c *TCPClient) Disconnect() {
c.conn.Close()
c.conn = nil

log.Warn().Msgf("Disconnected: %s", c.baseURL)
outputLogger.Warn().Msgf("Disconnected: %s", c.baseURL)
}
}

Expand All @@ -91,10 +89,10 @@ func (c *TCPClient) isAlive() bool {
if err == nil {
return true
} else if err == io.EOF {
log.Warn().Msg("connection closed, reconnecting")
outputLogger.Warn().Msg("connection closed, reconnecting")
return false
} else if err == syscall.EPIPE {
log.Warn().Msg("broken pipe, reconnecting")
outputLogger.Warn().Msg("broken pipe, reconnecting")
return false
}

Expand All @@ -106,18 +104,18 @@ func (c *TCPClient) Send(data []byte) (response []byte, err error) {
// Don't exit on panic
defer func() {
if r := recover(); r != nil {
log.Error().Msgf("PANIC: pkg: %v", r)
outputLogger.Error().Msgf("PANIC: pkg: %v", r)

if _, ok := r.(error); !ok {
log.Error().Stack().Msgf("faile to send request: %s", string(data))
outputLogger.Error().Stack().Msgf("faile to send request: %s", string(data))
}
}
}()

if c.conn == nil || !c.isAlive() {
log.Info().Msgf("Connecting: %s", c.baseURL)
outputLogger.Info().Msgf("Connecting: %s", c.baseURL)
if err = c.Connect(); err != nil {
log.Error().Err(err).Msgf("Connection error: %s", c.baseURL)
outputLogger.Error().Err(err).Msgf("Connection error: %s", c.baseURL)
return
}
}
Expand All @@ -127,11 +125,11 @@ func (c *TCPClient) Send(data []byte) (response []byte, err error) {
c.conn.SetWriteDeadline(timeout)

if c.config.Debug {
log.Debug().Msgf("Sending: %s", string(data))
outputLogger.Debug().Msgf("Sending: %s", string(data))
}

if _, err = c.conn.Write(data); err != nil {
log.Error().Err(err).Msgf("Write error: %s", c.baseURL)
outputLogger.Error().Err(err).Msgf("Write error: %s", c.baseURL)
return
}

Expand Down Expand Up @@ -162,15 +160,15 @@ func (c *TCPClient) Send(data []byte) (response []byte, err error) {
if err == io.EOF {
break
} else if err != nil {
log.Error().Err(err).Msgf("Read error: %s", c.baseURL)
outputLogger.Error().Err(err).Msgf("Read error: %s", c.baseURL)
break
}

readBytes += int(n)
}

if readBytes >= maxResponseSize {
log.Error().Msgf("Body is more than the max size: %d", maxResponseSize)
outputLogger.Error().Msgf("Body is more than the max size: %d", maxResponseSize)
break
}

Expand All @@ -179,7 +177,7 @@ func (c *TCPClient) Send(data []byte) (response []byte, err error) {
}

if err != nil {
log.Error().Err(err).Msgf("Response read error")
outputLogger.Error().Err(err).Msgf("Response read error")
return
}

Expand All @@ -191,7 +189,7 @@ func (c *TCPClient) Send(data []byte) (response []byte, err error) {
copy(payload, c.respBuf[:readBytes])

if c.config.Debug {
log.Debug().Msgf("Received: %s", string(payload))
outputLogger.Debug().Msgf("Received: %s", string(payload))
}

return payload, err
Expand Down
10 changes: 6 additions & 4 deletions pkg/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/rs/zerolog/log"
)

var logger = log.With().Str("component", "elasticsearch").Logger()

type ESUriErorr struct{}

func (e *ESUriErorr) Error() string {
Expand Down Expand Up @@ -89,7 +91,7 @@ func (p *ESPlugin) Init(URI string) {
err, p.Index = parseURI(URI)

if err != nil {
log.Fatal().Err(err).Msg("[ES] Can't initialize ElasticSearch plugin.")
logger.Fatal().Err(err).Msg("Can't initialize ElasticSearch plugin.")
}

p.eConn = elastigo.NewConn()
Expand All @@ -102,7 +104,7 @@ func (p *ESPlugin) Init(URI string) {

go p.ErrorHandler()

log.Info().Msg("[ES] Initialized Elasticsearch Plugin")
logger.Info().Msg("Initialized Elasticsearch Plugin")
return
}

Expand All @@ -114,7 +116,7 @@ func (p *ESPlugin) IndexerShutdown() {
func (p *ESPlugin) ErrorHandler() {
for {
errBuf := <-p.indexor.ErrorChannel
log.Error().Err(errBuf.Err).Msg("[ES] Error indexing document")
logger.Error().Err(errBuf.Err).Msg("Error indexing document")
}
}

Expand Down Expand Up @@ -161,7 +163,7 @@ func (p *ESPlugin) ResponseAnalyze(req, resp []byte, start, stop time.Time) {

j, err := json.Marshal(&esResp)
if err != nil {
log.Error().Err(err).Msg("[ES] Error marshaling ESRequestResponse")
logger.Error().Err(err).Msg("Error marshaling ESRequestResponse")
} else {
p.indexor.Index(p.Index, "RequestResponse", "", "", "", &t, j)
}
Expand Down
16 changes: 9 additions & 7 deletions pkg/emitter/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/rs/zerolog/log"
)

var logger = log.With().Str("component", "emitter").Logger()

// Emitter represents an abject to manage plugins communication
type Emitter struct {
sync.WaitGroup
Expand Down Expand Up @@ -68,7 +70,7 @@ func (e *Emitter) Start(plugins *plugin.InOutPlugins) {
go func() {
defer e.Done()
if err := e.CopyMulty(middleware, plugins.Outputs...); err != nil {
log.Error().Err(err).Msg("error during copy")
logger.Error().Err(err).Msg("error during copy")
}
}()
} else {
Expand All @@ -77,7 +79,7 @@ func (e *Emitter) Start(plugins *plugin.InOutPlugins) {
go func(in plugin.Reader) {
defer e.Done()
if err := e.CopyMulty(in, plugins.Outputs...); err != nil {
log.Error().Err(err).Msg("error during copy")
logger.Error().Err(err).Msg("error during copy")
}
}(in)
}
Expand Down Expand Up @@ -118,24 +120,24 @@ func (e *Emitter) CopyMulty(src plugin.Reader, writers ...plugin.Writer) error {
}
meta := proto.PayloadMeta(msg.Meta)
if len(meta) < 3 {
log.Warn().Msgf("[EMITTER] Found malformed record %q from %q", msg.Meta, src)
logger.Warn().Msgf("Found malformed record %q from %q", msg.Meta, src)
continue
}
requestID := meta[1]
// start a subroutine only when necessary
if log.Logger.GetLevel() == zerolog.DebugLevel {
log.Debug().Msgf("[EMITTER] input: %s from: %s", byteutils.SliceToString(msg.Meta[:len(msg.Meta)-1]), src)
logger.Debug().Msgf("input: %s from: %s", byteutils.SliceToString(msg.Meta[:len(msg.Meta)-1]), src)
}
if modifier != nil {
log.Debug().Msgf("[EMITTER] modifier: %s from: %s", requestID, src)
logger.Debug().Msgf("modifier: %s from: %s", requestID, src)
if proto.IsRequestPayload(msg.Meta) {
msg.Data = modifier.Rewrite(msg.Data)
// If modifier tells to skip request
if len(msg.Data) == 0 {
filteredRequests.Set(requestID, []byte{}, 60) //
continue
}
log.Debug().Msgf("[EMITTER] Rewritten input: %s from: %s", requestID, src)
logger.Debug().Msgf("Rewritten input: %s from: %s", requestID, src)
} else {
_, err := filteredRequests.Get(requestID)
if err == nil {
Expand All @@ -155,7 +157,7 @@ func (e *Emitter) CopyMulty(src plugin.Reader, writers ...plugin.Writer) error {
if e.config.SplitOutput {
if e.config.RecognizeTCPSessions {
if !pro.PRO {
log.Fatal().Msg("Detailed TCP sessions work only with PRO license")
logger.Fatal().Msg("Detailed TCP sessions work only with PRO license")
}
hasher := fnv.New32a()
hasher.Write(meta[1])
Expand Down
18 changes: 10 additions & 8 deletions pkg/file/input_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/rs/zerolog/log"
)

var inputLogger = log.With().Str("component", "input_file").Logger()

// InputFileConfig contains config of input file
type InputFileConfig struct {
InputFileLoop bool `json:"input-file-loop"`
Expand Down Expand Up @@ -92,7 +94,7 @@ func (f *fileInputReader) parse(init chan struct{}) error {

if err != nil {
if err != io.EOF {
log.Logger.Error().Err(err).Msg("Error reading file")
inputLogger.Error().Err(err).Msg("Error reading file")
}

f.Close()
Expand All @@ -110,7 +112,7 @@ func (f *fileInputReader) parse(init chan struct{}) error {
meta := proto.PayloadMeta(asBytes)

if len(meta) < 3 {
log.Warn().Msgf("Found malformed record, file: %s, line %d", f.path, lineNum)
inputLogger.Warn().Msgf("Found malformed record, file: %s, line %d", f.path, lineNum)
buffer = bytes.Buffer{}
continue
}
Expand Down Expand Up @@ -187,15 +189,15 @@ func newFileInputReader(path string, readDepth int, dryRun bool) *fileInputReade
}

if err != nil {
log.Error().Err(err).Msg("Error opening file")
inputLogger.Error().Err(err).Msg("Error opening file")
return nil
}

r := &fileInputReader{path: path, file: file, closed: 0, readDepth: readDepth, dryRun: dryRun}
if strings.HasSuffix(path, ".gz") {
gzReader, err := gzip.NewReader(file)
if err != nil {
log.Error().Err(err).Msg("Error opening compressed file")
inputLogger.Error().Err(err).Msg("Error opening compressed file")
return nil
}
r.reader = bufio.NewReader(gzReader)
Expand Down Expand Up @@ -269,20 +271,20 @@ func (i *FileInput) init() (err error) {

resp, err := svc.ListObjects(params)
if err != nil {
log.Error().Err(err).Msgf("Error while retrieving list of files from S3: %s", i.path)
inputLogger.Error().Err(err).Msgf("Error while retrieving list of files from S3: %s", i.path)
return err
}

for _, c := range resp.Contents {
matches = append(matches, "s3://"+bucket+"/"+(*c.Key))
}
} else if matches, err = filepath.Glob(i.path); err != nil {
log.Error().Err(err).Msgf("Error while retrieving list of files: %s", i.path)
inputLogger.Error().Err(err).Msgf("Error while retrieving list of files: %s", i.path)
return
}

if len(matches) == 0 {
log.Error().Msgf("No files match pattern: %s", i.path)
inputLogger.Error().Msgf("No files match pattern: %s", i.path)
return errors.New("no matching files")
}

Expand Down Expand Up @@ -432,7 +434,7 @@ func (i *FileInput) emit() {
i.stats.Set("max_wait", time.Duration(maxWait))
i.stats.Set("min_wait", time.Duration(minWait))

log.Info().Msgf("FileInput: end of file '%s'", i.path)
inputLogger.Info().Msgf("FileInput: end of file '%s'", i.path)

if i.dryRun {
fmt.Printf("Records found: %v\nFiles processed: %v\nBytes processed: %v\nMax wait: %v\nMin wait: %v\nFirst wait: %v\nIt will take `%v` to replay at current speed.\nFound %v records with out of order timestamp\n",
Expand Down
7 changes: 4 additions & 3 deletions pkg/file/output_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/rs/zerolog/log"
)

var outputLogger = log.With().Str("component", "output_file").Logger()
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
var instanceID string

Expand Down Expand Up @@ -240,7 +241,7 @@ func (o *FileOutput) PluginWrite(msg *plugin.Message) (n int, err error) {
}

if err != nil {
log.Fatal().Err(err).Str("file", o.currentName).Msg("Cannot open file")
outputLogger.Fatal().Err(err).Str("file", o.currentName).Msg("Cannot open file")
}

o.QueueLength = 0
Expand Down Expand Up @@ -268,7 +269,7 @@ func (o *FileOutput) flush() {
// Don't exit on panic
defer func() {
if r := recover(); r != nil {
log.Error().Stack().Msgf("PANIC while file flush: %v", r)
outputLogger.Error().Stack().Msgf("PANIC while file flush: %v", r)
}
}()

Expand All @@ -285,7 +286,7 @@ func (o *FileOutput) flush() {
if stat, err := o.file.Stat(); err == nil {
o.currentFileSize = int(stat.Size())
} else {
log.Error().Err(err).Msgf("Error accessing file size")
outputLogger.Error().Err(err).Msgf("Error accessing file size")
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/file/output_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/rs/zerolog/log"
)

var s3Logger = log.With().Str("component", "s3").Logger()

// S3Output output plugin
type S3Output struct {
pathTemplate string
Expand All @@ -32,7 +34,7 @@ type S3Output struct {
// NewS3Output constructor for FileOutput, accepts path
func NewS3Output(pathTemplate string, config *FileOutputConfig) *S3Output {
if !pro.PRO {
log.Fatal().Msg("Using S3 output and input requires PRO license")
s3Logger.Fatal().Msg("Using S3 output and input requires PRO license")
return nil
}

Expand Down Expand Up @@ -66,7 +68,7 @@ func NewS3Output(pathTemplate string, config *FileOutputConfig) *S3Output {
func (o *S3Output) connect() {
if o.session == nil {
o.session = session.Must(session.NewSession(awsConfig()))
log.Info().Msg("[S3 Output] S3 connection successfully initialized")
s3Logger.Info().Msg("[S3 Output] S3 connection successfully initialized")
}
}

Expand Down Expand Up @@ -113,7 +115,7 @@ func (o *S3Output) onBufferUpdate(path string) {

file, err := os.Open(path)
if err != nil {
log.Error().Err(err).Msgf("[S3 Output] Failed to open file %q", path)
s3Logger.Error().Err(err).Msgf("[S3 Output] Failed to open file %q", path)
return
}
defer os.Remove(path)
Expand All @@ -124,7 +126,7 @@ func (o *S3Output) onBufferUpdate(path string) {
Key: aws.String(key),
})
if err != nil {
log.Error().Err(err).Msgf("[S3 Output] Failed to upload data to %q/%q", bucket, key)
s3Logger.Error().Err(err).Msgf("[S3 Output] Failed to upload data to %q/%q", bucket, key)
return
}

Expand Down

0 comments on commit 4d60617

Please sign in to comment.