Skip to content

Commit

Permalink
[test] add Kamelet errorHandler test
Browse files Browse the repository at this point in the history
  • Loading branch information
bouskaJ committed Sep 17, 2021
1 parent b035e30 commit 581e8bb
Show file tree
Hide file tree
Showing 3 changed files with 202 additions and 41 deletions.
138 changes: 138 additions & 0 deletions e2e/common/kamelet_binding_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// +build integration

// To enable compilation of this file in Goland, go to "Settings -> Go -> Vendoring & Build Tags -> Custom Tags" and add "integration"

/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package common

import (
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
v1 "k8s.io/api/core/v1"
"testing"

. "github.com/apache/camel-k/e2e/support"
. "github.com/onsi/gomega"
)

func TestErrorHandler(t *testing.T) {
WithNewTestNamespace(t, func(ns string) {
Expect(Kamel("install", "-n", ns).Execute()).To(Succeed())

Expect(CreateErrorProducerKamelet(ns, "my-own-error-producer-source")()).To(Succeed())
Expect(CreateLogKamelet(ns, "my-own-log-sink")()).To(Succeed())
from := v1.ObjectReference{
Kind: "Kamelet",
Name: "my-own-error-producer-source",
APIVersion: v1alpha1.SchemeGroupVersion.String(),
}

to := v1.ObjectReference{
Kind: "Kamelet",
Name: "my-own-log-sink",
APIVersion: v1alpha1.SchemeGroupVersion.String(),
}

errorHandler := map[string]interface{}{
"dead-letter-channel": map[string]interface{}{
"endpoint": map[string]interface{}{
"ref": map[string]string{
"kind": "Kamelet",
"apiVersion": v1alpha1.SchemeGroupVersion.String(),
"name": "my-own-log-sink",
},
"properties": map[string]string{
"loggerName": "kameletErrorHandler",
},
}}}

t.Run("throw error test", func(t *testing.T) {

Expect(BindKameletToWithErrorHandler(ns, "throw-error-binding", from, to, map[string]string{"message": "throw Error"}, map[string]string{"loggerName": "integrationLogger"}, errorHandler)()).To(Succeed())

Eventually(IntegrationPodPhase(ns, "throw-error-binding"), TestTimeoutMedium).Should(Equal(v1.PodRunning))
Eventually(IntegrationLogs(ns, "throw-error-binding"), TestTimeoutShort).Should(ContainSubstring("kameletErrorHandler"))
Eventually(IntegrationLogs(ns, "throw-error-binding"), TestTimeoutShort).ShouldNot(ContainSubstring("integrationLogger"))

})

t.Run("don't throw error test", func(t *testing.T) {

Expect(BindKameletToWithErrorHandler(ns, "no-error-binding", from, to, map[string]string{"message": "true"}, map[string]string{"loggerName": "integrationLogger"}, errorHandler)()).To(Succeed())

Eventually(IntegrationPodPhase(ns, "no-error-binding"), TestTimeoutMedium).Should(Equal(v1.PodRunning))
Eventually(IntegrationLogs(ns, "no-error-binding"), TestTimeoutShort).ShouldNot(ContainSubstring("kameletErrorHandler"))
Eventually(IntegrationLogs(ns, "no-error-binding"), TestTimeoutShort).Should(ContainSubstring("integrationLogger"))

})

// Cleanup
Expect(Kamel("delete", "--all", "-n", ns).Execute()).Should(BeNil())
})
}

func CreateLogKamelet(ns string, name string) func() error {
flow := map[string]interface{}{
"from": map[string]interface{}{
"uri": "kamelet:source",
"steps": []map[string]interface{}{
{
"to": "log:{{loggerName}}",
},
},
},
}

props := map[string]v1alpha1.JSONSchemaProp{
"loggerName": {
Type: "string",
},
}
return CreateKamelet(ns, name, flow, props)
}

func CreateErrorProducerKamelet(ns string, name string) func() error {
props := map[string]v1alpha1.JSONSchemaProp{
"message": {
Type: "string",
},
}

flow := map[string]interface{}{
"from": map[string]interface{}{
"uri": "timer:tick",
"steps": []map[string]interface{}{
{
"set-body": map[string]interface{}{
"constant": "{{message}}",
},
},
{
"set-body": map[string]interface{}{
"simple": "${mandatoryBodyAs(Boolean)}",
},
},
{
"to": "kamelet:sink",
},
},
},
}

return CreateKamelet(ns, name, flow, props)
}
13 changes: 10 additions & 3 deletions e2e/knative/kamelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ limitations under the License.
package knative

import (
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
"testing"

. "github.com/onsi/gomega"
Expand All @@ -41,17 +42,23 @@ func TestKameletChange(t *testing.T) {
Expect(CreateTimerKamelet(ns, "timer-source")()).To(Succeed())
Expect(CreateKnativeChannel(ns, "messages")()).To(Succeed())
Expect(Kamel("run", "-n", ns, "files/display.groovy", "-w").Execute()).To(Succeed())
ref := v1.ObjectReference{
from := v1.ObjectReference{
Kind: "Kamelet",
APIVersion: v1alpha1.SchemeGroupVersion.String(),
Name: "timer-source",
}

to := v1.ObjectReference{
Kind: "InMemoryChannel",
Name: "messages",
APIVersion: messaging.SchemeGroupVersion.String(),
}
Expect(BindKameletTo(ns, "timer-binding", "timer-source", ref, map[string]string{"message": "message is Hello"})()).To(Succeed())
Expect(BindKameletTo(ns, "timer-binding", from, to, map[string]string{"message": "message is Hello"}, map[string]string{})()).To(Succeed())
Eventually(IntegrationPodPhase(ns, "timer-binding"), TestTimeoutMedium).Should(Equal(v1.PodRunning))
Eventually(IntegrationCondition(ns, "timer-binding", camelv1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(v1.ConditionTrue))
Eventually(IntegrationLogs(ns, "display"), TestTimeoutShort).Should(ContainSubstring("message is Hello"))

Expect(BindKameletTo(ns, "timer-binding", "timer-source", ref, map[string]string{"message": "message is Hi"})()).To(Succeed())
Expect(BindKameletTo(ns, "timer-binding", from, to, map[string]string{"message": "message is Hi"}, map[string]string{})()).To(Succeed())
Eventually(IntegrationPodPhase(ns, "timer-binding"), TestTimeoutMedium).Should(Equal(v1.PodRunning))
Eventually(IntegrationCondition(ns, "timer-binding", camelv1.IntegrationConditionReady), TestTimeoutShort).Should(Equal(v1.ConditionTrue))
Eventually(IntegrationLogs(ns, "display"), TestTimeoutShort).Should(ContainSubstring("message is Hi"))
Expand Down
92 changes: 54 additions & 38 deletions e2e/support/test_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,7 @@ func CreateKnativeChannel(ns string, name string) func() error {
Kamelets
*/

func CreateTimerKamelet(ns string, name string) func() error {
func CreateKamelet(ns string, name string, flow map[string]interface{}, properties map[string]v1alpha1.JSONSchemaProp) func() error {
return func() error {
kamelet := v1alpha1.Kamelet{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -1083,54 +1083,60 @@ func CreateTimerKamelet(ns string, name string) func() error {
},
Spec: v1alpha1.KameletSpec{
Definition: &v1alpha1.JSONSchemaProps{
Properties: map[string]v1alpha1.JSONSchemaProp{
"message": {
Type: "string",
},
},
Properties: properties,
},
Flow: asFlow(map[string]interface{}{
"from": map[string]interface{}{
"uri": "timer:tick",
"steps": []map[string]interface{}{
{
"set-body": map[string]interface{}{
"constant": "{{message}}",
},
},
{
"to": "kamelet:sink",
},
},
},
}),
Flow: asFlow(flow),
},
}
return TestClient().Create(TestContext, &kamelet)
}
}

func BindKameletTo(ns, name, from string, to corev1.ObjectReference, properties map[string]string) func() error {
return func() error {
kb := v1alpha1.KameletBinding{
ObjectMeta: metav1.ObjectMeta{
Namespace: ns,
Name: name,
},
Spec: v1alpha1.KameletBindingSpec{
Source: v1alpha1.Endpoint{
Ref: &corev1.ObjectReference{
Kind: "Kamelet",
APIVersion: v1alpha1.SchemeGroupVersion.String(),
Name: from,
func CreateTimerKamelet(ns string, name string) func() error {
props := map[string]v1alpha1.JSONSchemaProp{
"message": {
Type: "string",
},
}

flow := map[string]interface{}{
"from": map[string]interface{}{
"uri": "timer:tick",
"steps": []map[string]interface{}{
{
"set-body": map[string]interface{}{
"constant": "{{message}}",
},
Properties: asEndpointProperties(properties),
},
Sink: v1alpha1.Endpoint{
Ref: &to,
Properties: asEndpointProperties(map[string]string{}),
{
"to": "kamelet:sink",
},
},
},
}

return CreateKamelet(ns, name, flow, props)
}

func BindKameletTo(ns string, name string, from corev1.ObjectReference, to corev1.ObjectReference, sourceProperties map[string]string, sinkProperties map[string]string) func() error {
return BindKameletToWithErrorHandler(ns, name, from, to, sourceProperties, sinkProperties, nil)
}

func BindKameletToWithErrorHandler(ns string, name string, from corev1.ObjectReference, to corev1.ObjectReference, sourceProperties map[string]string, sinkProperties map[string]string, errorHandler map[string]interface{}) func() error {
return func() error {
kb := v1alpha1.NewKameletBinding(ns, name)
kb.Spec = v1alpha1.KameletBindingSpec{
Source: v1alpha1.Endpoint{
Ref: &from,
Properties: asEndpointProperties(sourceProperties),
},
Sink: v1alpha1.Endpoint{
Ref: &to,
Properties: asEndpointProperties(sinkProperties),
},
}
if errorHandler != nil {
kb.Spec.ErrorHandler = asErrorHandlerSpec(errorHandler)
}
return kubernetes.ReplaceResource(TestContext, TestClient(), &kb)
}
Expand All @@ -1146,6 +1152,16 @@ func asFlow(source map[string]interface{}) *v1.Flow {
}
}

func asErrorHandlerSpec(source map[string]interface{}) *v1alpha1.ErrorHandlerSpec {
bytes, err := json.Marshal(source)
if err != nil {
panic(err)
}
return &v1alpha1.ErrorHandlerSpec{
RawMessage: bytes,
}
}

func asEndpointProperties(props map[string]string) *v1alpha1.EndpointProperties {
bytes, err := json.Marshal(props)
if err != nil {
Expand Down

0 comments on commit 581e8bb

Please sign in to comment.