Skip to content

Commit

Permalink
kamelets: add condition to report when an integration is stuck waitin…
Browse files Browse the repository at this point in the history
…g for them #2331
  • Loading branch information
lburgazzoli committed May 27, 2021
1 parent a84667a commit d3f8319
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 34 deletions.
8 changes: 7 additions & 1 deletion pkg/apis/camel/v1/integration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,13 @@ const (
IntegrationConditionReplicaSetReadyReason string = "ReplicaSetReady"
// IntegrationConditionReplicaSetNotReadyReason --
IntegrationConditionReplicaSetNotReadyReason string = "ReplicaSetNotReady"

// IntegrationConditionKameletsAvailable --
IntegrationConditionKameletsAvailable IntegrationConditionType = "KameletsAvailable"
// IntegrationConditionKameletsAvailableReason --
IntegrationConditionKameletsAvailableReason string = "KameletsAvailable"
// IntegrationConditionKameletsNotAvailableReason --
IntegrationConditionKameletsNotAvailableReason string = "KameletsNotAvailable"
)

// IntegrationCondition describes the state of a resource at a certain point.
Expand Down Expand Up @@ -242,5 +249,4 @@ type PodSpec struct {
NodeSelector map[string]string `json:"nodeSelector,omitempty" protobuf:"bytes,7,rep,name=nodeSelector"`

TopologySpreadConstraints []corev1.TopologySpreadConstraint `json:"topologySpreadConstraints,omitempty" patchStrategy:"merge" patchMergeKey:"topologyKey" protobuf:"bytes,33,opt,name=topologySpreadConstraints"`

}
4 changes: 2 additions & 2 deletions pkg/resources/resources.go

Large diffs are not rendered by default.

94 changes: 63 additions & 31 deletions pkg/trait/kamelets.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package trait

import (
"errors"
"fmt"
"regexp"
"sort"
Expand Down Expand Up @@ -139,30 +140,74 @@ func (t *kameletsTrait) Apply(e *Environment) error {
return nil
}

func (t *kameletsTrait) addKamelets(e *Environment) error {
kameletKeys := t.getKameletKeys()
if len(kameletKeys) > 0 {
repo, err := repository.NewForPlatform(e.C, e.Client, e.Platform, e.Integration.Namespace, platform.GetOperatorNamespace())
func (t *kameletsTrait) collectKamelets(e *Environment) (map[string]*v1alpha1.Kamelet, error) {
repo, err := repository.NewForPlatform(e.C, e.Client, e.Platform, e.Integration.Namespace, platform.GetOperatorNamespace())
if err != nil {
return nil, err
}

kamelets := make(map[string]*v1alpha1.Kamelet)
missingKamelets := make([]string, 0)
availableKamelets := make([]string, 0)

for _, key := range t.getKameletKeys() {
kamelet, err := repo.Get(e.C, key)
if err != nil {
return err
return nil, err
}
for _, k := range t.getKameletKeys() {
kamelet, err := repo.Get(e.C, k)
if err != nil {
return err
}
if kamelet == nil {
return fmt.Errorf("kamelet %s not found in any of the defined repositories: %s", k, repo.String())
}

if kamelet == nil {
missingKamelets = append(missingKamelets, key)
} else {
availableKamelets = append(availableKamelets, key)

// Initialize remote kamelets
kamelet, err = kameletutils.Initialize(kamelet)
kamelets[key], err = kameletutils.Initialize(kamelet)
if err != nil {
return err
return nil, err
}
}
}

sort.Strings(availableKamelets)
sort.Strings(missingKamelets)

if len(missingKamelets) > 0 {
message := fmt.Sprintf("kamelets %s found, %s not found in repositories: %s",
strings.Join(availableKamelets, ","),
strings.Join(missingKamelets, ","),
repo.String())

e.Integration.Status.SetCondition(
v1.IntegrationConditionKameletsAvailable,
corev1.ConditionFalse,
v1.IntegrationConditionKameletsAvailableReason,
message,
)

return nil, errors.New(message)
}

e.Integration.Status.SetCondition(
v1.IntegrationConditionKameletsAvailable,
corev1.ConditionTrue,
v1.IntegrationConditionKameletsAvailableReason,
fmt.Sprintf("kamelets %s found in repositories: %s", strings.Join(availableKamelets, ","), repo.String()),
)

return kamelets, nil
}

func (t *kameletsTrait) addKamelets(e *Environment) error {
if len(t.getKameletKeys()) > 0 {
kamelets, err := t.collectKamelets(e)
if err != nil {
return err
}

for name, kamelet := range kamelets {
if kamelet.Status.Phase != v1alpha1.KameletPhaseReady {
return fmt.Errorf("kamelet %q is not %s: %s", k, v1alpha1.KameletPhaseReady, kamelet.Status.Phase)
return fmt.Errorf("kamelet %q is not %s: %s", name, v1alpha1.KameletPhaseReady, kamelet.Status.Phase)
}

if err := t.addKameletAsSource(e, kamelet); err != nil {
Expand All @@ -180,25 +225,12 @@ func (t *kameletsTrait) addKamelets(e *Environment) error {

func (t *kameletsTrait) configureApplicationProperties(e *Environment) error {
if len(t.getKameletKeys()) > 0 {
repo, err := repository.NewForPlatform(e.C, e.Client, e.Platform, e.Integration.Namespace, platform.GetOperatorNamespace())
kamelets, err := t.collectKamelets(e)
if err != nil {
return err
}
for _, k := range t.getKameletKeys() {
kamelet, err := repo.Get(e.C, k)
if err != nil {
return err
}
if kamelet == nil {
return fmt.Errorf("kamelet %s not found in any of the defined repositories: %s", k, repo.String())
}

// remote Kamelets may not be fully initialized
kamelet, err = kameletutils.Initialize(kamelet)
if err != nil {
return err
}

for _, kamelet := range kamelets {
// Configuring defaults from Kamelet
for _, prop := range kamelet.Status.Properties {
if prop.Default != "" {
Expand Down
91 changes: 91 additions & 0 deletions pkg/trait/kamelets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,97 @@ func TestKameletNamedConfigLookup(t *testing.T) {
assert.NotContains(t, environment.Integration.Status.Configuration, v1.ConfigurationSpec{Type: "secret", Value: "my-secret3"})
}

func TestKameletConditionFalse(t *testing.T) {
flow := `
- from:
uri: kamelet:timer
steps:
- to: kamelet:none
`
trait, environment := createKameletsTestEnvironment(
flow,
&v1alpha1.Kamelet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
Name: "timer",
},
Spec: v1alpha1.KameletSpec{
Flow: marshalOrFail(map[string]interface{}{
"from": map[string]interface{}{
"uri": "timer:tick",
},
}),
},
Status: v1alpha1.KameletStatus{Phase: v1alpha1.KameletPhaseReady},
})

enabled, err := trait.Configure(environment)
assert.NoError(t, err)
assert.True(t, enabled)

err = trait.Apply(environment)
assert.Error(t, err)
assert.Len(t, environment.Integration.Status.Conditions, 1)

cond := environment.Integration.Status.GetCondition(v1.IntegrationConditionKameletsAvailable)
assert.Equal(t, corev1.ConditionFalse, cond.Status)
assert.Equal(t, v1.IntegrationConditionKameletsAvailableReason, cond.Reason)
assert.Contains(t, cond.Message, "timer found")
assert.Contains(t, cond.Message, "none not found")
}

func TestKameletConditionTrue(t *testing.T) {
flow := `
- from:
uri: kamelet:timer
steps:
- to: kamelet:none
`
trait, environment := createKameletsTestEnvironment(
flow,
&v1alpha1.Kamelet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
Name: "timer",
},
Spec: v1alpha1.KameletSpec{
Flow: marshalOrFail(map[string]interface{}{
"from": map[string]interface{}{
"uri": "timer:tick",
},
}),
},
Status: v1alpha1.KameletStatus{Phase: v1alpha1.KameletPhaseReady},
},
&v1alpha1.Kamelet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "test",
Name: "none",
},
Spec: v1alpha1.KameletSpec{
Flow: marshalOrFail(map[string]interface{}{
"from": map[string]interface{}{
"uri": "timer:tick",
},
}),
},
Status: v1alpha1.KameletStatus{Phase: v1alpha1.KameletPhaseReady},
})

enabled, err := trait.Configure(environment)
assert.NoError(t, err)
assert.True(t, enabled)

err = trait.Apply(environment)
assert.NoError(t, err)
assert.Len(t, environment.Integration.Status.Conditions, 1)

cond := environment.Integration.Status.GetCondition(v1.IntegrationConditionKameletsAvailable)
assert.Equal(t, corev1.ConditionTrue, cond.Status)
assert.Equal(t, v1.IntegrationConditionKameletsAvailableReason, cond.Reason)
assert.Contains(t, cond.Message, "none,timer found")
}

func createKameletsTestEnvironment(flow string, objects ...runtime.Object) (*kameletsTrait, *Environment) {
catalog, _ := camel.DefaultCatalog()

Expand Down

0 comments on commit d3f8319

Please sign in to comment.