Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error handling 1st part #653

Merged
merged 2 commits into from Oct 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -35,6 +35,18 @@ public abstract class CamelConnectorConfig extends AbstractConfig {
public static final String CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF = "camel.aggregation.timeout";
public static final String CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC = "The timeout of the aggregation, to be used in combination with camel.beans.aggregate";

public static final String CAMEL_CONNECTOR_ERROR_HANDLER_DEFAULT = "default";
public static final String CAMEL_CONNECTOR_ERROR_HANDLER_CONF = "camel.error.handler";
public static final String CAMEL_CONNECTOR_ERROR_HANDLER_DOC = "The error handler to use: possible value are 'no' or 'default'";

public static final int CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT = 0;
public static final String CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF = "camel.error.handler.max.redeliveries";
public static final String CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC = "The maximum redeliveries to be use in case of Default Error Handler";

public static final Long CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT = 1000L;
public static final String CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF = "camel.error.handler.redelivery.delay";
public static final String CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC = "The initial redelivery delay in milliseconds in case of Default Error Handler";

protected CamelConnectorConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
super(definition, originals, configProviderProps, doLog);
}
Expand Down
Expand Up @@ -54,7 +54,10 @@ public class CamelSinkConnectorConfig extends CamelConnectorConfig {
.define(CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SINK_CONTENT_LOG_LEVEL_DOC)
.define(CAMEL_CONNECTOR_AGGREGATE_CONF, Type.STRING, CAMEL_CONNECTOR_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_DOC)
.define(CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_SIZE_DOC)
.define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC);
.define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC)
.define(CAMEL_CONNECTOR_ERROR_HANDLER_CONF, Type.STRING, CAMEL_CONNECTOR_ERROR_HANDLER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_ERROR_HANDLER_DOC)
.define(CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF, Type.INT, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC)
.define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC);

public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
Expand Down
Expand Up @@ -82,7 +82,9 @@ public void start(Map<String, String> props) {
final String unmarshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_UNMARSHAL_CONF);
final int size = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF);
final long timeout = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF);

final int maxRedeliveries = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF);
final long redeliveryDelay = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF);
final String errorHandler = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF);
CamelContext camelContext = new DefaultCamelContext();
if (remoteUrl == null) {
remoteUrl = TaskHelper.buildUrl(camelContext,
Expand All @@ -98,6 +100,9 @@ public void start(Map<String, String> props) {
.withMarshallDataFormat(marshaller)
.withAggregationSize(size)
.withAggregationTimeout(timeout)
.withErrorHandler(errorHandler)
.withMaxRedeliveries(maxRedeliveries)
.withRedeliveryDelay(redeliveryDelay)
.build(camelContext);


Expand Down
Expand Up @@ -90,7 +90,10 @@ public class CamelSourceConnectorConfig extends CamelConnectorConfig {
.define(CAMEL_SOURCE_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SOURCE_CONTENT_LOG_LEVEL_DOC)
.define(CAMEL_CONNECTOR_AGGREGATE_CONF, Type.STRING, CAMEL_CONNECTOR_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_DOC)
.define(CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF, Type.INT, CAMEL_CONNECTOR_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_SIZE_DOC)
.define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC);
.define(CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_DOC)
.define(CAMEL_CONNECTOR_ERROR_HANDLER_CONF, Type.STRING, CAMEL_CONNECTOR_ERROR_HANDLER_DEFAULT, Importance.LOW, CAMEL_CONNECTOR_ERROR_HANDLER_DOC)
.define(CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF, Type.INT, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_DOC)
.define(CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF, Type.LONG, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DEFAULT, Importance.MEDIUM, CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_DOC);

public CamelSourceConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
Expand Down
Expand Up @@ -88,7 +88,10 @@ public void start(Map<String, String> props) {
final String marshaller = config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_MARSHAL_CONF);
final int size = config.getInt(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_SIZE_CONF);
final long timeout = config.getLong(CamelSourceConnectorConfig.CAMEL_CONNECTOR_AGGREGATE_TIMEOUT_CONF);

final int maxRedeliveries = config.getInt(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_MAXIMUM_REDELIVERIES_CONF);
final long redeliveryDelay = config.getLong(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_REDELIVERY_DELAY_CONF);
final String errorHandler = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF);

topics = config.getString(CamelSourceConnectorConfig.TOPIC_CONF).split(",");

String localUrl = getLocalUrlWithPollingOptions(config);
Expand All @@ -107,6 +110,9 @@ public void start(Map<String, String> props) {
.withMarshallDataFormat(marshaller)
.withAggregationSize(size)
.withAggregationTimeout(timeout)
.withErrorHandler(errorHandler)
.withMaxRedeliveries(maxRedeliveries)
.withRedeliveryDelay(redeliveryDelay)
.build(camelContext);

consumer = cms.getCamelContext().getEndpoint(localUrl).createPollingConsumer();
Expand Down
Expand Up @@ -91,6 +91,9 @@ public static final class Builder {
private String unmarshallDataFormat;
private int aggregationSize;
private long aggregationTimeout;
private String errorHandler;
private int maxRedeliveries;
private long redeliveryDelay;

public Builder(String from, String to) {
this.from = from;
Expand Down Expand Up @@ -121,6 +124,21 @@ public Builder withAggregationTimeout(long aggregationTimeout) {
this.aggregationTimeout = aggregationTimeout;
return this;
}

public Builder withErrorHandler(String errorHandler) {
this.errorHandler = errorHandler;
return this;
}

public Builder withMaxRedeliveries(int maxRedeliveries) {
this.maxRedeliveries = maxRedeliveries;
return this;
}

public Builder withRedeliveryDelay(long redeliveryDelay) {
this.redeliveryDelay = redeliveryDelay;
return this;
}

public CamelKafkaConnectMain build(CamelContext camelContext) {
CamelKafkaConnectMain camelMain = new CamelKafkaConnectMain(camelContext);
Expand All @@ -138,6 +156,20 @@ public void configure() {
//from
RouteDefinition rd = from(from);
LOG.info("Creating Camel route from({})", from);

if (!ObjectHelper.isEmpty(props.get(CamelConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF))) {
String errorHandler = props.get(CamelConnectorConfig.CAMEL_CONNECTOR_ERROR_HANDLER_CONF);
switch (errorHandler) {
case "no":
rd.errorHandler(noErrorHandler());
break;
case "default":
rd.errorHandler(defaultErrorHandler().maximumRedeliveries(maxRedeliveries).redeliveryDelay(redeliveryDelay));
break;
default:
break;
}
}

//dataformats
if (!ObjectHelper.isEmpty(marshallDataFormat)) {
Expand Down