Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
"exchangePattern": { "index": 24, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
"partitioning": { "index": 25, "kind": "parameter", "displayName": "Partitioning", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.iggy.message.Partitioning", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "balanced", "configurationClass": "org.apache.camel.component.iggy.IggyConfiguration", "configurationField": "configuration", "description": "Partitioning strategy for message distribution" },
"lazyStartProducer": { "index": 26, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing." },
"username": { "index": 27, "kind": "parameter", "displayName": "Username", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.iggy.IggyConfiguration", "configurationField": "configuration", "description": "Iggy username" }
"headerFilterStrategy": { "index": 27, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter header to and from Camel message." },
"username": { "index": 28, "kind": "parameter", "displayName": "Username", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.iggy.IggyConfiguration", "configurationField": "configuration", "description": "Iggy username" }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@
"sendToAll": { "index": 15, "kind": "parameter", "displayName": "Send To All", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "configurationClass": "org.apache.camel.component.vertx.websocket.VertxWebsocketConfiguration", "configurationField": "configuration", "description": "To send to all websocket subscribers. Can be used to configure at the endpoint level, instead of providing the VertxWebsocketConstants.SEND_TO_ALL header on the message. Note that when using this option, the host name specified for the vertx-websocket producer URI must match one used for an existing vertx-websocket consumer. Note that this option only applies when producing messages to endpoints hosted by the vertx-websocket consumer and not to an externally hosted WebSocket." },
"clientOptions": { "index": 16, "kind": "parameter", "displayName": "Client Options", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "object", "javaType": "io.vertx.core.http.HttpClientOptions", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.websocket.VertxWebsocketConfiguration", "configurationField": "configuration", "description": "Sets customized options for configuring the WebSocket client used in the producer" },
"lazyStartProducer": { "index": 17, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing." },
"allowOriginHeader": { "index": 18, "kind": "parameter", "displayName": "Allow Origin Header", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.vertx.websocket.VertxWebsocketConfiguration", "configurationField": "configuration", "description": "Whether the WebSocket client should add the Origin header to the WebSocket handshake request." },
"handshakeHeaders": { "index": 19, "kind": "parameter", "displayName": "Handshake Headers", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "handshake.", "multiValue": true, "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.websocket.VertxWebsocketConfiguration", "configurationField": "configuration", "description": "Headers to send in the HTTP handshake request. When the endpoint is a consumer, it only works when it consumes a remote host as a client (i.e. consumeAsClient is true). This is a multi-value option with prefix: handshake." },
"originHeaderUrl": { "index": 20, "kind": "parameter", "displayName": "Origin Header Url", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.websocket.VertxWebsocketConfiguration", "configurationField": "configuration", "description": "The value of the Origin header that the WebSocket client should use on the WebSocket handshake request. When not specified, the WebSocket client will automatically determine the value for the Origin from the request URL." },
"sslContextParameters": { "index": 21, "kind": "parameter", "displayName": "Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.websocket.VertxWebsocketConfiguration", "configurationField": "configuration", "description": "To configure security using SSLContextParameters" }
"headerFilterStrategy": { "index": 18, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter header to and from Camel message." },
"allowOriginHeader": { "index": 19, "kind": "parameter", "displayName": "Allow Origin Header", "group": "security", "label": "security", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": true, "configurationClass": "org.apache.camel.component.vertx.websocket.VertxWebsocketConfiguration", "configurationField": "configuration", "description": "Whether the WebSocket client should add the Origin header to the WebSocket handshake request." },
"handshakeHeaders": { "index": 20, "kind": "parameter", "displayName": "Handshake Headers", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "java.util.Map<java.lang.String, java.lang.Object>", "prefix": "handshake.", "multiValue": true, "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.websocket.VertxWebsocketConfiguration", "configurationField": "configuration", "description": "Headers to send in the HTTP handshake request. When the endpoint is a consumer, it only works when it consumes a remote host as a client (i.e. consumeAsClient is true). This is a multi-value option with prefix: handshake." },
"originHeaderUrl": { "index": 21, "kind": "parameter", "displayName": "Origin Header Url", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.websocket.VertxWebsocketConfiguration", "configurationField": "configuration", "description": "The value of the Origin header that the WebSocket client should use on the WebSocket handshake request. When not specified, the WebSocket client will automatically determine the value for the Origin from the request URL." },
"sslContextParameters": { "index": 22, "kind": "parameter", "displayName": "Ssl Context Parameters", "group": "security", "label": "security", "required": false, "type": "object", "javaType": "org.apache.camel.support.jsse.SSLContextParameters", "deprecated": false, "autowired": false, "secret": false, "configurationClass": "org.apache.camel.component.vertx.websocket.VertxWebsocketConfiguration", "configurationField": "configuration", "description": "To configure security using SSLContextParameters" }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.servlet.ServletConsumer;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.atmosphere.cpr.ApplicationConfig;
import org.atmosphere.cpr.AtmosphereFramework;
import org.atmosphere.cpr.AtmosphereFrameworkInitializer;
Expand Down Expand Up @@ -103,8 +104,12 @@ public void sendEventNotification(String connectionKey, int eventType) {
exchange.getIn().setHeader(WebsocketConstants.CONNECTION_KEY, connectionKey);
exchange.getIn().setHeader(WebsocketConstants.EVENT_TYPE, eventType);

HeaderFilterStrategy headerFilterStrategy = getEndpoint().getHeaderFilterStrategy();
for (Map.Entry<String, String> param : queryMap.entrySet()) {
exchange.getIn().setHeader(param.getKey(), param.getValue());
if (headerFilterStrategy == null
|| !headerFilterStrategy.applyFilterToExternalHeaders(param.getKey(), param.getValue(), exchange)) {
exchange.getIn().setHeader(param.getKey(), param.getValue());
}
}

// use default consumer callback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public boolean configure(CamelContext camelContext, Object obj, String name, Obj
case "exceptionHandler": target.setExceptionHandler(property(camelContext, org.apache.camel.spi.ExceptionHandler.class, value)); return true;
case "exchangepattern":
case "exchangePattern": target.setExchangePattern(property(camelContext, org.apache.camel.ExchangePattern.class, value)); return true;
case "headerfilterstrategy":
case "headerFilterStrategy": target.setHeaderFilterStrategy(property(camelContext, org.apache.camel.spi.HeaderFilterStrategy.class, value)); return true;
case "host": target.getConfiguration().setHost(property(camelContext, java.lang.String.class, value)); return true;
case "lazystartproducer":
case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true;
Expand Down Expand Up @@ -99,6 +101,8 @@ public Class<?> getOptionType(String name, boolean ignoreCase) {
case "exceptionHandler": return org.apache.camel.spi.ExceptionHandler.class;
case "exchangepattern":
case "exchangePattern": return org.apache.camel.ExchangePattern.class;
case "headerfilterstrategy":
case "headerFilterStrategy": return org.apache.camel.spi.HeaderFilterStrategy.class;
case "host": return java.lang.String.class;
case "lazystartproducer":
case "lazyStartProducer": return boolean.class;
Expand Down Expand Up @@ -156,6 +160,8 @@ public Object getOptionValue(Object obj, String name, boolean ignoreCase) {
case "exceptionHandler": return target.getExceptionHandler();
case "exchangepattern":
case "exchangePattern": return target.getExchangePattern();
case "headerfilterstrategy":
case "headerFilterStrategy": return target.getHeaderFilterStrategy();
case "host": return target.getConfiguration().getHost();
case "lazystartproducer":
case "lazyStartProducer": return target.isLazyStartProducer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class IggyEndpointUriFactory extends org.apache.camel.support.component.E
private static final Set<String> SECRET_PROPERTY_NAMES;
private static final Map<String, String> MULTI_VALUE_PREFIXES;
static {
Set<String> props = new HashSet<>(28);
Set<String> props = new HashSet<>(29);
props.add("autoCommit");
props.add("autoCreateStream");
props.add("autoCreateTopic");
Expand All @@ -34,6 +34,7 @@ public class IggyEndpointUriFactory extends org.apache.camel.support.component.E
props.add("consumersCount");
props.add("exceptionHandler");
props.add("exchangePattern");
props.add("headerFilterStrategy");
props.add("host");
props.add("lazyStartProducer");
props.add("maxTopicSize");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
"exchangePattern": { "index": 24, "kind": "parameter", "displayName": "Exchange Pattern", "group": "consumer (advanced)", "label": "consumer,advanced", "required": false, "type": "enum", "javaType": "org.apache.camel.ExchangePattern", "enum": [ "InOnly", "InOut" ], "deprecated": false, "autowired": false, "secret": false, "description": "Sets the exchange pattern when the consumer creates an exchange." },
"partitioning": { "index": 25, "kind": "parameter", "displayName": "Partitioning", "group": "producer", "label": "producer", "required": false, "type": "object", "javaType": "org.apache.iggy.message.Partitioning", "deprecated": false, "autowired": false, "secret": false, "defaultValue": "balanced", "configurationClass": "org.apache.camel.component.iggy.IggyConfiguration", "configurationField": "configuration", "description": "Partitioning strategy for message distribution" },
"lazyStartProducer": { "index": 26, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel's routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing." },
"username": { "index": 27, "kind": "parameter", "displayName": "Username", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.iggy.IggyConfiguration", "configurationField": "configuration", "description": "Iggy username" }
"headerFilterStrategy": { "index": 27, "kind": "parameter", "displayName": "Header Filter Strategy", "group": "advanced", "label": "advanced", "required": false, "type": "object", "javaType": "org.apache.camel.spi.HeaderFilterStrategy", "deprecated": false, "autowired": false, "secret": false, "description": "To use a custom HeaderFilterStrategy to filter header to and from Camel message." },
"username": { "index": 28, "kind": "parameter", "displayName": "Username", "group": "security", "label": "security", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": true, "configurationClass": "org.apache.camel.component.iggy.IggyConfiguration", "configurationField": "configuration", "description": "Iggy username" }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.HeaderFilterStrategyAware;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
Expand All @@ -45,12 +47,15 @@
*/
@UriEndpoint(firstVersion = "4.17.0", scheme = "iggy", title = "Iggy", syntax = "iggy:topicName",
category = { Category.MESSAGING }, headersClass = IggyConstants.class)
public class IggyEndpoint extends DefaultEndpoint {
public class IggyEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware {

private static final Logger LOG = LoggerFactory.getLogger(IggyEndpoint.class);

@UriParam
private IggyConfiguration configuration;
@UriParam(label = "advanced",
description = "To use a custom HeaderFilterStrategy to filter header to and from Camel message.")
private HeaderFilterStrategy headerFilterStrategy;
@UriPath(description = "Name of the topic")
@Metadata(required = true)
private String topicName;
Expand Down Expand Up @@ -162,6 +167,22 @@ public ExecutorService createExecutor() {
"IggyConsumer[" + getTopicName() + "]", configuration.getConsumersCount());
}

@Override
public HeaderFilterStrategy getHeaderFilterStrategy() {
if (headerFilterStrategy == null) {
headerFilterStrategy = new IggyHeaderFilterStrategy();
}
return headerFilterStrategy;
}

/**
* To use a custom {@link org.apache.camel.spi.HeaderFilterStrategy} to filter header to and from Camel message.
*/
@Override
public void setHeaderFilterStrategy(HeaderFilterStrategy headerFilterStrategy) {
this.headerFilterStrategy = headerFilterStrategy;
}

public IggyConfiguration getConfiguration() {
return configuration;
}
Expand Down
Loading
Loading