Skip to content

Commit

Permalink
Merge pull request #323 from humancalico/rmdb
Browse files Browse the repository at this point in the history
use kafka consumer to get logs from feeder-service
  • Loading branch information
nyrahul committed Sep 1, 2021
2 parents 3e4c574 + a72a37c commit 83fa44a
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 16 deletions.
4 changes: 2 additions & 2 deletions src/conf/local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ application:
operation-trigger: 100
cron-job-time-interval: "0h0m10s" # format: XhYmZs
network-log-limit: 100000
network-log-from: "hubble" # db|hubble
network-log-from: "kafka" # db|hubble|kafka
network-log-file: "./flow.json" # file path
network-policy-to: "db|file" # db, file
network-policy-dir: "./"
system:
operation-mode: 1 # 1: cronjob | 2: one-time-job
operation-trigger: 100
cron-job-time-interval: "0h0m10s" # format: XhYmZs
system-log-from: "kubearmor" # db|kubearmor
system-log-from: "kafka" # db|kubearmor|kafka
system-log-limit: 100000
system-log-file: "./log.json" # file path
system-policy-to: "db|file" # db, file
Expand Down
71 changes: 62 additions & 9 deletions src/feedconsumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package feedconsumer

import (
"encoding/json"
"errors"
"strconv"
"sync"
"time"
Expand All @@ -15,7 +14,11 @@ import (
cfg "github.com/accuknox/knoxAutoPolicy/src/config"
"github.com/accuknox/knoxAutoPolicy/src/libs"
logger "github.com/accuknox/knoxAutoPolicy/src/logging"
"github.com/accuknox/knoxAutoPolicy/src/plugin"
types "github.com/accuknox/knoxAutoPolicy/src/types"
cilium "github.com/cilium/cilium/api/v1/flow"
pb "github.com/kubearmor/KubeArmor/protobuf"
"google.golang.org/protobuf/types/known/timestamppb"
)

const ( // status
Expand All @@ -24,7 +27,7 @@ const ( // status
)

// ====================== //
// == Gloabl Variables == //
// == Global Variables == //
// ====================== //

var numOfConsumers int
Expand Down Expand Up @@ -188,13 +191,19 @@ func (cfc *KnoxFeedConsumer) processNetworkLogMessage(message []byte) error {
}

clusterName := eventMap["cluster_name"]

clusterNameStr := ""
if err := json.Unmarshal(clusterName, &clusterNameStr); err != nil {
log.Error().Stack().Msg(err.Error())
return err
}

flowEvent := eventMap["flow"]
flowEvent, exists := eventMap["flow"]
if !exists {
return nil
}
if err := json.Unmarshal(flowEvent, &event); err != nil {
log.Error().Msg(err.Error())
return err
}

Expand All @@ -205,9 +214,38 @@ func (cfc *KnoxFeedConsumer) processNetworkLogMessage(message []byte) error {

if cfc.netLogEventsCount == cfc.eventsBuffer {
if len(cfc.netLogEvents) > 0 {
isSuccess := cfc.PushNetworkLogToDB()
if !isSuccess {
return errors.New("error saving to DB")
for _, netLog := range cfc.netLogEvents {
time, _ := strconv.ParseInt(netLog.Time, 10, 64)
flow := &cilium.Flow{
TrafficDirection: cilium.TrafficDirection(plugin.TrafficDirection[netLog.TrafficDirection]),
PolicyMatchType: uint32(netLog.PolicyMatchType),
DropReason: uint32(netLog.DropReason),
Verdict: cilium.Verdict(plugin.Verdict[netLog.Verdict]),
Time: &timestamppb.Timestamp{
Seconds: time,
},
EventType: &cilium.CiliumEventType{},
Source: &cilium.Endpoint{},
Destination: &cilium.Endpoint{},
IP: &cilium.IP{},
L4: &cilium.Layer4{},
L7: &cilium.Layer7{},
}

plugin.GetFlowData(netLog.EventType, flow.EventType)
plugin.GetFlowData(netLog.Source, flow.Source)
plugin.GetFlowData(netLog.Destination, flow.Destination)
plugin.GetFlowData(netLog.IP, flow.IP)
plugin.GetFlowData(netLog.L4, flow.L4)
plugin.GetFlowData(netLog.L7, flow.L7)

knoxFlow, valid := plugin.ConvertCiliumFlowToKnoxNetworkLog(flow)
if valid {
knoxFlow.ClusterName = netLog.ClusterName
plugin.CiliumFlowsKafkaMutex.Lock()
plugin.CiliumFlowsKafka = append(plugin.CiliumFlowsKafka, &knoxFlow)
plugin.CiliumFlowsKafkaMutex.Unlock()
}
}
cfc.netLogEvents = nil
cfc.netLogEvents = make([]types.NetworkLogEvent, 0, cfc.eventsBuffer)
Expand Down Expand Up @@ -244,9 +282,24 @@ func (cfc *KnoxFeedConsumer) processSystemLogMessage(message []byte) error {

if cfc.syslogEventsCount == cfc.eventsBuffer {
if len(cfc.syslogEvents) > 0 {
isSuccess := cfc.PushSystemLogToDB()
if !isSuccess {
return errors.New("error saving to DB")
for _, syslog := range cfc.syslogEvents {
log := pb.Log{
ClusterName: syslog.ClusterName,
HostName: syslog.HostName,
NamespaceName: syslog.NamespaceName,
PodName: syslog.PodName,
Source: syslog.Source,
Operation: syslog.Operation,
Resource: syslog.Resource,
Data: syslog.Data,
Result: syslog.Result,
}

knoxLog := plugin.ConvertKubeArmorLogToKnoxSystemLog(&log)
knoxLog.ClusterName = syslog.Clustername
plugin.KubeArmorKafkaLogsMutex.Lock()
plugin.KubeArmorKafkaLogs = append(plugin.KubeArmorKafkaLogs, &knoxLog)
plugin.KubeArmorKafkaLogsMutex.Unlock()
}
cfc.syslogEvents = nil
cfc.syslogEvents = make([]types.SystemLogEvent, 0, cfc.eventsBuffer)
Expand Down
16 changes: 16 additions & 0 deletions src/networkpolicy/helperFunctions.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,22 @@ func getNetworkLogs() []types.KnoxNetworkLog {
networkLogs = append(networkLogs, log)
}
}
} else if NetworkLogFrom == "kafka" {
// ================== //
// == Cilium kafka == //
// ================== //
log.Info().Msg("Get network log from the kafka consumer")

// get flows from kafka consumer
flows := plugin.GetCiliumFlowsFromKafka(OperationTrigger)
if len(flows) == 0 || len(flows) < OperationTrigger {
return nil
}

// convert hubble flows -> network logs (but, in this case, no flow id)
for _, flow := range flows {
networkLogs = append(networkLogs, *flow)
}
} else if NetworkLogFrom == "file" {
// =============================== //
// == File (.json) for testing == //
Expand Down
4 changes: 4 additions & 0 deletions src/networkpolicy/networkPolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/accuknox/knoxAutoPolicy/src/cluster"
cfg "github.com/accuknox/knoxAutoPolicy/src/config"
"github.com/accuknox/knoxAutoPolicy/src/feedconsumer"
"github.com/accuknox/knoxAutoPolicy/src/libs"
logger "github.com/accuknox/knoxAutoPolicy/src/logging"
"github.com/accuknox/knoxAutoPolicy/src/plugin"
Expand Down Expand Up @@ -1607,6 +1608,9 @@ func StartNetworkCronJob() {
if cfg.GetCfgNetworkLogFrom() == "hubble" {
go plugin.StartHubbleRelay(NetworkStopChan, &NetworkWaitG, cfg.GetCfgCiliumHubble())
NetworkWaitG.Add(1)
} else if cfg.GetCfgNetworkLogFrom() == "kafka" {
go feedconsumer.StartConsumer()
NetworkWaitG.Add(1)
}

// init cron job
Expand Down
38 changes: 38 additions & 0 deletions src/plugin/cilium.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,17 @@ var Verdict = map[string]int{

var CiliumFlows []*cilium.Flow
var CiliumFlowsMutex *sync.Mutex
var CiliumFlowsKafka []*types.KnoxNetworkLog
var CiliumFlowsKafkaMutex *sync.Mutex

var log *zerolog.Logger

func init() {
log = logger.GetInstance()
CiliumFlowsMutex = &sync.Mutex{}
KubeArmorRelayLogsMutex = &sync.Mutex{}
CiliumFlowsKafkaMutex = &sync.Mutex{}
KubeArmorKafkaLogsMutex = &sync.Mutex{}
}

// ====================== //
Expand Down Expand Up @@ -373,6 +377,17 @@ func ConvertCiliumNetworkLogsToKnoxNetworkLogs(dbDriver string, docs []map[strin
}
}

func GetFlowData(netLogEventType []byte, flowEventType interface{}) error {
if netLogEventType == nil {
return nil
}
err := json.Unmarshal(netLogEventType, flowEventType)
if err != nil {
log.Error().Msg("error while unmarshing event type :" + err.Error())
}
return err
}

// ============================== //
// == Network Policy Convertor == //
// ============================== //
Expand Down Expand Up @@ -754,3 +769,26 @@ func StartHubbleRelay(StopChan chan struct{}, wg *sync.WaitGroup, cfg types.Conf
log.Error().Msg("Unable to stream network flow: " + err.Error())
}
}

func GetCiliumFlowsFromKafka(trigger int) []*types.KnoxNetworkLog {
results := []*types.KnoxNetworkLog{}

CiliumFlowsKafkaMutex.Lock()
defer CiliumFlowsKafkaMutex.Unlock()
if len(CiliumFlowsKafka) == 0 {
log.Info().Msgf("Cilium kafka traffic flow not exist")
return results
}

if len(CiliumFlowsKafka) < trigger {
log.Info().Msgf("The number of cilium kafka traffic flow [%d] is less than trigger [%d]", len(CiliumFlowsKafka), trigger)
return results
}

results = CiliumFlowsKafka // copy
CiliumFlowsKafka = []*types.KnoxNetworkLog{} // reset

log.Info().Msgf("The total number of cilium kafka traffic flow: [%d]", len(results))

return results
}
27 changes: 26 additions & 1 deletion src/plugin/kubearmor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ import (
var KubeArmorRelayLogs []*pb.Log
var KubeArmorRelayLogsMutex *sync.Mutex

var KubeArmorKafkaLogs []*types.KnoxSystemLog
var KubeArmorKafkaLogsMutex *sync.Mutex

func ConvertKnoxSystemPolicyToKubeArmorPolicy(knoxPolicies []types.KnoxSystemPolicy) []types.KubeArmorPolicy {
results := []types.KubeArmorPolicy{}

Expand Down Expand Up @@ -100,7 +103,7 @@ func ConvertKubeArmorSystemLogsToKnoxSystemLogs(dbDriver string, docs []map[stri
return []types.KnoxSystemLog{}
}

func ConvertKubeArmorRelayLogToKnoxSystemLog(relayLog *pb.Log) types.KnoxSystemLog {
func ConvertKubeArmorLogToKnoxSystemLog(relayLog *pb.Log) types.KnoxSystemLog {

sources := strings.Split(relayLog.Source, " ")
source := ""
Expand Down Expand Up @@ -250,3 +253,25 @@ func StartKubeArmorRelay(StopChan chan struct{}, wg *sync.WaitGroup, cfg types.C
}
}()
}

func GetSystemLogsFromKafkaConsumer(trigger int) []*types.KnoxSystemLog {
results := []*types.KnoxSystemLog{}
KubeArmorKafkaLogsMutex.Lock()
defer KubeArmorKafkaLogsMutex.Unlock()
if len(KubeArmorKafkaLogs) == 0 {
log.Info().Msgf("KubeArmor kafka traffic flow not exist")
return results
}

if len(KubeArmorKafkaLogs) < trigger {
log.Info().Msgf("The number of KubeArmor traffic flow [%d] is less than trigger [%d]", len(KubeArmorKafkaLogs), trigger)
return results
}

results = KubeArmorKafkaLogs // copy
KubeArmorKafkaLogs = []*types.KnoxSystemLog{} // reset

log.Info().Msgf("The total number of KubeArmor kafka traffic flow: [%d]", len(results))

return results
}
24 changes: 20 additions & 4 deletions src/systempolicy/systemPolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/accuknox/knoxAutoPolicy/src/cluster"
cfg "github.com/accuknox/knoxAutoPolicy/src/config"
"github.com/accuknox/knoxAutoPolicy/src/feedconsumer"
"github.com/accuknox/knoxAutoPolicy/src/libs"
logger "github.com/accuknox/knoxAutoPolicy/src/logging"
"github.com/accuknox/knoxAutoPolicy/src/plugin"
Expand Down Expand Up @@ -165,10 +166,10 @@ type SysPath struct {
func getSystemLogs() []types.KnoxSystemLog {
systemLogs := []types.KnoxSystemLog{}

// =============== //
// == Database == //
// =============== //
if SystemLogFrom == "db" {
// ============== //
// == Database == //
// ============== //
log.Info().Msg("Get system log from the database")

// get system logs from db
Expand Down Expand Up @@ -236,9 +237,22 @@ func getSystemLogs() []types.KnoxSystemLog {

// convert kubearmor relay logs -> knox system logs
for _, relayLog := range relayLogs {
log := plugin.ConvertKubeArmorRelayLogToKnoxSystemLog(relayLog)
log := plugin.ConvertKubeArmorLogToKnoxSystemLog(relayLog)
systemLogs = append(systemLogs, log)
}
} else if SystemLogFrom == "kafka" {
log.Info().Msg("Get system log from kafka consumer")

// get system logs from kafka consumer
sysLogs := plugin.GetSystemLogsFromKafkaConsumer(OperationTrigger)
if len(sysLogs) == 0 || len(sysLogs) < OperationTrigger {
return nil
}

// convert kubearmor system logs -> knox system logs
for _, sysLog := range sysLogs {
systemLogs = append(systemLogs, *sysLog)
}
} else {
log.Error().Msgf("System log from not correct: %s", SystemLogFrom)
return nil
Expand Down Expand Up @@ -659,6 +673,8 @@ func StartSystemCronJob() {
if cfg.GetCfgSystemLogFrom() == "kubearmor" {
go plugin.StartKubeArmorRelay(SystemStopChan, &SystemWaitG, cfg.GetCfgKubeArmor())
SystemWaitG.Add(1)
} else if cfg.GetCfgSystemLogFrom() == "kafka" {
go feedconsumer.StartConsumer()
}

// init cron job
Expand Down

0 comments on commit 83fa44a

Please sign in to comment.