Skip to content

Commit

Permalink
fix: Avoid Class cast errors in TimestampRouter
Browse files Browse the repository at this point in the history
- Avoids String to Long Class cast errors in TimestampRouter utility
- Add test for timestamp-router-action Kamelet
  • Loading branch information
christophd committed Jun 13, 2024
1 parent 2048783 commit 791443d
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,6 @@
*/
package org.apache.camel.kamelets.utils.transform;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeProperty;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.util.ObjectHelper;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
Expand All @@ -36,6 +28,14 @@
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangeProperty;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.util.ObjectHelper;

public class MessageTimestampRouter {

public void process(@ExchangeProperty("topicFormat") String topicFormat, @ExchangeProperty("timestampFormat") String timestampFormat, @ExchangeProperty("timestampKeys") String timestampKeys, @ExchangeProperty("timestampKeyFormat") String timestampKeyFormat, Exchange ex) throws ParseException {
Expand Down Expand Up @@ -63,13 +63,13 @@ public void process(@ExchangeProperty("topicFormat") String topicFormat, @Exchan
break;
}
}
long timestamp;
Long timestamp = null;
if (ObjectHelper.isNotEmpty(timestampKeyFormat) && ObjectHelper.isNotEmpty(rawTimestamp) && !timestampKeyFormat.equalsIgnoreCase("timestamp")) {
final SimpleDateFormat timestampKeyFmt = new SimpleDateFormat(timestampKeyFormat);
timestampKeyFmt.setTimeZone(TimeZone.getTimeZone("UTC"));
timestamp = timestampKeyFmt.parse((String) rawTimestamp).getTime();
} else {
timestamp = Long.valueOf((String) rawTimestamp);
} else if (ObjectHelper.isNotEmpty(rawTimestamp)) {
timestamp = Long.parseLong(rawTimestamp.toString());
}
if (ObjectHelper.isNotEmpty(timestamp)) {
final String formattedTimestamp = fmt.format(new Date(timestamp));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@
*/
package org.apache.camel.kamelets.utils.transform;

import org.apache.camel.Exchange;
import org.apache.camel.ExchangeProperty;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.util.ObjectHelper;

import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.Date;
import java.util.TimeZone;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.camel.Exchange;
import org.apache.camel.ExchangeProperty;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.util.ObjectHelper;

public class TimestampRouter {

public void process(@ExchangeProperty("topicFormat") String topicFormat, @ExchangeProperty("timestampFormat") String timestampFormat, @ExchangeProperty("timestampHeaderName") String timestampHeaderName, Exchange ex) {
Expand All @@ -38,15 +38,15 @@ public void process(@ExchangeProperty("topicFormat") String topicFormat, @Exchan
final SimpleDateFormat fmt = new SimpleDateFormat(timestampFormat);
fmt.setTimeZone(TimeZone.getTimeZone("UTC"));

long timestamp;
Long timestamp = null;
String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, String.class);
Object rawTimestamp = ex.getMessage().getHeader(timestampHeaderName);
if (rawTimestamp instanceof Long) {
timestamp = (Long) rawTimestamp;
} else if (rawTimestamp instanceof Instant) {
timestamp = ((Instant) rawTimestamp).toEpochMilli();
} else {
timestamp = (Long) rawTimestamp;
} else if (ObjectHelper.isNotEmpty(rawTimestamp)) {
timestamp = Long.parseLong(rawTimestamp.toString());
}
if (ObjectHelper.isNotEmpty(timestamp)) {
final String formattedTimestamp = fmt.format(new Date(timestamp));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# ---------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ---------------------------------------------------------------------------

apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: timestamp-router-pipe
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: webhook-source
properties:
subpath: messages
steps:
- ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: timestamp-router-action
properties:
topicFormat: $[topic]_$[timestamp]
timestampFormat: YYYY-MM-dd
- ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: set-body-action
properties:
value: $simple{header[message]}
- ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: log-action
properties:
showHeaders: true
sink:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: kafka-sink
properties:
bootstrapServers: ${YAKS_TESTCONTAINERS_REDPANDA_LOCAL_BOOTSTRAP_SERVERS}
user: ${user}
password: ${password}
topic: dummy
securityProtocol: ${securityProtocol}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# ---------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ---------------------------------------------------------------------------

Feature: Kafka Timestamp Router

Background:
Given variable user is ""
Given variable password is ""
Given variables
| securityProtocol | PLAINTEXT |
| topicName | my-topic |
| timestamp | yaks:unixTimestamp()000 |
| topic | ${topicName}_yaks:currentDate('YYYY-MM-dd') |
| message | Camel K rocks! |
Given Kafka topic: ${topic}
Given Kafka topic partition: 0

Scenario: Create infrastructure
Given start Redpanda container

Scenario: Create Pipe
When load Pipe timestamp-router-pipe.yaml
Then Camel K integration timestamp-router-pipe should be running
Then Camel K integration timestamp-router-pipe should print Routes startup

Scenario: Receive message on Kafka topic and verify sink output
Given new Kafka connection
| url | ${YAKS_TESTCONTAINERS_REDPANDA_LOCAL_BOOTSTRAP_SERVERS} |
| consumerGroup | consumer-1 |
Given URL: yaks:resolveURL('timestamp-router-pipe',8080)
Given HTTP request query parameter kafka.TOPIC="${topicName}"
Given HTTP request query parameter kafka.TIMESTAMP="${timestamp}"
Given HTTP request query parameter message="yaks:urlEncode(${message})"
Given HTTP request fork mode is enabled
When send GET /messages
Then receive Kafka message with body: ${message}
And receive HTTP 200 OK

Scenario: Remove resources
Given delete Pipe timestamp-router-pipe
And stop Redpanda container
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ config:
resources:
- kafka-source-pipe.yaml
- kafka-sink-pipe.yaml
- timestamp-router-pipe.yaml
dump:
enabled: true
failedOnly: true
Expand Down

0 comments on commit 791443d

Please sign in to comment.