diff --git a/src/libs/common.go b/src/libs/common.go index c9adc9f2..99c48664 100644 --- a/src/libs/common.go +++ b/src/libs/common.go @@ -4,12 +4,14 @@ import ( "bytes" "crypto/rand" "flag" + "fmt" "math/big" "net" "os" "os/exec" "os/signal" "reflect" + "sort" "strings" "syscall" "time" @@ -584,3 +586,56 @@ func ConvertStrToUnixTime(strTime string) int64 { t, _ := time.Parse(TimeFormSimple, strTime) return t.UTC().Unix() } + +// IsLabelMapSubset check whether m2 is a subset of m1 +func IsLabelMapSubset(m1, m2 types.LabelMap) bool { + match := true + for k, v := range m2 { + if m1[k] != v { + match = false + break + } + } + return match +} + +// LabelMapFromLabelArray converts []string to map[string]string +func LabelMapFromLabelArray(labels []string) types.LabelMap { + labelMap := types.LabelMap{} + for _, label := range labels { + kvPair := strings.FieldsFunc(label, labelKVSplitter) + if len(kvPair) != 2 { + continue + } + labelMap[kvPair[0]] = kvPair[1] + } + return labelMap +} + +// LabelMapToLabelArray converts map[string]string to sorted []string +func LabelMapToLabelArray(labelMap types.LabelMap) (labels []string) { + for k, v := range labelMap { + labels = append(labels, fmt.Sprintf("%s=%s", k, v)) + } + + sort.Strings(labels) + return +} + +// LabelMapToString converts map[string]string to string +func LabelMapToString(lm types.LabelMap) string { + return strings.Join(LabelMapToLabelArray(lm), ",") +} + +// LabelMapFromString converts string to map[string]string +func LabelMapFromString(labels string) types.LabelMap { + return LabelMapFromLabelArray(strings.FieldsFunc(labels, labelArrSplitter)) +} + +func labelKVSplitter(r rune) bool { + return r == ':' || r == '=' +} + +func labelArrSplitter(r rune) bool { + return r == ',' || r == ';' +} diff --git a/src/libs/consumer.go b/src/libs/consumer.go new file mode 100644 index 00000000..d20020d0 --- /dev/null +++ b/src/libs/consumer.go @@ -0,0 +1,180 @@ +package libs + +import ( + "sync" + + "github.com/accuknox/auto-policy-discovery/src/types" + "google.golang.org/grpc" + + dpb "github.com/accuknox/auto-policy-discovery/src/protobuf/v1/discovery" +) + +// PolicyConsumer stores filter information provided in v1.Discovery.GetFlow RPC request +type PolicyConsumer struct { + policyType []string + Kind []string + Filter types.PolicyFilter + Events chan *types.PolicyYaml +} + +func (pc *PolicyConsumer) IsTypeNetwork() bool { + return ContainsElement(pc.policyType, types.PolicyTypeNetwork) +} + +func (pc *PolicyConsumer) IsTypeSystem() bool { + return ContainsElement(pc.policyType, types.PolicyTypeSystem) +} + +func NewPolicyConsumer(req *dpb.GetPolicyRequest) *PolicyConsumer { + kind := req.GetKind() + return &PolicyConsumer{ + Kind: kind, + policyType: getPolicyTypeFromKind(kind), + Filter: convertGrpcRequestToPolicyFilter(req), + Events: make(chan *types.PolicyYaml, 64), + } +} + +func getPolicyTypeFromKind(kind []string) []string { + isTypeNetwork := false + isTypeSystem := false + + for _, k := range kind { + switch k { + case types.KindCiliumNetworkPolicy, + types.KindK8sNetworkPolicy, + types.KindCiliumClusterwideNetworkPolicy: + isTypeNetwork = true + case types.KindKubeArmorPolicy, + types.KindKubeArmorHostPolicy: + isTypeSystem = true + } + } + + var res []string + if isTypeNetwork { + res = append(res, types.PolicyTypeNetwork) + } + if isTypeSystem { + res = append(res, types.PolicyTypeSystem) + } + + return res +} + +// PolicyStore is used for support v1.Discovery.GetFlow RPC requests +type PolicyStore struct { + Consumers map[*PolicyConsumer]struct{} + Mutex sync.Mutex +} + +// AddConsumer adds a new PolicyConsumer to the store +func (pc *PolicyStore) AddConsumer(c *PolicyConsumer) { + pc.Mutex.Lock() + defer pc.Mutex.Unlock() + + pc.Consumers[c] = struct{}{} + return +} + +// RemoveConsumer removes a PolicyConsumer from the store +func (pc *PolicyStore) RemoveConsumer(c *PolicyConsumer) { + pc.Mutex.Lock() + defer pc.Mutex.Unlock() + + delete(pc.Consumers, c) +} + +// Publish converts the given KnoxPolicy to PolicyYaml and pushes to consumer's channels +func (pc *PolicyStore) Publish(policy *types.PolicyYaml) { + pc.Mutex.Lock() + defer pc.Mutex.Unlock() + + for consumer := range pc.Consumers { + if matchPolicyYaml(policy, consumer) { + consumer.Events <- policy + } + } +} + +func FilterPolicyYamls(policyYamls []types.PolicyYaml, consumer *PolicyConsumer) []types.PolicyYaml { + result := []types.PolicyYaml{} + + for i := range policyYamls { + if matchPolicyYaml(&policyYamls[i], consumer) { + result = append(result, policyYamls[i]) + } + } + + return result +} + +func matchPolicyYaml(p *types.PolicyYaml, c *PolicyConsumer) bool { + filter := c.Filter + + if filter.Cluster != "" && filter.Cluster != p.Cluster { + return false + } + + if filter.Namespace != "" && filter.Cluster != p.Namespace { + return false + } + + if len(filter.Labels) != 0 && !IsLabelMapSubset(p.Labels, filter.Labels) { + return false + } + + if !ContainsElement(c.Kind, p.Kind) { + return false + } + + return true +} + +func convertGrpcRequestToPolicyFilter(req *dpb.GetPolicyRequest) types.PolicyFilter { + return types.PolicyFilter{ + Cluster: req.GetCluster(), + Namespace: req.GetNamespace(), + Labels: LabelMapFromLabelArray(req.GetLabel()), + } +} + +func convertPolicyYamlToGrpcResponse(p *types.PolicyYaml) *dpb.GetPolicyResponse { + return &dpb.GetPolicyResponse{ + Kind: p.Kind, + Name: p.Name, + Cluster: p.Cluster, + Namespace: p.Namespace, + Label: LabelMapToLabelArray(p.Labels), + Yaml: p.Yaml, + } +} + +func SendPolicyYamlInGrpcStream(stream grpc.ServerStream, policy *types.PolicyYaml) error { + resp := convertPolicyYamlToGrpcResponse(policy) + err := stream.SendMsg(resp) + if err != nil { + log.Error().Msgf("sending network policy yaml in grpc stream failed err=%v", err.Error()) + return err + } + return nil +} + +func RelayPolicyEventToGrpcStream(stream grpc.ServerStream, consumer *PolicyConsumer) error { + for { + select { + case <-stream.Context().Done(): + // client disconnected + return nil + case policy, ok := <-consumer.Events: + if !ok { + // channel closed and all items are consumed + return nil + } + err := SendPolicyYamlInGrpcStream(stream, policy) + if err != nil { + return err + } + } + } +} diff --git a/src/libs/dbHandler.go b/src/libs/dbHandler.go index 6560880f..e9125c13 100644 --- a/src/libs/dbHandler.go +++ b/src/libs/dbHandler.go @@ -87,6 +87,12 @@ func UpdateOutdatedNetworkPolicy(cfg types.ConfigDB, outdatedPolicy string, late } } +func UpdateNetworkPolicies(cfg types.ConfigDB, policies []types.KnoxNetworkPolicy) { + for _, policy := range policies { + UpdateNetworkPolicy(cfg, policy) + } +} + func UpdateNetworkPolicy(cfg types.ConfigDB, policy types.KnoxNetworkPolicy) { if cfg.DBDriver == "mysql" { if err := UpdateNetworkPolicyToMySQL(cfg, policy); err != nil { @@ -349,13 +355,30 @@ func GetPodNames(cfg types.ConfigDB, filter types.ObsPodDetail) ([]string, error // =============== // // == Policy DB == // // =============== // +func GetPolicyYamls(cfg types.ConfigDB, policyType string) ([]types.PolicyYaml, error) { + var err error + var results []types.PolicyYaml + + if cfg.DBDriver == "mysql" { + results, err = GetPolicyYamlsMySQL(cfg, policyType) + if err != nil { + return nil, err + } + } else if cfg.DBDriver == "sqlite3" { + results, err = GetPolicyYamlsSQLite(cfg, policyType) + if err != nil { + return nil, err + } + } + return results, nil +} -func UpdateOrInsertPolicies(cfg types.ConfigDB, policies []types.Policy) error { +func UpdateOrInsertPolicyYamls(cfg types.ConfigDB, policies []types.PolicyYaml) error { var err = errors.New("unknown db driver") if cfg.DBDriver == "mysql" { - err = UpdateOrInsertPoliciesMySQL(cfg, policies) + err = UpdateOrInsertPolicyYamlsMySQL(cfg, policies) } else if cfg.DBDriver == "sqlite3" { - err = UpdateOrInsertPoliciesSQLite(cfg, policies) + err = UpdateOrInsertPolicyYamlsSQLite(cfg, policies) } return err } diff --git a/src/libs/mysqlHandler.go b/src/libs/mysqlHandler.go index 24f290fd..3cef4585 100644 --- a/src/libs/mysqlHandler.go +++ b/src/libs/mysqlHandler.go @@ -18,7 +18,7 @@ const TableNetworkPolicy_TableName = "network_policy" const TableSystemPolicy_TableName = "system_policy" const TableSystemLogs_TableName = "system_logs" const TableNetworkLogs_TableName = "network_logs" -const PolicyTable_TableName = "policy_yaml" +const PolicyYaml_TableName = "policy_yaml" // ================ // // == Connection == // @@ -711,7 +711,7 @@ func CreatePolicyTableMySQL(cfg types.ConfigDB) error { db := connectMySQL(cfg) defer db.Close() - tableName := PolicyTable_TableName + tableName := PolicyYaml_TableName query := "CREATE TABLE IF NOT EXISTS `" + tableName + "` (" + @@ -1625,12 +1625,54 @@ func GetPodNamesMySQL(cfg types.ConfigDB, filter types.ObsPodDetail) ([]string, // == Policy DB == // // =============== // -func UpdateOrInsertPoliciesMySQL(cfg types.ConfigDB, policies []types.Policy) error { +func GetPolicyYamlsMySQL(cfg types.ConfigDB, policyType string) ([]types.PolicyYaml, error) { + db := connectMySQL(cfg) + defer db.Close() + + policies := []types.PolicyYaml{} + + var results *sql.Rows + var err error + + query := "SELECT type,kind,cluster_name,namespace,labels,policy_name,policy_yaml FROM " + PolicyYaml_TableName + query = query + "WHERE type = ?" + + results, err = db.Query(query, policyType) + if err != nil { + log.Error().Msg(err.Error()) + return nil, err + } + defer results.Close() + + for results.Next() { + var labels string + policy := types.PolicyYaml{} + + if err := results.Scan( + &policy.Type, + &policy.Kind, + &policy.Cluster, + &policy.Namespace, + &labels, + &policy.Name, + &policy.Yaml, + ); err != nil { + return nil, err + } + + policy.Labels = LabelMapFromString(labels) + policies = append(policies, policy) + } + + return policies, nil +} + +func UpdateOrInsertPolicyYamlsMySQL(cfg types.ConfigDB, policies []types.PolicyYaml) error { db := connectMySQL(cfg) defer db.Close() for _, pol := range policies { - if err := updateOrInsertPolicyMySQL(pol, db); err != nil { + if err := updateOrInsertPolicyYamlMySQL(pol, db); err != nil { log.Error().Msg(err.Error()) } } @@ -1638,12 +1680,11 @@ func UpdateOrInsertPoliciesMySQL(cfg types.ConfigDB, policies []types.Policy) er return nil } -func updateOrInsertPolicyMySQL(policy types.Policy, db *sql.DB) error { - +func updateOrInsertPolicyYamlMySQL(policy types.PolicyYaml, db *sql.DB) error { var err error queryString := ` policy_name = ? ` - query := "UPDATE " + PolicyTable_TableName + " SET policy_yaml=?, updated_time=? WHERE " + queryString + " " + query := "UPDATE " + PolicyYaml_TableName + " SET policy_yaml=?, updated_time=? WHERE " + queryString + " " updateStmt, err := db.Prepare(query) if err != nil { @@ -1652,9 +1693,9 @@ func updateOrInsertPolicyMySQL(policy types.Policy, db *sql.DB) error { defer updateStmt.Close() result, err := updateStmt.Exec( - policy.PolicyYaml, + policy.Yaml, ConvertStrToUnixTime("now"), - policy.PolicyName, + policy.Name, ) if err != nil { log.Error().Msg(err.Error()) @@ -1665,7 +1706,7 @@ func updateOrInsertPolicyMySQL(policy types.Policy, db *sql.DB) error { if err == nil && rowsAffected == 0 { - insertStmt, err := db.Prepare("INSERT INTO " + PolicyTable_TableName + + insertStmt, err := db.Prepare("INSERT INTO " + PolicyYaml_TableName + "(type,kind,cluster_name,namespace,labels,policy_name,policy_yaml,updated_time) values(?,?,?,?,?,?,?,?)") if err != nil { return err @@ -1675,11 +1716,11 @@ func updateOrInsertPolicyMySQL(policy types.Policy, db *sql.DB) error { _, err = insertStmt.Exec( policy.Type, policy.Kind, - policy.ClusterName, + policy.Cluster, policy.Namespace, - policy.Labels, - policy.PolicyName, - policy.PolicyYaml, + LabelMapToString(policy.Labels), + policy.Name, + policy.Yaml, ConvertStrToUnixTime("now"), ) if err != nil { diff --git a/src/libs/sqliteHandler.go b/src/libs/sqliteHandler.go index 6aa4dfd4..f23eb3bc 100644 --- a/src/libs/sqliteHandler.go +++ b/src/libs/sqliteHandler.go @@ -19,7 +19,7 @@ const TableNetworkPolicySQLite_TableName = "network_policy" const TableSystemPolicySQLite_TableName = "system_policy" const TableSystemLogsSQLite_TableName = "system_logs" const TableNetworkLogsSQLite_TableName = "network_logs" -const PolicyTableSQLite_TableName = "policy_yaml" +const PolicyYamlSQLite_TableName = "policy_yaml" // ================ // // == Connection == // @@ -681,7 +681,7 @@ func CreatePolicyTableSQLite(cfg types.ConfigDB) error { db := connectSQLite(cfg, cfg.SQLiteDBPath) defer db.Close() - tableName := PolicyTableSQLite_TableName + tableName := PolicyYamlSQLite_TableName query := "CREATE TABLE IF NOT EXISTS `" + tableName + "` (" + @@ -1594,12 +1594,54 @@ func GetPodNamesSQLite(cfg types.ConfigDB, filter types.ObsPodDetail) ([]string, // == Policy DB == // // =============== // -func UpdateOrInsertPoliciesSQLite(cfg types.ConfigDB, policies []types.Policy) error { +func GetPolicyYamlsSQLite(cfg types.ConfigDB, policyType string) ([]types.PolicyYaml, error) { + db := connectSQLite(cfg, cfg.SQLiteDBPath) + defer db.Close() + + policies := []types.PolicyYaml{} + + var results *sql.Rows + var err error + + query := "SELECT type,kind,cluster_name,namespace,labels,policy_name,policy_yaml FROM " + PolicyYaml_TableName + query = query + " WHERE type = ?" + + results, err = db.Query(query, policyType) + if err != nil { + log.Error().Msg(err.Error()) + return nil, err + } + defer results.Close() + + for results.Next() { + var labels string + policy := types.PolicyYaml{} + + if err := results.Scan( + &policy.Type, + &policy.Kind, + &policy.Cluster, + &policy.Namespace, + &labels, + &policy.Name, + &policy.Yaml, + ); err != nil { + return nil, err + } + + policy.Labels = LabelMapFromString(labels) + policies = append(policies, policy) + } + + return policies, nil +} + +func UpdateOrInsertPolicyYamlsSQLite(cfg types.ConfigDB, policies []types.PolicyYaml) error { db := connectSQLite(cfg, cfg.SQLiteDBPath) defer db.Close() for _, pol := range policies { - if err := updateOrInsertPolicySQLite(db, pol); err != nil { + if err := updateOrInsertPolicyYamlSQLite(db, pol); err != nil { log.Error().Msg(err.Error()) } } @@ -1607,10 +1649,10 @@ func UpdateOrInsertPoliciesSQLite(cfg types.ConfigDB, policies []types.Policy) e return nil } -func updateOrInsertPolicySQLite(db *sql.DB, policy types.Policy) error { +func updateOrInsertPolicyYamlSQLite(db *sql.DB, policy types.PolicyYaml) error { var err error - query := "UPDATE " + PolicyTableSQLite_TableName + " SET policy_yaml = ?, updated_time = ? WHERE policy_name = ?" + query := "UPDATE " + PolicyYamlSQLite_TableName + " SET policy_yaml = ?, updated_time = ? WHERE policy_name = ?" updateStmt, err := db.Prepare(query) if err != nil { return err @@ -1618,9 +1660,9 @@ func updateOrInsertPolicySQLite(db *sql.DB, policy types.Policy) error { defer updateStmt.Close() result, err := updateStmt.Exec( - policy.PolicyYaml, + policy.Yaml, ConvertStrToUnixTime("now"), - policy.PolicyName, + policy.Name, ) if err != nil { log.Error().Msg(err.Error()) @@ -1630,7 +1672,7 @@ func updateOrInsertPolicySQLite(db *sql.DB, policy types.Policy) error { rowsAffected, err := result.RowsAffected() if err == nil && rowsAffected == 0 { - insertStmt, err := db.Prepare("INSERT INTO " + PolicyTableSQLite_TableName + + insertStmt, err := db.Prepare("INSERT INTO " + PolicyYamlSQLite_TableName + " (type,kind,cluster_name,namespace,labels,policy_name,policy_yaml,updated_time) values(?,?,?,?,?,?,?,?)") if err != nil { return err @@ -1640,11 +1682,11 @@ func updateOrInsertPolicySQLite(db *sql.DB, policy types.Policy) error { _, err = insertStmt.Exec( policy.Type, policy.Kind, - policy.ClusterName, + policy.Cluster, policy.Namespace, - policy.Labels, - policy.PolicyName, - policy.PolicyYaml, + LabelMapToString(policy.Labels), + policy.Name, + policy.Yaml, ConvertStrToUnixTime("now"), ) if err != nil { diff --git a/src/networkpolicy/consumer.go b/src/networkpolicy/consumer.go new file mode 100644 index 00000000..48a298ff --- /dev/null +++ b/src/networkpolicy/consumer.go @@ -0,0 +1,26 @@ +package networkpolicy + +import ( + "sync" + + "github.com/accuknox/auto-policy-discovery/src/libs" + types "github.com/accuknox/auto-policy-discovery/src/types" +) + +var PolicyStore libs.PolicyStore + +func init() { + PolicyStore = libs.PolicyStore{ + Consumers: make(map[*libs.PolicyConsumer]struct{}), + Mutex: sync.Mutex{}, + } +} + +func GetPolicyYamlFromDB(consumer *libs.PolicyConsumer) []types.PolicyYaml { + policyYamls, err := libs.GetPolicyYamls(CfgDB, types.PolicyTypeNetwork) + if err != nil { + log.Error().Msgf("fetching policy yaml from DB failed err=%v", err.Error()) + return nil + } + return libs.FilterPolicyYamls(policyYamls, consumer) +} diff --git a/src/networkpolicy/deduplicator.go b/src/networkpolicy/deduplicator.go index 4601f8f6..b45ebbb9 100644 --- a/src/networkpolicy/deduplicator.go +++ b/src/networkpolicy/deduplicator.go @@ -917,8 +917,9 @@ func IsExistingPolicySpec(existingPolicies []types.KnoxNetworkPolicy, newPolicy // == Update Duplicated Network Policy == // // ====================================== // -func UpdateDuplicatedPolicy(existingPolicies []types.KnoxNetworkPolicy, discoveredPolicies []types.KnoxNetworkPolicy, dnsToIPs map[string][]string, clusterName string) []types.KnoxNetworkPolicy { +func UpdateDuplicatedPolicy(existingPolicies []types.KnoxNetworkPolicy, discoveredPolicies []types.KnoxNetworkPolicy, dnsToIPs map[string][]string, clusterName string) ([]types.KnoxNetworkPolicy, []types.KnoxNetworkPolicy) { newPolicies := []types.KnoxNetworkPolicy{} + updatedPolicies := []types.KnoxNetworkPolicy{} existIngressPolicies := map[Selector]types.KnoxNetworkPolicy{} existEgressPolicies := map[Selector]types.KnoxNetworkPolicy{} @@ -939,14 +940,15 @@ func UpdateDuplicatedPolicy(existingPolicies []types.KnoxNetworkPolicy, discover for _, newPolicy := range discoveredPolicies { lblArr := getLabelArrayFromMap(newPolicy.Spec.Selector.MatchLabels) selector := Selector{newPolicy.Kind, strings.Join(lblArr, ",")} + if newPolicy.Metadata["type"] == PolicyTypeIngress { existPolicy, ok := existIngressPolicies[selector] if ok { // Ingress policy for this endpoint exists already mergedPolicy, updated := mergeIngressPolicies(existPolicy, []types.KnoxNetworkPolicy{newPolicy}) if updated { + mergedPolicy.Metadata["status"] = "updated" existIngressPolicies[selector] = mergedPolicy - libs.UpdateNetworkPolicy(CfgDB, mergedPolicy) } } else { // Ingress policy for this endpoint does not exists previously @@ -959,8 +961,8 @@ func UpdateDuplicatedPolicy(existingPolicies []types.KnoxNetworkPolicy, discover // Egress policy for this endpoint exists already mergedPolicy, updated := mergeEgressPolicies(existPolicy, []types.KnoxNetworkPolicy{newPolicy}) if updated { + mergedPolicy.Metadata["status"] = "updated" existEgressPolicies[selector] = mergedPolicy - libs.UpdateNetworkPolicy(CfgDB, mergedPolicy) } } else { // Egress policy for this endpoint does not exists previously @@ -970,5 +972,18 @@ func UpdateDuplicatedPolicy(existingPolicies []types.KnoxNetworkPolicy, discover } } - return newPolicies + for _, policy := range existIngressPolicies { + if policy.Metadata["status"] == "updated" { + delete(policy.Metadata, "status") + updatedPolicies = append(updatedPolicies, policy) + } + } + for _, policy := range existEgressPolicies { + if policy.Metadata["status"] == "updated" { + delete(policy.Metadata, "status") + updatedPolicies = append(updatedPolicies, policy) + } + } + + return newPolicies, updatedPolicies } diff --git a/src/networkpolicy/networkPolicy.go b/src/networkpolicy/networkPolicy.go index 9277da88..ecadfba7 100644 --- a/src/networkpolicy/networkPolicy.go +++ b/src/networkpolicy/networkPolicy.go @@ -14,6 +14,7 @@ import ( logger "github.com/accuknox/auto-policy-discovery/src/logging" "github.com/accuknox/auto-policy-discovery/src/plugin" "github.com/accuknox/auto-policy-discovery/src/types" + cu "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/utils" "github.com/clarketm/json" "github.com/google/go-cmp/cmp" "sigs.k8s.io/yaml" @@ -2111,23 +2112,17 @@ func PopulateNetworkPoliciesFromNetworkLogs(networkLogs []types.KnoxNetworkLog) log.Info().Msgf("UpdateDuplicatedPolicy for cluster [%s] namespace [%s]", clusterName, namespace) // update duplicated policy - newNetPolicies := UpdateDuplicatedPolicy(existingNetPolicies, discoveredPolicies, DomainToIPs, clusterName) + newPolicies, updatedPolicies := UpdateDuplicatedPolicy(existingNetPolicies, discoveredPolicies, DomainToIPs, clusterName) - writeNetworkPoliciesYamlToDB(newNetPolicies) - - if len(newNetPolicies) > 0 { - // insert discovered policies to db - if strings.Contains(NetworkPolicyTo, "db") { - libs.InsertNetworkPolicies(CfgDB, newNetPolicies) - } - - // write discovered policies to file - if strings.Contains(NetworkPolicyTo, "file") { - WriteNetworkPoliciesToFile(clusterName, namespace) - } - - log.Info().Msgf("-> Network policy discovery done for namespace: [%s], [%d] policies discovered", namespace, len(newNetPolicies)) + if len(updatedPolicies) > 0 { + libs.UpdateNetworkPolicies(CfgDB, updatedPolicies) + writeNetworkPoliciesYamlToDB(updatedPolicies) + } + if len(newPolicies) > 0 { + libs.InsertNetworkPolicies(CfgDB, newPolicies) + writeNetworkPoliciesYamlToDB(newPolicies) } + log.Info().Msgf("-> Network policy discovery done for namespace: [%s], [%d] policies updated, [%d] policies newly discovered", namespace, len(updatedPolicies), len(newPolicies)) } // update cluster global variables @@ -2138,20 +2133,18 @@ func PopulateNetworkPoliciesFromNetworkLogs(networkLogs []types.KnoxNetworkLog) } func writeNetworkPoliciesYamlToDB(policies []types.KnoxNetworkPolicy) { - clusternames := []string{} + clusters := []string{} for _, pol := range policies { - clusternames = append(clusternames, pol.Metadata["cluster_name"]) + clusters = append(clusters, pol.Metadata["cluster_name"]) } // convert knoxPolicy to CiliumPolicy ciliumPolicies := plugin.ConvertKnoxPoliciesToCiliumPolicies(policies) - res := []types.Policy{} - - for index, ciliumPolicy := range ciliumPolicies { - var label string + res := []types.PolicyYaml{} + for i, ciliumPolicy := range ciliumPolicies { jsonBytes, err := json.Marshal(ciliumPolicy) if err != nil { log.Error().Msg(err.Error()) @@ -2162,30 +2155,29 @@ func writeNetworkPoliciesYamlToDB(policies []types.KnoxNetworkPolicy) { log.Error().Msg(err.Error()) continue } - policyYaml := string(yamlBytes) - if ciliumPolicy.Spec.NodeSelector.MatchLabels != nil { - for k, v := range ciliumPolicy.Spec.NodeSelector.MatchLabels { - label = k + "=" + v - } + var labels types.LabelMap + if ciliumPolicy.Kind == cu.ResourceTypeCiliumNetworkPolicy { + labels = ciliumPolicy.Spec.EndpointSelector.MatchLabels } else { - for k, v := range ciliumPolicy.Spec.EndpointSelector.MatchLabels { - label = k + "=" + v - } + labels = ciliumPolicy.Spec.NodeSelector.MatchLabels + } + + policyYaml := types.PolicyYaml{ + Type: types.PolicyTypeNetwork, + Kind: ciliumPolicy.Kind, + Name: ciliumPolicy.Metadata["name"], + Namespace: ciliumPolicy.Metadata["namespace"], + Cluster: clusters[i], + Labels: labels, + Yaml: yamlBytes, } + res = append(res, policyYaml) - res = append(res, types.Policy{ - Type: "network", - Kind: ciliumPolicy.Kind, - PolicyName: ciliumPolicy.Metadata["name"], - Namespace: ciliumPolicy.Metadata["namespace"], - ClusterName: clusternames[index], - Labels: label, - PolicyYaml: policyYaml, - }) + PolicyStore.Publish(&policyYaml) } - if err := libs.UpdateOrInsertPolicies(CfgDB, res); err != nil { + if err := libs.UpdateOrInsertPolicyYamls(CfgDB, res); err != nil { log.Error().Msgf(err.Error()) } } diff --git a/src/plugin/kubearmor.go b/src/plugin/kubearmor.go index c1004c7a..b5fe932e 100644 --- a/src/plugin/kubearmor.go +++ b/src/plugin/kubearmor.go @@ -49,8 +49,6 @@ func ConvertKnoxSystemPolicyToKubeArmorPolicy(knoxPolicies []types.KnoxSystemPol } kubePolicy.Metadata["namespace"] = policy.Metadata["namespace"] - kubePolicy.Metadata["clusterName"] = policy.Metadata["clusterName"] - kubePolicy.Metadata["containername"] = policy.Metadata["containername"] kubePolicy.Metadata["name"] = policy.Metadata["name"] if policy.Metadata["namespace"] == types.PolicyDiscoveryVMNamespace { diff --git a/src/protobuf/Makefile b/src/protobuf/Makefile index e067706c..56b0e312 100644 --- a/src/protobuf/Makefile +++ b/src/protobuf/Makefile @@ -8,4 +8,4 @@ build: protoc -I=. --go_out . --go_opt paths=source_relative --go-grpc_out . --go-grpc_opt paths=source_relative v1/analyzer/analyzer.proto protoc -I=. --go_out . --go_opt paths=source_relative --go-grpc_out . --go-grpc_opt paths=source_relative v1/insight/insight.proto protoc -I=. --go_out . --go_opt paths=source_relative --go-grpc_out . --go-grpc_opt paths=source_relative v1/observability/observability.proto - + protoc -I=. --go_out . --go_opt paths=source_relative --go-grpc_out . --go-grpc_opt paths=source_relative v1/discovery/discovery.proto diff --git a/src/protobuf/install_protoc.sh b/src/protobuf/install_protoc.sh index 193fc26e..8131f62e 100755 --- a/src/protobuf/install_protoc.sh +++ b/src/protobuf/install_protoc.sh @@ -10,4 +10,5 @@ export PATH="$PATH:$HOME/.local/bin" rm protoc-3.15.8-linux-x86_64.zip +go install google.golang.org/protobuf/cmd/protoc-gen-go@latest go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest diff --git a/src/protobuf/v1/discovery/discovery.pb.go b/src/protobuf/v1/discovery/discovery.pb.go new file mode 100644 index 00000000..b24b5128 --- /dev/null +++ b/src/protobuf/v1/discovery/discovery.pb.go @@ -0,0 +1,303 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.0 +// protoc v3.15.8 +// source: v1/discovery/discovery.proto + +package discovery + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type GetPolicyRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Follow bool `protobuf:"varint,1,opt,name=follow,proto3" json:"follow,omitempty"` + Kind []string `protobuf:"bytes,2,rep,name=kind,proto3" json:"kind,omitempty"` + Cluster string `protobuf:"bytes,3,opt,name=cluster,proto3" json:"cluster,omitempty"` + Namespace string `protobuf:"bytes,4,opt,name=namespace,proto3" json:"namespace,omitempty"` + Label []string `protobuf:"bytes,5,rep,name=label,proto3" json:"label,omitempty"` +} + +func (x *GetPolicyRequest) Reset() { + *x = GetPolicyRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_v1_discovery_discovery_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetPolicyRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetPolicyRequest) ProtoMessage() {} + +func (x *GetPolicyRequest) ProtoReflect() protoreflect.Message { + mi := &file_v1_discovery_discovery_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetPolicyRequest.ProtoReflect.Descriptor instead. +func (*GetPolicyRequest) Descriptor() ([]byte, []int) { + return file_v1_discovery_discovery_proto_rawDescGZIP(), []int{0} +} + +func (x *GetPolicyRequest) GetFollow() bool { + if x != nil { + return x.Follow + } + return false +} + +func (x *GetPolicyRequest) GetKind() []string { + if x != nil { + return x.Kind + } + return nil +} + +func (x *GetPolicyRequest) GetCluster() string { + if x != nil { + return x.Cluster + } + return "" +} + +func (x *GetPolicyRequest) GetNamespace() string { + if x != nil { + return x.Namespace + } + return "" +} + +func (x *GetPolicyRequest) GetLabel() []string { + if x != nil { + return x.Label + } + return nil +} + +type GetPolicyResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Kind string `protobuf:"bytes,1,opt,name=kind,proto3" json:"kind,omitempty"` + Cluster string `protobuf:"bytes,2,opt,name=cluster,proto3" json:"cluster,omitempty"` + Namespace string `protobuf:"bytes,3,opt,name=namespace,proto3" json:"namespace,omitempty"` + Label []string `protobuf:"bytes,4,rep,name=label,proto3" json:"label,omitempty"` + Name string `protobuf:"bytes,5,opt,name=name,proto3" json:"name,omitempty"` + Yaml []byte `protobuf:"bytes,6,opt,name=yaml,proto3" json:"yaml,omitempty"` +} + +func (x *GetPolicyResponse) Reset() { + *x = GetPolicyResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_v1_discovery_discovery_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetPolicyResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetPolicyResponse) ProtoMessage() {} + +func (x *GetPolicyResponse) ProtoReflect() protoreflect.Message { + mi := &file_v1_discovery_discovery_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetPolicyResponse.ProtoReflect.Descriptor instead. +func (*GetPolicyResponse) Descriptor() ([]byte, []int) { + return file_v1_discovery_discovery_proto_rawDescGZIP(), []int{1} +} + +func (x *GetPolicyResponse) GetKind() string { + if x != nil { + return x.Kind + } + return "" +} + +func (x *GetPolicyResponse) GetCluster() string { + if x != nil { + return x.Cluster + } + return "" +} + +func (x *GetPolicyResponse) GetNamespace() string { + if x != nil { + return x.Namespace + } + return "" +} + +func (x *GetPolicyResponse) GetLabel() []string { + if x != nil { + return x.Label + } + return nil +} + +func (x *GetPolicyResponse) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *GetPolicyResponse) GetYaml() []byte { + if x != nil { + return x.Yaml + } + return nil +} + +var File_v1_discovery_discovery_proto protoreflect.FileDescriptor + +var file_v1_discovery_discovery_proto_rawDesc = []byte{ + 0x0a, 0x1c, 0x76, 0x31, 0x2f, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2f, 0x64, + 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0c, + 0x76, 0x31, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x22, 0x8c, 0x01, 0x0a, + 0x10, 0x47, 0x65, 0x74, 0x50, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x16, 0x0a, 0x06, 0x66, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x08, 0x52, 0x06, 0x66, 0x6f, 0x6c, 0x6c, 0x6f, 0x77, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x69, 0x6e, + 0x64, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x12, 0x18, 0x0a, + 0x07, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, + 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, + 0x70, 0x61, 0x63, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, + 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x18, 0x05, + 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x22, 0x9d, 0x01, 0x0a, 0x11, + 0x47, 0x65, 0x74, 0x50, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x04, 0x6b, 0x69, 0x6e, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x12, + 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, 0x14, 0x0a, + 0x05, 0x6c, 0x61, 0x62, 0x65, 0x6c, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x05, 0x6c, 0x61, + 0x62, 0x65, 0x6c, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x79, 0x61, 0x6d, 0x6c, 0x18, + 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x79, 0x61, 0x6d, 0x6c, 0x32, 0x5d, 0x0a, 0x09, 0x44, + 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x12, 0x50, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x50, + 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x12, 0x1e, 0x2e, 0x76, 0x31, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, + 0x76, 0x65, 0x72, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x76, 0x31, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, + 0x76, 0x65, 0x72, 0x79, 0x2e, 0x47, 0x65, 0x74, 0x50, 0x6f, 0x6c, 0x69, 0x63, 0x79, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x41, 0x5a, 0x3f, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6b, 0x75, 0x62, 0x65, 0x61, 0x72, 0x6d, + 0x6f, 0x72, 0x2f, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2d, 0x65, 0x6e, 0x67, + 0x69, 0x6e, 0x65, 0x2f, 0x73, 0x72, 0x63, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2f, 0x76, 0x31, 0x2f, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_v1_discovery_discovery_proto_rawDescOnce sync.Once + file_v1_discovery_discovery_proto_rawDescData = file_v1_discovery_discovery_proto_rawDesc +) + +func file_v1_discovery_discovery_proto_rawDescGZIP() []byte { + file_v1_discovery_discovery_proto_rawDescOnce.Do(func() { + file_v1_discovery_discovery_proto_rawDescData = protoimpl.X.CompressGZIP(file_v1_discovery_discovery_proto_rawDescData) + }) + return file_v1_discovery_discovery_proto_rawDescData +} + +var file_v1_discovery_discovery_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_v1_discovery_discovery_proto_goTypes = []interface{}{ + (*GetPolicyRequest)(nil), // 0: v1.discovery.GetPolicyRequest + (*GetPolicyResponse)(nil), // 1: v1.discovery.GetPolicyResponse +} +var file_v1_discovery_discovery_proto_depIdxs = []int32{ + 0, // 0: v1.discovery.Discovery.GetPolicy:input_type -> v1.discovery.GetPolicyRequest + 1, // 1: v1.discovery.Discovery.GetPolicy:output_type -> v1.discovery.GetPolicyResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_v1_discovery_discovery_proto_init() } +func file_v1_discovery_discovery_proto_init() { + if File_v1_discovery_discovery_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_v1_discovery_discovery_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetPolicyRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_v1_discovery_discovery_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetPolicyResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_v1_discovery_discovery_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_v1_discovery_discovery_proto_goTypes, + DependencyIndexes: file_v1_discovery_discovery_proto_depIdxs, + MessageInfos: file_v1_discovery_discovery_proto_msgTypes, + }.Build() + File_v1_discovery_discovery_proto = out.File + file_v1_discovery_discovery_proto_rawDesc = nil + file_v1_discovery_discovery_proto_goTypes = nil + file_v1_discovery_discovery_proto_depIdxs = nil +} diff --git a/src/protobuf/v1/discovery/discovery.proto b/src/protobuf/v1/discovery/discovery.proto new file mode 100644 index 00000000..db38d8db --- /dev/null +++ b/src/protobuf/v1/discovery/discovery.proto @@ -0,0 +1,26 @@ +syntax = "proto3"; + +package v1.discovery; + +option go_package = "github.com/kubearmor/discovery-engine/src/protobuf/v1/discovery"; + +service Discovery { + rpc GetPolicy(GetPolicyRequest) returns (stream GetPolicyResponse) {} +} + +message GetPolicyRequest { + bool follow = 1; + repeated string kind = 2; + string cluster = 3; + string namespace = 4; + repeated string label = 5; +} + +message GetPolicyResponse { + string kind = 1; + string cluster = 2; + string namespace = 3; + repeated string label = 4; + string name = 5; + bytes yaml = 6; +} diff --git a/src/protobuf/v1/discovery/discovery_grpc.pb.go b/src/protobuf/v1/discovery/discovery_grpc.pb.go new file mode 100644 index 00000000..bb63dfab --- /dev/null +++ b/src/protobuf/v1/discovery/discovery_grpc.pb.go @@ -0,0 +1,132 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.15.8 +// source: v1/discovery/discovery.proto + +package discovery + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// DiscoveryClient is the client API for Discovery service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type DiscoveryClient interface { + GetPolicy(ctx context.Context, in *GetPolicyRequest, opts ...grpc.CallOption) (Discovery_GetPolicyClient, error) +} + +type discoveryClient struct { + cc grpc.ClientConnInterface +} + +func NewDiscoveryClient(cc grpc.ClientConnInterface) DiscoveryClient { + return &discoveryClient{cc} +} + +func (c *discoveryClient) GetPolicy(ctx context.Context, in *GetPolicyRequest, opts ...grpc.CallOption) (Discovery_GetPolicyClient, error) { + stream, err := c.cc.NewStream(ctx, &Discovery_ServiceDesc.Streams[0], "/v1.discovery.Discovery/GetPolicy", opts...) + if err != nil { + return nil, err + } + x := &discoveryGetPolicyClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Discovery_GetPolicyClient interface { + Recv() (*GetPolicyResponse, error) + grpc.ClientStream +} + +type discoveryGetPolicyClient struct { + grpc.ClientStream +} + +func (x *discoveryGetPolicyClient) Recv() (*GetPolicyResponse, error) { + m := new(GetPolicyResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// DiscoveryServer is the server API for Discovery service. +// All implementations must embed UnimplementedDiscoveryServer +// for forward compatibility +type DiscoveryServer interface { + GetPolicy(*GetPolicyRequest, Discovery_GetPolicyServer) error + mustEmbedUnimplementedDiscoveryServer() +} + +// UnimplementedDiscoveryServer must be embedded to have forward compatible implementations. +type UnimplementedDiscoveryServer struct { +} + +func (UnimplementedDiscoveryServer) GetPolicy(*GetPolicyRequest, Discovery_GetPolicyServer) error { + return status.Errorf(codes.Unimplemented, "method GetPolicy not implemented") +} +func (UnimplementedDiscoveryServer) mustEmbedUnimplementedDiscoveryServer() {} + +// UnsafeDiscoveryServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to DiscoveryServer will +// result in compilation errors. +type UnsafeDiscoveryServer interface { + mustEmbedUnimplementedDiscoveryServer() +} + +func RegisterDiscoveryServer(s grpc.ServiceRegistrar, srv DiscoveryServer) { + s.RegisterService(&Discovery_ServiceDesc, srv) +} + +func _Discovery_GetPolicy_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(GetPolicyRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(DiscoveryServer).GetPolicy(m, &discoveryGetPolicyServer{stream}) +} + +type Discovery_GetPolicyServer interface { + Send(*GetPolicyResponse) error + grpc.ServerStream +} + +type discoveryGetPolicyServer struct { + grpc.ServerStream +} + +func (x *discoveryGetPolicyServer) Send(m *GetPolicyResponse) error { + return x.ServerStream.SendMsg(m) +} + +// Discovery_ServiceDesc is the grpc.ServiceDesc for Discovery service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Discovery_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "v1.discovery.Discovery", + HandlerType: (*DiscoveryServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "GetPolicy", + Handler: _Discovery_GetPolicy_Handler, + ServerStreams: true, + }, + }, + Metadata: "v1/discovery/discovery.proto", +} diff --git a/src/server/grpcServer.go b/src/server/grpcServer.go index 17b79f74..91f41781 100644 --- a/src/server/grpcServer.go +++ b/src/server/grpcServer.go @@ -2,6 +2,7 @@ package server import ( "context" + "errors" "github.com/rs/zerolog" @@ -10,14 +11,15 @@ import ( core "github.com/accuknox/auto-policy-discovery/src/config" fc "github.com/accuknox/auto-policy-discovery/src/feedconsumer" logger "github.com/accuknox/auto-policy-discovery/src/logging" - networker "github.com/accuknox/auto-policy-discovery/src/networkpolicy" + network "github.com/accuknox/auto-policy-discovery/src/networkpolicy" obs "github.com/accuknox/auto-policy-discovery/src/observability" - sysworker "github.com/accuknox/auto-policy-discovery/src/systempolicy" + system "github.com/accuknox/auto-policy-discovery/src/systempolicy" "github.com/accuknox/auto-policy-discovery/src/insight" "github.com/accuknox/auto-policy-discovery/src/libs" apb "github.com/accuknox/auto-policy-discovery/src/protobuf/v1/analyzer" fpb "github.com/accuknox/auto-policy-discovery/src/protobuf/v1/consumer" + dpb "github.com/accuknox/auto-policy-discovery/src/protobuf/v1/discovery" ipb "github.com/accuknox/auto-policy-discovery/src/protobuf/v1/insight" opb "github.com/accuknox/auto-policy-discovery/src/protobuf/v1/observability" wpb "github.com/accuknox/auto-policy-discovery/src/protobuf/v1/worker" @@ -62,9 +64,9 @@ func (s *workerServer) Start(ctx context.Context, in *wpb.WorkerRequest) (*wpb.W if in.GetPolicytype() != "" { if in.GetPolicytype() == "network" { - networker.StartNetworkWorker() + network.StartNetworkWorker() } else if in.GetPolicytype() == "system" { - sysworker.StartSystemWorker() + system.StartSystemWorker() } response += "Starting " + in.GetPolicytype() + " policy discovery" } @@ -76,9 +78,9 @@ func (s *workerServer) Stop(ctx context.Context, in *wpb.WorkerRequest) (*wpb.Wo log.Info().Msg("Stop worker called") if in.GetPolicytype() == "network" { - networker.StopNetworkWorker() + network.StopNetworkWorker() } else if in.GetPolicytype() == "system" { - sysworker.StopSystemWorker() + system.StopSystemWorker() } else { return &wpb.WorkerResponse{Res: "No policy type, choose 'network' or 'system', not [" + in.GetPolicytype() + "]"}, nil } @@ -92,9 +94,9 @@ func (s *workerServer) GetWorkerStatus(ctx context.Context, in *wpb.WorkerReques status := "" if in.GetPolicytype() == "network" { - status = networker.NetworkWorkerStatus + status = network.NetworkWorkerStatus } else if in.GetPolicytype() == "system" { - status = sysworker.SystemWorkerStatus + status = system.SystemWorkerStatus } else { return &wpb.WorkerResponse{Res: "No policy type, choose 'network' or 'system', not [" + in.GetPolicytype() + "]"}, nil } @@ -106,14 +108,14 @@ func (s *workerServer) Convert(ctx context.Context, in *wpb.WorkerRequest) (*wpb if in.GetPolicytype() == "network" { log.Info().Msg("Convert network policy called") - networker.InitNetPolicyDiscoveryConfiguration() - networker.WriteNetworkPoliciesToFile(in.GetClustername(), in.GetNamespace()) - return networker.GetNetPolicy(in.Clustername, in.Namespace), nil + network.InitNetPolicyDiscoveryConfiguration() + network.WriteNetworkPoliciesToFile(in.GetClustername(), in.GetNamespace()) + return network.GetNetPolicy(in.Clustername, in.Namespace), nil } else if in.GetPolicytype() == "system" { log.Info().Msg("Convert system policy called") - sysworker.InitSysPolicyDiscoveryConfiguration() - sysworker.WriteSystemPoliciesToFile(in.GetNamespace(), in.GetClustername(), in.GetLabels(), in.GetFromsource()) - return sysworker.GetSysPolicy(in.Namespace, in.Clustername, in.Labels, in.Fromsource), nil + system.InitSysPolicyDiscoveryConfiguration() + system.WriteSystemPoliciesToFile(in.GetNamespace(), in.GetClustername(), in.GetLabels(), in.GetFromsource()) + return system.GetSysPolicy(in.Namespace, in.Clustername, in.Labels, in.Fromsource), nil } else { log.Info().Msg("Convert policy called, but no policy type") } @@ -121,6 +123,55 @@ func (s *workerServer) Convert(ctx context.Context, in *wpb.WorkerRequest) (*wpb return &wpb.WorkerResponse{Res: "ok"}, nil } +// ====================== // +// == Discovery Service == // +// ====================== // +type discoveryServer struct { + dpb.UnimplementedDiscoveryServer +} + +func (ds *discoveryServer) GetPolicy(req *dpb.GetPolicyRequest, srv dpb.Discovery_GetPolicyServer) error { + consumer := libs.NewPolicyConsumer(req) + + if !consumer.IsTypeSystem() && !consumer.IsTypeNetwork() { + return errors.New("Invalid Request") + } + + var yamlFromDB []types.PolicyYaml + if consumer.IsTypeSystem() { + yamlFromDB = append(yamlFromDB, system.GetPolicyYamlFromDB(consumer)...) + } + if consumer.IsTypeNetwork() { + yamlFromDB = append(yamlFromDB, network.GetPolicyYamlFromDB(consumer)...) + } + + for i := range yamlFromDB { + err := libs.SendPolicyYamlInGrpcStream(srv, &yamlFromDB[i]) + if err != nil { + return err + } + } + + if !req.GetFollow() { + // client only needs the discovered policy in DB. + // Not policy update events. + return nil + } + + if consumer.IsTypeSystem() { + system.PolicyStore.AddConsumer(consumer) + defer system.PolicyStore.RemoveConsumer(consumer) + } + + if consumer.IsTypeNetwork() { + network.PolicyStore.AddConsumer(consumer) + defer network.PolicyStore.RemoveConsumer(consumer) + } + + // consume policy update events + return libs.RelayPolicyEventToGrpcStream(srv, consumer) +} + // ====================== // // == Consumer Service == // // ====================== // @@ -228,6 +279,7 @@ func GetNewServer() *grpc.Server { analyzerServer := &analyzerServer{} insightServer := &insightServer{} observabilityServer := &observabilityServer{} + discoveryServer := &discoveryServer{} // register gRPC servers wpb.RegisterWorkerServer(s, workerServer) @@ -235,6 +287,7 @@ func GetNewServer() *grpc.Server { apb.RegisterAnalyzerServer(s, analyzerServer) ipb.RegisterInsightServer(s, insightServer) opb.RegisterObservabilityServer(s, observabilityServer) + dpb.RegisterDiscoveryServer(s, discoveryServer) if cfg.GetCurrentCfg().ConfigClusterMgmt.ClusterInfoFrom != "k8sclient" { // start consumer automatically @@ -244,10 +297,10 @@ func GetNewServer() *grpc.Server { } // start net worker automatically - networker.StartNetworkWorker() + network.StartNetworkWorker() // start sys worker automatically - sysworker.StartSystemWorker() + system.StartSystemWorker() // start observability obs.InitObservability() diff --git a/src/systempolicy/consumer.go b/src/systempolicy/consumer.go new file mode 100644 index 00000000..4575635b --- /dev/null +++ b/src/systempolicy/consumer.go @@ -0,0 +1,26 @@ +package systempolicy + +import ( + "sync" + + "github.com/accuknox/auto-policy-discovery/src/libs" + types "github.com/accuknox/auto-policy-discovery/src/types" +) + +var PolicyStore libs.PolicyStore + +func init() { + PolicyStore = libs.PolicyStore{ + Consumers: make(map[*libs.PolicyConsumer]struct{}), + Mutex: sync.Mutex{}, + } +} + +func GetPolicyYamlFromDB(consumer *libs.PolicyConsumer) []types.PolicyYaml { + policyYamls, err := libs.GetPolicyYamls(CfgDB, types.PolicyTypeSystem) + if err != nil { + log.Error().Msgf("fetching policy yaml from DB failed err=%v", err.Error()) + return nil + } + return libs.FilterPolicyYamls(policyYamls, consumer) +} diff --git a/src/systempolicy/systemPolicy.go b/src/systempolicy/systemPolicy.go index bafa27f8..74c04898 100644 --- a/src/systempolicy/systemPolicy.go +++ b/src/systempolicy/systemPolicy.go @@ -272,8 +272,6 @@ func WriteSystemPoliciesToFile_Ext(namespace, clustername, labels, fromsource st kubearmorK8SPolicies := extractK8SSystemPolicies(namespace, clustername, labels, fromsource) for _, pol := range kubearmorK8SPolicies { fname := "kubearmor_policies_" + pol.Metadata["clusterName"] + "_" + pol.Metadata["namespace"] + "_" + pol.Metadata["containername"] + "_" + pol.Metadata["name"] - delete(pol.Metadata, "clusterName") - delete(pol.Metadata, "containername") libs.WriteKubeArmorPolicyToYamlFile(fname, []types.KubeArmorPolicy{pol}) } @@ -281,8 +279,6 @@ func WriteSystemPoliciesToFile_Ext(namespace, clustername, labels, fromsource st for index, pol := range kubearmorVMPolicies { locSrc := strings.ReplaceAll(sources[index], "/", "-") fname := "kubearmor_policies_" + pol.Metadata["namespace"] + "_" + pol.Metadata["containername"] + locSrc - delete(pol.Metadata, "clusterName") - delete(pol.Metadata, "containername") libs.WriteKubeArmorPolicyToYamlFile(fname, []types.KubeArmorPolicy{pol}) } } @@ -307,9 +303,6 @@ func GetSysPolicy(namespace, clustername, labels, fromsource string) *wpb.Worker for i := range kubearmorK8SPolicies { kubearmorpolicy := wpb.KubeArmorPolicy{} - delete(kubearmorK8SPolicies[i].Metadata, "clusterName") - delete(kubearmorK8SPolicies[i].Metadata, "containername") - val, err := json.Marshal(&kubearmorK8SPolicies[i]) if err != nil { log.Error().Msgf("kubearmorK8SPolicy json marshal failed err=%v", err.Error()) @@ -323,9 +316,6 @@ func GetSysPolicy(namespace, clustername, labels, fromsource string) *wpb.Worker for i := range kubearmorVMPolicies { kubearmorpolicy := wpb.KubeArmorPolicy{} - delete(kubearmorVMPolicies[i].Metadata, "clusterName") - delete(kubearmorVMPolicies[i].Metadata, "containername") - val, err := json.Marshal(&kubearmorVMPolicies[i]) if err != nil { log.Error().Msgf("kubearmorVMPolicy json marshal failed err=%v", err.Error()) @@ -1322,13 +1312,15 @@ func GenFileSetForAllPodsInCluster(clusterName string, pods []types.Pod, settype } func insertSysPoliciesYamlToDB(policies []types.KnoxSystemPolicy) { - kubeArmorPolicies := plugin.ConvertKnoxSystemPolicyToKubeArmorPolicy(policies) - - res := []types.Policy{} + clusters := []string{} + for _, policy := range policies { + clusters = append(clusters, policy.Metadata["cluster_name"]) + } - for _, kubearmorPolicy := range kubeArmorPolicies { - var label string + kubeArmorPolicies := plugin.ConvertKnoxSystemPolicyToKubeArmorPolicy(policies) + res := []types.PolicyYaml{} + for i, kubearmorPolicy := range kubeArmorPolicies { jsonBytes, err := json.Marshal(kubearmorPolicy) if err != nil { log.Error().Msg(err.Error()) @@ -1339,24 +1331,22 @@ func insertSysPoliciesYamlToDB(policies []types.KnoxSystemPolicy) { log.Error().Msg(err.Error()) continue } - policyYaml := string(yamlBytes) - for k, v := range kubearmorPolicy.Spec.Selector.MatchLabels { - label = k + "=" + v + policyYaml := types.PolicyYaml{ + Type: types.PolicyTypeSystem, + Kind: kubearmorPolicy.Kind, + Name: kubearmorPolicy.Metadata["name"], + Namespace: kubearmorPolicy.Metadata["namespace"], + Cluster: clusters[i], + Labels: kubearmorPolicy.Spec.Selector.MatchLabels, + Yaml: yamlBytes, } + res = append(res, policyYaml) - res = append(res, types.Policy{ - Type: "system", - Kind: kubearmorPolicy.Kind, - PolicyName: kubearmorPolicy.Metadata["name"], - Namespace: kubearmorPolicy.Metadata["namespace"], - ClusterName: kubearmorPolicy.Metadata["clusterName"], - Labels: label, - PolicyYaml: policyYaml, - }) + PolicyStore.Publish(&policyYaml) } - if err := libs.UpdateOrInsertPolicies(CfgDB, res); err != nil { + if err := libs.UpdateOrInsertPolicyYamls(CfgDB, res); err != nil { log.Error().Msgf(err.Error()) } } diff --git a/src/types/constants.go b/src/types/constants.go index 720ea200..6927a99c 100644 --- a/src/types/constants.go +++ b/src/types/constants.go @@ -1,5 +1,7 @@ package types +import cu "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/utils" + const ( // KubeArmor VM PolicyDiscoveryVMNamespace = "accuknox-vm-namespace" @@ -15,10 +17,24 @@ const ( // RecordSeparator - DB separator flag RecordSeparator = "^^" - // Cilium constants + // Network Policy KindKnoxNetworkPolicy = "KnoxNetworkPolicy" KindKnoxHostNetworkPolicy = "KnoxHostNetworkPolicy" + // Cilium Policy + KindCiliumNetworkPolicy = cu.ResourceTypeCiliumNetworkPolicy + KindCiliumClusterwideNetworkPolicy = cu.ResourceTypeCiliumClusterwideNetworkPolicy + + // Kubernetes Policy + KindK8sNetworkPolicy = "NetworkPolicy" + + // KubeArmor Policy + KindKubeArmorPolicy = "KubeArmorPolicy" + KindKubeArmorHostPolicy = "KubeArmorHostPolicy" + + PolicyTypeSystem = "system" + PolicyTypeNetwork = "network" + // Binary Name Filters FilterBinaryKnoxAutoPolicy = "knoxAutoPolicy" ) diff --git a/src/types/policyData.go b/src/types/policyData.go index 50f20844..5f44b9a8 100644 --- a/src/types/policyData.go +++ b/src/types/policyData.go @@ -1,5 +1,8 @@ package types +// LabelMap stores the label of an endpoint +type LabelMap = map[string]string + // ========================= // // == Knox Network Policy == // // ========================= // @@ -310,15 +313,20 @@ type KubeArmorPolicy struct { Spec KnoxSystemSpec `json:"spec,omitempty" yaml:"spec,omitempty"` } -// =============== // -// == Policy DB == // -// =============== // -type Policy struct { - Type string `json:"type,omitempty"` - Kind string `json:"kind,omitempty"` - Namespace string `json:"namespace,omitempty"` - ClusterName string `json:"cluster_name,omitempty"` - Labels string `json:"labels,omitempty"` - PolicyName string `json:"policy_name,omitempty"` - PolicyYaml string `json:"policy_yaml,omitempty"` +// PolicyFilter is used for GetFlow RPC in Discovery Service. +type PolicyFilter struct { + Cluster string + Namespace string + Labels LabelMap +} + +// PolicyYaml stores a policy in YAML format along with its metadata +type PolicyYaml struct { + Type string `json:"type,omitempty"` + Kind string `json:"kind,omitempty"` + Name string `json:"name,omitempty"` + Namespace string `json:"namespace,omitempty"` + Cluster string `json:"cluster,omitempty"` + Labels LabelMap `json:"labels,omitempty"` + Yaml []byte `json:"yaml,omitempty"` }