From 1ac16009ee9e5fa747624842fbb53b8bd5cfa52b Mon Sep 17 00:00:00 2001 From: gautric Date: Tue, 15 Dec 2015 12:18:16 +0100 Subject: [PATCH 1/3] add Constant header --- .../java/org/apache/camel/component/paho/PahoConstants.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java index 453e6bcecf8bd..7108a12dfed25 100644 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java @@ -30,6 +30,10 @@ public final class PahoConstants { @Deprecated public static final String HEADER_ORIGINAL_MESSAGE = "PahoOriginalMessage"; + public static final String CAMEL_PAHO = "CamelPaho"; + public static final String CAMEL_PAHO_MSG_QOS = CAMEL_PAHO + ".qos"; + public static final String CAMEL_PAHO_MSG_RETAINED = CAMEL_PAHO + ".retained"; + private PahoConstants() { } From 2ddbce57113c58af54d27e6c71d95fa1ee6206d1 Mon Sep 17 00:00:00 2001 From: gautric Date: Tue, 15 Dec 2015 12:23:37 +0100 Subject: [PATCH 2/3] CAMEL-9420 camel-paho : provide dynamic qos and rentained option --- .../camel/component/paho/PahoProducer.java | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java index 23690156c29fe..99265d235d5d1 100644 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java @@ -27,12 +27,30 @@ public PahoProducer(PahoEndpoint endpoint) { super(endpoint); } + private int retrieveQos(Exchange exchange) { + if (exchange.getIn().getHeaders().containsKey(PahoConstants.CAMEL_PAHO_MSG_QOS)) { + return exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_QOS, Integer.class); + } else { + return getEndpoint().getQos(); + } + } + + private boolean retrieveRetained(Exchange exchange) { + if (exchange.getIn().getHeaders().containsKey(PahoConstants.CAMEL_PAHO_MSG_RETAINED)) { + return exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_RETAINED, Boolean.class); + } else { + return getEndpoint().isRetained(); + } + } + @Override public void process(Exchange exchange) throws Exception { MqttClient client = getEndpoint().getClient(); String topic = getEndpoint().getTopic(); - int qos = getEndpoint().getQos(); - boolean retained = getEndpoint().isRetained(); + + int qos = retrieveQos(exchange); + boolean retained = retrieveRetained(exchange); + byte[] payload = exchange.getIn().getBody(byte[].class); MqttMessage message = new MqttMessage(payload); @@ -41,9 +59,11 @@ public void process(Exchange exchange) throws Exception { client.publish(topic, message); } + + @Override public PahoEndpoint getEndpoint() { - return (PahoEndpoint) super.getEndpoint(); + return (PahoEndpoint)super.getEndpoint(); } -} \ No newline at end of file +} From 5f46c717116499258708923ac1405b18dd1fd9e6 Mon Sep 17 00:00:00 2001 From: gautric Date: Wed, 16 Dec 2015 13:02:13 +0100 Subject: [PATCH 3/3] Corrections dot and single line --- .../camel/component/paho/PahoConstants.java | 4 ++-- .../camel/component/paho/PahoProducer.java | 20 ++----------------- 2 files changed, 4 insertions(+), 20 deletions(-) diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java index 7108a12dfed25..e7ecf902769c8 100644 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoConstants.java @@ -31,8 +31,8 @@ public final class PahoConstants { @Deprecated public static final String HEADER_ORIGINAL_MESSAGE = "PahoOriginalMessage"; public static final String CAMEL_PAHO = "CamelPaho"; - public static final String CAMEL_PAHO_MSG_QOS = CAMEL_PAHO + ".qos"; - public static final String CAMEL_PAHO_MSG_RETAINED = CAMEL_PAHO + ".retained"; + public static final String CAMEL_PAHO_MSG_QOS = CAMEL_PAHO + "Qos"; + public static final String CAMEL_PAHO_MSG_RETAINED = CAMEL_PAHO + "Retained"; private PahoConstants() { diff --git a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java index 99265d235d5d1..cdd360a3dd787 100644 --- a/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java +++ b/components/camel-paho/src/main/java/org/apache/camel/component/paho/PahoProducer.java @@ -27,29 +27,13 @@ public PahoProducer(PahoEndpoint endpoint) { super(endpoint); } - private int retrieveQos(Exchange exchange) { - if (exchange.getIn().getHeaders().containsKey(PahoConstants.CAMEL_PAHO_MSG_QOS)) { - return exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_QOS, Integer.class); - } else { - return getEndpoint().getQos(); - } - } - - private boolean retrieveRetained(Exchange exchange) { - if (exchange.getIn().getHeaders().containsKey(PahoConstants.CAMEL_PAHO_MSG_RETAINED)) { - return exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_RETAINED, Boolean.class); - } else { - return getEndpoint().isRetained(); - } - } - @Override public void process(Exchange exchange) throws Exception { MqttClient client = getEndpoint().getClient(); String topic = getEndpoint().getTopic(); - int qos = retrieveQos(exchange); - boolean retained = retrieveRetained(exchange); + int qos = exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_QOS, getEndpoint().getQos(), Integer.class); + boolean retained = exchange.getIn().getHeader(PahoConstants.CAMEL_PAHO_MSG_RETAINED, getEndpoint().isRetained(), Boolean.class); byte[] payload = exchange.getIn().getBody(byte[].class);