Skip to content

Commit

Permalink
new changes
Browse files Browse the repository at this point in the history
Signed-off-by: Prateeknandle <prateeknandle@gmail.com>
  • Loading branch information
Prateeknandle committed Feb 22, 2023
1 parent 2dc91fc commit b0197cb
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 145 deletions.
25 changes: 25 additions & 0 deletions src/cluster/k8sClientHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,3 +459,28 @@ func GetDeploymentsFromK8sClient() []types.Deployment {
}
return results
}

func GetKubearmorRelayURL() string {
var namespace string
client := ConnectK8sClient()
if client == nil {
return ""
}

// get pods from k8s api client
pods, err := client.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{
LabelSelector: "kubearmor-app",
})
if err != nil {
log.Error().Msg(err.Error())
return ""
}
// for _, pod := range pods.Items {
// if pod.Labels["kubearmor-app"] == "kubearmor" {
// namespace = pod.Namespace
// }
// }
namespace = pods.Items[0].Namespace
url := "kubearmor." + namespace + ".svc.cluster.local"
return url
}
136 changes: 0 additions & 136 deletions src/config/configManager.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,11 @@
package config

import (
"context"
"flag"
"io/ioutil"
"os"
"path/filepath"
"strconv"

types "github.com/accuknox/auto-policy-discovery/src/types"
"github.com/rs/zerolog/log"
"github.com/spf13/viper"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

// operation mode: cronjob: 1
Expand Down Expand Up @@ -49,132 +40,6 @@ var CurrentCfg types.Configuration
var NetworkPlugIn string
var IgnoringNetworkNamespaces []string
var HTTPUrlThreshold int
var parsed bool = false
var kubeconfig *string

func isInCluster() bool {
if _, ok := os.LookupEnv("KUBERNETES_PORT"); ok {
return true
}

return false
}

func ConnectLocalAPIClient() *kubernetes.Clientset {
if !parsed {
homeDir := ""
if h := os.Getenv("HOME"); h != "" {
homeDir = h
} else {
homeDir = os.Getenv("USERPROFILE") // windows
}

envKubeConfig := os.Getenv("KUBECONFIG")
if envKubeConfig != "" {
kubeconfig = &envKubeConfig
} else {
if home := homeDir; home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
}

parsed = true
}

// use the current context in kubeconfig
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
log.Error().Msg(err.Error())
return nil
}

// creates the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Error().Msg(err.Error())
return nil
}

return clientset
}

func ConnectInClusterAPIClient() *kubernetes.Clientset {
host := ""
port := ""
token := ""

if val, ok := os.LookupEnv("KUBERNETES_SERVICE_HOST"); ok {
host = val
} else {
host = "127.0.0.1"
}

if val, ok := os.LookupEnv("KUBERNETES_PORT_443_TCP_PORT"); ok {
port = val
} else {
port = "6443"
}

read, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/token")
if err != nil {
log.Error().Msg(err.Error())
return nil
}

token = string(read)

// create the configuration by token
kubeConfig := &rest.Config{
Host: "https://" + host + ":" + port,
BearerToken: token,
TLSClientConfig: rest.TLSClientConfig{
Insecure: true,
},
}

if client, err := kubernetes.NewForConfig(kubeConfig); err != nil {
log.Error().Msg(err.Error())
return nil
} else {
return client
}
}

func ConnectK8sClient() *kubernetes.Clientset {
if isInCluster() {
return ConnectInClusterAPIClient()
}

return ConnectLocalAPIClient()
}

func GetKubearmorRelayURL() string {
var namespace string
client := ConnectK8sClient()
if client == nil {
return ""
}

// get pods from k8s api client
pods, err := client.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{
LabelSelector: "kubearmor-app",
})
if err != nil {
log.Error().Msg(err.Error())
return ""
}
// for _, pod := range pods.Items {
// if pod.Labels["kubearmor-app"] == "kubearmor" {
// namespace = pod.Namespace
// }
// }
namespace = pods.Items[0].Namespace
url := "kubearmor." + namespace + ".svc.cluster.local"
return url
}

func init() {
IgnoringNetworkNamespaces = []string{"kube-system"}
Expand Down Expand Up @@ -232,7 +97,6 @@ func LoadConfigCiliumHubble() types.ConfigCiliumHubble {
func LoadConfigKubeArmor() types.ConfigKubeArmorRelay {
cfgKubeArmor := types.ConfigKubeArmorRelay{}
cfgKubeArmor.KubeArmorRelayURL = viper.GetString("kubearmor.url")

/*
addr, err := net.LookupIP(cfgKubeArmor.KubeArmorRelayURL)
if err == nil {
Expand Down
16 changes: 8 additions & 8 deletions src/plugin/kubearmor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"net"
"path/filepath"
"strconv"
Expand All @@ -29,8 +30,6 @@ var KubeArmorRelayLogsMutex *sync.Mutex
var KubeArmorFCLogs []*types.KnoxSystemLog
var KubeArmorFCLogsMutex *sync.Mutex

var SystemStopChan chan struct{}

func generateProcessPaths(fromSrc []types.KnoxFromSource) []string {
var processpaths []string
for _, locfrmsrc := range fromSrc {
Expand Down Expand Up @@ -376,10 +375,12 @@ func ignoreLogFromRelayWithNamespace(nsFilter, nsNotFilter []string, namespace s

var KubeArmorRelayStarted = false

func StartKubeArmorRelay(StopChan chan struct{}, cfg types.ConfigKubeArmorRelay) {
func StartKubeArmorRelay(StopChan chan struct{}, cfg types.ConfigKubeArmorRelay) chan error {
error := make(chan error)

if KubeArmorRelayStarted {
// log.Info().Msg("kubearmor relay already started")
return
return nil
}
KubeArmorRelayStarted = true
conn := ConnectKubeArmorRelay(cfg)
Expand All @@ -402,10 +403,7 @@ func StartKubeArmorRelay(StopChan chan struct{}, cfg types.ConfigKubeArmorRelay)
stream, err := client.WatchLogs(context.Background(), &req)
if err != nil {
log.Error().Msg("unable to stream systems logs: " + err.Error())
log.Info().Msg("watch for kubearmor relay")
url := config.GetKubearmorRelayURL()
cfg.KubeArmorRelayURL = url
StartKubeArmorRelay(SystemStopChan, cfg)
error <- fmt.Errorf("unable to stream systems logs: " + err.Error())
return
}
for {
Expand Down Expand Up @@ -486,6 +484,7 @@ func StartKubeArmorRelay(StopChan chan struct{}, cfg types.ConfigKubeArmorRelay)
stream, err := client.WatchAlerts(context.Background(), &req)
if err != nil {
log.Error().Msg("unable to stream systems alerts: " + err.Error())
error <- fmt.Errorf("unable to stream systems alerts: " + err.Error())
return
}
for {
Expand Down Expand Up @@ -530,6 +529,7 @@ func StartKubeArmorRelay(StopChan chan struct{}, cfg types.ConfigKubeArmorRelay)
}
}
}()
return error
}

func GetSystemLogsFromFeedConsumer(trigger int) []*types.KnoxSystemLog {
Expand Down
9 changes: 8 additions & 1 deletion src/systempolicy/systemPolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
)

var log *zerolog.Logger
var kubearmorRelayURL types.ConfigKubeArmorRelay

func init() {
log = logger.GetInstance()
Expand Down Expand Up @@ -1467,7 +1468,13 @@ func DiscoverSystemPolicyMain() {
func StartSystemLogRcvr() {
for {
if cfg.GetCfgSystemLogFrom() == "kubearmor" {
plugin.StartKubeArmorRelay(SystemStopChan, cfg.GetCfgKubeArmor())
err := plugin.StartKubeArmorRelay(SystemStopChan, cfg.GetCfgKubeArmor())
if val, ok := <-err; ok && val != nil {
url := cluster.GetKubearmorRelayURL()
kubearmorRelayURL.KubeArmorRelayURL = url
kubearmorRelayURL.KubeArmorRelayPort = "32767"
_ = plugin.StartKubeArmorRelay(SystemStopChan, kubearmorRelayURL)
}
} else if cfg.GetCfgSystemLogFrom() == "feed-consumer" {
fc.ConsumerMutex.Lock()
fc.StartConsumer()
Expand Down

0 comments on commit b0197cb

Please sign in to comment.