Skip to content

Commit

Permalink
fix: calculate SSA diffs with smd.merge.Updater (#467)
Browse files Browse the repository at this point in the history
* fix: refactor ssa diff logic

Signed-off-by: Leonardo Luz Almeida <leonardo_almeida@intuit.com>

* fix: calculate ssa diff with smd.merge.Updater

Signed-off-by: Leonardo Luz Almeida <leonardo_almeida@intuit.com>

* chore: Add golangci config file

Signed-off-by: Leonardo Luz Almeida <leonardo_almeida@intuit.com>

* fix: remove wrong param passed to golanci-ghaction

Signed-off-by: Leonardo Luz Almeida <leonardo_almeida@intuit.com>

* doc: Add doc to the wrapper file

Signed-off-by: Leonardo Luz Almeida <leonardo_almeida@intuit.com>

* doc: Add instructions about how to extract the openapiv2 document from
k8s

Signed-off-by: Leonardo Luz Almeida <leonardo_almeida@intuit.com>

* better wording

Signed-off-by: Leonardo Luz Almeida <leonardo_almeida@intuit.com>

* better code comments

Signed-off-by: Leonardo Luz Almeida <leonardo_almeida@intuit.com>

Signed-off-by: Leonardo Luz Almeida <leonardo_almeida@intuit.com>
  • Loading branch information
leoluz committed Oct 4, 2022
1 parent 3951079 commit 98ccd3d
Show file tree
Hide file tree
Showing 17 changed files with 73,856 additions and 73 deletions.
1 change: 0 additions & 1 deletion .github/workflows/ci.yaml
Expand Up @@ -31,7 +31,6 @@ jobs:
with:
version: v1.38.0
args: --timeout 5m
skip-go-installation: true
- uses: codecov/codecov-action@v3.1.0
with:
token: ${{ secrets.CODECOV_TOKEN }} #required
Expand Down
3 changes: 3 additions & 0 deletions .golangci.yaml
@@ -0,0 +1,3 @@
run:
skip-files:
- "pkg/diff/internal/fieldmanager/borrowed_.+\\.go$"
125 changes: 85 additions & 40 deletions pkg/diff/diff.go
Expand Up @@ -13,16 +13,19 @@ import (

jsonpatch "github.com/evanphx/json-patch"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/jsonmergepatch"
"k8s.io/apimachinery/pkg/util/managedfields"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/structured-merge-diff/v4/fieldpath"
"sigs.k8s.io/structured-merge-diff/v4/merge"
"sigs.k8s.io/structured-merge-diff/v4/typed"

"github.com/argoproj/gitops-engine/internal/kubernetes_vendor/pkg/api/v1/endpoints"
"github.com/argoproj/gitops-engine/pkg/diff/internal/fieldmanager"
"github.com/argoproj/gitops-engine/pkg/sync/resource"
jsonutil "github.com/argoproj/gitops-engine/pkg/utils/json"
gescheme "github.com/argoproj/gitops-engine/pkg/utils/kube/scheme"
Expand Down Expand Up @@ -121,67 +124,66 @@ func Diff(config, live *unstructured.Unstructured, opts ...Option) (*DiffResult,
// k8s library (https://github.com/kubernetes-sigs/structured-merge-diff).
func StructuredMergeDiff(config, live *unstructured.Unstructured, gvkParser *managedfields.GvkParser, manager string) (*DiffResult, error) {
if live != nil && config != nil {
gvk := config.GetObjectKind().GroupVersionKind()
pt := gescheme.ResolveParseableType(gvk, gvkParser)
return structuredMergeDiff(config, live, pt, manager)
params := &SMDParams{
config: config,
live: live,
gvkParser: gvkParser,
manager: manager,
}
return structuredMergeDiff(params)
}
return handleResourceCreateOrDeleteDiff(config, live)
}

func structuredMergeDiff(config, live *unstructured.Unstructured, pt *typed.ParseableType, manager string) (*DiffResult, error) {
// 1) Build typed value from live and config unstructures
tvLive, err := pt.FromUnstructured(live.Object)
// SMDParams defines the parameters required by the structuredMergeDiff
// function
type SMDParams struct {
config *unstructured.Unstructured
live *unstructured.Unstructured
gvkParser *managedfields.GvkParser
manager string
}

func structuredMergeDiff(p *SMDParams) (*DiffResult, error) {

gvk := p.config.GetObjectKind().GroupVersionKind()
pt := gescheme.ResolveParseableType(gvk, p.gvkParser)

// Build typed value from live and config unstructures
tvLive, err := pt.FromUnstructured(p.live.Object)
if err != nil {
return nil, fmt.Errorf("error building typed value from live resource: %w", err)
}
tvConfig, err := pt.FromUnstructured(config.Object)
tvConfig, err := pt.FromUnstructured(p.config.Object)
if err != nil {
return nil, fmt.Errorf("error building typed value from config resource: %w", err)
}

previousFieldSet := &fieldpath.Set{}
managerFound := false
// 2) Search for manager to find all fields managed by it
// so it can be removed from live state before merging desired
// state (config).
if manager != "" {
for _, m := range live.GetManagedFields() {
if m.Manager == manager {
err := previousFieldSet.FromJSON(bytes.NewReader(m.FieldsV1.Raw))
if err != nil {
return nil, fmt.Errorf("error parsing manager fields from JSON: %w", err)
}
managerFound = true
}
}
// Invoke the apply function to calculate the diff using
// the structured-merge-diff library
mergedLive, err := apply(tvConfig, tvLive, p)
if err != nil {
return nil, fmt.Errorf("error calculating diff: %w", err)
}

// 3) When manager is not found, it means that the resource
// wasn't being synced with the given manager up to this point.
// In this case config fields will be used to clean live state.
if !managerFound {
previousFieldSet, err = tvConfig.ToFieldSet()
// When mergedLive is nil it means that there is no change
if mergedLive == nil {
liveBytes, err := json.Marshal(p.live)
if err != nil {
return nil, fmt.Errorf("error converting config to fieldset: %w", err)
return nil, fmt.Errorf("error marshaling live resource: %w", err)
}
// In this case diff result will have live state for both,
// predicted and live.
return buildDiffResult(liveBytes, liveBytes), nil
}

// 4) Remove previous fields from live
cleanLive := tvLive.RemoveItems(previousFieldSet)

// 5) Merge desired state in clean live
mergedCleanLive, err := cleanLive.Merge(tvConfig)
if err != nil {
return nil, fmt.Errorf("error merging config into clean live: %w", err)
}

// 6) Apply default values in predicted live
predictedLive, err := normalizeTypedValue(mergedCleanLive)
// Normalize merged live
predictedLive, err := normalizeTypedValue(mergedLive)
if err != nil {
return nil, fmt.Errorf("error applying default values in predicted live: %w", err)
}

// 7) Apply default values in live
// Normalize live
taintedLive, err := normalizeTypedValue(tvLive)
if err != nil {
return nil, fmt.Errorf("error applying default values in live: %w", err)
Expand All @@ -190,6 +192,49 @@ func structuredMergeDiff(config, live *unstructured.Unstructured, pt *typed.Pars
return buildDiffResult(predictedLive, taintedLive), nil
}

// apply will build all the dependency required to invoke the smd.merge.updater.Apply
// to correctly calculate the diff with the same logic used in k8s with server-side
// apply.
func apply(tvConfig, tvLive *typed.TypedValue, p *SMDParams) (*typed.TypedValue, error) {

// Build the structured-merge-diff Updater
updater := merge.Updater{
Converter: fieldmanager.NewVersionConverter(p.gvkParser, scheme.Scheme, p.config.GroupVersionKind().GroupVersion()),
}

// Build a list of managers and which API version they own
managed, err := fieldmanager.DecodeManagedFields(p.live.GetManagedFields())
if err != nil {
return nil, fmt.Errorf("error decoding managed fields: %w", err)
}

// Use the desired manifest to extract the target resource version
version := fieldpath.APIVersion(p.config.GetAPIVersion())

// The manager string needs to be converted to the internal manager
// key used inside structured-merge-diff apply logic
managerKey, err := buildManagerInfoForApply(p.manager)
if err != nil {
return nil, fmt.Errorf("error building manager info: %w", err)
}

// Finally invoke Apply to execute the same function used in k8s
// server-side applies
mergedLive, _, err := updater.Apply(tvLive, tvConfig, version, managed.Fields(), managerKey, true)
if err != nil {
return nil, fmt.Errorf("error while running updater.Apply: %w", err)
}
return mergedLive, err
}

func buildManagerInfoForApply(manager string) (string, error) {
managerInfo := metav1.ManagedFieldsEntry{
Manager: manager,
Operation: metav1.ManagedFieldsOperationApply,
}
return fieldmanager.BuildManagerIdentifier(&managerInfo)
}

// normalizeTypedValue will prepare the given tv so it can be used in diffs by:
// - removing last-applied-configuration annotation
// - applying default values
Expand Down
99 changes: 88 additions & 11 deletions pkg/diff/diff_test.go
Expand Up @@ -10,17 +10,21 @@ import (
"testing"

"github.com/argoproj/gitops-engine/pkg/diff/testdata"
"github.com/argoproj/gitops-engine/pkg/utils/kube/scheme"
openapi_v2 "github.com/google/gnostic/openapiv2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/managedfields"
"k8s.io/klog/v2/klogr"
openapiproto "k8s.io/kube-openapi/pkg/util/proto"
"sigs.k8s.io/yaml"
)

Expand Down Expand Up @@ -747,39 +751,68 @@ func TestUnsortedEndpoints(t *testing.T) {
}
}

func buildGVKParser(t *testing.T) *managedfields.GvkParser {
document := &openapi_v2.Document{}
err := proto.Unmarshal(testdata.OpenAPIV2Doc, document)
if err != nil {
t.Fatalf("error unmarshaling openapi doc: %s", err)
}
models, err := openapiproto.NewOpenAPIData(document)
if err != nil {
t.Fatalf("error building openapi data: %s", err)
}

gvkParser, err := managedfields.NewGVKParser(models, false)
if err != nil {
t.Fatalf("error building gvkParser: %s", err)
}
return gvkParser
}

func TestStructuredMergeDiff(t *testing.T) {
parser := scheme.StaticParser()
svcParseType := parser.Type("io.k8s.api.core.v1.Service")
manager := "argocd-controller"
buildParams := func(live, config *unstructured.Unstructured) *SMDParams {
gvkParser := buildGVKParser(t)
manager := "argocd-controller"
return &SMDParams{
config: config,
live: live,
gvkParser: gvkParser,
manager: manager,
}
}

t.Run("will apply default values", func(t *testing.T) {
// given
t.Parallel()
liveState := StrToUnstructured(testdata.ServiceLiveYAML)
desiredState := StrToUnstructured(testdata.ServiceConfigYAML)
params := buildParams(liveState, desiredState)

// when
result, err := structuredMergeDiff(desiredState, liveState, &svcParseType, manager)
result, err := structuredMergeDiff(params)

// then
require.NoError(t, err)
assert.NotNil(t, result)
assert.False(t, result.Modified)
assert.True(t, result.Modified)
predictedSVC := YamlToSvc(t, result.PredictedLive)
liveSVC := YamlToSvc(t, result.NormalizedLive)
assert.NotNil(t, predictedSVC.Spec.InternalTrafficPolicy)
assert.NotNil(t, liveSVC.Spec.InternalTrafficPolicy)
require.NotNil(t, predictedSVC.Spec.InternalTrafficPolicy)
require.NotNil(t, liveSVC.Spec.InternalTrafficPolicy)
assert.Equal(t, "Cluster", string(*predictedSVC.Spec.InternalTrafficPolicy))
assert.Equal(t, "Cluster", string(*liveSVC.Spec.InternalTrafficPolicy))
assert.Empty(t, predictedSVC.Annotations[AnnotationLastAppliedConfig])
assert.Empty(t, liveSVC.Annotations[AnnotationLastAppliedConfig])
})
t.Run("will remove entries in list", func(t *testing.T) {
// given
t.Parallel()
liveState := StrToUnstructured(testdata.ServiceLiveYAML)
desiredState := StrToUnstructured(testdata.ServiceConfigWith2Ports)
params := buildParams(liveState, desiredState)

// when
result, err := structuredMergeDiff(desiredState, liveState, &svcParseType, manager)
result, err := structuredMergeDiff(params)

// then
require.NoError(t, err)
Expand All @@ -790,11 +823,13 @@ func TestStructuredMergeDiff(t *testing.T) {
})
t.Run("will remove previously added fields not present in desired state", func(t *testing.T) {
// given
t.Parallel()
liveState := StrToUnstructured(testdata.LiveServiceWithTypeYAML)
desiredState := StrToUnstructured(testdata.ServiceConfigYAML)
params := buildParams(liveState, desiredState)

// when
result, err := structuredMergeDiff(desiredState, liveState, &svcParseType, manager)
result, err := structuredMergeDiff(params)

// then
require.NoError(t, err)
Expand All @@ -805,11 +840,13 @@ func TestStructuredMergeDiff(t *testing.T) {
})
t.Run("will apply service with multiple ports", func(t *testing.T) {
// given
t.Parallel()
liveState := StrToUnstructured(testdata.ServiceLiveYAML)
desiredState := StrToUnstructured(testdata.ServiceConfigWithSamePortsYAML)
params := buildParams(liveState, desiredState)

// when
result, err := structuredMergeDiff(desiredState, liveState, &svcParseType, manager)
result, err := structuredMergeDiff(params)

// then
require.NoError(t, err)
Expand All @@ -818,6 +855,36 @@ func TestStructuredMergeDiff(t *testing.T) {
svc := YamlToSvc(t, result.PredictedLive)
assert.Len(t, svc.Spec.Ports, 5)
})
t.Run("will apply deployment defaults correctly", func(t *testing.T) {
// given
t.Parallel()
liveState := StrToUnstructured(testdata.DeploymentLiveYAML)
desiredState := StrToUnstructured(testdata.DeploymentConfigYAML)
params := buildParams(liveState, desiredState)

// when
result, err := structuredMergeDiff(params)

// then
require.NoError(t, err)
assert.NotNil(t, result)
assert.False(t, result.Modified)
deploy := YamlToDeploy(t, result.PredictedLive)
assert.Len(t, deploy.Spec.Template.Spec.Containers, 1)
assert.Equal(t, "0", deploy.Spec.Template.Spec.Containers[0].Resources.Requests.Cpu().String())
assert.Equal(t, "0", deploy.Spec.Template.Spec.Containers[0].Resources.Requests.Memory().String())
assert.Equal(t, "0", deploy.Spec.Template.Spec.Containers[0].Resources.Requests.Storage().String())
assert.Equal(t, "0", deploy.Spec.Template.Spec.Containers[0].Resources.Limits.Cpu().String())
assert.Equal(t, "0", deploy.Spec.Template.Spec.Containers[0].Resources.Limits.Memory().String())
assert.Equal(t, "0", deploy.Spec.Template.Spec.Containers[0].Resources.Limits.Storage().String())
require.NotNil(t, deploy.Spec.Strategy.RollingUpdate)
expectedMaxSurge := &intstr.IntOrString{
Type: intstr.String,
StrVal: "25%",
}
assert.Equal(t, expectedMaxSurge, deploy.Spec.Strategy.RollingUpdate.MaxSurge)
assert.Equal(t, "ClusterFirst", string(deploy.Spec.Template.Spec.DNSPolicy))
})
}

func createSecret(data map[string]string) *unstructured.Unstructured {
Expand Down Expand Up @@ -1078,6 +1145,16 @@ func YamlToSvc(t *testing.T, y []byte) *corev1.Service {
return &svc
}

func YamlToDeploy(t *testing.T, y []byte) *appsv1.Deployment {
t.Helper()
deploy := appsv1.Deployment{}
err := yaml.Unmarshal(y, &deploy)
if err != nil {
t.Fatalf("error unmarshaling deployment bytes: %s", err)
}
return &deploy
}

func StrToUnstructured(yamlStr string) *unstructured.Unstructured {
obj := make(map[string]interface{})
err := yaml.Unmarshal([]byte(yamlStr), &obj)
Expand Down
2 changes: 2 additions & 0 deletions pkg/diff/internal/fieldmanager/README
@@ -0,0 +1,2 @@
Please check the doc.go file for more details about
how to use and maintain the code in this package.

0 comments on commit 98ccd3d

Please sign in to comment.