Skip to content

Commit

Permalink
Merge pull request #242 from seungsoo-lee/dev
Browse files Browse the repository at this point in the history
Updated consumers
  • Loading branch information
seungsoo-lee authored Jul 27, 2021
2 parents 1cbb667 + ea814be commit ffab2df
Showing 1 changed file with 33 additions and 23 deletions.
56 changes: 33 additions & 23 deletions src/feedconsumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ var Status string
var waitG sync.WaitGroup
var stopChan chan struct{}

var netLogEvents []types.NetworkLogEvent
var netLogEventsCount int
// var netLogEvents []types.NetworkLogEvent
// var netLogEventsCount int

var syslogEvents []types.SystemLogEvent
var syslogEventsCount int
// var syslogEvents []types.SystemLogEvent
// var syslogEventsCount int

var log *zerolog.Logger

Expand All @@ -64,6 +64,12 @@ type KnoxFeedConsumer struct {
kafkaConfig kafka.ConfigMap
topics []string
eventsBuffer int

netLogEvents []types.NetworkLogEvent
netLogEventsCount int

syslogEvents []types.SystemLogEvent
syslogEventsCount int
}

func (cfc *KnoxFeedConsumer) setupKafkaConfig() {
Expand All @@ -76,8 +82,8 @@ func (cfc *KnoxFeedConsumer) setupKafkaConfig() {
cfc.topics = viper.GetStringSlice("feed-consumer.kafka.topics")
cfc.eventsBuffer = viper.GetInt("feed-consumer.kafka.events.buffer")

netLogEvents = make([]types.NetworkLogEvent, 0, cfc.eventsBuffer)
syslogEvents = make([]types.SystemLogEvent, 0, cfc.eventsBuffer)
cfc.netLogEvents = make([]types.NetworkLogEvent, 0, cfc.eventsBuffer)
cfc.syslogEvents = make([]types.SystemLogEvent, 0, cfc.eventsBuffer)

sslEnabled := viper.GetBool("feed-consumer.kafka.ssl.enabled")
securityProtocol := viper.GetString("feed-consumer.kafka.security.protocol")
Expand Down Expand Up @@ -203,34 +209,36 @@ func (cfc *KnoxFeedConsumer) processNetworkLogMessage(message []byte) error {

// add cluster_name to the event
event.ClusterName = clusterNameStr
netLogEvents = append(netLogEvents, event)
netLogEventsCount++
cfc.netLogEvents = append(cfc.netLogEvents, event)
cfc.netLogEventsCount++

if netLogEventsCount == cfc.eventsBuffer {
if len(netLogEvents) > 0 {
if cfc.netLogEventsCount == cfc.eventsBuffer {
if len(cfc.netLogEvents) > 0 {
isSuccess := cfc.PushNetworkLogToDB()
if !isSuccess {
return errors.New("error saving to DB")
}
netLogEvents = nil
netLogEvents = make([]types.NetworkLogEvent, 0, cfc.eventsBuffer)
cfc.netLogEvents = nil
cfc.netLogEvents = make([]types.NetworkLogEvent, 0, cfc.eventsBuffer)
}

netLogEventsCount = 0
cfc.netLogEventsCount = 0
}

return nil
}

func (cfc *KnoxFeedConsumer) PushNetworkLogToDB() bool {
if err := libs.InsertNetworkLogToDB(cfg.GetCfgDB(), netLogEvents); err != nil {
if err := libs.InsertNetworkLogToDB(cfg.GetCfgDB(), cfc.netLogEvents); err != nil {
log.Error().Msgf("InsertNetworkFlowToDB err: %s", err.Error())
return false
}

return true
}

// == //

func (cfc *KnoxFeedConsumer) processSystemLogMessage(message []byte) error {
syslogEvent := types.SystemLogEvent{}

Expand All @@ -240,27 +248,27 @@ func (cfc *KnoxFeedConsumer) processSystemLogMessage(message []byte) error {
return err
}

syslogEvents = append(syslogEvents, syslogEvent)
syslogEventsCount++
cfc.syslogEvents = append(cfc.syslogEvents, syslogEvent)
cfc.syslogEventsCount++

if syslogEventsCount == cfc.eventsBuffer {
if len(syslogEvents) > 0 {
if cfc.syslogEventsCount == cfc.eventsBuffer {
if len(cfc.syslogEvents) > 0 {
isSuccess := cfc.PushSystemLogToDB()
if !isSuccess {
return errors.New("error saving to DB")
}
syslogEvents = nil
syslogEvents = make([]types.SystemLogEvent, 0, cfc.eventsBuffer)
cfc.syslogEvents = nil
cfc.syslogEvents = make([]types.SystemLogEvent, 0, cfc.eventsBuffer)
}

syslogEventsCount = 0
cfc.syslogEventsCount = 0
}

return nil
}

func (cfc *KnoxFeedConsumer) PushSystemLogToDB() bool {
if err := libs.InsertSystemLogToDB(cfg.GetCfgDB(), syslogEvents); err != nil {
if err := libs.InsertSystemLogToDB(cfg.GetCfgDB(), cfc.syslogEvents); err != nil {
log.Error().Msgf("InsertSystemLogToDB err: %s", err.Error())
return false
}
Expand All @@ -281,7 +289,9 @@ func StartConsumer() {
n := 0
for n < numOfConsumers {
c := &KnoxFeedConsumer{
id: n + 1,
id: n + 1,
netLogEvents: []types.NetworkLogEvent{},
syslogEvents: []types.SystemLogEvent{},
}

c.setupKafkaConfig()
Expand Down

0 comments on commit ffab2df

Please sign in to comment.