diff --git a/src/cluster/k8sClientHandler.go b/src/cluster/k8sClientHandler.go index a42c81fe..da8451a3 100644 --- a/src/cluster/k8sClientHandler.go +++ b/src/cluster/k8sClientHandler.go @@ -459,3 +459,28 @@ func GetDeploymentsFromK8sClient() []types.Deployment { } return results } + +func GetKubearmorRelayURL() string { + var namespace string + client := ConnectK8sClient() + if client == nil { + return "" + } + + // get pods from k8s api client + pods, err := client.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{ + LabelSelector: "kubearmor-app", + }) + if err != nil { + log.Error().Msg(err.Error()) + return "" + } + // for _, pod := range pods.Items { + // if pod.Labels["kubearmor-app"] == "kubearmor" { + // namespace = pod.Namespace + // } + // } + namespace = pods.Items[0].Namespace + url := "kubearmor." + namespace + ".svc.cluster.local" + return url +} \ No newline at end of file diff --git a/src/config/configManager.go b/src/config/configManager.go index c4ceb9d3..fd0da85d 100644 --- a/src/config/configManager.go +++ b/src/config/configManager.go @@ -1,20 +1,11 @@ package config import ( - "context" - "flag" - "io/ioutil" "os" - "path/filepath" "strconv" types "github.com/accuknox/auto-policy-discovery/src/types" - "github.com/rs/zerolog/log" "github.com/spf13/viper" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/clientcmd" ) // operation mode: cronjob: 1 @@ -49,132 +40,6 @@ var CurrentCfg types.Configuration var NetworkPlugIn string var IgnoringNetworkNamespaces []string var HTTPUrlThreshold int -var parsed bool = false -var kubeconfig *string - -func isInCluster() bool { - if _, ok := os.LookupEnv("KUBERNETES_PORT"); ok { - return true - } - - return false -} - -func ConnectLocalAPIClient() *kubernetes.Clientset { - if !parsed { - homeDir := "" - if h := os.Getenv("HOME"); h != "" { - homeDir = h - } else { - homeDir = os.Getenv("USERPROFILE") // windows - } - - envKubeConfig := os.Getenv("KUBECONFIG") - if envKubeConfig != "" { - kubeconfig = &envKubeConfig - } else { - if home := homeDir; home != "" { - kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file") - } else { - kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file") - } - flag.Parse() - } - - parsed = true - } - - // use the current context in kubeconfig - config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) - if err != nil { - log.Error().Msg(err.Error()) - return nil - } - - // creates the clientset - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - log.Error().Msg(err.Error()) - return nil - } - - return clientset -} - -func ConnectInClusterAPIClient() *kubernetes.Clientset { - host := "" - port := "" - token := "" - - if val, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST"); ok { - host = val - } else { - host = "127.0.0.1" - } - - if val, ok := os.LookupEnv("KUBERNETES_PORT_443_TCP_PORT"); ok { - port = val - } else { - port = "6443" - } - - read, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token") - if err != nil { - log.Error().Msg(err.Error()) - return nil - } - - token = string(read) - - // create the configuration by token - kubeConfig := &rest.Config{ - Host: "https://" + host + ":" + port, - BearerToken: token, - TLSClientConfig: rest.TLSClientConfig{ - Insecure: true, - }, - } - - if client, err := kubernetes.NewForConfig(kubeConfig); err != nil { - log.Error().Msg(err.Error()) - return nil - } else { - return client - } -} - -func ConnectK8sClient() *kubernetes.Clientset { - if isInCluster() { - return ConnectInClusterAPIClient() - } - - return ConnectLocalAPIClient() -} - -func GetKubearmorRelayURL() string { - var namespace string - client := ConnectK8sClient() - if client == nil { - return "" - } - - // get pods from k8s api client - pods, err := client.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{ - LabelSelector: "kubearmor-app", - }) - if err != nil { - log.Error().Msg(err.Error()) - return "" - } - // for _, pod := range pods.Items { - // if pod.Labels["kubearmor-app"] == "kubearmor" { - // namespace = pod.Namespace - // } - // } - namespace = pods.Items[0].Namespace - url := "kubearmor." + namespace + ".svc.cluster.local" - return url -} func init() { IgnoringNetworkNamespaces = []string{"kube-system"} @@ -232,7 +97,6 @@ func LoadConfigCiliumHubble() types.ConfigCiliumHubble { func LoadConfigKubeArmor() types.ConfigKubeArmorRelay { cfgKubeArmor := types.ConfigKubeArmorRelay{} cfgKubeArmor.KubeArmorRelayURL = viper.GetString("kubearmor.url") - /* addr, err := net.LookupIP(cfgKubeArmor.KubeArmorRelayURL) if err == nil { diff --git a/src/plugin/kubearmor.go b/src/plugin/kubearmor.go index a5af65f4..7e7d885b 100644 --- a/src/plugin/kubearmor.go +++ b/src/plugin/kubearmor.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "errors" + "fmt" "net" "path/filepath" "strconv" @@ -29,8 +30,6 @@ var KubeArmorRelayLogsMutex *sync.Mutex var KubeArmorFCLogs []*types.KnoxSystemLog var KubeArmorFCLogsMutex *sync.Mutex -var SystemStopChan chan struct{} - func generateProcessPaths(fromSrc []types.KnoxFromSource) []string { var processpaths []string for _, locfrmsrc := range fromSrc { @@ -376,10 +375,12 @@ func ignoreLogFromRelayWithNamespace(nsFilter, nsNotFilter []string, namespace s var KubeArmorRelayStarted = false -func StartKubeArmorRelay(StopChan chan struct{}, cfg types.ConfigKubeArmorRelay) { +func StartKubeArmorRelay(StopChan chan struct{}, cfg types.ConfigKubeArmorRelay) chan error { + error := make(chan error) + if KubeArmorRelayStarted { // log.Info().Msg("kubearmor relay already started") - return + return nil } KubeArmorRelayStarted = true conn := ConnectKubeArmorRelay(cfg) @@ -402,10 +403,7 @@ func StartKubeArmorRelay(StopChan chan struct{}, cfg types.ConfigKubeArmorRelay) stream, err := client.WatchLogs(context.Background(), &req) if err != nil { log.Error().Msg("unable to stream systems logs: " + err.Error()) - log.Info().Msg("watch for kubearmor relay") - url := config.GetKubearmorRelayURL() - cfg.KubeArmorRelayURL = url - StartKubeArmorRelay(SystemStopChan, cfg) + error <- fmt.Errorf("unable to stream systems logs: " + err.Error()) return } for { @@ -486,6 +484,7 @@ func StartKubeArmorRelay(StopChan chan struct{}, cfg types.ConfigKubeArmorRelay) stream, err := client.WatchAlerts(context.Background(), &req) if err != nil { log.Error().Msg("unable to stream systems alerts: " + err.Error()) + error <- fmt.Errorf("unable to stream systems alerts: " + err.Error()) return } for { @@ -530,6 +529,7 @@ func StartKubeArmorRelay(StopChan chan struct{}, cfg types.ConfigKubeArmorRelay) } } }() + return error } func GetSystemLogsFromFeedConsumer(trigger int) []*types.KnoxSystemLog { diff --git a/src/systempolicy/systemPolicy.go b/src/systempolicy/systemPolicy.go index 0ac291f7..7fcab1c1 100644 --- a/src/systempolicy/systemPolicy.go +++ b/src/systempolicy/systemPolicy.go @@ -32,6 +32,7 @@ import ( ) var log *zerolog.Logger +var kubearmorRelayURL types.ConfigKubeArmorRelay func init() { log = logger.GetInstance() @@ -1467,7 +1468,13 @@ func DiscoverSystemPolicyMain() { func StartSystemLogRcvr() { for { if cfg.GetCfgSystemLogFrom() == "kubearmor" { - plugin.StartKubeArmorRelay(SystemStopChan, cfg.GetCfgKubeArmor()) + err := plugin.StartKubeArmorRelay(SystemStopChan, cfg.GetCfgKubeArmor()) + if val, ok := <-err; ok && val != nil { + url := cluster.GetKubearmorRelayURL() + kubearmorRelayURL.KubeArmorRelayURL = url + kubearmorRelayURL.KubeArmorRelayPort = "32767" + _ = plugin.StartKubeArmorRelay(SystemStopChan, kubearmorRelayURL) + } } else if cfg.GetCfgSystemLogFrom() == "feed-consumer" { fc.ConsumerMutex.Lock() fc.StartConsumer()