Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
Expand Down Expand Up @@ -169,31 +170,27 @@ public class ConsumeKinesisStream extends AbstractAwsAsyncProcessor<KinesisAsync
);

static final PropertyDescriptor KINESIS_STREAM_NAME = new PropertyDescriptor.Builder()
.name("kinesis-stream-name")
.displayName("Amazon Kinesis Stream Name")
.name("Amazon Kinesis Stream Name")
.description("The name of Kinesis Stream")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

public static final PropertyDescriptor APPLICATION_NAME = new PropertyDescriptor.Builder()
.displayName("Application Name")
.name("amazon-kinesis-stream-application-name")
.name("Application Name")
.description("The Kinesis stream reader application name.")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.required(true).build();

public static final PropertyDescriptor INITIAL_STREAM_POSITION = new PropertyDescriptor.Builder()
.displayName("Initial Stream Position")
.name("amazon-kinesis-stream-initial-position")
.name("Initial Stream Position")
.description("Initial position to read Kinesis streams.")
.allowableValues(LATEST, TRIM_HORIZON, AT_TIMESTAMP)
.defaultValue(LATEST.getValue())
.required(true).build();

public static final PropertyDescriptor STREAM_POSITION_TIMESTAMP = new PropertyDescriptor.Builder()
.displayName("Stream Position Timestamp")
.name("amazon-kinesis-stream-position-timestamp")
.name("Stream Position Timestamp")
.description("Timestamp position in stream from which to start reading Kinesis Records. " +
"Required if " + INITIAL_STREAM_POSITION.getDescription() + " is " + AT_TIMESTAMP.getDisplayName() + ". " +
"Uses the Timestamp Format to parse value into a Date.")
Expand All @@ -202,8 +199,7 @@ public class ConsumeKinesisStream extends AbstractAwsAsyncProcessor<KinesisAsync
.required(false).build();

public static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
.displayName("Timestamp Format")
.name("amazon-kinesis-stream-timestamp-format")
.name("Timestamp Format")
.description("Format to use for parsing the " + STREAM_POSITION_TIMESTAMP.getDisplayName() + " into a Date " +
"and converting the Kinesis Record's Approximate Arrival Timestamp into a FlowFile attribute.")
.addValidator((subject, input, context) -> {
Expand All @@ -223,65 +219,57 @@ public class ConsumeKinesisStream extends AbstractAwsAsyncProcessor<KinesisAsync
.required(true).build();

public static final PropertyDescriptor FAILOVER_TIMEOUT = new PropertyDescriptor.Builder()
.displayName("Failover Timeout")
.name("amazon-kinesis-stream-failover-timeout")
.name("Failover Timeout")
.description("Kinesis Client Library failover timeout")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("30 secs")
.required(true).build();

public static final PropertyDescriptor GRACEFUL_SHUTDOWN_TIMEOUT = new PropertyDescriptor.Builder()
.displayName("Graceful Shutdown Timeout")
.name("amazon-kinesis-stream-graceful-shutdown-timeout")
.name("Graceful Shutdown Timeout")
.description("Kinesis Client Library graceful shutdown timeout")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("20 secs")
.required(true).build();

public static final PropertyDescriptor CHECKPOINT_INTERVAL = new PropertyDescriptor.Builder()
.displayName("Checkpoint Interval")
.name("amazon-kinesis-stream-checkpoint-interval")
.name("Checkpoint Interval")
.description("Interval between Kinesis checkpoints")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("3 secs")
.required(true).build();

public static final PropertyDescriptor NUM_RETRIES = new PropertyDescriptor.Builder()
.displayName("Retry Count")
.name("amazon-kinesis-stream-retry-count")
.name("Retry Count")
.description("Number of times to retry a Kinesis operation (process record, checkpoint, shutdown)")
.addValidator(StandardValidators.INTEGER_VALIDATOR)
.defaultValue("10")
.required(true).build();

public static final PropertyDescriptor RETRY_WAIT = new PropertyDescriptor.Builder()
.displayName("Retry Wait")
.name("amazon-kinesis-stream-retry-wait")
.name("Retry Wait")
.description("Interval between Kinesis operation retries (process record, checkpoint, shutdown)")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.defaultValue("1 sec")
.required(true).build();

public static final PropertyDescriptor DYNAMODB_ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder()
.displayName("DynamoDB Override")
.name("amazon-kinesis-stream-dynamodb-override")
.name("DynamoDB Override")
.description("DynamoDB override to use non-AWS deployments")
.addValidator(StandardValidators.URL_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.required(false).build();

public static final PropertyDescriptor REPORT_CLOUDWATCH_METRICS = new PropertyDescriptor.Builder()
.displayName("Report Metrics to CloudWatch")
.name("amazon-kinesis-stream-cloudwatch-flag")
.name("Report Metrics to CloudWatch")
.description("Whether to report Kinesis usage metrics to CloudWatch.")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.allowableValues("true", "false")
.defaultValue("false")
.required(true).build();

public static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
.name("amazon-kinesis-stream-record-reader")
.displayName("Record Reader")
.name("Record Reader")
.description("The Record Reader to use for reading received messages." +
" The Kinesis Stream name can be referred to by Expression Language '${" +
AbstractKinesisRecordProcessor.KINESIS_RECORD_SCHEMA_KEY + "}' to access a schema." +
Expand All @@ -292,8 +280,7 @@ public class ConsumeKinesisStream extends AbstractAwsAsyncProcessor<KinesisAsync
.build();

public static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
.name("amazon-kinesis-stream-record-writer")
.displayName("Record Writer")
.name("Record Writer")
.description("The Record Writer to use for serializing Records to an output FlowFile." +
" The Kinesis Stream name can be referred to by Expression Language '${" +
AbstractKinesisRecordProcessor.KINESIS_RECORD_SCHEMA_KEY + "}' to access a schema." +
Expand Down Expand Up @@ -405,6 +392,24 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String
}
}

@Override
public void migrateProperties(final PropertyConfiguration config) {
config.renameProperty("kinesis-stream-name", "Amazon Kinesis Stream Name");
config.renameProperty("amazon-kinesis-stream-application-name", "Application Name");
config.renameProperty("amazon-kinesis-stream-initial-position", "Initial Stream Position");
config.renameProperty("amazon-kinesis-stream-position-timestamp", "Stream Position Timestamp");
config.renameProperty("amazon-kinesis-stream-timestamp-format", "Timestamp Format");
config.renameProperty("amazon-kinesis-stream-failover-timeout", "Failover Timeout");
config.renameProperty("amazon-kinesis-stream-graceful-shutdown-timeout", "Graceful Shutdown Timeout");
config.renameProperty("amazon-kinesis-stream-checkpoint-interval", "Checkpoint Interval");
config.renameProperty("amazon-kinesis-stream-retry-count", "Retry Count");
config.renameProperty("amazon-kinesis-stream-retry-wait", "Retry Wait");
config.renameProperty("amazon-kinesis-stream-dynamodb-override", "DynamoDB Override");
config.renameProperty("amazon-kinesis-stream-cloudwatch-flag", "Report Metrics to CloudWatch");
config.renameProperty("amazon-kinesis-stream-record-reader", "Record Reader");
config.renameProperty("amazon-kinesis-stream-record-writer", "Record Writer");
}

@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
final PropertyDescriptor.Builder builder = new PropertyDescriptor.Builder()
Expand Down