From eacf8d786c28d624d9383a3b3a0050a665cf2728 Mon Sep 17 00:00:00 2001 From: mans2singh Date: Sun, 9 Oct 2016 17:42:25 -0700 Subject: [PATCH 1/4] Nifi-1540 - AWS Kinesis put processor and refactoring --- .../kinesis/AbstractBaseKinesisProcessor.java | 96 ++++ .../AbstractKinesisFirehoseProcessor.java | 6 +- .../kinesis/firehose/PutKinesisFirehose.java | 31 +- .../stream/AbstractKinesisProcessor.java | 64 +++ .../aws/kinesis/stream/PutKinesis.java | 174 +++++++ .../processors/aws/kinesis/ITPutKinesis.java | 475 ++++++++++++++++++ .../aws/kinesis/TestPutKinesis.java | 82 +++ 7 files changed, 896 insertions(+), 32 deletions(-) create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisProcessor.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesis.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/ITPutKinesis.java create mode 100644 nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/TestPutKinesis.java diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java new file mode 100644 index 000000000000..0f2cf412624e --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; + +import com.amazonaws.AmazonWebServiceClient; + +/** + * This class provides processor the base class for kinesis client + */ +public abstract class AbstractBaseKinesisProcessor + extends AbstractAWSCredentialsProviderProcessor { + + /** + * Kinesis put record response error message + */ + public static final String AWS_KINESIS_ERROR_MESSAGE = "aws.kinesis.error.message"; + + public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() + .name("Batch Size") + .description("Batch size for messages (1-500).") + .defaultValue("250") + .required(false) + .addValidator(StandardValidators.createLongValidator(1, 500, true)) + .sensitive(false) + .build(); + + public static final PropertyDescriptor MAX_MESSAGE_BUFFER_SIZE_MB = new PropertyDescriptor.Builder() + .name("Max message buffer size") + .description("Max message buffer") + .defaultValue("1 MB") + .required(false) + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .sensitive(false) + .build(); + + /** + * Max buffer size 1 MB + */ + public static final int MAX_MESSAGE_SIZE = 1000 * 1024; + + protected FlowFile handleFlowFileTooBig(final ProcessSession session, FlowFile flowFileCandidate, + final String streamName, String message) { + flowFileCandidate = session.putAttribute(flowFileCandidate, message, + "record too big " + flowFileCandidate.getSize() + " max allowed " + MAX_MESSAGE_SIZE ); + session.transfer(flowFileCandidate, REL_FAILURE); + getLogger().error("Failed to publish to kinesis {} records {} because the size was greater than {} bytes", + new Object[]{streamName, flowFileCandidate, MAX_MESSAGE_SIZE}); + return flowFileCandidate; + } + + protected List filterMessagesByMaxSize(final ProcessSession session, final int batchSize, final long maxBufferSizeBytes, final String streamName, String message) { + List flowFiles = new ArrayList(batchSize); + + long currentBufferSizeBytes = 0; + + for (int i = 0; (i < batchSize) && (currentBufferSizeBytes <= maxBufferSizeBytes); i++) { + + FlowFile flowFileCandidate = session.get(); + if ( flowFileCandidate == null ) + break; + + if (flowFileCandidate.getSize() > MAX_MESSAGE_SIZE) { + flowFileCandidate = handleFlowFileTooBig(session, flowFileCandidate, streamName, message); + continue; + } + + currentBufferSizeBytes += flowFileCandidate.getSize(); + + flowFiles.add(flowFileCandidate); + } + return flowFiles; + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java index ddc6e6c15ae0..ca15653c4f63 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/AbstractKinesisFirehoseProcessor.java @@ -19,7 +19,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; +import org.apache.nifi.processors.aws.kinesis.AbstractBaseKinesisProcessor; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentials; @@ -29,7 +29,7 @@ /** * This class provides processor the base class for kinesis firehose */ -public abstract class AbstractKinesisFirehoseProcessor extends AbstractAWSCredentialsProviderProcessor { +public abstract class AbstractKinesisFirehoseProcessor extends AbstractBaseKinesisProcessor { public static final PropertyDescriptor KINESIS_FIREHOSE_DELIVERY_STREAM_NAME = new PropertyDescriptor.Builder() .name("Amazon Kinesis Firehose Delivery Stream Name") @@ -68,7 +68,7 @@ protected AmazonKinesisFirehoseClient createClient(final ProcessContext context, } /** - * Create client using AWSCredentials + * Create client using AWSCredentails * * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead */ diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java index f5c3b9f19e7d..8abc9650dfed 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java @@ -91,25 +91,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue(); final String firehoseStreamName = context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).getValue(); - List flowFiles = new ArrayList(batchSize); - - long currentBufferSizeBytes = 0; - - for (int i = 0; (i < batchSize) && (currentBufferSizeBytes <= maxBufferSizeBytes); i++) { - - FlowFile flowFileCandidate = session.get(); - if ( flowFileCandidate == null ) - break; - - if (flowFileCandidate.getSize() > MAX_MESSAGE_SIZE) { - flowFileCandidate = handleFlowFileTooBig(session, flowFileCandidate, firehoseStreamName); - continue; - } - - currentBufferSizeBytes += flowFileCandidate.getSize(); - - flowFiles.add(flowFileCandidate); - } + List flowFiles = filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, firehoseStreamName, + AWS_KINESIS_FIREHOSE_ERROR_MESSAGE); final AmazonKinesisFirehoseClient client = getClient(); @@ -172,14 +155,4 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } } - protected FlowFile handleFlowFileTooBig(final ProcessSession session, FlowFile flowFileCandidate, - final String firehoseStreamName) { - flowFileCandidate = session.putAttribute(flowFileCandidate, AWS_KINESIS_FIREHOSE_ERROR_MESSAGE, - "record too big " + flowFileCandidate.getSize() + " max allowed " + MAX_MESSAGE_SIZE ); - session.transfer(flowFileCandidate, REL_FAILURE); - getLogger().error("Failed to publish to kinesis firehose {} records {} because the size was greater than {} bytes", - new Object[]{firehoseStreamName, flowFileCandidate, MAX_MESSAGE_SIZE}); - return flowFileCandidate; - } - } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisProcessor.java new file mode 100644 index 000000000000..5cfffeb4ec72 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisProcessor.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.stream; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.kinesis.AbstractBaseKinesisProcessor; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.kinesis.AmazonKinesisClient; + +/** + * This class provides processor the base class for kinesis client + */ +public abstract class AbstractKinesisProcessor extends AbstractBaseKinesisProcessor { + + public static final PropertyDescriptor KINESIS_STREAM_NAME = new PropertyDescriptor.Builder() + .name("Amazon Kinesis Stream Name") + .description("The name of kinesis stream") + .expressionLanguageSupported(false) + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + /** + * Create client using aws credentials provider. This is the preferred way for creating clients + */ + @Override + protected AmazonKinesisClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) { + getLogger().info("Creating client using aws credentials provider"); + + return new AmazonKinesisClient(credentialsProvider, config); + } + + /** + * Create client using AWSCredentails + * + * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead + */ + @Override + protected AmazonKinesisClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { + getLogger().info("Creating client using aws credentials"); + + return new AmazonKinesisClient(credentials, config); + } + +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesis.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesis.java new file mode 100644 index 000000000000..710d7245db2c --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesis.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis.stream; + +import java.io.ByteArrayOutputStream; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.util.StandardValidators; + +import com.amazonaws.services.kinesis.AmazonKinesisClient; +import com.amazonaws.services.kinesis.model.PutRecordsRequest; +import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry; +import com.amazonaws.services.kinesis.model.PutRecordsResult; +import com.amazonaws.services.kinesis.model.PutRecordsResultEntry; + +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"amazon", "aws", "kinesis", "put", "stream"}) +@CapabilityDescription("Sends the contents to a specified Amazon Kinesis. " + + "In order to send data to Kinesis, the stream name has to be specified.") +@WritesAttributes({ + @WritesAttribute(attribute = "aws.kinesis.error.message", description = "Error message on posting message to AWS Kinesis"), + @WritesAttribute(attribute = "aws.kinesis.error.code", description = "Error code for the message when posting to AWS Kinesis"), + @WritesAttribute(attribute = "aws.kinesis.sequence.number", description = "Sequence number for the message when posting to AWS Kinesis"), + @WritesAttribute(attribute = "aws.kinesis.shard.id", description = "Shard id of the message posted to AWS Kinesis")}) +public class PutKinesis extends AbstractKinesisProcessor { + + /** + * Kinesis put record response error code + */ + public static final String AWS_KINESIS_ERROR_CODE = "aws.kinesis.error.code"; + + public static final String AWS_KINESIS_SHARD_ID = "aws.kinesis.shard.id"; + + public static final String AWS_KINESIS_SEQUENCE_NUMBER = "aws.kinesis.sequence.number"; + + public static final PropertyDescriptor KINESIS_PARTITION_KEY = new PropertyDescriptor.Builder() + .displayName("Amazon Kinesis Stream Partition Key") + .name("amazon-kinesis-stream-partition-key") + .description("The partition key attribute. If it is not set, a random value is used") + .expressionLanguageSupported(true) + .defaultValue("${kinesis.partition.key}") + .required(false) + .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR).build(); + + public static final List properties = Collections.unmodifiableList( + Arrays.asList(KINESIS_STREAM_NAME, KINESIS_PARTITION_KEY, BATCH_SIZE, MAX_MESSAGE_BUFFER_SIZE_MB, REGION, ACCESS_KEY, SECRET_KEY, CREDENTIALS_FILE, + AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, PROXY_HOST,PROXY_HOST_PORT)); + + /** A random number generator for cases where partition key is not available */ + protected Random randomParitionKeyGenerator = new Random(); + + @Override + protected List getSupportedPropertyDescriptors() { + return properties; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + + final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); + final long maxBufferSizeBytes = context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue(); + final String streamName = context.getProperty(KINESIS_STREAM_NAME).getValue(); + + List flowFiles = filterMessagesByMaxSize(session, batchSize, maxBufferSizeBytes, streamName, + AWS_KINESIS_ERROR_MESSAGE); + + final AmazonKinesisClient client = getClient(); + + try { + List records = new ArrayList<>(); + + List failedFlowFiles = new ArrayList<>(); + List successfulFlowFiles = new ArrayList<>(); + + // Prepare batch of records + for (int i = 0; i < flowFiles.size(); i++) { + FlowFile flowFile = flowFiles.get(i); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + session.exportTo(flowFile, baos); + PutRecordsRequestEntry record = new PutRecordsRequestEntry().withData(ByteBuffer.wrap(baos.toByteArray())); + + String partitionKey = context.getProperty(PutKinesis.KINESIS_PARTITION_KEY) + .evaluateAttributeExpressions(flowFiles.get(i)).getValue(); + + if ( ! StringUtils.isBlank(partitionKey) ) { + record.setPartitionKey(partitionKey); + } else { + record.setPartitionKey(Integer.toString(randomParitionKeyGenerator.nextInt())); + } + + records.add(record); + } + + if ( records.size() > 0 ) { + + PutRecordsRequest putRecordRequest = new PutRecordsRequest(); + putRecordRequest.setStreamName(streamName); + putRecordRequest.setRecords(records); + PutRecordsResult results = client.putRecords(putRecordRequest); + + List responseEntries = results.getRecords(); + for (int i = 0; i < responseEntries.size(); i++ ) { + PutRecordsResultEntry entry = responseEntries.get(i); + FlowFile flowFile = flowFiles.get(i); + + Map attributes = new HashMap<>(); + attributes.put(AWS_KINESIS_SHARD_ID, entry.getShardId()); + attributes.put(AWS_KINESIS_SEQUENCE_NUMBER, entry.getSequenceNumber()); + + if ( ! StringUtils.isBlank(entry.getErrorCode()) ) { + attributes.put(AWS_KINESIS_ERROR_CODE, entry.getErrorCode()); + attributes.put(AWS_KINESIS_ERROR_MESSAGE, entry.getErrorMessage()); + flowFile = session.putAllAttributes(flowFile, attributes); + failedFlowFiles.add(flowFile); + } else { + flowFile = session.putAllAttributes(flowFile, attributes); + successfulFlowFiles.add(flowFile); + } + } + if ( failedFlowFiles.size() > 0 ) { + session.transfer(failedFlowFiles, REL_FAILURE); + getLogger().error("Failed to publish to kinesis {} records {}", new Object[]{streamName, failedFlowFiles}); + } + if ( successfulFlowFiles.size() > 0 ) { + session.transfer(successfulFlowFiles, REL_SUCCESS); + getLogger().info("Successfully published to kinesis {} records {}", new Object[]{streamName, successfulFlowFiles}); + } + records.clear(); + } + + } catch (final Exception exception) { + getLogger().error("Failed to publish to kinesis {} with exception {}", new Object[]{flowFiles, exception}); + session.transfer(flowFiles, REL_FAILURE); + context.yield(); + } + } + +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/ITPutKinesis.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/ITPutKinesis.java new file mode 100644 index 000000000000..8aee9da8bbd8 --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/ITPutKinesis.java @@ -0,0 +1,475 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.processors.aws.kinesis.stream.PutKinesis; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * This test contains both unit and integration test (integration tests are ignored by default) + */ +public class ITPutKinesis { + + private TestRunner runner; + protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + + @Before + public void setUp() throws Exception { + runner = TestRunners.newTestRunner(PutKinesis.class); + runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); + runner.assertValid(); + } + + @After + public void tearDown() throws Exception { + runner = null; + } + + /** + * Comment out ignore for integration tests (requires creds files) + */ + @Test + public void testIntegrationSuccess() throws Exception { + runner = TestRunners.newTestRunner(PutKinesis.class); + runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); + runner.assertValid(); + + runner.enqueue("test".getBytes()); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 1); + + final List ffs = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + final MockFlowFile out = ffs.iterator().next(); + + out.assertContentEquals("test".getBytes()); + } + + @Test + public void testIntegrationWithFixedPartitionSuccess() throws Exception { + runner = TestRunners.newTestRunner(PutKinesis.class); + runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); + runner.setProperty(PutKinesis.KINESIS_PARTITION_KEY, "pfixed"); + runner.assertValid(); + + runner.enqueue("test".getBytes()); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 1); + + final List ffs = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + final MockFlowFile out = ffs.iterator().next(); + + out.assertContentEquals("test".getBytes()); + } + + @Test + public void testIntegrationWithDynamicPartitionSuccess() throws Exception { + runner = TestRunners.newTestRunner(PutKinesis.class); + runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); + runner.setProperty(PutKinesis.KINESIS_PARTITION_KEY, "${parition}"); + runner.assertValid(); + Map properties = new HashMap<>(); + properties.put("partition", "px"); + runner.enqueue("test".getBytes(), properties); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 1); + + final List ffs = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + final MockFlowFile out = ffs.iterator().next(); + + out.assertContentEquals("test".getBytes()); + } + + /** + * Comment out ignore for integration tests (requires creds files) + */ + @Test + public void testIntegrationFailedBadStreamName() throws Exception { + runner = TestRunners.newTestRunner(PutKinesis.class); + runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "bad-kstream"); + runner.assertValid(); + + runner.enqueue("test".getBytes()); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesis.REL_FAILURE, 1); + + } + + @Test + public void testOneMessageWithMaxBufferSizeGreaterThan1MBOneSuccess() { + runner = TestRunners.newTestRunner(PutKinesis.class); + runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutKinesis.BATCH_SIZE, "2"); + runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue(bytes); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + assertEquals(1,flowFiles.size()); + } + + @Test + public void testOneMessageWithMaxBufferSizeGreaterThan1MBOneWithParameterPartitionSuccess() { + runner = TestRunners.newTestRunner(PutKinesis.class); + runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutKinesis.BATCH_SIZE, "2"); + runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); + runner.setProperty(PutKinesis.KINESIS_PARTITION_KEY, "${partitionKey}"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + Map props = new HashMap<>(); + props.put("partitionKey", "p1"); + runner.enqueue(bytes,props); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + assertEquals(1,flowFiles.size()); + } + + @Test + public void testTwoMessageBatch5WithMaxBufferSize1MBRunOnceTwoMessageSent() { + runner = TestRunners.newTestRunner(PutKinesis.class); + runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutKinesis.BATCH_SIZE, "5"); + runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "2 MB"); + runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue(bytes); + runner.enqueue(bytes.clone()); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 2); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + assertEquals(2,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + } + } + + @Test + public void testThreeMessageWithBatch10MaxBufferSize1MBTRunOnceTwoMessageSent() { + runner = TestRunners.newTestRunner(PutKinesis.class); + runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutKinesis.BATCH_SIZE, "10"); + runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue(bytes); + runner.enqueue(bytes.clone()); + runner.enqueue(bytes.clone()); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 2); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + assertEquals(2,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + } + } + + @Test + public void testTwoMessageWithBatch10MaxBufferSize1MBTRunOnceTwoMessageSent() { + runner = TestRunners.newTestRunner(PutKinesis.class); + runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutKinesis.BATCH_SIZE, "10"); + runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue(bytes); + runner.enqueue(bytes.clone()); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 2); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + assertEquals(2,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + } + } + + @Test + public void testThreeMessageWithBatch2MaxBufferSize1MBRunTwiceThreeMessageSent() { + runner = TestRunners.newTestRunner(PutKinesis.class); + runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutKinesis.BATCH_SIZE, "2"); + runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue(bytes); + runner.enqueue(bytes.clone()); + runner.enqueue(bytes.clone()); + runner.run(2, true, true); + + runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 3); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + assertEquals(3,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + } + } + + @Test + public void testThreeMessageHello2MBThereWithBatch10MaxBufferSize1MBRunOnceTwoMessageSuccessOneFailed() { + runner = TestRunners.newTestRunner(PutKinesis.class); + runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutKinesis.BATCH_SIZE, "10"); + runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE * 2)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue("hello".getBytes()); + runner.enqueue(bytes); + runner.enqueue("there".getBytes()); + runner.run(1, true, true); + + List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + assertEquals(2,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + } + + List flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesis.REL_FAILURE); + assertEquals(1,flowFilesFailed.size()); + for (MockFlowFile flowFileFailed : flowFilesFailed) { + assertNotNull(flowFileFailed.getAttribute(PutKinesis.AWS_KINESIS_ERROR_MESSAGE)); + } + } + + @Test + public void testTwoMessageHello2MBWithBatch10MaxBufferSize1MBRunOnceOneSuccessOneFailed() throws Exception { + runner = TestRunners.newTestRunner(PutKinesis.class); + runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutKinesis.BATCH_SIZE, "10"); + runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE * 2)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue("hello".getBytes()); + runner.enqueue(bytes); + runner.run(1, true, true); + + List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + assertEquals(1,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + flowFile.assertContentEquals("hello".getBytes()); + } + + List flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesis.REL_FAILURE); + assertEquals(1,flowFilesFailed.size()); + for (MockFlowFile flowFileFailed : flowFilesFailed) { + assertNotNull(flowFileFailed.getAttribute(PutKinesis.AWS_KINESIS_ERROR_MESSAGE)); + } + } + + @Test + public void testTwoMessage2MBHelloWorldWithBatch10MaxBufferSize1MBRunOnceTwoMessageSent() throws Exception { + runner = TestRunners.newTestRunner(PutKinesis.class); + runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutKinesis.BATCH_SIZE, "10"); + runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE * 2)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue(bytes); + runner.enqueue("HelloWorld".getBytes()); + runner.run(1, true, true); + + List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + assertEquals(1,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + flowFile.assertContentEquals("HelloWorld".getBytes()); + } + + List flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesis.REL_FAILURE); + assertEquals(1,flowFilesFailed.size()); + for (MockFlowFile flowFileFailed : flowFilesFailed) { + assertNotNull(flowFileFailed.getAttribute(PutKinesis.AWS_KINESIS_ERROR_MESSAGE)); + } + } + + @Test + public void testTwoMessageHelloWorldWithBatch10MaxBufferSize1MBRunOnceTwoMessageSent() throws Exception { + runner = TestRunners.newTestRunner(PutKinesis.class); + runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutKinesis.BATCH_SIZE, "10"); + runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); + runner.assertValid(); + runner.enqueue("Hello".getBytes()); + runner.enqueue("World".getBytes()); + runner.run(1, true, true); + + List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + assertEquals(2,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + } + flowFiles.get(0).assertContentEquals("Hello".getBytes()); + flowFiles.get(1).assertContentEquals("World".getBytes()); + + List flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesis.REL_FAILURE); + assertEquals(0,flowFilesFailed.size()); + } + + @Test + public void testThreeMessageWithBatch5MaxBufferSize1MBRunOnceTwoMessageSent() { + runner = TestRunners.newTestRunner(PutKinesis.class); + runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutKinesis.BATCH_SIZE, "5"); + runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue(bytes); + runner.enqueue(bytes.clone()); + runner.enqueue(bytes.clone()); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 2); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + assertEquals(2,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + } + } + + @Test + public void test5MessageWithBatch10MaxBufferSize10MBRunOnce5MessageSent() { + runner = TestRunners.newTestRunner(PutKinesis.class); + runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutKinesis.BATCH_SIZE, "10"); + runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); + runner.assertValid(); + byte [] bytes = new byte[10]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue(bytes); + runner.enqueue(bytes.clone()); + runner.enqueue(bytes.clone()); + runner.enqueue(bytes); + runner.enqueue(bytes.clone()); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 5); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + assertEquals(5,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + } + } + + @Test + public void test5MessageWithBatch2MaxBufferSize10MBRunOnce2MessageSent() { + runner = TestRunners.newTestRunner(PutKinesis.class); + runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutKinesis.BATCH_SIZE, "2"); + runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); + runner.assertValid(); + byte [] bytes = new byte[10]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue(bytes); + runner.enqueue(bytes.clone()); + runner.enqueue(bytes.clone()); + runner.enqueue(bytes); + runner.enqueue(bytes.clone()); + runner.run(1, true, true); + + runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 2); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + assertEquals(2,flowFiles.size()); + for (MockFlowFile flowFile : flowFiles) { + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + } + } +} diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/TestPutKinesis.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/TestPutKinesis.java new file mode 100644 index 000000000000..19425f6a6d5d --- /dev/null +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/TestPutKinesis.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.kinesis; + +import static org.junit.Assert.assertNotNull; + +import java.util.List; + +import org.apache.nifi.processors.aws.kinesis.stream.PutKinesis; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestPutKinesis { + private TestRunner runner; + protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; + + @Before + public void setUp() throws Exception { + runner = TestRunners.newTestRunner(PutKinesis.class); + runner.setProperty(PutKinesis.ACCESS_KEY, "abcd"); + runner.setProperty(PutKinesis.SECRET_KEY, "secret key"); + runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); + runner.assertValid(); + } + + @After + public void tearDown() throws Exception { + runner = null; + } + + @Test + public void testCustomValidateBatchSize1Valid() { + runner.setProperty(PutKinesis.BATCH_SIZE, "1"); + runner.assertValid(); + } + + @Test + public void testCustomValidateBatchSize500Valid() { + runner.setProperty(PutKinesis.BATCH_SIZE, "500"); + runner.assertValid(); + } + @Test + public void testCustomValidateBatchSize501InValid() { + runner.setProperty(PutKinesis.BATCH_SIZE, "501"); + runner.assertNotValid(); + } + + @Test + public void testWithSizeGreaterThan1MB() { + runner.setProperty(PutKinesis.BATCH_SIZE, "1"); + runner.assertValid(); + byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE + 1)]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = 'a'; + } + runner.enqueue(bytes); + runner.run(1); + + runner.assertAllFlowFilesTransferred(PutKinesis.REL_FAILURE, 1); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_FAILURE); + + assertNotNull(flowFiles.get(0).getAttribute(PutKinesis.AWS_KINESIS_ERROR_MESSAGE)); + } +} From bbbb0447e23f00a826034f7dccdb86ca032dc4ac Mon Sep 17 00:00:00 2001 From: mans2singh Date: Sun, 9 Oct 2016 17:55:04 -0700 Subject: [PATCH 2/4] Nifi-1540 - Added Kinesis put processor to services --- .../META-INF/services/org.apache.nifi.processor.Processor | 1 + 1 file changed, 1 insertion(+) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index d8c18fc60d5c..c6a06fba65f7 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -25,3 +25,4 @@ org.apache.nifi.processors.aws.kinesis.firehose.PutKinesisFirehose org.apache.nifi.processors.aws.dynamodb.GetDynamoDB org.apache.nifi.processors.aws.dynamodb.PutDynamoDB org.apache.nifi.processors.aws.dynamodb.DeleteDynamoDB +org.apache.nifi.processors.aws.kinesis.stream.PutKinesis From 43ed646cf77456db739c051c7c53501d2f0982ec Mon Sep 17 00:00:00 2001 From: mans2singh Date: Tue, 11 Oct 2016 09:30:27 -0700 Subject: [PATCH 3/4] Nifi-1540 - Updated name, displayname, comments on provisioning for IT, moved tests to stream package, based on comments --- .../kinesis/AbstractBaseKinesisProcessor.java | 8 ++- .../stream/AbstractKinesisProcessor.java | 5 +- .../aws/kinesis/stream/PutKinesis.java | 4 +- .../kinesis/{ => stream}/ITPutKinesis.java | 57 ++----------------- .../kinesis/{ => stream}/TestPutKinesis.java | 2 +- 5 files changed, 17 insertions(+), 59 deletions(-) rename nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/{ => stream}/ITPutKinesis.java (83%) rename nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/{ => stream}/TestPutKinesis.java (98%) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java index 0f2cf412624e..e5598200fabe 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/AbstractBaseKinesisProcessor.java @@ -39,7 +39,8 @@ public abstract class AbstractBaseKinesisProcessor { public static final PropertyDescriptor KINESIS_STREAM_NAME = new PropertyDescriptor.Builder() - .name("Amazon Kinesis Stream Name") - .description("The name of kinesis stream") + .name("kinesis-stream-name") + .displayName("Amazon Kinesis Stream Name") + .description("The name of Kinesis Stream") .expressionLanguageSupported(false) .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesis.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesis.java index 710d7245db2c..6575a0cb2df9 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesis.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesis.java @@ -159,13 +159,13 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } if ( successfulFlowFiles.size() > 0 ) { session.transfer(successfulFlowFiles, REL_SUCCESS); - getLogger().info("Successfully published to kinesis {} records {}", new Object[]{streamName, successfulFlowFiles}); + getLogger().debug("Successfully published to kinesis {} records {}", new Object[]{streamName, successfulFlowFiles}); } records.clear(); } } catch (final Exception exception) { - getLogger().error("Failed to publish to kinesis {} with exception {}", new Object[]{flowFiles, exception}); + getLogger().error("Failed to publish to kinesis {} flowfiles {} with exception {}", new Object[]{streamName, flowFiles, exception}); session.transfer(flowFiles, REL_FAILURE); context.yield(); } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/ITPutKinesis.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesis.java similarity index 83% rename from nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/ITPutKinesis.java rename to nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesis.java index 8aee9da8bbd8..73f02a22425a 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/ITPutKinesis.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesis.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.aws.kinesis; +package org.apache.nifi.processors.aws.kinesis.stream; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -32,7 +32,10 @@ import org.junit.Test; /** - * This test contains both unit and integration test (integration tests are ignored by default) + * This test contains both unit and integration test (integration tests are ignored by default). + * Running integration tests may result in failures due to provisioned capacity of Kinesis stream based on number of shards. + * The following integration tests run successfully with 10 shards. If increasing shards is not a possiblity, please reduce the size and + * number of messages in the integration tests based AWS Kinesis provisioning pages calculations. */ public class ITPutKinesis { @@ -43,6 +46,7 @@ public class ITPutKinesis { public void setUp() throws Exception { runner = TestRunners.newTestRunner(PutKinesis.class); runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); + runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.assertValid(); } @@ -56,9 +60,7 @@ public void tearDown() throws Exception { */ @Test public void testIntegrationSuccess() throws Exception { - runner = TestRunners.newTestRunner(PutKinesis.class); runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); runner.assertValid(); runner.enqueue("test".getBytes()); @@ -74,9 +76,6 @@ public void testIntegrationSuccess() throws Exception { @Test public void testIntegrationWithFixedPartitionSuccess() throws Exception { - runner = TestRunners.newTestRunner(PutKinesis.class); - runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); runner.setProperty(PutKinesis.KINESIS_PARTITION_KEY, "pfixed"); runner.assertValid(); @@ -93,9 +92,6 @@ public void testIntegrationWithFixedPartitionSuccess() throws Exception { @Test public void testIntegrationWithDynamicPartitionSuccess() throws Exception { - runner = TestRunners.newTestRunner(PutKinesis.class); - runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); - runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); runner.setProperty(PutKinesis.KINESIS_PARTITION_KEY, "${parition}"); runner.assertValid(); Map properties = new HashMap<>(); @@ -116,8 +112,6 @@ public void testIntegrationWithDynamicPartitionSuccess() throws Exception { */ @Test public void testIntegrationFailedBadStreamName() throws Exception { - runner = TestRunners.newTestRunner(PutKinesis.class); - runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "bad-kstream"); runner.assertValid(); @@ -130,11 +124,8 @@ public void testIntegrationFailedBadStreamName() throws Exception { @Test public void testOneMessageWithMaxBufferSizeGreaterThan1MBOneSuccess() { - runner = TestRunners.newTestRunner(PutKinesis.class); - runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.setProperty(PutKinesis.BATCH_SIZE, "2"); runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); - runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); runner.assertValid(); byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; for (int i = 0; i < bytes.length; i++) { @@ -150,11 +141,8 @@ public void testOneMessageWithMaxBufferSizeGreaterThan1MBOneSuccess() { @Test public void testOneMessageWithMaxBufferSizeGreaterThan1MBOneWithParameterPartitionSuccess() { - runner = TestRunners.newTestRunner(PutKinesis.class); - runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.setProperty(PutKinesis.BATCH_SIZE, "2"); runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); - runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); runner.setProperty(PutKinesis.KINESIS_PARTITION_KEY, "${partitionKey}"); runner.assertValid(); byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; @@ -173,11 +161,8 @@ public void testOneMessageWithMaxBufferSizeGreaterThan1MBOneWithParameterPartiti @Test public void testTwoMessageBatch5WithMaxBufferSize1MBRunOnceTwoMessageSent() { - runner = TestRunners.newTestRunner(PutKinesis.class); - runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.setProperty(PutKinesis.BATCH_SIZE, "5"); runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "2 MB"); - runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); runner.assertValid(); byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; for (int i = 0; i < bytes.length; i++) { @@ -198,11 +183,8 @@ public void testTwoMessageBatch5WithMaxBufferSize1MBRunOnceTwoMessageSent() { @Test public void testThreeMessageWithBatch10MaxBufferSize1MBTRunOnceTwoMessageSent() { - runner = TestRunners.newTestRunner(PutKinesis.class); - runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.setProperty(PutKinesis.BATCH_SIZE, "10"); runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); - runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); runner.assertValid(); byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; for (int i = 0; i < bytes.length; i++) { @@ -224,11 +206,8 @@ public void testThreeMessageWithBatch10MaxBufferSize1MBTRunOnceTwoMessageSent() @Test public void testTwoMessageWithBatch10MaxBufferSize1MBTRunOnceTwoMessageSent() { - runner = TestRunners.newTestRunner(PutKinesis.class); - runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.setProperty(PutKinesis.BATCH_SIZE, "10"); runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); - runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); runner.assertValid(); byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; for (int i = 0; i < bytes.length; i++) { @@ -249,11 +228,8 @@ public void testTwoMessageWithBatch10MaxBufferSize1MBTRunOnceTwoMessageSent() { @Test public void testThreeMessageWithBatch2MaxBufferSize1MBRunTwiceThreeMessageSent() { - runner = TestRunners.newTestRunner(PutKinesis.class); - runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.setProperty(PutKinesis.BATCH_SIZE, "2"); runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); - runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); runner.assertValid(); byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; for (int i = 0; i < bytes.length; i++) { @@ -275,11 +251,8 @@ public void testThreeMessageWithBatch2MaxBufferSize1MBRunTwiceThreeMessageSent() @Test public void testThreeMessageHello2MBThereWithBatch10MaxBufferSize1MBRunOnceTwoMessageSuccessOneFailed() { - runner = TestRunners.newTestRunner(PutKinesis.class); - runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.setProperty(PutKinesis.BATCH_SIZE, "10"); runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); - runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); runner.assertValid(); byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE * 2)]; for (int i = 0; i < bytes.length; i++) { @@ -306,11 +279,8 @@ public void testThreeMessageHello2MBThereWithBatch10MaxBufferSize1MBRunOnceTwoMe @Test public void testTwoMessageHello2MBWithBatch10MaxBufferSize1MBRunOnceOneSuccessOneFailed() throws Exception { - runner = TestRunners.newTestRunner(PutKinesis.class); - runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.setProperty(PutKinesis.BATCH_SIZE, "10"); runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); - runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); runner.assertValid(); byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE * 2)]; for (int i = 0; i < bytes.length; i++) { @@ -337,11 +307,8 @@ public void testTwoMessageHello2MBWithBatch10MaxBufferSize1MBRunOnceOneSuccessOn @Test public void testTwoMessage2MBHelloWorldWithBatch10MaxBufferSize1MBRunOnceTwoMessageSent() throws Exception { - runner = TestRunners.newTestRunner(PutKinesis.class); - runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.setProperty(PutKinesis.BATCH_SIZE, "10"); runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); - runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); runner.assertValid(); byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE * 2)]; for (int i = 0; i < bytes.length; i++) { @@ -368,11 +335,8 @@ public void testTwoMessage2MBHelloWorldWithBatch10MaxBufferSize1MBRunOnceTwoMess @Test public void testTwoMessageHelloWorldWithBatch10MaxBufferSize1MBRunOnceTwoMessageSent() throws Exception { - runner = TestRunners.newTestRunner(PutKinesis.class); - runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.setProperty(PutKinesis.BATCH_SIZE, "10"); runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); - runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); runner.assertValid(); runner.enqueue("Hello".getBytes()); runner.enqueue("World".getBytes()); @@ -393,11 +357,8 @@ public void testTwoMessageHelloWorldWithBatch10MaxBufferSize1MBRunOnceTwoMessage @Test public void testThreeMessageWithBatch5MaxBufferSize1MBRunOnceTwoMessageSent() { - runner = TestRunners.newTestRunner(PutKinesis.class); - runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.setProperty(PutKinesis.BATCH_SIZE, "5"); runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); - runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); runner.assertValid(); byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; for (int i = 0; i < bytes.length; i++) { @@ -419,11 +380,8 @@ public void testThreeMessageWithBatch5MaxBufferSize1MBRunOnceTwoMessageSent() { @Test public void test5MessageWithBatch10MaxBufferSize10MBRunOnce5MessageSent() { - runner = TestRunners.newTestRunner(PutKinesis.class); - runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.setProperty(PutKinesis.BATCH_SIZE, "10"); runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); - runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); runner.assertValid(); byte [] bytes = new byte[10]; for (int i = 0; i < bytes.length; i++) { @@ -447,11 +405,8 @@ public void test5MessageWithBatch10MaxBufferSize10MBRunOnce5MessageSent() { @Test public void test5MessageWithBatch2MaxBufferSize10MBRunOnce2MessageSent() { - runner = TestRunners.newTestRunner(PutKinesis.class); - runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.setProperty(PutKinesis.BATCH_SIZE, "2"); runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); - runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); runner.assertValid(); byte [] bytes = new byte[10]; for (int i = 0; i < bytes.length; i++) { diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/TestPutKinesis.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestPutKinesis.java similarity index 98% rename from nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/TestPutKinesis.java rename to nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestPutKinesis.java index 19425f6a6d5d..2d6b2fd28601 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/TestPutKinesis.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestPutKinesis.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.processors.aws.kinesis; +package org.apache.nifi.processors.aws.kinesis.stream; import static org.junit.Assert.assertNotNull; From b5973e4db9e9a6ed72f428a2e92b24511c1cf6fc Mon Sep 17 00:00:00 2001 From: mans2singh Date: Tue, 11 Oct 2016 12:15:35 -0700 Subject: [PATCH 4/4] Nifi-1540 - Changed name to PutKinesisStream, changed logging message to use exception first --- ...va => AbstractKinesisStreamProcessor.java} | 2 +- ...{PutKinesis.java => PutKinesisStream.java} | 6 +- .../org.apache.nifi.processor.Processor | 2 +- ...utKinesis.java => ITPutKinesisStream.java} | 208 +++++++++--------- ...Kinesis.java => TestPutKinesisStream.java} | 28 +-- 5 files changed, 123 insertions(+), 123 deletions(-) rename nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/{AbstractKinesisProcessor.java => AbstractKinesisStreamProcessor.java} (95%) rename nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/{PutKinesis.java => PutKinesisStream.java} (97%) rename nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/{ITPutKinesis.java => ITPutKinesisStream.java} (61%) rename nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/{TestPutKinesis.java => TestPutKinesisStream.java} (68%) diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisProcessor.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisStreamProcessor.java similarity index 95% rename from nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisProcessor.java rename to nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisStreamProcessor.java index 16f7666111d9..b7513af328ff 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisProcessor.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/AbstractKinesisStreamProcessor.java @@ -29,7 +29,7 @@ /** * This class provides processor the base class for kinesis client */ -public abstract class AbstractKinesisProcessor extends AbstractBaseKinesisProcessor { +public abstract class AbstractKinesisStreamProcessor extends AbstractBaseKinesisProcessor { public static final PropertyDescriptor KINESIS_STREAM_NAME = new PropertyDescriptor.Builder() .name("kinesis-stream-name") diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesis.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java similarity index 97% rename from nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesis.java rename to nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java index 6575a0cb2df9..cafc82cfa24c 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesis.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java @@ -57,7 +57,7 @@ @WritesAttribute(attribute = "aws.kinesis.error.code", description = "Error code for the message when posting to AWS Kinesis"), @WritesAttribute(attribute = "aws.kinesis.sequence.number", description = "Sequence number for the message when posting to AWS Kinesis"), @WritesAttribute(attribute = "aws.kinesis.shard.id", description = "Shard id of the message posted to AWS Kinesis")}) -public class PutKinesis extends AbstractKinesisProcessor { +public class PutKinesisStream extends AbstractKinesisStreamProcessor { /** * Kinesis put record response error code @@ -115,7 +115,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session session.exportTo(flowFile, baos); PutRecordsRequestEntry record = new PutRecordsRequestEntry().withData(ByteBuffer.wrap(baos.toByteArray())); - String partitionKey = context.getProperty(PutKinesis.KINESIS_PARTITION_KEY) + String partitionKey = context.getProperty(PutKinesisStream.KINESIS_PARTITION_KEY) .evaluateAttributeExpressions(flowFiles.get(i)).getValue(); if ( ! StringUtils.isBlank(partitionKey) ) { @@ -165,7 +165,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session } } catch (final Exception exception) { - getLogger().error("Failed to publish to kinesis {} flowfiles {} with exception {}", new Object[]{streamName, flowFiles, exception}); + getLogger().error("Failed to publish due to exception {} to kinesis {} flowfiles {} ", new Object[]{exception, streamName, flowFiles}); session.transfer(flowFiles, REL_FAILURE); context.yield(); } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index c6a06fba65f7..df265c3cf507 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -25,4 +25,4 @@ org.apache.nifi.processors.aws.kinesis.firehose.PutKinesisFirehose org.apache.nifi.processors.aws.dynamodb.GetDynamoDB org.apache.nifi.processors.aws.dynamodb.PutDynamoDB org.apache.nifi.processors.aws.dynamodb.DeleteDynamoDB -org.apache.nifi.processors.aws.kinesis.stream.PutKinesis +org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesis.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesisStream.java similarity index 61% rename from nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesis.java rename to nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesisStream.java index 73f02a22425a..b19542348383 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesis.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/ITPutKinesisStream.java @@ -23,7 +23,7 @@ import java.util.List; import java.util.Map; -import org.apache.nifi.processors.aws.kinesis.stream.PutKinesis; +import org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -37,16 +37,16 @@ * The following integration tests run successfully with 10 shards. If increasing shards is not a possiblity, please reduce the size and * number of messages in the integration tests based AWS Kinesis provisioning pages calculations. */ -public class ITPutKinesis { +public class ITPutKinesisStream { private TestRunner runner; protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; @Before public void setUp() throws Exception { - runner = TestRunners.newTestRunner(PutKinesis.class); - runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); - runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner = TestRunners.newTestRunner(PutKinesisStream.class); + runner.setProperty(PutKinesisStream.KINESIS_STREAM_NAME, "kstream"); + runner.setProperty(PutKinesisStream.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.assertValid(); } @@ -60,15 +60,15 @@ public void tearDown() throws Exception { */ @Test public void testIntegrationSuccess() throws Exception { - runner.setProperty(PutKinesis.CREDENTIALS_FILE, CREDENTIALS_FILE); + runner.setProperty(PutKinesisStream.CREDENTIALS_FILE, CREDENTIALS_FILE); runner.assertValid(); runner.enqueue("test".getBytes()); runner.run(1); - runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 1); + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1); - final List ffs = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + final List ffs = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); final MockFlowFile out = ffs.iterator().next(); out.assertContentEquals("test".getBytes()); @@ -76,15 +76,15 @@ public void testIntegrationSuccess() throws Exception { @Test public void testIntegrationWithFixedPartitionSuccess() throws Exception { - runner.setProperty(PutKinesis.KINESIS_PARTITION_KEY, "pfixed"); + runner.setProperty(PutKinesisStream.KINESIS_PARTITION_KEY, "pfixed"); runner.assertValid(); runner.enqueue("test".getBytes()); runner.run(1); - runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 1); + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1); - final List ffs = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + final List ffs = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); final MockFlowFile out = ffs.iterator().next(); out.assertContentEquals("test".getBytes()); @@ -92,16 +92,16 @@ public void testIntegrationWithFixedPartitionSuccess() throws Exception { @Test public void testIntegrationWithDynamicPartitionSuccess() throws Exception { - runner.setProperty(PutKinesis.KINESIS_PARTITION_KEY, "${parition}"); + runner.setProperty(PutKinesisStream.KINESIS_PARTITION_KEY, "${parition}"); runner.assertValid(); Map properties = new HashMap<>(); properties.put("partition", "px"); runner.enqueue("test".getBytes(), properties); runner.run(1); - runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 1); + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1); - final List ffs = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + final List ffs = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); final MockFlowFile out = ffs.iterator().next(); out.assertContentEquals("test".getBytes()); @@ -112,40 +112,40 @@ public void testIntegrationWithDynamicPartitionSuccess() throws Exception { */ @Test public void testIntegrationFailedBadStreamName() throws Exception { - runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "bad-kstream"); + runner.setProperty(PutKinesisStream.KINESIS_STREAM_NAME, "bad-kstream"); runner.assertValid(); runner.enqueue("test".getBytes()); runner.run(1); - runner.assertAllFlowFilesTransferred(PutKinesis.REL_FAILURE, 1); + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_FAILURE, 1); } @Test public void testOneMessageWithMaxBufferSizeGreaterThan1MBOneSuccess() { - runner.setProperty(PutKinesis.BATCH_SIZE, "2"); - runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesisStream.BATCH_SIZE, "2"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); runner.assertValid(); - byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)]; for (int i = 0; i < bytes.length; i++) { bytes[i] = 'a'; } runner.enqueue(bytes); runner.run(1); - runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); assertEquals(1,flowFiles.size()); } @Test public void testOneMessageWithMaxBufferSizeGreaterThan1MBOneWithParameterPartitionSuccess() { - runner.setProperty(PutKinesis.BATCH_SIZE, "2"); - runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); - runner.setProperty(PutKinesis.KINESIS_PARTITION_KEY, "${partitionKey}"); + runner.setProperty(PutKinesisStream.BATCH_SIZE, "2"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesisStream.KINESIS_PARTITION_KEY, "${partitionKey}"); runner.assertValid(); - byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)]; for (int i = 0; i < bytes.length; i++) { bytes[i] = 'a'; } @@ -154,17 +154,17 @@ public void testOneMessageWithMaxBufferSizeGreaterThan1MBOneWithParameterPartiti runner.enqueue(bytes,props); runner.run(1); - runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 1); - List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 1); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); assertEquals(1,flowFiles.size()); } @Test public void testTwoMessageBatch5WithMaxBufferSize1MBRunOnceTwoMessageSent() { - runner.setProperty(PutKinesis.BATCH_SIZE, "5"); - runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "2 MB"); + runner.setProperty(PutKinesisStream.BATCH_SIZE, "5"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "2 MB"); runner.assertValid(); - byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)]; for (int i = 0; i < bytes.length; i++) { bytes[i] = 'a'; } @@ -172,21 +172,21 @@ public void testTwoMessageBatch5WithMaxBufferSize1MBRunOnceTwoMessageSent() { runner.enqueue(bytes.clone()); runner.run(1); - runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 2); - List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 2); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); assertEquals(2,flowFiles.size()); for (MockFlowFile flowFile : flowFiles) { - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); } } @Test public void testThreeMessageWithBatch10MaxBufferSize1MBTRunOnceTwoMessageSent() { - runner.setProperty(PutKinesis.BATCH_SIZE, "10"); - runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesisStream.BATCH_SIZE, "10"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); runner.assertValid(); - byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)]; for (int i = 0; i < bytes.length; i++) { bytes[i] = 'a'; } @@ -195,21 +195,21 @@ public void testThreeMessageWithBatch10MaxBufferSize1MBTRunOnceTwoMessageSent() runner.enqueue(bytes.clone()); runner.run(1); - runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 2); - List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 2); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); assertEquals(2,flowFiles.size()); for (MockFlowFile flowFile : flowFiles) { - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); } } @Test public void testTwoMessageWithBatch10MaxBufferSize1MBTRunOnceTwoMessageSent() { - runner.setProperty(PutKinesis.BATCH_SIZE, "10"); - runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesisStream.BATCH_SIZE, "10"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); runner.assertValid(); - byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)]; for (int i = 0; i < bytes.length; i++) { bytes[i] = 'a'; } @@ -217,21 +217,21 @@ public void testTwoMessageWithBatch10MaxBufferSize1MBTRunOnceTwoMessageSent() { runner.enqueue(bytes.clone()); runner.run(1); - runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 2); - List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 2); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); assertEquals(2,flowFiles.size()); for (MockFlowFile flowFile : flowFiles) { - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); } } @Test public void testThreeMessageWithBatch2MaxBufferSize1MBRunTwiceThreeMessageSent() { - runner.setProperty(PutKinesis.BATCH_SIZE, "2"); - runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesisStream.BATCH_SIZE, "2"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); runner.assertValid(); - byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)]; for (int i = 0; i < bytes.length; i++) { bytes[i] = 'a'; } @@ -240,21 +240,21 @@ public void testThreeMessageWithBatch2MaxBufferSize1MBRunTwiceThreeMessageSent() runner.enqueue(bytes.clone()); runner.run(2, true, true); - runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 3); - List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 3); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); assertEquals(3,flowFiles.size()); for (MockFlowFile flowFile : flowFiles) { - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); } } @Test public void testThreeMessageHello2MBThereWithBatch10MaxBufferSize1MBRunOnceTwoMessageSuccessOneFailed() { - runner.setProperty(PutKinesis.BATCH_SIZE, "10"); - runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesisStream.BATCH_SIZE, "10"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); runner.assertValid(); - byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE * 2)]; + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE * 2)]; for (int i = 0; i < bytes.length; i++) { bytes[i] = 'a'; } @@ -263,26 +263,26 @@ public void testThreeMessageHello2MBThereWithBatch10MaxBufferSize1MBRunOnceTwoMe runner.enqueue("there".getBytes()); runner.run(1, true, true); - List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); assertEquals(2,flowFiles.size()); for (MockFlowFile flowFile : flowFiles) { - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); } - List flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesis.REL_FAILURE); + List flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE); assertEquals(1,flowFilesFailed.size()); for (MockFlowFile flowFileFailed : flowFilesFailed) { - assertNotNull(flowFileFailed.getAttribute(PutKinesis.AWS_KINESIS_ERROR_MESSAGE)); + assertNotNull(flowFileFailed.getAttribute(PutKinesisStream.AWS_KINESIS_ERROR_MESSAGE)); } } @Test public void testTwoMessageHello2MBWithBatch10MaxBufferSize1MBRunOnceOneSuccessOneFailed() throws Exception { - runner.setProperty(PutKinesis.BATCH_SIZE, "10"); - runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesisStream.BATCH_SIZE, "10"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); runner.assertValid(); - byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE * 2)]; + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE * 2)]; for (int i = 0; i < bytes.length; i++) { bytes[i] = 'a'; } @@ -290,27 +290,27 @@ public void testTwoMessageHello2MBWithBatch10MaxBufferSize1MBRunOnceOneSuccessOn runner.enqueue(bytes); runner.run(1, true, true); - List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); assertEquals(1,flowFiles.size()); for (MockFlowFile flowFile : flowFiles) { - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); flowFile.assertContentEquals("hello".getBytes()); } - List flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesis.REL_FAILURE); + List flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE); assertEquals(1,flowFilesFailed.size()); for (MockFlowFile flowFileFailed : flowFilesFailed) { - assertNotNull(flowFileFailed.getAttribute(PutKinesis.AWS_KINESIS_ERROR_MESSAGE)); + assertNotNull(flowFileFailed.getAttribute(PutKinesisStream.AWS_KINESIS_ERROR_MESSAGE)); } } @Test public void testTwoMessage2MBHelloWorldWithBatch10MaxBufferSize1MBRunOnceTwoMessageSent() throws Exception { - runner.setProperty(PutKinesis.BATCH_SIZE, "10"); - runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesisStream.BATCH_SIZE, "10"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); runner.assertValid(); - byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE * 2)]; + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE * 2)]; for (int i = 0; i < bytes.length; i++) { bytes[i] = 'a'; } @@ -318,49 +318,49 @@ public void testTwoMessage2MBHelloWorldWithBatch10MaxBufferSize1MBRunOnceTwoMess runner.enqueue("HelloWorld".getBytes()); runner.run(1, true, true); - List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); assertEquals(1,flowFiles.size()); for (MockFlowFile flowFile : flowFiles) { - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); flowFile.assertContentEquals("HelloWorld".getBytes()); } - List flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesis.REL_FAILURE); + List flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE); assertEquals(1,flowFilesFailed.size()); for (MockFlowFile flowFileFailed : flowFilesFailed) { - assertNotNull(flowFileFailed.getAttribute(PutKinesis.AWS_KINESIS_ERROR_MESSAGE)); + assertNotNull(flowFileFailed.getAttribute(PutKinesisStream.AWS_KINESIS_ERROR_MESSAGE)); } } @Test public void testTwoMessageHelloWorldWithBatch10MaxBufferSize1MBRunOnceTwoMessageSent() throws Exception { - runner.setProperty(PutKinesis.BATCH_SIZE, "10"); - runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesisStream.BATCH_SIZE, "10"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); runner.assertValid(); runner.enqueue("Hello".getBytes()); runner.enqueue("World".getBytes()); runner.run(1, true, true); - List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); assertEquals(2,flowFiles.size()); for (MockFlowFile flowFile : flowFiles) { - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); } flowFiles.get(0).assertContentEquals("Hello".getBytes()); flowFiles.get(1).assertContentEquals("World".getBytes()); - List flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesis.REL_FAILURE); + List flowFilesFailed = runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE); assertEquals(0,flowFilesFailed.size()); } @Test public void testThreeMessageWithBatch5MaxBufferSize1MBRunOnceTwoMessageSent() { - runner.setProperty(PutKinesis.BATCH_SIZE, "5"); - runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesisStream.BATCH_SIZE, "5"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); runner.assertValid(); - byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE)]; + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE)]; for (int i = 0; i < bytes.length; i++) { bytes[i] = 'a'; } @@ -369,19 +369,19 @@ public void testThreeMessageWithBatch5MaxBufferSize1MBRunOnceTwoMessageSent() { runner.enqueue(bytes.clone()); runner.run(1, true, true); - runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 2); - List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 2); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); assertEquals(2,flowFiles.size()); for (MockFlowFile flowFile : flowFiles) { - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); } } @Test public void test5MessageWithBatch10MaxBufferSize10MBRunOnce5MessageSent() { - runner.setProperty(PutKinesis.BATCH_SIZE, "10"); - runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesisStream.BATCH_SIZE, "10"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); runner.assertValid(); byte [] bytes = new byte[10]; for (int i = 0; i < bytes.length; i++) { @@ -394,19 +394,19 @@ public void test5MessageWithBatch10MaxBufferSize10MBRunOnce5MessageSent() { runner.enqueue(bytes.clone()); runner.run(1, true, true); - runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 5); - List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 5); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); assertEquals(5,flowFiles.size()); for (MockFlowFile flowFile : flowFiles) { - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); } } @Test public void test5MessageWithBatch2MaxBufferSize10MBRunOnce2MessageSent() { - runner.setProperty(PutKinesis.BATCH_SIZE, "2"); - runner.setProperty(PutKinesis.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); + runner.setProperty(PutKinesisStream.BATCH_SIZE, "2"); + runner.setProperty(PutKinesisStream.MAX_MESSAGE_BUFFER_SIZE_MB, "1 MB"); runner.assertValid(); byte [] bytes = new byte[10]; for (int i = 0; i < bytes.length; i++) { @@ -419,12 +419,12 @@ public void test5MessageWithBatch2MaxBufferSize10MBRunOnce2MessageSent() { runner.enqueue(bytes.clone()); runner.run(1, true, true); - runner.assertAllFlowFilesTransferred(PutKinesis.REL_SUCCESS, 2); - List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_SUCCESS); + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_SUCCESS, 2); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_SUCCESS); assertEquals(2,flowFiles.size()); for (MockFlowFile flowFile : flowFiles) { - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SEQUENCE_NUMBER); - flowFile.assertAttributeExists(PutKinesis.AWS_KINESIS_SHARD_ID); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SEQUENCE_NUMBER); + flowFile.assertAttributeExists(PutKinesisStream.AWS_KINESIS_SHARD_ID); } } } diff --git a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestPutKinesis.java b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestPutKinesisStream.java similarity index 68% rename from nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestPutKinesis.java rename to nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestPutKinesisStream.java index 2d6b2fd28601..29ca4f0ee8e2 100644 --- a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestPutKinesis.java +++ b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/kinesis/stream/TestPutKinesisStream.java @@ -20,7 +20,7 @@ import java.util.List; -import org.apache.nifi.processors.aws.kinesis.stream.PutKinesis; +import org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; @@ -28,16 +28,16 @@ import org.junit.Before; import org.junit.Test; -public class TestPutKinesis { +public class TestPutKinesisStream { private TestRunner runner; protected final static String CREDENTIALS_FILE = System.getProperty("user.home") + "/aws-credentials.properties"; @Before public void setUp() throws Exception { - runner = TestRunners.newTestRunner(PutKinesis.class); - runner.setProperty(PutKinesis.ACCESS_KEY, "abcd"); - runner.setProperty(PutKinesis.SECRET_KEY, "secret key"); - runner.setProperty(PutKinesis.KINESIS_STREAM_NAME, "kstream"); + runner = TestRunners.newTestRunner(PutKinesisStream.class); + runner.setProperty(PutKinesisStream.ACCESS_KEY, "abcd"); + runner.setProperty(PutKinesisStream.SECRET_KEY, "secret key"); + runner.setProperty(PutKinesisStream.KINESIS_STREAM_NAME, "kstream"); runner.assertValid(); } @@ -48,35 +48,35 @@ public void tearDown() throws Exception { @Test public void testCustomValidateBatchSize1Valid() { - runner.setProperty(PutKinesis.BATCH_SIZE, "1"); + runner.setProperty(PutKinesisStream.BATCH_SIZE, "1"); runner.assertValid(); } @Test public void testCustomValidateBatchSize500Valid() { - runner.setProperty(PutKinesis.BATCH_SIZE, "500"); + runner.setProperty(PutKinesisStream.BATCH_SIZE, "500"); runner.assertValid(); } @Test public void testCustomValidateBatchSize501InValid() { - runner.setProperty(PutKinesis.BATCH_SIZE, "501"); + runner.setProperty(PutKinesisStream.BATCH_SIZE, "501"); runner.assertNotValid(); } @Test public void testWithSizeGreaterThan1MB() { - runner.setProperty(PutKinesis.BATCH_SIZE, "1"); + runner.setProperty(PutKinesisStream.BATCH_SIZE, "1"); runner.assertValid(); - byte [] bytes = new byte[(PutKinesis.MAX_MESSAGE_SIZE + 1)]; + byte [] bytes = new byte[(PutKinesisStream.MAX_MESSAGE_SIZE + 1)]; for (int i = 0; i < bytes.length; i++) { bytes[i] = 'a'; } runner.enqueue(bytes); runner.run(1); - runner.assertAllFlowFilesTransferred(PutKinesis.REL_FAILURE, 1); - List flowFiles = runner.getFlowFilesForRelationship(PutKinesis.REL_FAILURE); + runner.assertAllFlowFilesTransferred(PutKinesisStream.REL_FAILURE, 1); + List flowFiles = runner.getFlowFilesForRelationship(PutKinesisStream.REL_FAILURE); - assertNotNull(flowFiles.get(0).getAttribute(PutKinesis.AWS_KINESIS_ERROR_MESSAGE)); + assertNotNull(flowFiles.get(0).getAttribute(PutKinesisStream.AWS_KINESIS_ERROR_MESSAGE)); } }