Skip to content

Commit

Permalink
CAMEL-12160 - Camel-AWS Kinesis Firehose: Use a configuration for the…
Browse files Browse the repository at this point in the history
… options like the other AWS components
  • Loading branch information
oscerd committed Jan 19, 2018
1 parent 80964be commit f73e533
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 30 deletions.
Expand Up @@ -34,8 +34,10 @@ public KinesisFirehoseComponent(CamelContext context) {


@Override @Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
KinesisFirehoseEndpoint endpoint = new KinesisFirehoseEndpoint(uri, remaining, this); KinesisFirehoseConfiguration configuration = new KinesisFirehoseConfiguration();
setProperties(endpoint, parameters); configuration.setStreamName(remaining);
setProperties(configuration, parameters);
KinesisFirehoseEndpoint endpoint = new KinesisFirehoseEndpoint(uri, configuration, this);
return endpoint; return endpoint;
} }
} }
@@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.aws.firehose;

import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehose;

import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
import org.apache.camel.spi.UriPath;

@UriParams
public class KinesisFirehoseConfiguration {

@UriPath(description = "Name of the stream")
@Metadata(required = "true")
private String streamName;
@UriParam(description = "Amazon Kinesis Firehose client to use for all requests for this endpoint")
@Metadata(required = "true")
private AmazonKinesisFirehose amazonKinesisFirehoseClient;

public void setAmazonKinesisFirehoseClient(AmazonKinesisFirehose client) {
this.amazonKinesisFirehoseClient = client;
}

public AmazonKinesisFirehose getAmazonKinesisFirehoseClient() {
return amazonKinesisFirehoseClient;
}

public void setStreamName(String streamName) {
this.streamName = streamName;
}

public String getStreamName() {
return streamName;
}
}
Expand Up @@ -34,16 +34,12 @@
producerOnly = true, label = "cloud,messaging") producerOnly = true, label = "cloud,messaging")
public class KinesisFirehoseEndpoint extends DefaultEndpoint { public class KinesisFirehoseEndpoint extends DefaultEndpoint {


@UriPath(description = "Name of the stream") @UriParam
@Metadata(required = "true") private KinesisFirehoseConfiguration configuration;
private String streamName;
@UriParam(description = "Amazon Kinesis Firehose client to use for all requests for this endpoint")
@Metadata(required = "true")
private AmazonKinesisFirehose amazonKinesisFirehoseClient;


public KinesisFirehoseEndpoint(String uri, String streamName, KinesisFirehoseComponent component) { public KinesisFirehoseEndpoint(String uri, KinesisFirehoseConfiguration configuration, KinesisFirehoseComponent component) {
super(uri, component); super(uri, component);
this.streamName = streamName; this.configuration = configuration;
} }


@Override @Override
Expand All @@ -61,15 +57,11 @@ public boolean isSingleton() {
return true; return true;
} }


public void setAmazonKinesisFirehoseClient(AmazonKinesisFirehose client) {
this.amazonKinesisFirehoseClient = client;
}

public AmazonKinesisFirehose getClient() { public AmazonKinesisFirehose getClient() {
return amazonKinesisFirehoseClient; return configuration.getAmazonKinesisFirehoseClient();
} }


public String getStreamName() { public KinesisFirehoseConfiguration getConfiguration() {
return streamName; return configuration;
} }
} }
Expand Up @@ -54,7 +54,7 @@ private PutRecordRequest createRequest(Exchange exchange) {
record.setData(body); record.setData(body);


PutRecordRequest putRecordRequest = new PutRecordRequest(); PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setDeliveryStreamName(getEndpoint().getStreamName()); putRecordRequest.setDeliveryStreamName(getEndpoint().getConfiguration().getStreamName());
putRecordRequest.setRecord(record); putRecordRequest.setRecord(record);
return putRecordRequest; return putRecordRequest;
} }
Expand Down
Expand Up @@ -16,14 +16,13 @@
*/ */
package org.apache.camel.component.aws.kinesis; package org.apache.camel.component.aws.kinesis;


import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.ShardIteratorType;

import org.apache.camel.spi.Metadata; import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams; import org.apache.camel.spi.UriParams;
import org.apache.camel.spi.UriPath; import org.apache.camel.spi.UriPath;

import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.model.ShardIteratorType;

@UriParams @UriParams
public class KinesisConfiguration { public class KinesisConfiguration {


Expand Down
Expand Up @@ -169,6 +169,7 @@ private Queue<Exchange> createExchanges(List<Record> records) {


private boolean hasSequenceNumber() { private boolean hasSequenceNumber() {
return !getEndpoint().getConfiguration().getSequenceNumber().isEmpty() return !getEndpoint().getConfiguration().getSequenceNumber().isEmpty()
&& (getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER)); && (getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
|| getEndpoint().getConfiguration().getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER));
} }
} }
Expand Up @@ -25,10 +25,8 @@
import org.apache.camel.Processor; import org.apache.camel.Processor;
import org.apache.camel.Producer; import org.apache.camel.Producer;
import org.apache.camel.impl.ScheduledPollEndpoint; import org.apache.camel.impl.ScheduledPollEndpoint;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriPath;


/** /**
* The aws-kinesis component is for consuming and producing records from Amazon * The aws-kinesis component is for consuming and producing records from Amazon
Expand All @@ -39,15 +37,16 @@ public class KinesisEndpoint extends ScheduledPollEndpoint {


@UriParam @UriParam
private KinesisConfiguration configuration; private KinesisConfiguration configuration;

public KinesisEndpoint(String uri, KinesisConfiguration configuration, KinesisComponent component) { public KinesisEndpoint(String uri, KinesisConfiguration configuration, KinesisComponent component) {
super(uri, component); super(uri, component);
this.configuration = configuration; this.configuration = configuration;
} }


@Override @Override
protected void doStart() throws Exception { protected void doStart() throws Exception {
if ((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER)) && configuration.getSequenceNumber().isEmpty()) { if ((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER) || configuration.getIteratorType().equals(ShardIteratorType.AT_SEQUENCE_NUMBER))
&& configuration.getSequenceNumber().isEmpty()) {
throw new IllegalArgumentException("Sequence Number must be specified with iterator Types AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER"); throw new IllegalArgumentException("Sequence Number must be specified with iterator Types AFTER_SEQUENCE_NUMBER or AT_SEQUENCE_NUMBER");
} }
super.doStart(); super.doStart();
Expand Down Expand Up @@ -83,7 +82,7 @@ public boolean isSingleton() {
public AmazonKinesis getClient() { public AmazonKinesis getClient() {
return configuration.getAmazonKinesisClient(); return configuration.getAmazonKinesisClient();
} }

public KinesisConfiguration getConfiguration() { public KinesisConfiguration getConfiguration() {
return configuration; return configuration;
} }
Expand Down
Expand Up @@ -51,6 +51,6 @@ public void allEndpointParams() throws Exception {
); );


assertThat(endpoint.getClient(), is(amazonKinesisFirehoseClient)); assertThat(endpoint.getClient(), is(amazonKinesisFirehoseClient));
assertThat(endpoint.getStreamName(), is("some_stream_name")); assertThat(endpoint.getConfiguration().getStreamName(), is("some_stream_name"));
} }
} }
Expand Up @@ -48,6 +48,8 @@ public class KinesisFirehoseProducerTest {
@Mock @Mock
private KinesisFirehoseEndpoint kinesisFirehoseEndpoint; private KinesisFirehoseEndpoint kinesisFirehoseEndpoint;
@Mock @Mock
private KinesisFirehoseConfiguration kinesisFirehoseConfiguration;
@Mock
private Message inMessage; private Message inMessage;
@Mock @Mock
private Message outMessage; private Message outMessage;
Expand All @@ -61,7 +63,8 @@ public class KinesisFirehoseProducerTest {
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
when(kinesisFirehoseEndpoint.getClient()).thenReturn(kinesisFirehoseClient); when(kinesisFirehoseEndpoint.getClient()).thenReturn(kinesisFirehoseClient);
when(kinesisFirehoseEndpoint.getStreamName()).thenReturn(STREAM_NAME); when(kinesisFirehoseEndpoint.getConfiguration()).thenReturn(kinesisFirehoseConfiguration);
when(kinesisFirehoseEndpoint.getConfiguration().getStreamName()).thenReturn(STREAM_NAME);
when(exchange.getOut()).thenReturn(outMessage); when(exchange.getOut()).thenReturn(outMessage);
when(exchange.getIn()).thenReturn(inMessage); when(exchange.getIn()).thenReturn(inMessage);
when(exchange.getPattern()).thenReturn(ExchangePattern.InOut); when(exchange.getPattern()).thenReturn(ExchangePattern.InOut);
Expand Down

0 comments on commit f73e533

Please sign in to comment.