diff --git a/.github/workflows/ci-test.yml b/.github/workflows/ci-test.yml index 955a5d13..56a7c474 100644 --- a/.github/workflows/ci-test.yml +++ b/.github/workflows/ci-test.yml @@ -2,9 +2,9 @@ name: ci-test on: push: - branches: [master,dev] + branches: [master, dev] pull_request: - branches: [master,dev] + branches: [master, dev] jobs: unit-test: @@ -18,4 +18,50 @@ jobs: go-version: v1.16 - name: Unit Test - run: ./tests/test-go-unit.sh \ No newline at end of file + run: ./tests/test-go-unit.sh + + system-test: + runs-on: ubuntu-latest + steps: + - name: Checkout Source + uses: actions/checkout@v2 + + - uses: actions/setup-go@v2 + with: + go-version: v1.16 + + - name: Setup Env + run: | + # install kernel-headers + sudo apt-get update + sudo apt-get install -y linux-headers-$(uname -r) + # install kubeadm + sudo apt-get update + sudo apt-get install -y apt-transport-https curl + curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add - + sudo touch /etc/apt/sources.list.d/kubernetes.list + echo "deb http://apt.kubernetes.io/ kubernetes-xenial main" | sudo tee -a /etc/apt/sources.list.d/kubernetes.list + sudo apt-get update + sudo apt-get install -y kubelet=1.21.3-00 kubeadm=1.21.3-00 kubectl=1.21.3-00 + sudo apt-mark hold kubelet kubeadm kubectl + # install apparmor and auditd + sudo apt-get install -y apparmor apparmor-utils auditd + sudo systemctl start apparmor; sudo systemctl start auditd + # turn off swap + sudo swapoff -a + # initialize kubernetes + sudo kubeadm init --pod-network-cidr=10.244.0.0/16 | tee -a ~/k8s_init.log + # copy k8s config + mkdir -p $HOME/.kube + sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config + sudo chown $USER:$USER $HOME/.kube/config + export KUBECONFIG=$HOME/.kube/config + echo "export KUBECONFIG=$HOME/.kube/config" | tee -a ~/.bashrc + # install flannel + kubectl apply -f https://raw.githubusercontent.com/coreos/flannel/v0.13.0/Documentation/kube-flannel.yml + # disable master isolation + kubectl taint nodes --all node-role.kubernetes.io/master- + # Install grpcurl + go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest + - name: Run Test Script + run: ./tests/test-scenarios-local.sh diff --git a/deployments b/deployments index abf06e27..6834f042 160000 --- a/deployments +++ b/deployments @@ -1 +1 @@ -Subproject commit abf06e27340256d279874de5a8dbadfe4380df6d +Subproject commit 6834f042b396bd4002bfaaf31a87f4b46af10442 diff --git a/src/conf/local.yaml b/src/conf/local.yaml index f69cd2e3..7ef1e2d5 100644 --- a/src/conf/local.yaml +++ b/src/conf/local.yaml @@ -5,7 +5,7 @@ application: operation-trigger: 100 cron-job-time-interval: "0h0m10s" # format: XhYmZs network-log-limit: 100000 - network-log-from: "hubble" # db|hubble + network-log-from: "kafka" # db|hubble|kafka network-log-file: "./flow.json" # file path network-policy-to: "db|file" # db, file network-policy-dir: "./" @@ -13,7 +13,7 @@ application: operation-mode: 1 # 1: cronjob | 2: one-time-job operation-trigger: 100 cron-job-time-interval: "0h0m10s" # format: XhYmZs - system-log-from: "kubearmor" # db|kubearmor + system-log-from: "kafka" # db|kubearmor|kafka system-log-limit: 100000 system-log-file: "./log.json" # file path system-policy-to: "db|file" # db, file diff --git a/src/config/configManager_test.go b/src/config/configManager_test.go index f4d91090..fde85832 100644 --- a/src/config/configManager_test.go +++ b/src/config/configManager_test.go @@ -2,11 +2,8 @@ package config import ( "bytes" - "encoding/json" "testing" - "github.com/DATA-DOG/go-sqlmock" - "github.com/accuknox/knoxAutoPolicy/src/libs" types "github.com/accuknox/knoxAutoPolicy/src/types" "github.com/spf13/viper" "github.com/stretchr/testify/assert" @@ -29,7 +26,6 @@ func TestLoadConfigDB(t *testing.T) { assert.NotEmpty(t, cfg.DBHost, "DB host should not be empty") assert.NotEmpty(t, cfg.DBPort, "DB host should not be empty") - assert.NotEmpty(t, cfg.TableConfiguration, "Table configuration should not be empty") assert.NotEmpty(t, cfg.TableNetworkLog, "Table network_log should not be empty") assert.NotEmpty(t, cfg.TableNetworkPolicy, "Table network_policy should not be empty") assert.NotEmpty(t, cfg.TableSystemLog, "Table system_log should not be empty") @@ -93,255 +89,3 @@ func TestSetLogFile(t *testing.T) { assert.Equal(t, CurrentCfg.ConfigNetPolicy.NetworkLogFile, "test_log.log", "network log file should be \"test_log.log\"") } - -func TestAddConfiguration(t *testing.T) { - // prepare mock mysql - _, mock := libs.NewMock() - - newCfg := types.Configuration{} - newCfg.ConfigName = "test_config" - newCfg.ConfigNetPolicy.NetPolicyCIDRBits = 32 - - configDBPtr := &newCfg.ConfigDB - configDB, _ := json.Marshal(configDBPtr) - - configHubblePtr := &newCfg.ConfigCiliumHubble - configCilium, _ := json.Marshal(configHubblePtr) - - configKubeArmorPtr := &newCfg.ConfigKubeArmorRelay - configKubeArmor, _ := json.Marshal(configKubeArmorPtr) - - configFilterPtr := &newCfg.ConfigNetPolicy.NetLogFilters - configFilter, _ := json.Marshal(configFilterPtr) - - prep := mock.ExpectPrepare("INSERT INTO auto_policy_config") - prep.ExpectExec().WithArgs( - "test_config", //config_name - 0, //status - configDB, //config_db - configCilium, //config_cilium_hubble - configKubeArmor, //config_kubearmor_relay - 0, //network_operation_mode - "", //network_cronjob_time_interval - "", //network_one_time_job_time_selection - "", //network_log_from - "", //network_log_file - "", //network_policy_to - "", //network_policy_dir - configFilter, //network_policy_log_filters - 0, //network_policy_types - 0, //network_policy_rule_types - 32, //network_policy_cidr_bits - 0, //network_policy_l3_level - 0, //network_policy_l4_level - 0, //network_policy_l7_level - 0, //system_operation_mode - "", //system_cronjob_time_interval - "", //system_one_time_job_time_selection - "", //system_log_from - "", //system_log_file - "", //system_policy_to - "", //system_policy_dir - 0, //system_policy_types - configFilter, //system_policy_log_filters - false, //system_policy_proc_fromsource - false, //system_policy_file_fromsource - "", //cluster_info_from - "", //cluster_mgmt_url - ).WillReturnResult(sqlmock.NewResult(0, 1)) - - // add configuration - err := AddConfiguration(newCfg) - assert.NoError(t, err) - - if err = mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet expectation error: %s", err) - } -} - -func TestGetConfigurations(t *testing.T) { - // prepare mock mysql - _, mock := libs.NewMock() - - testCfg := types.Configuration{} - testCfg.ConfigName = "test_config" - testCfg.ConfigNetPolicy.NetPolicyCIDRBits = 32 - - configDBPtr := &testCfg.ConfigDB - configDB, _ := json.Marshal(configDBPtr) - - configHubblePtr := &testCfg.ConfigCiliumHubble - configCilium, _ := json.Marshal(configHubblePtr) - - configKubeArmorPtr := &testCfg.ConfigKubeArmorRelay - configKubeArmor, _ := json.Marshal(configKubeArmorPtr) - - configFilterPtr := &testCfg.ConfigNetPolicy.NetLogFilters - configFilter, _ := json.Marshal(configFilterPtr) - - rows := mock.NewRows([]string{ - "id", - "config_name", - "status", - "config_db", - "config_cilium_hubble", - "config_kubearmor_relay", - "network_operation_mode", - "network_cronjob_time_interval", - "network_one_time_job_time_selection", - "network_log_from", - "network_log_file", - "network_policy_to", - "network_policy_dir", - "network_policy_log_filters", - "network_policy_types", - "network_policy_rule_types", - "network_policy_cidr_bits", - "network_policy_l3_level", - "network_policy_l4_level", - "network_policy_l7_level", - "system_operation_mode", - "system_cronjob_time_interval", - "system_one_time_job_time_selection", - "system_log_from", - "system_log_file", - "system_policy_to", - "system_policy_dir", - "system_policy_types", - "system_policy_log_filters", - "system_policy_proc_fromsource", - "system_policy_file_fromsource", - "cluster_info_from", - "cluster_mgmt_url"}). - AddRow( - 1, //id - "test_config", //config_name - 0, //status - configDB, //config_db - configCilium, //config_cilium_hubble - configKubeArmor, //config_kubearmor_relay - 0, //network_operation_mode - "", //network_cronjob_time_interval - "", //network_one_time_job_time_selection - "", //network_log_from - "", //network_log_file - "", //network_policy_to - "", //network_policy_dir - configFilter, //network_policy_log_filters - 0, //network_policy_types - 0, //network_policy_rule_types - 32, //network_policy_cidr_bits - 0, //network_policy_l3_level - 0, //network_policy_l4_level - 0, //network_policy_l7_level - 0, //system_operation_mode - "", //system_cronjob_time_interval - "", //system_one_time_job_time_selection - "", //system_log_from - "", //system_log_file - "", //system_policy_to - "", //system_policy_dir - 0, //system_policy_types - configFilter, //system_policy_log_filters - false, //system_policy_proc_fromsource - false, //system_policy_file_fromsource - "", //cluster_info_from - "", //cluster_mgmt_url - ) - - query := "SELECT (.+) FROM auto_policy_config WHERE config_name = ?" - mock.ExpectQuery(query).WillReturnRows(rows) - - // get configuration by name - results, err := GetConfigurations(testCfg.ConfigName) - assert.NoError(t, err) - assert.Equal(t, results[0].ConfigName, "test_config") - - if err = mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet expectation error: %s", err) - } -} - -func TestUpdateConfiguration(t *testing.T) { - // prepare mock mysql - _, mock := libs.NewMock() - - testCfg := types.Configuration{} - testCfg.ConfigName = "test_config" - testCfg.ConfigNetPolicy.NetPolicyCIDRBits = 24 - - configDBPtr := &testCfg.ConfigDB - configDB, _ := json.Marshal(configDBPtr) - - configHubblePtr := &testCfg.ConfigCiliumHubble - configCilium, _ := json.Marshal(configHubblePtr) - - configKubeArmorPtr := &testCfg.ConfigKubeArmorRelay - configKubeArmor, _ := json.Marshal(configKubeArmorPtr) - - configFilterPtr := &testCfg.ConfigNetPolicy.NetLogFilters - configFilter, _ := json.Marshal(configFilterPtr) - - prep := mock.ExpectPrepare("UPDATE auto_policy_config") - prep.ExpectExec().WithArgs( - configDB, //config_db - configCilium, //config_cilium_hubble - configKubeArmor, //config_kubearmor_relay - 0, //network_operation_mode - "", //network_cronjob_time_interval - "", //network_one_time_job_time_selection - "", //network_log_from - "", //network_log_file - "", //network_policy_to - "", //network_policy_dir - configFilter, //network_policy_log_filters - 0, //network_policy_types - 0, //network_policy_rule_types - 24, //network_policy_cidr_bits - 0, //network_policy_l3_level - 0, //network_policy_l4_level - 0, //network_policy_l7_level - 0, //system_operation_mode - "", //system_cronjob_time_interval - "", //system_one_time_job_time_selection - "", //system_log_from - "", //system_log_file - "", //system_policy_to - "", //system_policy_dir - 0, //system_policy_types - configFilter, //system_policy_log_filters - false, //system_policy_proc_fromsource - false, //system_policy_file_fromsource - "", //cluster_info_from - "", //cluster_mgmt_url). - "test_config", //config_name - ).WillReturnResult(sqlmock.NewResult(0, 1)) - - // update configuration by name - err := UpdateConfiguration("test_config", testCfg) - assert.NoError(t, err) - - if err = mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet expectation error: %s", err) - } -} - -func TestDeleteConfiguration(t *testing.T) { - // prepare mock mysql - _, mock := libs.NewMock() - - testCfg := types.Configuration{} - testCfg.ConfigName = "test_config" - - prep := mock.ExpectPrepare("DELETE FROM auto_policy_config") - prep.ExpectExec().WithArgs("test_config"). - WillReturnResult(sqlmock.NewResult(0, 1)) - - // update configuration by name - err := DeleteConfiguration("test_config") - assert.NoError(t, err) - - if err = mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet expectation error: %s", err) - } -} diff --git a/src/feedconsumer/consumer.go b/src/feedconsumer/consumer.go index 4e5d0a40..6f03c784 100644 --- a/src/feedconsumer/consumer.go +++ b/src/feedconsumer/consumer.go @@ -2,7 +2,6 @@ package feedconsumer import ( "encoding/json" - "errors" "strconv" "sync" "time" @@ -15,7 +14,11 @@ import ( cfg "github.com/accuknox/knoxAutoPolicy/src/config" "github.com/accuknox/knoxAutoPolicy/src/libs" logger "github.com/accuknox/knoxAutoPolicy/src/logging" + "github.com/accuknox/knoxAutoPolicy/src/plugin" types "github.com/accuknox/knoxAutoPolicy/src/types" + cilium "github.com/cilium/cilium/api/v1/flow" + pb "github.com/kubearmor/KubeArmor/protobuf" + "google.golang.org/protobuf/types/known/timestamppb" ) const ( // status @@ -24,7 +27,7 @@ const ( // status ) // ====================== // -// == Gloabl Variables == // +// == Global Variables == // // ====================== // var numOfConsumers int @@ -188,13 +191,19 @@ func (cfc *KnoxFeedConsumer) processNetworkLogMessage(message []byte) error { } clusterName := eventMap["cluster_name"] + clusterNameStr := "" if err := json.Unmarshal(clusterName, &clusterNameStr); err != nil { + log.Error().Stack().Msg(err.Error()) return err } - flowEvent := eventMap["flow"] + flowEvent, exists := eventMap["flow"] + if !exists { + return nil + } if err := json.Unmarshal(flowEvent, &event); err != nil { + log.Error().Msg(err.Error()) return err } @@ -205,9 +214,38 @@ func (cfc *KnoxFeedConsumer) processNetworkLogMessage(message []byte) error { if cfc.netLogEventsCount == cfc.eventsBuffer { if len(cfc.netLogEvents) > 0 { - isSuccess := cfc.PushNetworkLogToDB() - if !isSuccess { - return errors.New("error saving to DB") + for _, netLog := range cfc.netLogEvents { + time, _ := strconv.ParseInt(netLog.Time, 10, 64) + flow := &cilium.Flow{ + TrafficDirection: cilium.TrafficDirection(plugin.TrafficDirection[netLog.TrafficDirection]), + PolicyMatchType: uint32(netLog.PolicyMatchType), + DropReason: uint32(netLog.DropReason), + Verdict: cilium.Verdict(plugin.Verdict[netLog.Verdict]), + Time: ×tamppb.Timestamp{ + Seconds: time, + }, + EventType: &cilium.CiliumEventType{}, + Source: &cilium.Endpoint{}, + Destination: &cilium.Endpoint{}, + IP: &cilium.IP{}, + L4: &cilium.Layer4{}, + L7: &cilium.Layer7{}, + } + + plugin.GetFlowData(netLog.EventType, flow.EventType) + plugin.GetFlowData(netLog.Source, flow.Source) + plugin.GetFlowData(netLog.Destination, flow.Destination) + plugin.GetFlowData(netLog.IP, flow.IP) + plugin.GetFlowData(netLog.L4, flow.L4) + plugin.GetFlowData(netLog.L7, flow.L7) + + knoxFlow, valid := plugin.ConvertCiliumFlowToKnoxNetworkLog(flow) + if valid { + knoxFlow.ClusterName = netLog.ClusterName + plugin.CiliumFlowsKafkaMutex.Lock() + plugin.CiliumFlowsKafka = append(plugin.CiliumFlowsKafka, &knoxFlow) + plugin.CiliumFlowsKafkaMutex.Unlock() + } } cfc.netLogEvents = nil cfc.netLogEvents = make([]types.NetworkLogEvent, 0, cfc.eventsBuffer) @@ -244,9 +282,24 @@ func (cfc *KnoxFeedConsumer) processSystemLogMessage(message []byte) error { if cfc.syslogEventsCount == cfc.eventsBuffer { if len(cfc.syslogEvents) > 0 { - isSuccess := cfc.PushSystemLogToDB() - if !isSuccess { - return errors.New("error saving to DB") + for _, syslog := range cfc.syslogEvents { + log := pb.Log{ + ClusterName: syslog.ClusterName, + HostName: syslog.HostName, + NamespaceName: syslog.NamespaceName, + PodName: syslog.PodName, + Source: syslog.Source, + Operation: syslog.Operation, + Resource: syslog.Resource, + Data: syslog.Data, + Result: syslog.Result, + } + + knoxLog := plugin.ConvertKubeArmorLogToKnoxSystemLog(&log) + knoxLog.ClusterName = syslog.Clustername + plugin.KubeArmorKafkaLogsMutex.Lock() + plugin.KubeArmorKafkaLogs = append(plugin.KubeArmorKafkaLogs, &knoxLog) + plugin.KubeArmorKafkaLogsMutex.Unlock() } cfc.syslogEvents = nil cfc.syslogEvents = make([]types.SystemLogEvent, 0, cfc.eventsBuffer) diff --git a/src/networkpolicy/helperFunctions.go b/src/networkpolicy/helperFunctions.go index e94dc96c..9bb9105a 100644 --- a/src/networkpolicy/helperFunctions.go +++ b/src/networkpolicy/helperFunctions.go @@ -221,6 +221,22 @@ func getNetworkLogs() []types.KnoxNetworkLog { networkLogs = append(networkLogs, log) } } + } else if NetworkLogFrom == "kafka" { + // ================== // + // == Cilium kafka == // + // ================== // + log.Info().Msg("Get network log from the kafka consumer") + + // get flows from kafka consumer + flows := plugin.GetCiliumFlowsFromKafka(OperationTrigger) + if len(flows) == 0 || len(flows) < OperationTrigger { + return nil + } + + // convert hubble flows -> network logs (but, in this case, no flow id) + for _, flow := range flows { + networkLogs = append(networkLogs, *flow) + } } else if NetworkLogFrom == "file" { // =============================== // // == File (.json) for testing == // diff --git a/src/networkpolicy/networkPolicy.go b/src/networkpolicy/networkPolicy.go index c89796be..6280f999 100644 --- a/src/networkpolicy/networkPolicy.go +++ b/src/networkpolicy/networkPolicy.go @@ -10,6 +10,7 @@ import ( "github.com/accuknox/knoxAutoPolicy/src/cluster" cfg "github.com/accuknox/knoxAutoPolicy/src/config" + "github.com/accuknox/knoxAutoPolicy/src/feedconsumer" "github.com/accuknox/knoxAutoPolicy/src/libs" logger "github.com/accuknox/knoxAutoPolicy/src/logging" "github.com/accuknox/knoxAutoPolicy/src/plugin" @@ -1607,6 +1608,9 @@ func StartNetworkCronJob() { if cfg.GetCfgNetworkLogFrom() == "hubble" { go plugin.StartHubbleRelay(NetworkStopChan, &NetworkWaitG, cfg.GetCfgCiliumHubble()) NetworkWaitG.Add(1) + } else if cfg.GetCfgNetworkLogFrom() == "kafka" { + go feedconsumer.StartConsumer() + NetworkWaitG.Add(1) } // init cron job diff --git a/src/plugin/cilium.go b/src/plugin/cilium.go index 75285e4f..c86de6b7 100644 --- a/src/plugin/cilium.go +++ b/src/plugin/cilium.go @@ -58,6 +58,8 @@ var Verdict = map[string]int{ var CiliumFlows []*cilium.Flow var CiliumFlowsMutex *sync.Mutex +var CiliumFlowsKafka []*types.KnoxNetworkLog +var CiliumFlowsKafkaMutex *sync.Mutex var log *zerolog.Logger @@ -65,6 +67,8 @@ func init() { log = logger.GetInstance() CiliumFlowsMutex = &sync.Mutex{} KubeArmorRelayLogsMutex = &sync.Mutex{} + CiliumFlowsKafkaMutex = &sync.Mutex{} + KubeArmorKafkaLogsMutex = &sync.Mutex{} } // ====================== // @@ -373,6 +377,17 @@ func ConvertCiliumNetworkLogsToKnoxNetworkLogs(dbDriver string, docs []map[strin } } +func GetFlowData(netLogEventType []byte, flowEventType interface{}) error { + if netLogEventType == nil { + return nil + } + err := json.Unmarshal(netLogEventType, flowEventType) + if err != nil { + log.Error().Msg("error while unmarshing event type :" + err.Error()) + } + return err +} + // ============================== // // == Network Policy Convertor == // // ============================== // @@ -754,3 +769,26 @@ func StartHubbleRelay(StopChan chan struct{}, wg *sync.WaitGroup, cfg types.Conf log.Error().Msg("Unable to stream network flow: " + err.Error()) } } + +func GetCiliumFlowsFromKafka(trigger int) []*types.KnoxNetworkLog { + results := []*types.KnoxNetworkLog{} + + CiliumFlowsKafkaMutex.Lock() + defer CiliumFlowsKafkaMutex.Unlock() + if len(CiliumFlowsKafka) == 0 { + log.Info().Msgf("Cilium kafka traffic flow not exist") + return results + } + + if len(CiliumFlowsKafka) < trigger { + log.Info().Msgf("The number of cilium kafka traffic flow [%d] is less than trigger [%d]", len(CiliumFlowsKafka), trigger) + return results + } + + results = CiliumFlowsKafka // copy + CiliumFlowsKafka = []*types.KnoxNetworkLog{} // reset + + log.Info().Msgf("The total number of cilium kafka traffic flow: [%d]", len(results)) + + return results +} diff --git a/src/plugin/kubearmor.go b/src/plugin/kubearmor.go index e2f10b25..880bf0c3 100644 --- a/src/plugin/kubearmor.go +++ b/src/plugin/kubearmor.go @@ -17,6 +17,9 @@ import ( var KubeArmorRelayLogs []*pb.Log var KubeArmorRelayLogsMutex *sync.Mutex +var KubeArmorKafkaLogs []*types.KnoxSystemLog +var KubeArmorKafkaLogsMutex *sync.Mutex + func ConvertKnoxSystemPolicyToKubeArmorPolicy(knoxPolicies []types.KnoxSystemPolicy) []types.KubeArmorPolicy { results := []types.KubeArmorPolicy{} @@ -100,7 +103,7 @@ func ConvertKubeArmorSystemLogsToKnoxSystemLogs(dbDriver string, docs []map[stri return []types.KnoxSystemLog{} } -func ConvertKubeArmorRelayLogToKnoxSystemLog(relayLog *pb.Log) types.KnoxSystemLog { +func ConvertKubeArmorLogToKnoxSystemLog(relayLog *pb.Log) types.KnoxSystemLog { sources := strings.Split(relayLog.Source, " ") source := "" @@ -250,3 +253,25 @@ func StartKubeArmorRelay(StopChan chan struct{}, wg *sync.WaitGroup, cfg types.C } }() } + +func GetSystemLogsFromKafkaConsumer(trigger int) []*types.KnoxSystemLog { + results := []*types.KnoxSystemLog{} + KubeArmorKafkaLogsMutex.Lock() + defer KubeArmorKafkaLogsMutex.Unlock() + if len(KubeArmorKafkaLogs) == 0 { + log.Info().Msgf("KubeArmor kafka traffic flow not exist") + return results + } + + if len(KubeArmorKafkaLogs) < trigger { + log.Info().Msgf("The number of KubeArmor traffic flow [%d] is less than trigger [%d]", len(KubeArmorKafkaLogs), trigger) + return results + } + + results = KubeArmorKafkaLogs // copy + KubeArmorKafkaLogs = []*types.KnoxSystemLog{} // reset + + log.Info().Msgf("The total number of KubeArmor kafka traffic flow: [%d]", len(results)) + + return results +} diff --git a/src/server/grpcServer.go b/src/server/grpcServer.go index 1dfdfe72..b956e5f7 100644 --- a/src/server/grpcServer.go +++ b/src/server/grpcServer.go @@ -42,23 +42,30 @@ type workerServer struct { func (s *workerServer) Start(ctx context.Context, in *wpb.WorkerRequest) (*wpb.WorkerResponse, error) { log.Info().Msg("Start worker called") + response := "" + if in.GetReq() == "dbclear" { libs.ClearDBTables(core.CurrentCfg.ConfigDB) + response += "Cleared DB ," } if in.GetLogfile() != "" { core.SetLogFile(in.GetLogfile()) + response += "Log File Set ," } - if in.GetPolicytype() == "network" { - networker.StartNetworkWorker() - } else if in.GetPolicytype() == "system" { - sysworker.StartSystemWorker() + if in.GetPolicytype() != "" { + if in.GetPolicytype() == "network" { + networker.StartNetworkWorker() + } else if in.GetPolicytype() == "system" { + sysworker.StartSystemWorker() + } + response += "Starting " + in.GetPolicytype() + " policy discovery" } else { - return &wpb.WorkerResponse{Res: "No policy type, choose 'network' or 'system', not [" + in.GetPolicytype() + "]"}, nil + response += "No policy type provided, choose 'network' or 'system' to start policy discovery" } - return &wpb.WorkerResponse{Res: "ok starting " + in.GetPolicytype() + " policy discovery"}, nil + return &wpb.WorkerResponse{Res: response}, nil } func (s *workerServer) Stop(ctx context.Context, in *wpb.WorkerRequest) (*wpb.WorkerResponse, error) { diff --git a/src/systempolicy/systemPolicy.go b/src/systempolicy/systemPolicy.go index 08f81483..b761edb0 100644 --- a/src/systempolicy/systemPolicy.go +++ b/src/systempolicy/systemPolicy.go @@ -10,6 +10,7 @@ import ( "github.com/accuknox/knoxAutoPolicy/src/cluster" cfg "github.com/accuknox/knoxAutoPolicy/src/config" + "github.com/accuknox/knoxAutoPolicy/src/feedconsumer" "github.com/accuknox/knoxAutoPolicy/src/libs" logger "github.com/accuknox/knoxAutoPolicy/src/logging" "github.com/accuknox/knoxAutoPolicy/src/plugin" @@ -165,10 +166,10 @@ type SysPath struct { func getSystemLogs() []types.KnoxSystemLog { systemLogs := []types.KnoxSystemLog{} - // =============== // - // == Database == // - // =============== // if SystemLogFrom == "db" { + // ============== // + // == Database == // + // ============== // log.Info().Msg("Get system log from the database") // get system logs from db @@ -236,9 +237,22 @@ func getSystemLogs() []types.KnoxSystemLog { // convert kubearmor relay logs -> knox system logs for _, relayLog := range relayLogs { - log := plugin.ConvertKubeArmorRelayLogToKnoxSystemLog(relayLog) + log := plugin.ConvertKubeArmorLogToKnoxSystemLog(relayLog) systemLogs = append(systemLogs, log) } + } else if SystemLogFrom == "kafka" { + log.Info().Msg("Get system log from kafka consumer") + + // get system logs from kafka consumer + sysLogs := plugin.GetSystemLogsFromKafkaConsumer(OperationTrigger) + if len(sysLogs) == 0 || len(sysLogs) < OperationTrigger { + return nil + } + + // convert kubearmor system logs -> knox system logs + for _, sysLog := range sysLogs { + systemLogs = append(systemLogs, *sysLog) + } } else { log.Error().Msgf("System log from not correct: %s", SystemLogFrom) return nil @@ -659,6 +673,8 @@ func StartSystemCronJob() { if cfg.GetCfgSystemLogFrom() == "kubearmor" { go plugin.StartKubeArmorRelay(SystemStopChan, &SystemWaitG, cfg.GetCfgKubeArmor()) SystemWaitG.Add(1) + } else if cfg.GetCfgSystemLogFrom() == "kafka" { + go feedconsumer.StartConsumer() } // init cron job diff --git a/tests/conf/local.yaml b/tests/conf/local.yaml new file mode 100644 index 00000000..5882ac36 --- /dev/null +++ b/tests/conf/local.yaml @@ -0,0 +1,34 @@ +application: + name: knoxautopolicy + network: + operation-mode: 2 # 1: cronjob | 2: one-time-job + network-log-from: "file" # db|hubble + network-log-file: "./test-flow.json" # file path + network-policy-to: "db|file" # db, file + network-policy-dir: "./" + system: + operation-mode: 2 # 1: cronjob | 2: one-time-job + system-log-from: "file" # db|hubble + system-log-file: "./test-flow.json" # file path + system-policy-to: "db|file" # db, file + system-policy-dir: "./" + cluster: + cluster-info-from: "k8sclient" # k8sclient|accuknox + #cluster-mgmt-url: "http://cluster-management-service.accuknox-dev-cluster-mgmt.svc.cluster.local/cm" + cluster-mgmt-url: "http://localhost:8080" + +database: + driver: mysql + host: 127.0.0.1 + port: 3306 + user: root + password: password + dbname: accuknox + table-network-log: network_log + table-network-policy: network_policy + table-system-log: system_log + table-system-alert: system_alert + table-system-policy: system_policy + +logging: + level: "INFO" diff --git a/tests/test-scenarios-local.sh b/tests/test-scenarios-local.sh index d587df37..7822762d 100755 --- a/tests/test-scenarios-local.sh +++ b/tests/test-scenarios-local.sh @@ -1,5 +1,7 @@ #!/bin/bash +shopt -s extglob + RED='\033[0;31m' GREEN='\033[0;32m' ORANGE='\033[0;33m' @@ -7,32 +9,34 @@ BLUE='\033[0;34m' MAGENTA='\033[0;35m' NC='\033[0m' -AUTOPOL_HOME=`dirname $(realpath "$0")`/.. -AUTOPOL_POLICY=$AUTOPOL_HOME/policies +AUTOPOL_HOME=$(dirname $(realpath "$0"))/.. AUTOPOL_SRC_HOME=$AUTOPOL_HOME/src -TEST_HOME=`dirname $(realpath "$0")` -TEST_SCRIPTS=`dirname $(realpath "$0")`/scripts +TEST_HOME=$(dirname $(realpath "$0")) +TEST_SCRIPTS=$(dirname $(realpath "$0"))/scripts PASSED_TESTS=() FAILED_TESTS=() COUNT_TESTS=0 -## ==================== ## +## ==================== ## ## == KnoxAutoPolicy == ## ## ==================== ## res_start_service=0 function start_and_wait_for_KnoxAutoPolicy_initialization() { - $AUTOPOL_HOME/scripts/start_service.sh &> /dev/null & + export CONF_FILE_NAME=local + + $AUTOPOL_HOME/src/knoxAutoPolicy -config-path=$AUTOPOL_HOME/tests/conf & + echo $? if [ $? != 0 ]; then res_start_service=1 exit 1 fi - sleep 1 - + sleep 3 + LISTEN=$(ps -e | grep knoxAutoPolicy | wc -l) if [ $LISTEN != 1 ]; then res_start_service=1 @@ -43,30 +47,16 @@ function start_and_wait_for_KnoxAutoPolicy_initialization() { function stop_and_wait_for_KnoxAutoPolicy_termination() { ps -e | grep knoxAutoPolicy | awk '{print $1}' | xargs -I {} kill {} - for (( ; ; )) - do - ps -e | grep knoxAutoPolicy &> /dev/null + for (( ; ; )); do + ps -e | grep knoxAutoPolicy &>/dev/null if [ $? != 0 ]; then break fi - sleep 1 + sleep 3 done } -function replace_discovery_mode() { - if [[ $1 == *"_egress_ingress_"* ]]; then - sed -i "s/DISCOVERY_POLICY_TYPES=1/DISCOVERY_POLICY_TYPES=3/" $AUTOPOL_HOME/scripts/start_service.sh - sed -i "s/DISCOVERY_POLICY_TYPES=2/DISCOVERY_POLICY_TYPES=3/" $AUTOPOL_HOME/scripts/start_service.sh - elif [[ $1 == *"_egress_"* ]]; then - sed -i "s/DISCOVERY_POLICY_TYPES=2/DISCOVERY_POLICY_TYPES=1/" $AUTOPOL_HOME/scripts/start_service.sh - sed -i "s/DISCOVERY_POLICY_TYPES=3/DISCOVERY_POLICY_TYPES=1/" $AUTOPOL_HOME/scripts/start_service.sh - else - sed -i "s/DISCOVERY_POLICY_TYPES=1/DISCOVERY_POLICY_TYPES=2/" $AUTOPOL_HOME/scripts/start_service.sh - sed -i "s/DISCOVERY_POLICY_TYPES=3/DISCOVERY_POLICY_TYPES=2/" $AUTOPOL_HOME/scripts/start_service.sh - fi -} - ## ============== ## ## == Database == ## ## ============== ## @@ -75,9 +65,8 @@ function start_and_wait_for_mysql_initialization() { cd $TEST_HOME/mysql docker-compose up -d - for (( ; ; )) - do - docker logs mysql-example > ./logs 2>&1 + for (( ; ; )); do + docker logs mysql-example >./logs 2>&1 log=$(cat $TEST_HOME/mysql/logs) if [[ $log == *"Ready for start up"* ]]; then break @@ -107,12 +96,11 @@ function apply_and_wait_for_microservice_creation() { return fi - for (( ; ; )) - do + for (( ; ; )); do RAW=$(kubectl get pods -n multiubuntu | wc -l) - ALL=`expr $RAW - 1` - READY=`kubectl get pods -n multiubuntu | grep Running | wc -l` + ALL=$(expr $RAW - 1) + READY=$(kubectl get pods -n multiubuntu | grep Running | wc -l) if [ $ALL == $READY ]; then break @@ -139,26 +127,25 @@ function delete_and_wait_for_microserivce_deletion() { function add_label_pods() { if [[ $2 == "TC_16" ]] || [[ $2 == "TC_30" ]]; then cd $1 - /bin/bash ./add_label.sh $> /dev/null + /bin/bash ./add_label.sh $ >/dev/null fi } function del_label_pods() { if [[ $2 == "TC_16" ]] || [[ $2 == "TC_30" ]]; then cd $1 - /bin/bash ./del_label.sh $> /dev/null + /bin/bash ./del_label.sh $ >/dev/null fi } function run_test_case() { cd $1 - ACTUAL_YAML_FILE=cilium_policies_$2.yaml + ACTUAL_YAML_FILE=@(cilium|kubearmor)_policies*.yaml + + for JSON_FILE in $(ls -r $TC_*.json); do + for EXPECTED_YAML_FILE in $(ls -r $TC_*.yaml); do - for JSON_FILE in $(ls -r $TC_*.json) - do - for EXPECTED_YAML_FILE in $(ls -r $TC_*.yaml) - do # check before / after if [[ $JSON_FILE == *"before"* ]] && [[ $EXPECTED_YAML_FILE != *"before"* ]]; then continue @@ -167,10 +154,16 @@ function run_test_case() { fi echo -e "${GREEN}[INFO] Discovering from $JSON_FILE" - if [[ $JSON_FILE == *"after"* ]]; then - $TEST_SCRIPTS/startTest.sh $1/$JSON_FILE &> /dev/null + cp $1/$JSON_FILE test-flow.json + + # start knoxAutoPolicy + echo -e "${ORANGE}[INFO] Starting KnoxAutoPolicy${NC}" + start_and_wait_for_KnoxAutoPolicy_initialization + if [ $res_start_service != 0 ]; then + echo -e "${RED}[FAIL] Failed to start KnoxAutoPolicy${NC}" + exit 1 else - $TEST_SCRIPTS/startTest.sh $1/$JSON_FILE clear &> /dev/null + echo "[INFO] Started KnoxAutoPolicy" fi if [ $? != 0 ]; then @@ -181,10 +174,28 @@ function run_test_case() { echo "[INFO] Discovered policies from $JSON_FILE" echo -e "${GREEN}[INFO] Comparing $EXPECTED_YAML_FILE and $ACTUAL_YAML_FILE${NC}" - python3 $TEST_SCRIPTS/diff.py $AUTOPOL_POLICY/$ACTUAL_YAML_FILE $1/$EXPECTED_YAML_FILE + fail=0 + python3 $TEST_SCRIPTS/diff.py $1/$ACTUAL_YAML_FILE $1/$EXPECTED_YAML_FILE if [ $? != 0 ]; then echo -e "${RED}[FAIL] Failed $3${NC}" FAILED_TESTS+=($testcase) + fail=1 + fi + + # Clear DB for next policy generation + if [[ $JSON_FILE == *"before"* ]]; then + echo "[INFO] Not clearing DB" + else + echo "[INFO] Clear DB" + grpcurl -plaintext -d '{"req": "dbclear"}' localhost:9089 v1.worker.Worker.Start + fi + + # stop knoxAutoPolicy + echo -e "${ORANGE}[INFO] Stopping KnoxAutoPolicy${NC}" + stop_and_wait_for_KnoxAutoPolicy_termination + echo "[INFO] Stopped KnoxAutoPolicy" + + if [ $fail != 0 ]; then return fi done @@ -204,7 +215,8 @@ cd $AUTOPOL_SRC_HOME if [ ! -f KnoxAutoPolicy ]; then echo -e "${ORANGE}[INFO] Building KnoxAutoPolicy${NC}" - make clean > /dev/null ; make > /dev/null + make clean >/dev/null + make >/dev/null echo "[INFO] Built KnoxAutoPolicy" fi @@ -225,29 +237,15 @@ apply_and_wait_for_microservice_creation $microservice if [ $res_microservice == 0 ]; then echo "[INFO] Applied $microservice" - echo "[INFO] Wait for initialization" - sleep 30 echo "[INFO] Started to run testcases" -## Step 4. Run all the test cases + ## Step 4. Run all the test cases cd $TEST_HOME/$microservice/test-cases - for testcase in $(ls -d $TC_*) - do + for testcase in $(ls -d $TC_*); do add_label_pods $TEST_HOME/$microservice/test-cases/$testcase $testcase # replace configuration JSON_FILE=$(ls $TEST_HOME/$microservice/test-cases/$testcase/*.json) - replace_discovery_mode $JSON_FILE - - # start knoxAutoPolicy - echo -e "${ORANGE}[INFO] Starting KnoxAutoPolicy${NC}" - start_and_wait_for_KnoxAutoPolicy_initialization - if [ $res_start_service != 0 ]; then - echo -e "${RED}[FAIL] Failed to start KnoxAutoPolicy${NC}" - exit 1 - else - echo "[INFO] Started KnoxAutoPolicy" - fi # run a test case res_case=0 @@ -260,15 +258,10 @@ if [ $res_microservice == 0 ]; then ((COUNT_TESTS++)) fi - # stop knoxAutoPolicy - echo -e "${ORANGE}[INFO] Stopping KnoxAutoPolicy${NC}" - stop_and_wait_for_KnoxAutoPolicy_termination - echo "[INFO] Stopped KnoxAutoPolicy" - del_label_pods $TEST_HOME/$microservice/test-cases/$testcase $testcase done -## Step 6. Delete Microservice + ## Step 6. Delete Microservice res_delete=0 echo -e "${ORANGE}[INFO] Deleting $microservice${NC}" delete_and_wait_for_microserivce_deletion $microservice @@ -291,10 +284,9 @@ echo " - Completed Cases: " $COUNT_TESTS echo " - Passed Cases: " ${#PASSED_TESTS[@]} echo -n " - Failed Cases: " ${#FAILED_TESTS[@]} -if (( ${#FAILED_TESTS[@]} > 0 )); then +if ((${#FAILED_TESTS[@]} > 0)); then echo -n " (" - for casenumber in ${FAILED_TESTS[@]} - do + for casenumber in ${FAILED_TESTS[@]}; do echo -n " "$casenumber done echo ")"