From 4a77decc52f61e738ad576513b18d460bf326b9c Mon Sep 17 00:00:00 2001 From: Rahul Jadhav Date: Thu, 12 Aug 2021 11:21:52 +0530 Subject: [PATCH 01/18] submodule moved ahead --- deployments | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deployments b/deployments index abf06e27..65b312c9 160000 --- a/deployments +++ b/deployments @@ -1 +1 @@ -Subproject commit abf06e27340256d279874de5a8dbadfe4380df6d +Subproject commit 65b312c9157b2024aa13530bb8c608c1f6cd1c92 From cf61c175fb77316d91a7ed8914f2accdfec8fc84 Mon Sep 17 00:00:00 2001 From: Rahul Jadhav Date: Thu, 12 Aug 2021 22:29:12 +0530 Subject: [PATCH 02/18] updated deployment --- deployments | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deployments b/deployments index 65b312c9..c2b4ac69 160000 --- a/deployments +++ b/deployments @@ -1 +1 @@ -Subproject commit 65b312c9157b2024aa13530bb8c608c1f6cd1c92 +Subproject commit c2b4ac69453f87f8cb24b2f6fe251f20f3c1841b From 6e952c0c9c6ce4b4737df3fe1484161d5fbeae06 Mon Sep 17 00:00:00 2001 From: Rahul Jadhav Date: Thu, 19 Aug 2021 14:51:27 +0530 Subject: [PATCH 03/18] conf fix --- deployments | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deployments b/deployments index c2b4ac69..6834f042 160000 --- a/deployments +++ b/deployments @@ -1 +1 @@ -Subproject commit c2b4ac69453f87f8cb24b2f6fe251f20f3c1841b +Subproject commit 6834f042b396bd4002bfaaf31a87f4b46af10442 From 0fbe3964fde8e39f5a487afc9e402277eac5ddd0 Mon Sep 17 00:00:00 2001 From: daemon1024 Date: Mon, 23 Aug 2021 16:40:15 +0530 Subject: [PATCH 04/18] Fix system tests to support updated service --- tests/conf/local.yaml | 28 ++++++++ tests/test-scenarios-local.sh | 128 +++++++++++----------------------- 2 files changed, 70 insertions(+), 86 deletions(-) create mode 100644 tests/conf/local.yaml diff --git a/tests/conf/local.yaml b/tests/conf/local.yaml new file mode 100644 index 00000000..c74ee1c8 --- /dev/null +++ b/tests/conf/local.yaml @@ -0,0 +1,28 @@ +application: + name: knoxautopolicy + network: + operation-mode: 2 # 1: cronjob | 2: one-time-job + network-log-from: "file" # db|hubble + network-log-file: "./test-flow.json" # file path + network-policy-to: "db|file" # db, file + network-policy-dir: "./" + cluster: + cluster-info-from: "k8sclient" # k8sclient|accuknox + #cluster-mgmt-url: "http://cluster-management-service.accuknox-dev-cluster-mgmt.svc.cluster.local/cm" + cluster-mgmt-url: "http://localhost:8080" + +database: + driver: mysql + host: 127.0.0.1 + port: 3306 + user: root + password: password + dbname: accuknox + table-network-log: network_log + table-network-policy: network_policy + table-system-log: system_log + table-system-alert: system_alert + table-system-policy: system_policy + +logging: + level: "INFO" diff --git a/tests/test-scenarios-local.sh b/tests/test-scenarios-local.sh index d587df37..0b5a1952 100755 --- a/tests/test-scenarios-local.sh +++ b/tests/test-scenarios-local.sh @@ -7,32 +7,34 @@ BLUE='\033[0;34m' MAGENTA='\033[0;35m' NC='\033[0m' -AUTOPOL_HOME=`dirname $(realpath "$0")`/.. -AUTOPOL_POLICY=$AUTOPOL_HOME/policies +AUTOPOL_HOME=$(dirname $(realpath "$0"))/.. AUTOPOL_SRC_HOME=$AUTOPOL_HOME/src -TEST_HOME=`dirname $(realpath "$0")` -TEST_SCRIPTS=`dirname $(realpath "$0")`/scripts +TEST_HOME=$(dirname $(realpath "$0")) +TEST_SCRIPTS=$(dirname $(realpath "$0"))/scripts PASSED_TESTS=() FAILED_TESTS=() COUNT_TESTS=0 -## ==================== ## +## ==================== ## ## == KnoxAutoPolicy == ## ## ==================== ## res_start_service=0 function start_and_wait_for_KnoxAutoPolicy_initialization() { - $AUTOPOL_HOME/scripts/start_service.sh &> /dev/null & + export CONF_FILE_NAME=local + + $AUTOPOL_HOME/src/knoxAutoPolicy -config-path=$AUTOPOL_HOME/tests/conf & + echo $? if [ $? != 0 ]; then res_start_service=1 exit 1 fi - sleep 1 - + sleep 3 + LISTEN=$(ps -e | grep knoxAutoPolicy | wc -l) if [ $LISTEN != 1 ]; then res_start_service=1 @@ -43,30 +45,16 @@ function start_and_wait_for_KnoxAutoPolicy_initialization() { function stop_and_wait_for_KnoxAutoPolicy_termination() { ps -e | grep knoxAutoPolicy | awk '{print $1}' | xargs -I {} kill {} - for (( ; ; )) - do - ps -e | grep knoxAutoPolicy &> /dev/null + for (( ; ; )); do + ps -e | grep knoxAutoPolicy &>/dev/null if [ $? != 0 ]; then break fi - sleep 1 + sleep 3 done } -function replace_discovery_mode() { - if [[ $1 == *"_egress_ingress_"* ]]; then - sed -i "s/DISCOVERY_POLICY_TYPES=1/DISCOVERY_POLICY_TYPES=3/" $AUTOPOL_HOME/scripts/start_service.sh - sed -i "s/DISCOVERY_POLICY_TYPES=2/DISCOVERY_POLICY_TYPES=3/" $AUTOPOL_HOME/scripts/start_service.sh - elif [[ $1 == *"_egress_"* ]]; then - sed -i "s/DISCOVERY_POLICY_TYPES=2/DISCOVERY_POLICY_TYPES=1/" $AUTOPOL_HOME/scripts/start_service.sh - sed -i "s/DISCOVERY_POLICY_TYPES=3/DISCOVERY_POLICY_TYPES=1/" $AUTOPOL_HOME/scripts/start_service.sh - else - sed -i "s/DISCOVERY_POLICY_TYPES=1/DISCOVERY_POLICY_TYPES=2/" $AUTOPOL_HOME/scripts/start_service.sh - sed -i "s/DISCOVERY_POLICY_TYPES=3/DISCOVERY_POLICY_TYPES=2/" $AUTOPOL_HOME/scripts/start_service.sh - fi -} - ## ============== ## ## == Database == ## ## ============== ## @@ -75,9 +63,8 @@ function start_and_wait_for_mysql_initialization() { cd $TEST_HOME/mysql docker-compose up -d - for (( ; ; )) - do - docker logs mysql-example > ./logs 2>&1 + for (( ; ; )); do + docker logs mysql-example >./logs 2>&1 log=$(cat $TEST_HOME/mysql/logs) if [[ $log == *"Ready for start up"* ]]; then break @@ -107,12 +94,11 @@ function apply_and_wait_for_microservice_creation() { return fi - for (( ; ; )) - do + for (( ; ; )); do RAW=$(kubectl get pods -n multiubuntu | wc -l) - ALL=`expr $RAW - 1` - READY=`kubectl get pods -n multiubuntu | grep Running | wc -l` + ALL=$(expr $RAW - 1) + READY=$(kubectl get pods -n multiubuntu | grep Running | wc -l) if [ $ALL == $READY ]; then break @@ -136,41 +122,24 @@ function delete_and_wait_for_microserivce_deletion() { ## == Test Functions == ## ## ==================== ## -function add_label_pods() { - if [[ $2 == "TC_16" ]] || [[ $2 == "TC_30" ]]; then - cd $1 - /bin/bash ./add_label.sh $> /dev/null - fi -} - -function del_label_pods() { - if [[ $2 == "TC_16" ]] || [[ $2 == "TC_30" ]]; then - cd $1 - /bin/bash ./del_label.sh $> /dev/null - fi -} - function run_test_case() { cd $1 ACTUAL_YAML_FILE=cilium_policies_$2.yaml - for JSON_FILE in $(ls -r $TC_*.json) - do - for EXPECTED_YAML_FILE in $(ls -r $TC_*.yaml) - do - # check before / after - if [[ $JSON_FILE == *"before"* ]] && [[ $EXPECTED_YAML_FILE != *"before"* ]]; then - continue - elif [[ $JSON_FILE == *"after"* ]] && [[ $EXPECTED_YAML_FILE != *"after"* ]]; then - continue - fi - + for JSON_FILE in $(ls -r $TC_*.json); do + for EXPECTED_YAML_FILE in $(ls -r $TC_*.yaml); do echo -e "${GREEN}[INFO] Discovering from $JSON_FILE" - if [[ $JSON_FILE == *"after"* ]]; then - $TEST_SCRIPTS/startTest.sh $1/$JSON_FILE &> /dev/null + cp $1/$JSON_FILE test-flow.json + + # start knoxAutoPolicy + echo -e "${ORANGE}[INFO] Starting KnoxAutoPolicy${NC}" + start_and_wait_for_KnoxAutoPolicy_initialization + if [ $res_start_service != 0 ]; then + echo -e "${RED}[FAIL] Failed to start KnoxAutoPolicy${NC}" + exit 1 else - $TEST_SCRIPTS/startTest.sh $1/$JSON_FILE clear &> /dev/null + echo "[INFO] Started KnoxAutoPolicy" fi if [ $? != 0 ]; then @@ -181,7 +150,7 @@ function run_test_case() { echo "[INFO] Discovered policies from $JSON_FILE" echo -e "${GREEN}[INFO] Comparing $EXPECTED_YAML_FILE and $ACTUAL_YAML_FILE${NC}" - python3 $TEST_SCRIPTS/diff.py $AUTOPOL_POLICY/$ACTUAL_YAML_FILE $1/$EXPECTED_YAML_FILE + python3 $TEST_SCRIPTS/diff.py $1/$ACTUAL_YAML_FILE $1/$EXPECTED_YAML_FILE if [ $? != 0 ]; then echo -e "${RED}[FAIL] Failed $3${NC}" FAILED_TESTS+=($testcase) @@ -204,7 +173,8 @@ cd $AUTOPOL_SRC_HOME if [ ! -f KnoxAutoPolicy ]; then echo -e "${ORANGE}[INFO] Building KnoxAutoPolicy${NC}" - make clean > /dev/null ; make > /dev/null + make clean >/dev/null + make >/dev/null echo "[INFO] Built KnoxAutoPolicy" fi @@ -225,29 +195,14 @@ apply_and_wait_for_microservice_creation $microservice if [ $res_microservice == 0 ]; then echo "[INFO] Applied $microservice" - echo "[INFO] Wait for initialization" - sleep 30 echo "[INFO] Started to run testcases" -## Step 4. Run all the test cases + ## Step 4. Run all the test cases cd $TEST_HOME/$microservice/test-cases - for testcase in $(ls -d $TC_*) - do - add_label_pods $TEST_HOME/$microservice/test-cases/$testcase $testcase - + for testcase in $(ls -d $TC_*); do + cd # replace configuration JSON_FILE=$(ls $TEST_HOME/$microservice/test-cases/$testcase/*.json) - replace_discovery_mode $JSON_FILE - - # start knoxAutoPolicy - echo -e "${ORANGE}[INFO] Starting KnoxAutoPolicy${NC}" - start_and_wait_for_KnoxAutoPolicy_initialization - if [ $res_start_service != 0 ]; then - echo -e "${RED}[FAIL] Failed to start KnoxAutoPolicy${NC}" - exit 1 - else - echo "[INFO] Started KnoxAutoPolicy" - fi # run a test case res_case=0 @@ -260,15 +215,17 @@ if [ $res_microservice == 0 ]; then ((COUNT_TESTS++)) fi + # Clear DB for next policy generation + echo "[INFO] Clear DB" + grpcurl -plaintext -d '{"req": "dbclear"}' localhost:9089 v1.worker.Worker.Start + # stop knoxAutoPolicy echo -e "${ORANGE}[INFO] Stopping KnoxAutoPolicy${NC}" stop_and_wait_for_KnoxAutoPolicy_termination echo "[INFO] Stopped KnoxAutoPolicy" - - del_label_pods $TEST_HOME/$microservice/test-cases/$testcase $testcase done -## Step 6. Delete Microservice + ## Step 6. Delete Microservice res_delete=0 echo -e "${ORANGE}[INFO] Deleting $microservice${NC}" delete_and_wait_for_microserivce_deletion $microservice @@ -291,10 +248,9 @@ echo " - Completed Cases: " $COUNT_TESTS echo " - Passed Cases: " ${#PASSED_TESTS[@]} echo -n " - Failed Cases: " ${#FAILED_TESTS[@]} -if (( ${#FAILED_TESTS[@]} > 0 )); then +if ((${#FAILED_TESTS[@]} > 0)); then echo -n " (" - for casenumber in ${FAILED_TESTS[@]} - do + for casenumber in ${FAILED_TESTS[@]}; do echo -n " "$casenumber done echo ")" From ccb630b0a54d3e4eee734ca4471a1024980ad7f1 Mon Sep 17 00:00:00 2001 From: daemon1024 Date: Mon, 23 Aug 2021 16:42:30 +0530 Subject: [PATCH 05/18] add system tests github workflow --- .github/workflows/ci-test.yml | 52 +++++++++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci-test.yml b/.github/workflows/ci-test.yml index 955a5d13..56a7c474 100644 --- a/.github/workflows/ci-test.yml +++ b/.github/workflows/ci-test.yml @@ -2,9 +2,9 @@ name: ci-test on: push: - branches: [master,dev] + branches: [master, dev] pull_request: - branches: [master,dev] + branches: [master, dev] jobs: unit-test: @@ -18,4 +18,50 @@ jobs: go-version: v1.16 - name: Unit Test - run: ./tests/test-go-unit.sh \ No newline at end of file + run: ./tests/test-go-unit.sh + + system-test: + runs-on: ubuntu-latest + steps: + - name: Checkout Source + uses: actions/checkout@v2 + + - uses: actions/setup-go@v2 + with: + go-version: v1.16 + + - name: Setup Env + run: | + # install kernel-headers + sudo apt-get update + sudo apt-get install -y linux-headers-$(uname -r) + # install kubeadm + sudo apt-get update + sudo apt-get install -y apt-transport-https curl + curl -s https://packages.cloud.google.com/apt/doc/apt-key.gpg | sudo apt-key add - + sudo touch /etc/apt/sources.list.d/kubernetes.list + echo "deb http://apt.kubernetes.io/ kubernetes-xenial main" | sudo tee -a /etc/apt/sources.list.d/kubernetes.list + sudo apt-get update + sudo apt-get install -y kubelet=1.21.3-00 kubeadm=1.21.3-00 kubectl=1.21.3-00 + sudo apt-mark hold kubelet kubeadm kubectl + # install apparmor and auditd + sudo apt-get install -y apparmor apparmor-utils auditd + sudo systemctl start apparmor; sudo systemctl start auditd + # turn off swap + sudo swapoff -a + # initialize kubernetes + sudo kubeadm init --pod-network-cidr=10.244.0.0/16 | tee -a ~/k8s_init.log + # copy k8s config + mkdir -p $HOME/.kube + sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config + sudo chown $USER:$USER $HOME/.kube/config + export KUBECONFIG=$HOME/.kube/config + echo "export KUBECONFIG=$HOME/.kube/config" | tee -a ~/.bashrc + # install flannel + kubectl apply -f https://raw.githubusercontent.com/coreos/flannel/v0.13.0/Documentation/kube-flannel.yml + # disable master isolation + kubectl taint nodes --all node-role.kubernetes.io/master- + # Install grpcurl + go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest + - name: Run Test Script + run: ./tests/test-scenarios-local.sh From af0f648294798d8bfab69a0720f1a92d2d6b9298 Mon Sep 17 00:00:00 2001 From: daemon1024 Date: Mon, 23 Aug 2021 16:54:52 +0530 Subject: [PATCH 06/18] update start worker response --- src/server/grpcServer.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/server/grpcServer.go b/src/server/grpcServer.go index 1dfdfe72..b956e5f7 100644 --- a/src/server/grpcServer.go +++ b/src/server/grpcServer.go @@ -42,23 +42,30 @@ type workerServer struct { func (s *workerServer) Start(ctx context.Context, in *wpb.WorkerRequest) (*wpb.WorkerResponse, error) { log.Info().Msg("Start worker called") + response := "" + if in.GetReq() == "dbclear" { libs.ClearDBTables(core.CurrentCfg.ConfigDB) + response += "Cleared DB ," } if in.GetLogfile() != "" { core.SetLogFile(in.GetLogfile()) + response += "Log File Set ," } - if in.GetPolicytype() == "network" { - networker.StartNetworkWorker() - } else if in.GetPolicytype() == "system" { - sysworker.StartSystemWorker() + if in.GetPolicytype() != "" { + if in.GetPolicytype() == "network" { + networker.StartNetworkWorker() + } else if in.GetPolicytype() == "system" { + sysworker.StartSystemWorker() + } + response += "Starting " + in.GetPolicytype() + " policy discovery" } else { - return &wpb.WorkerResponse{Res: "No policy type, choose 'network' or 'system', not [" + in.GetPolicytype() + "]"}, nil + response += "No policy type provided, choose 'network' or 'system' to start policy discovery" } - return &wpb.WorkerResponse{Res: "ok starting " + in.GetPolicytype() + " policy discovery"}, nil + return &wpb.WorkerResponse{Res: response}, nil } func (s *workerServer) Stop(ctx context.Context, in *wpb.WorkerRequest) (*wpb.WorkerResponse, error) { From 58be73c6dd4a529992927fd165d1c31d8cc01a2e Mon Sep 17 00:00:00 2001 From: daemon1024 Date: Mon, 23 Aug 2021 17:02:38 +0530 Subject: [PATCH 07/18] Update unit tests for code change #314 --- src/config/configManager_test.go | 256 ------------------------------- 1 file changed, 256 deletions(-) diff --git a/src/config/configManager_test.go b/src/config/configManager_test.go index f4d91090..fde85832 100644 --- a/src/config/configManager_test.go +++ b/src/config/configManager_test.go @@ -2,11 +2,8 @@ package config import ( "bytes" - "encoding/json" "testing" - "github.com/DATA-DOG/go-sqlmock" - "github.com/accuknox/knoxAutoPolicy/src/libs" types "github.com/accuknox/knoxAutoPolicy/src/types" "github.com/spf13/viper" "github.com/stretchr/testify/assert" @@ -29,7 +26,6 @@ func TestLoadConfigDB(t *testing.T) { assert.NotEmpty(t, cfg.DBHost, "DB host should not be empty") assert.NotEmpty(t, cfg.DBPort, "DB host should not be empty") - assert.NotEmpty(t, cfg.TableConfiguration, "Table configuration should not be empty") assert.NotEmpty(t, cfg.TableNetworkLog, "Table network_log should not be empty") assert.NotEmpty(t, cfg.TableNetworkPolicy, "Table network_policy should not be empty") assert.NotEmpty(t, cfg.TableSystemLog, "Table system_log should not be empty") @@ -93,255 +89,3 @@ func TestSetLogFile(t *testing.T) { assert.Equal(t, CurrentCfg.ConfigNetPolicy.NetworkLogFile, "test_log.log", "network log file should be \"test_log.log\"") } - -func TestAddConfiguration(t *testing.T) { - // prepare mock mysql - _, mock := libs.NewMock() - - newCfg := types.Configuration{} - newCfg.ConfigName = "test_config" - newCfg.ConfigNetPolicy.NetPolicyCIDRBits = 32 - - configDBPtr := &newCfg.ConfigDB - configDB, _ := json.Marshal(configDBPtr) - - configHubblePtr := &newCfg.ConfigCiliumHubble - configCilium, _ := json.Marshal(configHubblePtr) - - configKubeArmorPtr := &newCfg.ConfigKubeArmorRelay - configKubeArmor, _ := json.Marshal(configKubeArmorPtr) - - configFilterPtr := &newCfg.ConfigNetPolicy.NetLogFilters - configFilter, _ := json.Marshal(configFilterPtr) - - prep := mock.ExpectPrepare("INSERT INTO auto_policy_config") - prep.ExpectExec().WithArgs( - "test_config", //config_name - 0, //status - configDB, //config_db - configCilium, //config_cilium_hubble - configKubeArmor, //config_kubearmor_relay - 0, //network_operation_mode - "", //network_cronjob_time_interval - "", //network_one_time_job_time_selection - "", //network_log_from - "", //network_log_file - "", //network_policy_to - "", //network_policy_dir - configFilter, //network_policy_log_filters - 0, //network_policy_types - 0, //network_policy_rule_types - 32, //network_policy_cidr_bits - 0, //network_policy_l3_level - 0, //network_policy_l4_level - 0, //network_policy_l7_level - 0, //system_operation_mode - "", //system_cronjob_time_interval - "", //system_one_time_job_time_selection - "", //system_log_from - "", //system_log_file - "", //system_policy_to - "", //system_policy_dir - 0, //system_policy_types - configFilter, //system_policy_log_filters - false, //system_policy_proc_fromsource - false, //system_policy_file_fromsource - "", //cluster_info_from - "", //cluster_mgmt_url - ).WillReturnResult(sqlmock.NewResult(0, 1)) - - // add configuration - err := AddConfiguration(newCfg) - assert.NoError(t, err) - - if err = mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet expectation error: %s", err) - } -} - -func TestGetConfigurations(t *testing.T) { - // prepare mock mysql - _, mock := libs.NewMock() - - testCfg := types.Configuration{} - testCfg.ConfigName = "test_config" - testCfg.ConfigNetPolicy.NetPolicyCIDRBits = 32 - - configDBPtr := &testCfg.ConfigDB - configDB, _ := json.Marshal(configDBPtr) - - configHubblePtr := &testCfg.ConfigCiliumHubble - configCilium, _ := json.Marshal(configHubblePtr) - - configKubeArmorPtr := &testCfg.ConfigKubeArmorRelay - configKubeArmor, _ := json.Marshal(configKubeArmorPtr) - - configFilterPtr := &testCfg.ConfigNetPolicy.NetLogFilters - configFilter, _ := json.Marshal(configFilterPtr) - - rows := mock.NewRows([]string{ - "id", - "config_name", - "status", - "config_db", - "config_cilium_hubble", - "config_kubearmor_relay", - "network_operation_mode", - "network_cronjob_time_interval", - "network_one_time_job_time_selection", - "network_log_from", - "network_log_file", - "network_policy_to", - "network_policy_dir", - "network_policy_log_filters", - "network_policy_types", - "network_policy_rule_types", - "network_policy_cidr_bits", - "network_policy_l3_level", - "network_policy_l4_level", - "network_policy_l7_level", - "system_operation_mode", - "system_cronjob_time_interval", - "system_one_time_job_time_selection", - "system_log_from", - "system_log_file", - "system_policy_to", - "system_policy_dir", - "system_policy_types", - "system_policy_log_filters", - "system_policy_proc_fromsource", - "system_policy_file_fromsource", - "cluster_info_from", - "cluster_mgmt_url"}). - AddRow( - 1, //id - "test_config", //config_name - 0, //status - configDB, //config_db - configCilium, //config_cilium_hubble - configKubeArmor, //config_kubearmor_relay - 0, //network_operation_mode - "", //network_cronjob_time_interval - "", //network_one_time_job_time_selection - "", //network_log_from - "", //network_log_file - "", //network_policy_to - "", //network_policy_dir - configFilter, //network_policy_log_filters - 0, //network_policy_types - 0, //network_policy_rule_types - 32, //network_policy_cidr_bits - 0, //network_policy_l3_level - 0, //network_policy_l4_level - 0, //network_policy_l7_level - 0, //system_operation_mode - "", //system_cronjob_time_interval - "", //system_one_time_job_time_selection - "", //system_log_from - "", //system_log_file - "", //system_policy_to - "", //system_policy_dir - 0, //system_policy_types - configFilter, //system_policy_log_filters - false, //system_policy_proc_fromsource - false, //system_policy_file_fromsource - "", //cluster_info_from - "", //cluster_mgmt_url - ) - - query := "SELECT (.+) FROM auto_policy_config WHERE config_name = ?" - mock.ExpectQuery(query).WillReturnRows(rows) - - // get configuration by name - results, err := GetConfigurations(testCfg.ConfigName) - assert.NoError(t, err) - assert.Equal(t, results[0].ConfigName, "test_config") - - if err = mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet expectation error: %s", err) - } -} - -func TestUpdateConfiguration(t *testing.T) { - // prepare mock mysql - _, mock := libs.NewMock() - - testCfg := types.Configuration{} - testCfg.ConfigName = "test_config" - testCfg.ConfigNetPolicy.NetPolicyCIDRBits = 24 - - configDBPtr := &testCfg.ConfigDB - configDB, _ := json.Marshal(configDBPtr) - - configHubblePtr := &testCfg.ConfigCiliumHubble - configCilium, _ := json.Marshal(configHubblePtr) - - configKubeArmorPtr := &testCfg.ConfigKubeArmorRelay - configKubeArmor, _ := json.Marshal(configKubeArmorPtr) - - configFilterPtr := &testCfg.ConfigNetPolicy.NetLogFilters - configFilter, _ := json.Marshal(configFilterPtr) - - prep := mock.ExpectPrepare("UPDATE auto_policy_config") - prep.ExpectExec().WithArgs( - configDB, //config_db - configCilium, //config_cilium_hubble - configKubeArmor, //config_kubearmor_relay - 0, //network_operation_mode - "", //network_cronjob_time_interval - "", //network_one_time_job_time_selection - "", //network_log_from - "", //network_log_file - "", //network_policy_to - "", //network_policy_dir - configFilter, //network_policy_log_filters - 0, //network_policy_types - 0, //network_policy_rule_types - 24, //network_policy_cidr_bits - 0, //network_policy_l3_level - 0, //network_policy_l4_level - 0, //network_policy_l7_level - 0, //system_operation_mode - "", //system_cronjob_time_interval - "", //system_one_time_job_time_selection - "", //system_log_from - "", //system_log_file - "", //system_policy_to - "", //system_policy_dir - 0, //system_policy_types - configFilter, //system_policy_log_filters - false, //system_policy_proc_fromsource - false, //system_policy_file_fromsource - "", //cluster_info_from - "", //cluster_mgmt_url). - "test_config", //config_name - ).WillReturnResult(sqlmock.NewResult(0, 1)) - - // update configuration by name - err := UpdateConfiguration("test_config", testCfg) - assert.NoError(t, err) - - if err = mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet expectation error: %s", err) - } -} - -func TestDeleteConfiguration(t *testing.T) { - // prepare mock mysql - _, mock := libs.NewMock() - - testCfg := types.Configuration{} - testCfg.ConfigName = "test_config" - - prep := mock.ExpectPrepare("DELETE FROM auto_policy_config") - prep.ExpectExec().WithArgs("test_config"). - WillReturnResult(sqlmock.NewResult(0, 1)) - - // update configuration by name - err := DeleteConfiguration("test_config") - assert.NoError(t, err) - - if err = mock.ExpectationsWereMet(); err != nil { - t.Errorf("unmet expectation error: %s", err) - } -} From 76cb304a6dde4203cb13edf65426dc0d1b3ec2fd Mon Sep 17 00:00:00 2001 From: daemon1024 Date: Tue, 24 Aug 2021 01:56:59 +0530 Subject: [PATCH 08/18] revert add/del label func & after/before check --- tests/test-scenarios-local.sh | 35 ++++++++++++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/tests/test-scenarios-local.sh b/tests/test-scenarios-local.sh index 0b5a1952..dbbe2bba 100755 --- a/tests/test-scenarios-local.sh +++ b/tests/test-scenarios-local.sh @@ -122,6 +122,20 @@ function delete_and_wait_for_microserivce_deletion() { ## == Test Functions == ## ## ==================== ## +function add_label_pods() { + if [[ $2 == "TC_16" ]] || [[ $2 == "TC_30" ]]; then + cd $1 + /bin/bash ./add_label.sh $ >/dev/null + fi +} + +function del_label_pods() { + if [[ $2 == "TC_16" ]] || [[ $2 == "TC_30" ]]; then + cd $1 + /bin/bash ./del_label.sh $ >/dev/null + fi +} + function run_test_case() { cd $1 @@ -129,6 +143,14 @@ function run_test_case() { for JSON_FILE in $(ls -r $TC_*.json); do for EXPECTED_YAML_FILE in $(ls -r $TC_*.yaml); do + + # check before / after + if [[ $JSON_FILE == *"before"* ]] && [[ $EXPECTED_YAML_FILE != *"before"* ]]; then + continue + elif [[ $JSON_FILE == *"after"* ]] && [[ $EXPECTED_YAML_FILE != *"after"* ]]; then + continue + fi + echo -e "${GREEN}[INFO] Discovering from $JSON_FILE" cp $1/$JSON_FILE test-flow.json @@ -200,7 +222,8 @@ if [ $res_microservice == 0 ]; then ## Step 4. Run all the test cases cd $TEST_HOME/$microservice/test-cases for testcase in $(ls -d $TC_*); do - cd + add_label_pods $TEST_HOME/$microservice/test-cases/$testcase $testcase + # replace configuration JSON_FILE=$(ls $TEST_HOME/$microservice/test-cases/$testcase/*.json) @@ -216,13 +239,19 @@ if [ $res_microservice == 0 ]; then fi # Clear DB for next policy generation - echo "[INFO] Clear DB" - grpcurl -plaintext -d '{"req": "dbclear"}' localhost:9089 v1.worker.Worker.Start + if [[ $JSON_FILE == *"before"* ]]; then + echo "[INFO] Not clearing DB" + else + echo "[INFO] Clear DB" + grpcurl -plaintext -d '{"req": "dbclear"}' localhost:9089 v1.worker.Worker.Start + fi # stop knoxAutoPolicy echo -e "${ORANGE}[INFO] Stopping KnoxAutoPolicy${NC}" stop_and_wait_for_KnoxAutoPolicy_termination echo "[INFO] Stopped KnoxAutoPolicy" + + del_label_pods $TEST_HOME/$microservice/test-cases/$testcase $testcase done ## Step 6. Delete Microservice From 9d3dcb8e68c5013b2b5154d96abc73315e669ad2 Mon Sep 17 00:00:00 2001 From: daemon1024 Date: Tue, 24 Aug 2021 17:26:15 +0530 Subject: [PATCH 09/18] consider system policy tests in system tests --- tests/conf/local.yaml | 6 ++++++ tests/test-scenarios-local.sh | 35 +++++++++++++++++++++-------------- 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/tests/conf/local.yaml b/tests/conf/local.yaml index c74ee1c8..5882ac36 100644 --- a/tests/conf/local.yaml +++ b/tests/conf/local.yaml @@ -6,6 +6,12 @@ application: network-log-file: "./test-flow.json" # file path network-policy-to: "db|file" # db, file network-policy-dir: "./" + system: + operation-mode: 2 # 1: cronjob | 2: one-time-job + system-log-from: "file" # db|hubble + system-log-file: "./test-flow.json" # file path + system-policy-to: "db|file" # db, file + system-policy-dir: "./" cluster: cluster-info-from: "k8sclient" # k8sclient|accuknox #cluster-mgmt-url: "http://cluster-management-service.accuknox-dev-cluster-mgmt.svc.cluster.local/cm" diff --git a/tests/test-scenarios-local.sh b/tests/test-scenarios-local.sh index dbbe2bba..7822762d 100755 --- a/tests/test-scenarios-local.sh +++ b/tests/test-scenarios-local.sh @@ -1,5 +1,7 @@ #!/bin/bash +shopt -s extglob + RED='\033[0;31m' GREEN='\033[0;32m' ORANGE='\033[0;33m' @@ -139,7 +141,7 @@ function del_label_pods() { function run_test_case() { cd $1 - ACTUAL_YAML_FILE=cilium_policies_$2.yaml + ACTUAL_YAML_FILE=@(cilium|kubearmor)_policies*.yaml for JSON_FILE in $(ls -r $TC_*.json); do for EXPECTED_YAML_FILE in $(ls -r $TC_*.yaml); do @@ -172,10 +174,28 @@ function run_test_case() { echo "[INFO] Discovered policies from $JSON_FILE" echo -e "${GREEN}[INFO] Comparing $EXPECTED_YAML_FILE and $ACTUAL_YAML_FILE${NC}" + fail=0 python3 $TEST_SCRIPTS/diff.py $1/$ACTUAL_YAML_FILE $1/$EXPECTED_YAML_FILE if [ $? != 0 ]; then echo -e "${RED}[FAIL] Failed $3${NC}" FAILED_TESTS+=($testcase) + fail=1 + fi + + # Clear DB for next policy generation + if [[ $JSON_FILE == *"before"* ]]; then + echo "[INFO] Not clearing DB" + else + echo "[INFO] Clear DB" + grpcurl -plaintext -d '{"req": "dbclear"}' localhost:9089 v1.worker.Worker.Start + fi + + # stop knoxAutoPolicy + echo -e "${ORANGE}[INFO] Stopping KnoxAutoPolicy${NC}" + stop_and_wait_for_KnoxAutoPolicy_termination + echo "[INFO] Stopped KnoxAutoPolicy" + + if [ $fail != 0 ]; then return fi done @@ -238,19 +258,6 @@ if [ $res_microservice == 0 ]; then ((COUNT_TESTS++)) fi - # Clear DB for next policy generation - if [[ $JSON_FILE == *"before"* ]]; then - echo "[INFO] Not clearing DB" - else - echo "[INFO] Clear DB" - grpcurl -plaintext -d '{"req": "dbclear"}' localhost:9089 v1.worker.Worker.Start - fi - - # stop knoxAutoPolicy - echo -e "${ORANGE}[INFO] Stopping KnoxAutoPolicy${NC}" - stop_and_wait_for_KnoxAutoPolicy_termination - echo "[INFO] Stopped KnoxAutoPolicy" - del_label_pods $TEST_HOME/$microservice/test-cases/$testcase $testcase done From 666c5b4bc8a603e46b063677578f200ad04920bd Mon Sep 17 00:00:00 2001 From: Akshat Agarwal Date: Mon, 30 Aug 2021 21:29:23 +0530 Subject: [PATCH 10/18] use kafka consumer to get logs from feeder-service Co-authored-by: Barun Acharya <47106543+daemon1024@users.noreply.github.com> --- src/conf/local.yaml | 4 +-- src/feedconsumer/consumer.go | 25 ++++++++---------- src/networkpolicy/helperFunctions.go | 18 +++++++++++++ src/networkpolicy/networkPolicy.go | 4 +++ src/plugin/cilium.go | 38 ++++++++++++++++++++++++++++ src/plugin/kubearmor.go | 31 ++++++++++++++++++++++- src/systempolicy/systemPolicy.go | 26 ++++++++++++++++--- 7 files changed, 124 insertions(+), 22 deletions(-) diff --git a/src/conf/local.yaml b/src/conf/local.yaml index f69cd2e3..7ef1e2d5 100644 --- a/src/conf/local.yaml +++ b/src/conf/local.yaml @@ -5,7 +5,7 @@ application: operation-trigger: 100 cron-job-time-interval: "0h0m10s" # format: XhYmZs network-log-limit: 100000 - network-log-from: "hubble" # db|hubble + network-log-from: "kafka" # db|hubble|kafka network-log-file: "./flow.json" # file path network-policy-to: "db|file" # db, file network-policy-dir: "./" @@ -13,7 +13,7 @@ application: operation-mode: 1 # 1: cronjob | 2: one-time-job operation-trigger: 100 cron-job-time-interval: "0h0m10s" # format: XhYmZs - system-log-from: "kubearmor" # db|kubearmor + system-log-from: "kafka" # db|kubearmor|kafka system-log-limit: 100000 system-log-file: "./log.json" # file path system-policy-to: "db|file" # db, file diff --git a/src/feedconsumer/consumer.go b/src/feedconsumer/consumer.go index 4e5d0a40..d554a985 100644 --- a/src/feedconsumer/consumer.go +++ b/src/feedconsumer/consumer.go @@ -187,28 +187,27 @@ func (cfc *KnoxFeedConsumer) processNetworkLogMessage(message []byte) error { return err } - clusterName := eventMap["cluster_name"] - clusterNameStr := "" - if err := json.Unmarshal(clusterName, &clusterNameStr); err != nil { - return err - } - + // FIXME + // clusterName := eventMap["cluster_name"] + // clusterNameStr := "" + // if err := json.Unmarshal(clusterName, &clusterNameStr); err != nil { + // log.Error().Stack().Msg(err.Error()) + // return err + // } + + // FIXME if eventMap has flow field unmarshal it flowEvent := eventMap["flow"] if err := json.Unmarshal(flowEvent, &event); err != nil { return err } // add cluster_name to the event - event.ClusterName = clusterNameStr + // event.ClusterName = clusterNameStr FIXME no idea what to do with this? cfc.netLogEvents = append(cfc.netLogEvents, event) cfc.netLogEventsCount++ if cfc.netLogEventsCount == cfc.eventsBuffer { if len(cfc.netLogEvents) > 0 { - isSuccess := cfc.PushNetworkLogToDB() - if !isSuccess { - return errors.New("error saving to DB") - } cfc.netLogEvents = nil cfc.netLogEvents = make([]types.NetworkLogEvent, 0, cfc.eventsBuffer) } @@ -244,10 +243,6 @@ func (cfc *KnoxFeedConsumer) processSystemLogMessage(message []byte) error { if cfc.syslogEventsCount == cfc.eventsBuffer { if len(cfc.syslogEvents) > 0 { - isSuccess := cfc.PushSystemLogToDB() - if !isSuccess { - return errors.New("error saving to DB") - } cfc.syslogEvents = nil cfc.syslogEvents = make([]types.SystemLogEvent, 0, cfc.eventsBuffer) } diff --git a/src/networkpolicy/helperFunctions.go b/src/networkpolicy/helperFunctions.go index e94dc96c..7f97b654 100644 --- a/src/networkpolicy/helperFunctions.go +++ b/src/networkpolicy/helperFunctions.go @@ -215,6 +215,24 @@ func getNetworkLogs() []types.KnoxNetworkLog { return nil } + // convert hubble flows -> network logs (but, in this case, no flow id) + for _, flow := range flows { + if log, valid := plugin.ConvertCiliumFlowToKnoxNetworkLog(flow); valid { + networkLogs = append(networkLogs, log) + } + } + } else if NetworkLogFrom == "kafka" { + // ================== // + // == Cilium kafka == // + // ================== // + log.Info().Msg("Get network log from the kafka consumer") + + // get flows from kafka consumer + flows := plugin.GetCiliumFlowsFromKafka(OperationTrigger) + if len(flows) == 0 || len(flows) < OperationTrigger { + return nil + } + // convert hubble flows -> network logs (but, in this case, no flow id) for _, flow := range flows { if log, valid := plugin.ConvertCiliumFlowToKnoxNetworkLog(flow); valid { diff --git a/src/networkpolicy/networkPolicy.go b/src/networkpolicy/networkPolicy.go index c89796be..6280f999 100644 --- a/src/networkpolicy/networkPolicy.go +++ b/src/networkpolicy/networkPolicy.go @@ -10,6 +10,7 @@ import ( "github.com/accuknox/knoxAutoPolicy/src/cluster" cfg "github.com/accuknox/knoxAutoPolicy/src/config" + "github.com/accuknox/knoxAutoPolicy/src/feedconsumer" "github.com/accuknox/knoxAutoPolicy/src/libs" logger "github.com/accuknox/knoxAutoPolicy/src/logging" "github.com/accuknox/knoxAutoPolicy/src/plugin" @@ -1607,6 +1608,9 @@ func StartNetworkCronJob() { if cfg.GetCfgNetworkLogFrom() == "hubble" { go plugin.StartHubbleRelay(NetworkStopChan, &NetworkWaitG, cfg.GetCfgCiliumHubble()) NetworkWaitG.Add(1) + } else if cfg.GetCfgNetworkLogFrom() == "kafka" { + go feedconsumer.StartConsumer() + NetworkWaitG.Add(1) } // init cron job diff --git a/src/plugin/cilium.go b/src/plugin/cilium.go index 75285e4f..55f89816 100644 --- a/src/plugin/cilium.go +++ b/src/plugin/cilium.go @@ -58,6 +58,8 @@ var Verdict = map[string]int{ var CiliumFlows []*cilium.Flow var CiliumFlowsMutex *sync.Mutex +var CiliumFlowsKafka []*cilium.Flow +var CiliumFlowsKafkaMutex *sync.Mutex var log *zerolog.Logger @@ -65,6 +67,8 @@ func init() { log = logger.GetInstance() CiliumFlowsMutex = &sync.Mutex{} KubeArmorRelayLogsMutex = &sync.Mutex{} + CiliumFlowsKafkaMutex = &sync.Mutex{} + KubeArmorKafkaLogsMutex = &sync.Mutex{} } // ====================== // @@ -754,3 +758,37 @@ func StartHubbleRelay(StopChan chan struct{}, wg *sync.WaitGroup, cfg types.Conf log.Error().Msg("Unable to stream network flow: " + err.Error()) } } + +func GetCiliumFlowsFromKafka(trigger int) []*cilium.Flow { + results := []*cilium.Flow{} + + CiliumFlowsKafkaMutex.Lock() + if len(CiliumFlowsKafka) == 0 { + log.Info().Msgf("Cilium kafka traffic flow not exist") + CiliumFlowsKafkaMutex.Unlock() + return results + } + + if len(CiliumFlowsKafka) < trigger { + log.Info().Msgf("The number of cilium kafka traffic flow [%d] is less than trigger [%d]", len(CiliumFlowsKafka), trigger) + CiliumFlowsKafkaMutex.Unlock() + return results + } + + results = CiliumFlowsKafka // copy + CiliumFlowsKafka = []*cilium.Flow{} // reset + CiliumFlowsKafkaMutex.Unlock() + + firstDoc := results[0] + lastDoc := results[len(results)-1] + + // id/time filter update + startTime := firstDoc.Time.Seconds + endTime := lastDoc.Time.Seconds + + log.Info().Msgf("The total number of cilium kafka traffic flow: [%d] from %s ~ to %s", len(results), + time.Unix(startTime, 0).Format(libs.TimeFormSimple), + time.Unix(endTime, 0).Format(libs.TimeFormSimple)) + + return results +} diff --git a/src/plugin/kubearmor.go b/src/plugin/kubearmor.go index e2f10b25..087c6a28 100644 --- a/src/plugin/kubearmor.go +++ b/src/plugin/kubearmor.go @@ -17,6 +17,9 @@ import ( var KubeArmorRelayLogs []*pb.Log var KubeArmorRelayLogsMutex *sync.Mutex +var KubeArmorKafkaLogs []*pb.Log +var KubeArmorKafkaLogsMutex *sync.Mutex + func ConvertKnoxSystemPolicyToKubeArmorPolicy(knoxPolicies []types.KnoxSystemPolicy) []types.KubeArmorPolicy { results := []types.KubeArmorPolicy{} @@ -100,7 +103,7 @@ func ConvertKubeArmorSystemLogsToKnoxSystemLogs(dbDriver string, docs []map[stri return []types.KnoxSystemLog{} } -func ConvertKubeArmorRelayLogToKnoxSystemLog(relayLog *pb.Log) types.KnoxSystemLog { +func ConvertKubeArmorLogToKnoxSystemLog(relayLog *pb.Log) types.KnoxSystemLog { sources := strings.Split(relayLog.Source, " ") source := "" @@ -250,3 +253,29 @@ func StartKubeArmorRelay(StopChan chan struct{}, wg *sync.WaitGroup, cfg types.C } }() } + +func GetSystemLogsFromKafkaConsumer(trigger int) []*pb.Log { + results := []*pb.Log{} + KubeArmorKafkaLogsMutex.Lock() + if len(KubeArmorKafkaLogs) == 0 { + log.Info().Msgf("KubeArmor kafka traffic flow not exist") + KubeArmorKafkaLogsMutex.Unlock() + return results + } + + if len(KubeArmorKafkaLogs) < trigger { + log.Info().Msgf("The number of KubeArmor traffic flow [%d] is less than trigger [%d]", len(KubeArmorKafkaLogs), trigger) + KubeArmorKafkaLogsMutex.Unlock() + return results + } + + results = KubeArmorKafkaLogs // copy + KubeArmorKafkaLogs = []*pb.Log{} // reset + KubeArmorKafkaLogsMutex.Unlock() + + log.Info().Msgf("The total number of KubeArmor kafka traffic flow: [%d] from %s ~ to %s", len(results), + time.Unix(int64(results[0].Timestamp), 0).Format(libs.TimeFormSimple), + time.Unix(int64(results[len(results)-1].Timestamp), 0).Format(libs.TimeFormSimple)) + + return results +} diff --git a/src/systempolicy/systemPolicy.go b/src/systempolicy/systemPolicy.go index 08f81483..fa600df7 100644 --- a/src/systempolicy/systemPolicy.go +++ b/src/systempolicy/systemPolicy.go @@ -10,6 +10,7 @@ import ( "github.com/accuknox/knoxAutoPolicy/src/cluster" cfg "github.com/accuknox/knoxAutoPolicy/src/config" + "github.com/accuknox/knoxAutoPolicy/src/feedconsumer" "github.com/accuknox/knoxAutoPolicy/src/libs" logger "github.com/accuknox/knoxAutoPolicy/src/logging" "github.com/accuknox/knoxAutoPolicy/src/plugin" @@ -165,10 +166,10 @@ type SysPath struct { func getSystemLogs() []types.KnoxSystemLog { systemLogs := []types.KnoxSystemLog{} - // =============== // - // == Database == // - // =============== // if SystemLogFrom == "db" { + // ============== // + // == Database == // + // ============== // log.Info().Msg("Get system log from the database") // get system logs from db @@ -236,13 +237,28 @@ func getSystemLogs() []types.KnoxSystemLog { // convert kubearmor relay logs -> knox system logs for _, relayLog := range relayLogs { - log := plugin.ConvertKubeArmorRelayLogToKnoxSystemLog(relayLog) + log := plugin.ConvertKubeArmorLogToKnoxSystemLog(relayLog) + systemLogs = append(systemLogs, log) + } + } else if SystemLogFrom == "kafka" { + log.Info().Msg("Get system log from kafka consumer") + + // get system logs from kafka consumer + sysLogs := plugin.GetSystemLogsFromKafkaConsumer(OperationTrigger) + if len(sysLogs) == 0 || len(sysLogs) < OperationTrigger { + return nil + } + + // convert kubearmor system logs -> knox system logs + for _, sysLog := range sysLogs { + log := plugin.ConvertKubeArmorLogToKnoxSystemLog(sysLog) systemLogs = append(systemLogs, log) } } else { log.Error().Msgf("System log from not correct: %s", SystemLogFrom) return nil } + print(systemLogs) return systemLogs } @@ -659,6 +675,8 @@ func StartSystemCronJob() { if cfg.GetCfgSystemLogFrom() == "kubearmor" { go plugin.StartKubeArmorRelay(SystemStopChan, &SystemWaitG, cfg.GetCfgKubeArmor()) SystemWaitG.Add(1) + } else if cfg.GetCfgSystemLogFrom() == "kafka" { + go feedconsumer.StartConsumer() } // init cron job From e394565a70f7f21c1c6d49fc8126298f46716e4d Mon Sep 17 00:00:00 2001 From: daemon1024 Date: Tue, 31 Aug 2021 01:21:28 +0530 Subject: [PATCH 11/18] process kafka msg to compatible event Process Network Log Event to Cilium Flows Process System Log Event to Kubearmor Relay --- src/feedconsumer/consumer.go | 104 ++++++++++++++++++++++++++++++++--- 1 file changed, 97 insertions(+), 7 deletions(-) diff --git a/src/feedconsumer/consumer.go b/src/feedconsumer/consumer.go index d554a985..c845669b 100644 --- a/src/feedconsumer/consumer.go +++ b/src/feedconsumer/consumer.go @@ -2,7 +2,6 @@ package feedconsumer import ( "encoding/json" - "errors" "strconv" "sync" "time" @@ -15,7 +14,11 @@ import ( cfg "github.com/accuknox/knoxAutoPolicy/src/config" "github.com/accuknox/knoxAutoPolicy/src/libs" logger "github.com/accuknox/knoxAutoPolicy/src/logging" + "github.com/accuknox/knoxAutoPolicy/src/plugin" types "github.com/accuknox/knoxAutoPolicy/src/types" + cilium "github.com/cilium/cilium/api/v1/flow" + pb "github.com/kubearmor/KubeArmor/protobuf" + "google.golang.org/protobuf/types/known/timestamppb" ) const ( // status @@ -24,7 +27,7 @@ const ( // status ) // ====================== // -// == Gloabl Variables == // +// == Global Variables == // // ====================== // var numOfConsumers int @@ -187,27 +190,98 @@ func (cfc *KnoxFeedConsumer) processNetworkLogMessage(message []byte) error { return err } - // FIXME + // FIXME: Couldn't find any field cluster_name in the received network log + // Error Msg: unexpected end of JSON input + // // clusterName := eventMap["cluster_name"] // clusterNameStr := "" // if err := json.Unmarshal(clusterName, &clusterNameStr); err != nil { // log.Error().Stack().Msg(err.Error()) // return err // } + // add cluster_name to the event + // event.ClusterName = clusterNameStr //Refer above comment - // FIXME if eventMap has flow field unmarshal it - flowEvent := eventMap["flow"] + flowEvent, exists := eventMap["flow"] + if !exists { + return nil + } if err := json.Unmarshal(flowEvent, &event); err != nil { return err } - // add cluster_name to the event - // event.ClusterName = clusterNameStr FIXME no idea what to do with this? cfc.netLogEvents = append(cfc.netLogEvents, event) cfc.netLogEventsCount++ if cfc.netLogEventsCount == cfc.eventsBuffer { if len(cfc.netLogEvents) > 0 { + for _, netLog := range cfc.netLogEvents { + time, _ := strconv.ParseInt(netLog.Time, 10, 64) + flow := cilium.Flow{ + TrafficDirection: cilium.TrafficDirection(plugin.TrafficDirection[netLog.TrafficDirection]), + PolicyMatchType: uint32(netLog.PolicyMatchType), + DropReason: uint32(netLog.DropReason), + Verdict: cilium.Verdict(plugin.Verdict[netLog.Verdict]), + Time: ×tamppb.Timestamp{ + Seconds: time, + }, + } + var err error + if netLog.EventType != nil { + err = json.Unmarshal(netLog.EventType, &flow.EventType) + if err != nil { + log.Error().Msg("Error while unmarshing event type :" + err.Error()) + continue + } + } + + if netLog.Source != nil { + err = json.Unmarshal(netLog.Source, &flow.Source) + if err != nil { + log.Error().Msg("Error while unmarshing source :" + err.Error()) + continue + } + } + + if netLog.Destination != nil { + err = json.Unmarshal(netLog.Destination, &flow.Destination) + if err != nil { + log.Error().Msg("Error while unmarshing destination :" + err.Error()) + continue + } + } + + if netLog.IP != nil { + err = json.Unmarshal(netLog.IP, &flow.IP) + if err != nil { + log.Error().Msg("Error while unmarshing ip :" + err.Error()) + continue + } + } + + if netLog.L4 != nil { + err = json.Unmarshal(netLog.L4, &flow.L4) + if err != nil { + log.Error().Msg("Error while unmarshing l4 :" + err.Error()) + continue + } + } + + if netLog.L7 != nil { + l7Byte := netLog.L7 + if len(l7Byte) != 0 { + err = json.Unmarshal(l7Byte, &flow.L7) + if err != nil { + log.Error().Msg("Error while unmarshing l7 :" + err.Error()) + continue + } + } + } + + plugin.CiliumFlowsKafkaMutex.Lock() + plugin.CiliumFlowsKafka = append(plugin.CiliumFlowsKafka, &flow) + plugin.CiliumFlowsKafkaMutex.Unlock() + } cfc.netLogEvents = nil cfc.netLogEvents = make([]types.NetworkLogEvent, 0, cfc.eventsBuffer) } @@ -243,6 +317,22 @@ func (cfc *KnoxFeedConsumer) processSystemLogMessage(message []byte) error { if cfc.syslogEventsCount == cfc.eventsBuffer { if len(cfc.syslogEvents) > 0 { + for _, syslog := range cfc.syslogEvents { + log := pb.Log{ + ClusterName: syslog.ClusterName, + HostName: syslog.HostName, + NamespaceName: syslog.NamespaceName, + PodName: syslog.PodName, + Source: syslog.Source, + Operation: syslog.Operation, + Resource: syslog.Resource, + Data: syslog.Data, + Result: syslog.Result, + } + plugin.KubeArmorKafkaLogsMutex.Lock() + plugin.KubeArmorKafkaLogs = append(plugin.KubeArmorKafkaLogs, &log) + plugin.KubeArmorKafkaLogsMutex.Unlock() + } cfc.syslogEvents = nil cfc.syslogEvents = make([]types.SystemLogEvent, 0, cfc.eventsBuffer) } From 3db28904c59504f5172beaf6abd2c03323844426 Mon Sep 17 00:00:00 2001 From: Akshat Agarwal Date: Tue, 31 Aug 2021 19:15:06 +0530 Subject: [PATCH 12/18] uncomment the cluster_name changes Acked-by: Barun Acharya <47106543+daemon1024@users.noreply.github.com> --- src/feedconsumer/consumer.go | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/feedconsumer/consumer.go b/src/feedconsumer/consumer.go index c845669b..a9edf81f 100644 --- a/src/feedconsumer/consumer.go +++ b/src/feedconsumer/consumer.go @@ -190,26 +190,25 @@ func (cfc *KnoxFeedConsumer) processNetworkLogMessage(message []byte) error { return err } - // FIXME: Couldn't find any field cluster_name in the received network log - // Error Msg: unexpected end of JSON input - // - // clusterName := eventMap["cluster_name"] - // clusterNameStr := "" - // if err := json.Unmarshal(clusterName, &clusterNameStr); err != nil { - // log.Error().Stack().Msg(err.Error()) - // return err - // } - // add cluster_name to the event - // event.ClusterName = clusterNameStr //Refer above comment + clusterName := eventMap["cluster_name"] + + clusterNameStr := "" + if err := json.Unmarshal(clusterName, &clusterNameStr); err != nil { + log.Error().Stack().Msg(err.Error()) + return err + } flowEvent, exists := eventMap["flow"] if !exists { return nil } if err := json.Unmarshal(flowEvent, &event); err != nil { + log.Error().Msg(err.Error()) return err } + // add cluster_name to the event + event.ClusterName = clusterNameStr cfc.netLogEvents = append(cfc.netLogEvents, event) cfc.netLogEventsCount++ From abf54cd3728df9d5d3e1b471ecf12bd27b6052be Mon Sep 17 00:00:00 2001 From: daemon1024 Date: Tue, 31 Aug 2021 20:01:18 +0530 Subject: [PATCH 13/18] consider clustername in kafka policy gen converting ciliumFlows to knoxNetworkLog beforehand and appending clusterName if it's a valid Flow --- src/feedconsumer/consumer.go | 10 +++++++--- src/networkpolicy/helperFunctions.go | 4 +--- src/plugin/cilium.go | 21 ++++++--------------- 3 files changed, 14 insertions(+), 21 deletions(-) diff --git a/src/feedconsumer/consumer.go b/src/feedconsumer/consumer.go index a9edf81f..bceecbcf 100644 --- a/src/feedconsumer/consumer.go +++ b/src/feedconsumer/consumer.go @@ -277,9 +277,13 @@ func (cfc *KnoxFeedConsumer) processNetworkLogMessage(message []byte) error { } } - plugin.CiliumFlowsKafkaMutex.Lock() - plugin.CiliumFlowsKafka = append(plugin.CiliumFlowsKafka, &flow) - plugin.CiliumFlowsKafkaMutex.Unlock() + knoxFlow, valid := plugin.ConvertCiliumFlowToKnoxNetworkLog(&flow) + if valid { + knoxFlow.ClusterName = netLog.ClusterName + plugin.CiliumFlowsKafkaMutex.Lock() + plugin.CiliumFlowsKafka = append(plugin.CiliumFlowsKafka, &knoxFlow) + plugin.CiliumFlowsKafkaMutex.Unlock() + } } cfc.netLogEvents = nil cfc.netLogEvents = make([]types.NetworkLogEvent, 0, cfc.eventsBuffer) diff --git a/src/networkpolicy/helperFunctions.go b/src/networkpolicy/helperFunctions.go index 7f97b654..9bb9105a 100644 --- a/src/networkpolicy/helperFunctions.go +++ b/src/networkpolicy/helperFunctions.go @@ -235,9 +235,7 @@ func getNetworkLogs() []types.KnoxNetworkLog { // convert hubble flows -> network logs (but, in this case, no flow id) for _, flow := range flows { - if log, valid := plugin.ConvertCiliumFlowToKnoxNetworkLog(flow); valid { - networkLogs = append(networkLogs, log) - } + networkLogs = append(networkLogs, *flow) } } else if NetworkLogFrom == "file" { // =============================== // diff --git a/src/plugin/cilium.go b/src/plugin/cilium.go index 55f89816..e9c9922a 100644 --- a/src/plugin/cilium.go +++ b/src/plugin/cilium.go @@ -58,7 +58,7 @@ var Verdict = map[string]int{ var CiliumFlows []*cilium.Flow var CiliumFlowsMutex *sync.Mutex -var CiliumFlowsKafka []*cilium.Flow +var CiliumFlowsKafka []*types.KnoxNetworkLog var CiliumFlowsKafkaMutex *sync.Mutex var log *zerolog.Logger @@ -759,8 +759,8 @@ func StartHubbleRelay(StopChan chan struct{}, wg *sync.WaitGroup, cfg types.Conf } } -func GetCiliumFlowsFromKafka(trigger int) []*cilium.Flow { - results := []*cilium.Flow{} +func GetCiliumFlowsFromKafka(trigger int) []*types.KnoxNetworkLog { + results := []*types.KnoxNetworkLog{} CiliumFlowsKafkaMutex.Lock() if len(CiliumFlowsKafka) == 0 { @@ -775,20 +775,11 @@ func GetCiliumFlowsFromKafka(trigger int) []*cilium.Flow { return results } - results = CiliumFlowsKafka // copy - CiliumFlowsKafka = []*cilium.Flow{} // reset + results = CiliumFlowsKafka // copy + CiliumFlowsKafka = []*types.KnoxNetworkLog{} // reset CiliumFlowsKafkaMutex.Unlock() - firstDoc := results[0] - lastDoc := results[len(results)-1] - - // id/time filter update - startTime := firstDoc.Time.Seconds - endTime := lastDoc.Time.Seconds - - log.Info().Msgf("The total number of cilium kafka traffic flow: [%d] from %s ~ to %s", len(results), - time.Unix(startTime, 0).Format(libs.TimeFormSimple), - time.Unix(endTime, 0).Format(libs.TimeFormSimple)) + log.Info().Msgf("The total number of cilium kafka traffic flow: [%d]", len(results)) return results } From d8d61422cd931d4d57c15af594bd1d8847672cd3 Mon Sep 17 00:00:00 2001 From: daemon1024 Date: Tue, 31 Aug 2021 20:18:18 +0530 Subject: [PATCH 14/18] consider clustername in kafka system policy gen converting KubeArmor Log to knoxSystemLog beforehand and appending clusterName --- src/feedconsumer/consumer.go | 5 ++++- src/plugin/kubearmor.go | 14 ++++++-------- src/systempolicy/systemPolicy.go | 3 +-- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/feedconsumer/consumer.go b/src/feedconsumer/consumer.go index bceecbcf..5d9a884d 100644 --- a/src/feedconsumer/consumer.go +++ b/src/feedconsumer/consumer.go @@ -332,8 +332,11 @@ func (cfc *KnoxFeedConsumer) processSystemLogMessage(message []byte) error { Data: syslog.Data, Result: syslog.Result, } + + knoxLog := plugin.ConvertKubeArmorLogToKnoxSystemLog(&log) + knoxLog.ClusterName = syslog.Clustername plugin.KubeArmorKafkaLogsMutex.Lock() - plugin.KubeArmorKafkaLogs = append(plugin.KubeArmorKafkaLogs, &log) + plugin.KubeArmorKafkaLogs = append(plugin.KubeArmorKafkaLogs, &knoxLog) plugin.KubeArmorKafkaLogsMutex.Unlock() } cfc.syslogEvents = nil diff --git a/src/plugin/kubearmor.go b/src/plugin/kubearmor.go index 087c6a28..7342bf22 100644 --- a/src/plugin/kubearmor.go +++ b/src/plugin/kubearmor.go @@ -17,7 +17,7 @@ import ( var KubeArmorRelayLogs []*pb.Log var KubeArmorRelayLogsMutex *sync.Mutex -var KubeArmorKafkaLogs []*pb.Log +var KubeArmorKafkaLogs []*types.KnoxSystemLog var KubeArmorKafkaLogsMutex *sync.Mutex func ConvertKnoxSystemPolicyToKubeArmorPolicy(knoxPolicies []types.KnoxSystemPolicy) []types.KubeArmorPolicy { @@ -254,8 +254,8 @@ func StartKubeArmorRelay(StopChan chan struct{}, wg *sync.WaitGroup, cfg types.C }() } -func GetSystemLogsFromKafkaConsumer(trigger int) []*pb.Log { - results := []*pb.Log{} +func GetSystemLogsFromKafkaConsumer(trigger int) []*types.KnoxSystemLog { + results := []*types.KnoxSystemLog{} KubeArmorKafkaLogsMutex.Lock() if len(KubeArmorKafkaLogs) == 0 { log.Info().Msgf("KubeArmor kafka traffic flow not exist") @@ -269,13 +269,11 @@ func GetSystemLogsFromKafkaConsumer(trigger int) []*pb.Log { return results } - results = KubeArmorKafkaLogs // copy - KubeArmorKafkaLogs = []*pb.Log{} // reset + results = KubeArmorKafkaLogs // copy + KubeArmorKafkaLogs = []*types.KnoxSystemLog{} // reset KubeArmorKafkaLogsMutex.Unlock() - log.Info().Msgf("The total number of KubeArmor kafka traffic flow: [%d] from %s ~ to %s", len(results), - time.Unix(int64(results[0].Timestamp), 0).Format(libs.TimeFormSimple), - time.Unix(int64(results[len(results)-1].Timestamp), 0).Format(libs.TimeFormSimple)) + log.Info().Msgf("The total number of KubeArmor kafka traffic flow: [%d]", len(results)) return results } diff --git a/src/systempolicy/systemPolicy.go b/src/systempolicy/systemPolicy.go index fa600df7..f9a11b19 100644 --- a/src/systempolicy/systemPolicy.go +++ b/src/systempolicy/systemPolicy.go @@ -251,8 +251,7 @@ func getSystemLogs() []types.KnoxSystemLog { // convert kubearmor system logs -> knox system logs for _, sysLog := range sysLogs { - log := plugin.ConvertKubeArmorLogToKnoxSystemLog(sysLog) - systemLogs = append(systemLogs, log) + systemLogs = append(systemLogs, *sysLog) } } else { log.Error().Msgf("System log from not correct: %s", SystemLogFrom) From c8d7586bec89b2bc966c2d64c2523c13eaa550ee Mon Sep 17 00:00:00 2001 From: daemon1024 Date: Wed, 1 Sep 2021 14:42:30 +0530 Subject: [PATCH 15/18] Continue policy gen even if some fields missing --- src/feedconsumer/consumer.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/feedconsumer/consumer.go b/src/feedconsumer/consumer.go index 5d9a884d..906983d9 100644 --- a/src/feedconsumer/consumer.go +++ b/src/feedconsumer/consumer.go @@ -230,7 +230,6 @@ func (cfc *KnoxFeedConsumer) processNetworkLogMessage(message []byte) error { err = json.Unmarshal(netLog.EventType, &flow.EventType) if err != nil { log.Error().Msg("Error while unmarshing event type :" + err.Error()) - continue } } @@ -238,7 +237,6 @@ func (cfc *KnoxFeedConsumer) processNetworkLogMessage(message []byte) error { err = json.Unmarshal(netLog.Source, &flow.Source) if err != nil { log.Error().Msg("Error while unmarshing source :" + err.Error()) - continue } } @@ -246,7 +244,6 @@ func (cfc *KnoxFeedConsumer) processNetworkLogMessage(message []byte) error { err = json.Unmarshal(netLog.Destination, &flow.Destination) if err != nil { log.Error().Msg("Error while unmarshing destination :" + err.Error()) - continue } } @@ -254,7 +251,6 @@ func (cfc *KnoxFeedConsumer) processNetworkLogMessage(message []byte) error { err = json.Unmarshal(netLog.IP, &flow.IP) if err != nil { log.Error().Msg("Error while unmarshing ip :" + err.Error()) - continue } } @@ -262,7 +258,6 @@ func (cfc *KnoxFeedConsumer) processNetworkLogMessage(message []byte) error { err = json.Unmarshal(netLog.L4, &flow.L4) if err != nil { log.Error().Msg("Error while unmarshing l4 :" + err.Error()) - continue } } @@ -272,7 +267,6 @@ func (cfc *KnoxFeedConsumer) processNetworkLogMessage(message []byte) error { err = json.Unmarshal(l7Byte, &flow.L7) if err != nil { log.Error().Msg("Error while unmarshing l7 :" + err.Error()) - continue } } } From 4cb2dcc74de0cdaba23728fe77d0058a30b208d0 Mon Sep 17 00:00:00 2001 From: daemon1024 Date: Wed, 1 Sep 2021 16:41:17 +0530 Subject: [PATCH 16/18] refactor cilium flow conversion using reflection --- src/feedconsumer/consumer.go | 60 +++++++++--------------------------- src/plugin/cilium.go | 11 +++++++ 2 files changed, 25 insertions(+), 46 deletions(-) diff --git a/src/feedconsumer/consumer.go b/src/feedconsumer/consumer.go index 906983d9..6f03c784 100644 --- a/src/feedconsumer/consumer.go +++ b/src/feedconsumer/consumer.go @@ -216,7 +216,7 @@ func (cfc *KnoxFeedConsumer) processNetworkLogMessage(message []byte) error { if len(cfc.netLogEvents) > 0 { for _, netLog := range cfc.netLogEvents { time, _ := strconv.ParseInt(netLog.Time, 10, 64) - flow := cilium.Flow{ + flow := &cilium.Flow{ TrafficDirection: cilium.TrafficDirection(plugin.TrafficDirection[netLog.TrafficDirection]), PolicyMatchType: uint32(netLog.PolicyMatchType), DropReason: uint32(netLog.DropReason), @@ -224,54 +224,22 @@ func (cfc *KnoxFeedConsumer) processNetworkLogMessage(message []byte) error { Time: ×tamppb.Timestamp{ Seconds: time, }, - } - var err error - if netLog.EventType != nil { - err = json.Unmarshal(netLog.EventType, &flow.EventType) - if err != nil { - log.Error().Msg("Error while unmarshing event type :" + err.Error()) - } - } - - if netLog.Source != nil { - err = json.Unmarshal(netLog.Source, &flow.Source) - if err != nil { - log.Error().Msg("Error while unmarshing source :" + err.Error()) - } - } - - if netLog.Destination != nil { - err = json.Unmarshal(netLog.Destination, &flow.Destination) - if err != nil { - log.Error().Msg("Error while unmarshing destination :" + err.Error()) - } + EventType: &cilium.CiliumEventType{}, + Source: &cilium.Endpoint{}, + Destination: &cilium.Endpoint{}, + IP: &cilium.IP{}, + L4: &cilium.Layer4{}, + L7: &cilium.Layer7{}, } - if netLog.IP != nil { - err = json.Unmarshal(netLog.IP, &flow.IP) - if err != nil { - log.Error().Msg("Error while unmarshing ip :" + err.Error()) - } - } - - if netLog.L4 != nil { - err = json.Unmarshal(netLog.L4, &flow.L4) - if err != nil { - log.Error().Msg("Error while unmarshing l4 :" + err.Error()) - } - } - - if netLog.L7 != nil { - l7Byte := netLog.L7 - if len(l7Byte) != 0 { - err = json.Unmarshal(l7Byte, &flow.L7) - if err != nil { - log.Error().Msg("Error while unmarshing l7 :" + err.Error()) - } - } - } + plugin.GetFlowData(netLog.EventType, flow.EventType) + plugin.GetFlowData(netLog.Source, flow.Source) + plugin.GetFlowData(netLog.Destination, flow.Destination) + plugin.GetFlowData(netLog.IP, flow.IP) + plugin.GetFlowData(netLog.L4, flow.L4) + plugin.GetFlowData(netLog.L7, flow.L7) - knoxFlow, valid := plugin.ConvertCiliumFlowToKnoxNetworkLog(&flow) + knoxFlow, valid := plugin.ConvertCiliumFlowToKnoxNetworkLog(flow) if valid { knoxFlow.ClusterName = netLog.ClusterName plugin.CiliumFlowsKafkaMutex.Lock() diff --git a/src/plugin/cilium.go b/src/plugin/cilium.go index e9c9922a..296fec34 100644 --- a/src/plugin/cilium.go +++ b/src/plugin/cilium.go @@ -377,6 +377,17 @@ func ConvertCiliumNetworkLogsToKnoxNetworkLogs(dbDriver string, docs []map[strin } } +func GetFlowData(netLogEventType []byte, flowEventType interface{}) error { + if netLogEventType == nil { + return nil + } + err := json.Unmarshal(netLogEventType, flowEventType) + if err != nil { + log.Error().Msg("error while unmarshing event type :" + err.Error()) + } + return err +} + // ============================== // // == Network Policy Convertor == // // ============================== // From a76c3e9fddb5701bbe78c6fbe4a4f0a08f095c0c Mon Sep 17 00:00:00 2001 From: daemon1024 Date: Wed, 1 Sep 2021 19:36:40 +0530 Subject: [PATCH 17/18] remove unnecessary print --- src/systempolicy/systemPolicy.go | 1 - 1 file changed, 1 deletion(-) diff --git a/src/systempolicy/systemPolicy.go b/src/systempolicy/systemPolicy.go index f9a11b19..b761edb0 100644 --- a/src/systempolicy/systemPolicy.go +++ b/src/systempolicy/systemPolicy.go @@ -257,7 +257,6 @@ func getSystemLogs() []types.KnoxSystemLog { log.Error().Msgf("System log from not correct: %s", SystemLogFrom) return nil } - print(systemLogs) return systemLogs } From a72a37c9a4ce9bc2b2ee217e09a02e0b8aefc283 Mon Sep 17 00:00:00 2001 From: daemon1024 Date: Wed, 1 Sep 2021 19:58:27 +0530 Subject: [PATCH 18/18] use defer Mutex.Unlock() --- src/plugin/cilium.go | 4 +--- src/plugin/kubearmor.go | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/src/plugin/cilium.go b/src/plugin/cilium.go index 296fec34..c86de6b7 100644 --- a/src/plugin/cilium.go +++ b/src/plugin/cilium.go @@ -774,21 +774,19 @@ func GetCiliumFlowsFromKafka(trigger int) []*types.KnoxNetworkLog { results := []*types.KnoxNetworkLog{} CiliumFlowsKafkaMutex.Lock() + defer CiliumFlowsKafkaMutex.Unlock() if len(CiliumFlowsKafka) == 0 { log.Info().Msgf("Cilium kafka traffic flow not exist") - CiliumFlowsKafkaMutex.Unlock() return results } if len(CiliumFlowsKafka) < trigger { log.Info().Msgf("The number of cilium kafka traffic flow [%d] is less than trigger [%d]", len(CiliumFlowsKafka), trigger) - CiliumFlowsKafkaMutex.Unlock() return results } results = CiliumFlowsKafka // copy CiliumFlowsKafka = []*types.KnoxNetworkLog{} // reset - CiliumFlowsKafkaMutex.Unlock() log.Info().Msgf("The total number of cilium kafka traffic flow: [%d]", len(results)) diff --git a/src/plugin/kubearmor.go b/src/plugin/kubearmor.go index 7342bf22..880bf0c3 100644 --- a/src/plugin/kubearmor.go +++ b/src/plugin/kubearmor.go @@ -257,21 +257,19 @@ func StartKubeArmorRelay(StopChan chan struct{}, wg *sync.WaitGroup, cfg types.C func GetSystemLogsFromKafkaConsumer(trigger int) []*types.KnoxSystemLog { results := []*types.KnoxSystemLog{} KubeArmorKafkaLogsMutex.Lock() + defer KubeArmorKafkaLogsMutex.Unlock() if len(KubeArmorKafkaLogs) == 0 { log.Info().Msgf("KubeArmor kafka traffic flow not exist") - KubeArmorKafkaLogsMutex.Unlock() return results } if len(KubeArmorKafkaLogs) < trigger { log.Info().Msgf("The number of KubeArmor traffic flow [%d] is less than trigger [%d]", len(KubeArmorKafkaLogs), trigger) - KubeArmorKafkaLogsMutex.Unlock() return results } results = KubeArmorKafkaLogs // copy KubeArmorKafkaLogs = []*types.KnoxSystemLog{} // reset - KubeArmorKafkaLogsMutex.Unlock() log.Info().Msgf("The total number of KubeArmor kafka traffic flow: [%d]", len(results))