Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
734 lines (630 sloc) 31.3 KB
kinesis {
akka {
# The dispatcher that will be used by default in the reference configuration
# Note that this can be overridden in the producer
# and consumer configurations via the `use-dispatcher` key.
default-dispatcher {
type = "Dispatcher"
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 8
parallelism-factor = 3.0
parallelism-max = 64
}
throughput = 50
}
}
# The application name is used as the name of the dynamoDB table used for checkpointing
# You SHOULD override this value
application-name = "KinesisProducer"
# The default producer configuration, used for all producers.
default-producer {
# The name of the producer stream, MUST be specified per producer
# stream-name = "kinesis-stream"
akka {
# Fully qualified config path for the producer's dispatcher
# Can be overridden per producer/consumer
# Overriding this value with null or an empty string will result is no specific dispatcher being specified
dispatcher = "kinesis.akka.default-dispatcher"
# The maximum number of concurrent message send request before throttling
# As messages are sent in batches, a Future is create per message to track the successful (or failed)
# put of the message onto Kinesis.
# This value is optional but may help control memory usage
# Defaults to no throttling.
#max-outstanding-requests = 50000
# Optionally defined in combination with the above
# Controls the time after throttling before a retry in millis.
# Default: 100
#throttling-retry-millis = 100
}
# We convert these to a properties file to be fed directly into the Kinesis KPL library
# See the link below for full details
# http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-config.html
# This section can be overridden on a per producer basis
# Config values within this kpl block are optional (see defaults)
kpl {
# Enable aggregation. With aggregation, multiple user records are packed into
# a single KinesisRecord. If disabled, each user record is sent in its own
# KinesisRecord.
#
# If your records are small, enabling aggregation will allow you to put many
# more records than you would otherwise be able to for a shard before getting
# throttled.
#
# Default: true
AggregationEnabled = true
# Maximum number of items to pack into an aggregated record.
#
# There should be normally no need to adjust this. If you want to limit the
# time records spend buffering, look into record_max_buffered_time instead.
#
# Default: 4294967295
# Minimum: 1
# Maximum (inclusive): 9223372036854775807
AggregationMaxCount = 4294967295
# Maximum number of bytes to pack into an aggregated Kinesis record.
#
# There should be normally no need to adjust this. If you want to limit the
# time records spend buffering, look into record_max_buffered_time instead.
#
# If a record has more data by itself than this limit, it will bypass the
# aggregator. Note the backend enforces a limit of 50KB on record size. If
# you set this beyond 50KB, oversize records will be rejected at the backend.
#
# Default: 51200
# Minimum: 64
# Maximum (inclusive): 1048576
AggregationMaxSize = 51200
# Maximum number of items to pack into an PutRecords request.
#
# There should be normally no need to adjust this. If you want to limit the
# time records spend buffering, look into record_max_buffered_time instead.
#
# Default: 500
# Minimum: 1
# Maximum (inclusive): 500
CollectionMaxCount = 500
# Maximum amount of data to send with a PutRecords request.
#
# There should be normally no need to adjust this. If you want to limit the
# time records spend buffering, look into record_max_buffered_time instead.
#
# Records larger than the limit will still be sent, but will not be grouped
# with others.
#
# Default: 5242880
# Minimum: 52224
# Maximum (inclusive): 9223372036854775807
CollectionMaxSize = 5242880
# Timeout (milliseconds) for establishing TLS connections.
#
# Default: 6000
# Minimum: 100
# Maximum (inclusive): 300000
ConnectTimeout = 6000
# How often to refresh credentials (in milliseconds).
# During a refresh, credentials are retrieved from any SDK credentials providers attached to
# the wrapper and pushed to the core.
#
# Default: 5000
# Minimum: 1
# Maximum (inclusive): 300000
# CredentialsRefreshDelay =
# Use a custom CloudWatch endpoint.
# Note this does not accept protocols or paths, only host names or ip addresses. There is no
# way to disable TLS. The KPL always connects with TLS.
# Expected pattern: ^([A-Za-z0-9-\\.]+)?$
# CloudwatchEndpoint =
# Server port to connect to for CloudWatch.
# Default: 443
# Minimum: 1
# Maximum (inclusive): 65535
# CloudwatchPort =
# This has no effect on Windows.
# If set to true, the KPL native process will attempt to raise its own core file size soft
# limit to 128MB, or the hard limit, whichever is lower. If the soft limit is already at or
# above the target amount, it is not changed.
# Note that even if the limit is successfully raised (or already sufficient), it does not
# guarantee that core files will be written on a crash, since that is dependent on operation
# system settings that's beyond the control of individual processes.
# Default: false
#EnableCoreDumps = false
# Use a custom Kinesis endpoint.
#
# Mostly for testing use. Note this does not accept protocols or paths, only
# host names or ip addresses. There is no way to disable TLS. The KPL always
# connects with TLS.
#
# Expected pattern: ^([A-Za-z0-9-\\.]+)?$
# KinesisEndpoint =
# Server port to connect to. Only useful with KinesisEndpoint.
#
# Default: 443
# Minimum: 1
# Maximum (inclusive): 65535
# KinesisPort = 443
# If true, throttled puts are not retried. The records that got throttled
# will be failed immediately upon receiving the throttling error. This is
# useful if you want to react immediately to any throttling without waiting
# for the KPL to retry. For example, you can use a different hash key to send
# the throttled record to a backup shard.
#
# If false, the KPL will automatically retry throttled puts. The KPL performs
# backoff for shards that it has received throttling errors from, and will
# avoid flooding them with retries. Note that records may fail from
# expiration (see record_ttl) if they get delayed for too long because of
# throttling.
#
# Default: false
FailIfThrottled = false
# Minimum level of logs. Messages below the specified level will not be
# logged. Logs for the native KPL daemon show up on stderr.
#
# Default: info
# Expected pattern: info|warning|error
LogLevel = info
# Maximum number of connections to open to the backend. HTTP requests are
# sent in parallel over multiple connections.
#
# Setting this too high may impact latency and consume additional resources
# without increasing throughput.
#
# Default: 24
# Minimum: 1
# Maximum (inclusive): 256
MaxConnections = 24
# Controls the granularity of metrics that are uploaded to CloudWatch.
# Greater granularity produces more metrics.
#
# When "shard" is selected, metrics are emitted with the stream name and
# shard id as dimensions. On top of this, the same metric is also emitted
# with only the stream name dimension, and lastly, without the stream name.
# This means for a particular metric, 2 streams with 2 shards (each) will
# produce 7 CloudWatch metrics, one for each shard, one for each stream, and
# one overall, all describing the same statistics, but at different levels of
# granularity.
#
# When "stream" is selected, per shard metrics are not uploaded; when
# "global" is selected, only the total aggregate for all streams and all
# shards are uploaded.
#
# Consider reducing the granularity if you're not interested in shard-level
# metrics, or if you have a large number of shards.
#
# If you only have 1 stream, select "global"; the global data will be
# equivalent to that for the stream.
#
# Refer to the metrics documentation for details about each metric.
#
# Default: shard
# Expected pattern: global|stream|shard
MetricsGranularity = shard
# Controls the number of metrics that are uploaded to CloudWatch.
#
# "none" disables all metrics.
#
# "summary" enables the following metrics: UserRecordsPut, KinesisRecordsPut,
# ErrorsByCode, AllErrors, BufferingTime.
#
# "detailed" enables all remaining metrics.
#
# Refer to the metrics documentation for details about each metric.
#
# Default: detailed
# Expected pattern: none|summary|detailed
MetricsLevel = detailed
# The namespace to upload metrics under.
#
# If you have multiple applications running the KPL under the same AWS
# account, you should use a different namespace for each application.
#
# If you are also using the KCL, you may wish to use the application name you
# have configured for the KCL as the the namespace here. This way both your
# KPL and KCL metrics show up under the same namespace.
#
# Default: KinesisProducerLibrary
# Expected pattern: (?!AWS/).{1,255}
MetricsNamespace = KinesisProducerLibrary
# Delay (in milliseconds) between each metrics upload.
#
# For testing only. There is no benefit in setting this lower or higher in
# production.
#
# Default: 60000
# Minimum: 1
# Maximum (inclusive): 60000
MetricsUploadDelay = 60000
# Minimum number of connections to keep open to the backend.
#
# There should be no need to increase this in general.
#
# Default: 1
# Minimum: 1
# Maximum (inclusive): 16
MinConnections = 1
#Path to the native KPL binary. Only use this setting if you want to use a custom build of
#the native code.
#
#NativeExecutable=
# Limits the maximum allowed put rate for a shard, as a percentage of the
# backend limits.
#
# The rate limit prevents the producer from sending data too fast to a shard.
# Such a limit is useful for reducing bandwidth and CPU cycle wastage from
# sending requests that we know are going to fail from throttling.
#
# Kinesis enforces limits on both the number of records and number of bytes
# per second. This setting applies to both.
#
# The default value of 150% is chosen to allow a single producer instance to
# completely saturate the allowance for a shard. This is an aggressive
# setting. If you prefer to reduce throttling errors rather than completely
# saturate the shard, consider reducing this setting.
#
# Default: 150
# Minimum: 1
# Maximum (inclusive): 9223372036854775807
RateLimit = 150
# Maximum amount of time (milliseconds) a record may spend being buffered
# before it gets sent. Records may be sent sooner than this depending on the
# other buffering limits.
#
# This setting provides coarse ordering among records - any two records will
# be reordered by no more than twice this amount (assuming no failures and
# retries and equal network latency).
#
# The library makes a best effort to enforce this time, but cannot guarantee
# that it will be precisely met. In general, if the CPU is not overloaded,
# the library will meet this deadline to within 10ms.
#
# Failures and retries can additionally increase the amount of time records
# spend in the KPL. If your application cannot tolerate late records, use the
# record_ttl setting to drop records that do not get transmitted in time.
#
# Setting this too low can negatively impact throughput.
#
# Default: 100
# Maximum (inclusive): 9223372036854775807
RecordMaxBufferedTime = 100
# Set a time-to-live on records (milliseconds). Records that do not get
# successfully put within the limit are failed.
#
# This setting is useful if your application cannot or does not wish to
# tolerate late records. Records will still incur network latency after they
# leave the KPL, so take that into consideration when choosing a value for
# this setting.
#
# If you do not wish to lose records and prefer to retry indefinitely, set
# record_ttl to a large value like INT_MAX. This has the potential to cause
# head-of-line blocking if network issues or throttling occur. You can
# respond to such situations by using the metrics reporting functions of the
# KPL. You may also set fail_if_throttled to true to prevent automatic
# retries in case of throttling.
#
# Default: 30000
# Minimum: 100
# Maximum (inclusive): 9223372036854775807
RecordTtl = 30000
# Which region to send records to.
#
# If you do not specify the region and are running in EC2, the library will
# use the region the instance is in.
#
# The region is also used to sign requests.
#
# Expected pattern: ^([a-z]+-[a-z]+-[0-9])?$
#Region = "us-east-1"
# The maximum total time (milliseconds) elapsed between when we begin a HTTP
# request and receiving all of the response. If it goes over, the request
# will be timed-out.
#
# Note that a timed-out request may actually succeed at the backend. Retrying
# then leads to duplicates. Setting the timeout too low will therefore
# increase the probability of duplicates.
#
# Default: 6000
# Minimum: 100
# Maximum (inclusive): 600000
RequestTimeout = 6000
# Temp directory into which to extract the native binaries. The KPL requires write
# permissions in this directory.
#
# If not specified, defaults to /tmp in Unix. (Windows TBD)
# TempDirectory
# Verify the endpoint's certificate. Do not disable unless using
# custom_endpoint for testing. Never disable this in production.
#
# Default: true
VerifyCertificate = true
# Sets the threading model that the native process will use.
# Enum:
# PER_REQUEST: Tells the native process to create a thread for each request.
# Under heavy load this can create a very large number of threads, which may cause resource exhaustion.
# POOLED: Tells the native process to use a thread pool. The size of the pool can be controlled by ThreadPoolSize
# This uses a queue, and thread pool to execute requests to Kinesis.
# This limits the number of threads that the native process may use.
# Under extremely heavy load this can increase latency significantly more than the per request model.
#
# Default = PER_REQUEST
# ThreadingModel =
# Sets the maximum number of threads that the native process' thread pool will be configured with.
#
# Default: 0
# ThreadPoolSize =
}
}
# The default consumer configuration, used for all consumers.
default-consumer {
# The name of the consumer stream, MUST be specified per consumer
# stream-name = "kinesis-stream"
akka {
# Fully qualified config path for the consumer's dispatcher
# Can be overridden per producer/consumer
# Overriding this value with null or an empty string will result is no specific dispatcher being specified
dispatcher = "kinesis.akka.default-dispatcher"
}
worker {
# This is the total timeout for processing a single batch
# All messages must have been processed and checkpointed for the batch to be considered complete.
# Should be tuned in combination with `maxRecords` whcih controls the size of the batch.
# Note that this will be per retry.
batchTimeoutSeconds = 10 # Given 10,000 records this requires at least 1000/sec
# The number of times to retry failed batch messages.
# A failed message is one where we received no response from the eventProcessor within the above timeout.
# A value of `1` means we retry each failed message once (thus we attempt processing failed messages twice).
failedMessageRetries = 1
# The failure tolerance percentage for each batch (rounded down).
# So for a batch of 500 messages, the default (0.25) allows 1 unprocessed message.
#
# If after retrying unprocessed messages we are within this tolerance then we will ccheckpoint the latest and continue.
# If we are above this tolerance then we will shutdown processing for the specific shard on this node.
# The eventProcessor will be notified of any shutdown by sending a KinesisProcessorFailure message.
failureTolerancePercentage = 0.25
# Automatic graceful shutdown - this uses a jvm shutdown hook to automatically stop the consumer, forcing a checkpoint
# The shutdown will block for up to the period defined in shutdownTimeoutSeconds
gracefulShutdownHook = true
# When gracefully shutting down, this is the timeout allowed checkpointing etc
# It is strongly recommended that this is larger than the batch timeout * retries
# This will help prevent message duplication due to unfinished batches on shutdown
shutdownTimeoutSeconds = 25
}
checkpointer {
# The amount of time to wait after failing to checkpoint, usually due to an exception or dynamo throttling.
backoffMillis = 3000
# The standard delay between (successful) checkpoints
intervalMillis = 2000
# The delay between notification messages sent to the parent to indicate we're ready to checkpoint
# This is for subsequent messages after the initial notification,
# for cases where the parent had no messages to checkpoint.
notificationDelayMillis = 1000
}
# We convert these to a properties file to be fed directly into the Kinesis KCL library
# See the link below for full details
#
# This section can be overridden on a per consumer basis
# Most Config values within this kpl block are optional (see defaults)
kcl {
# See here for credentials providers:
# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html
# http://docs.aws.amazon.com/java-sdk/latest/developer-guide/credentials.html
# Providing a list of these providers creates a `AWSCredentialsProviderChain` where each one is tried until one succeeds
#
# Mandatory
AWSCredentialsProvider = DefaultAWSCredentialsProviderChain
# if workerId is not provided in the properties. WorkerId is automatically generated
#workerId = 123
# The location in the shard from which the KinesisClientLibrary will start fetching records from
# when the application starts for the first time and there is no checkpoint for the shard.
#
# Once checkpointing has begun, the application will always continue from the last checkpoint (in DynamoDB)
# See:
# http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#Kinesis-GetShardIterator-request-ShardIteratorType
#
# Default: LATEST
initialPositionInStream = TRIM_HORIZON
# Max records to fetch from Kinesis in a single GetRecords call.
#
# Default = 10000
#maxRecords = 10000
# Idle time between record reads in milliseconds
#
# Default = 1000
#idleTimeBetweenReadsInMillis = 1000
# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval
# will be regarded as having problems and it's shards will be assigned to other workers.
# For applications that have a large number of shards, this msy be set to a higher number to reduce
# the number of DynamoDB IOPS required for tracking leases.
#
# Default: 10000
#failoverTimeMillis = 10000
# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
#
# Default: 60000
#shardSyncIntervalMillis = 60000
# Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
# to delete the ones we don't need any longer.
#
# Default: true
#cleanupLeasesUponShardCompletion = true
# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
# to {@link RecordProcessorCheckpointer#checkpoint(String)} by default.
#
# Default: true
#validateSequenceNumberBeforeCheckpointing = true
# An alternative to kinesisEndpoint, sets the
# regional endpoint for this client's service calls. Use this
# method to control which AWS region you want to work with.
#
# Default: null
#regionName = us-east-1
# Overrides the default endpoint for this client. e.g. ("https://kinesis.us-east-1.amazonaws.com").
#
# This can be used to control which AWS region they want to work with and
# will override the regionName property
#
# Callers can pass in just the endpoint (e.g. "kinesis.us-east-1.amazonaws.com")
# or a full URL, including the protocol (e.g. "https://kinesis.us-east-1.amazonaws.com").
# If the protocol is not specified here, the default protocol will be used, which is HTTPS.
#
# For more information on using AWS regions with the AWS SDK for Java, and
# a complete list of all available endpoints for all AWS services see:
# http://developer.amazonwebservices.com/connect/entry.jspa?externalID=3912
#
# Default: null
#kinesisEndpoint = "https://kinesis"
# DynamoDB endpoint
# Default: null
#dynamoDBEndpoint = "https://dynamo"
# Don't call processRecords() on the record processor for empty record lists.
# Enables applications flush/checkpoint (if they have some data "in progress but don't get new data for while)
#
# Default: false
#callProcessRecordsEvenForEmptyRecordList = false
# Interval in milliseconds between polling to check for parent shard completion.
# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
# completion of parent shards).
#
# Default: 10000
#parentShardPollIntervalMillis = 10000
# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
#
# Default: 500
#taskBackoffTimeMillis = 500
# Buffer metrics for at most this long before publishing to CloudWatch.
#
# Default: 10000
#metricsBufferTimeMillis = 10000
# Buffer at most this many metrics before publishing to CloudWatch.
#
# Default: 10000
#metricsMaxQueueSize = 10000
# Metrics level for which to enable CloudWatch metrics.
# Can be: NONE, SUMMARY, DETAILED
#
# Default: DETAILED
#metricsLevel = DETAILED
# Allowed dimensions for CloudWatch metrics.
#
# Operation dimension will be enabled regardless of the config provided.
#
# See: http://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html#metric-levels
#
# Comma separated list of values from:
# Operation, ShardId, WorkerIdentifier, ALL
#
# Default: Operation, ShardId
#metricsEnabledDimensions = Operation, ShardId
# Sets the max size of the thread pool that will be used to renew leases.
# Setting this to low may starve the lease renewal process, and cause the worker to lose leases at a higher rate.
#
# Min: 2
# Default: 20
#maxLeaseRenewalThreads=20
# The max number of leases (shards) this worker should process.
# This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints
# or during deployment.
# NOTE: Setting this to a low value can cause data loss if workers are not able to pick up all shards in the
# stream due to the max limit.
#
# Default: 2147483647 (Integer.MAX_VALUE)
#maxLeasesForWorker = 2147483647
# Max leases to steal from another worker at one time (for load balancing).
# Setting this to a higher number can allow for faster load convergence (e.g. during deployments, cold starts),
# but can cause higher churn in the system.
#
# Default: 1
#maxLeasesToStealAtOneTime = 1
# The Amazon DynamoDB table used for tracking leases will be provisioned with this read capacity.
#
# Default: 10
#initialLeaseTableReadCapacity = 10
# The Amazon DynamoDB table used for tracking leases will be provisioned with this write capacity.
#
# Default: 10
#initialLeaseTableWriteCapacity = 10
# If set to true, the Worker will not sync shards and leases during initialization if there are one or more leases
# in the lease table. This assumes that the shards and leases are in-sync.
# This enables customers to choose faster startup times (e.g. during incremental deployments of an application).
#
# skipShardSyncAtStartupIfLeasesExist Should Worker skip syncing shards and leases at startup (Worker initialization).
# Default: false
#skipShardSyncAtStartupIfLeasesExist=
# Override the default user agent (application name)
#
# Default: <applicationName>
#userAgent =
# TableName name of the lease table in DynamoDB
# Default = <applicationName>
#tableName =
# A timeout when dispatching records to the client MultiLang record processor.
# If the record processor doesn't respond within the timeout the parent Java process will be terminated.
# This is a temporary fix to handle cases where the KCL becomes blocked while waiting for a client record processor.
# Setting this can cause the KCL to exit suddenly,
# before using this ensure that you have an automated restart for your application
#
# Default: no timeout
#timeoutInSeconds =
# The amount of milliseconds to wait before graceful shutdown forcefully terminates the process.
# Default: 5000
#shutdownGraceMillis = 5000
# Internally support timeouts and retries for GetRecords calls.
#
# If retryGetRecordsInSeconds is set
# And maxGetRecordsThreadPool is set
# Then getRecords will asynchronously retry internally using a CompletionService of
# max size "maxGetRecordsThreadPool" with "retryGetRecordsInSeconds" between each retry.
#
# NOTE: this enables the AsynchronousGetRecordsRetrievalStrategy for getRecords
#
# Time in seconds to wait before the worker retries to get a record
# Default: Optional value, default not set
#retryGetRecordsInSeconds = 1
#
# max number of threads in the getRecords thread pool (per shard)
# Default: Optional value, default not set
#maxGetRecordsThreadPool = 2
#######################
# Pre-fetching config #
#######################
# Pre-fetching will retrieve and queue additional records from Kinesis while the
# application is processing existing records.
# Pre-fetching can be enabled by setting dataFetchingStrategy to PREFETCH_CACHED. Once
# enabled an additional fetching thread will be started to retrieve records from Kinesis.
# Retrieved records will be held in a queue until the application is ready to process them.
# Which data fetching strategy to use (DEFAULT, PREFETCH_CACHED)
# Default: DEFAULT
dataFetchingStrategy = PREFETCH_CACHED
#
# Pre-fetching supports the following configuration values:
#
# The maximum number of process records input that can be queued
# Default: 3
#maxPendingProcessRecordsInput = 3
# The maximum number of bytes that can be queued
# Default 8388608 (8 * 1024 * 1024 / 8Mb)
#maxCacheByteSize = 8388608
# The maximum number of records that can be queued
# Default: 30000
#maxRecordsCount = 30000
# The amount of time to wait between calls to Kinesis
# Default: 1500
#idleMillisBetweenCalls = 1500
##############################
# End of Pre-fetching config #
##############################
# Milliseconds after which the logger will log a warning message for the long running task
# Default: not set
#logWarningForTaskAfterMillis = 100
# True if we should ignore child shards which have open parents
# Default: not set
#ignoreUnexpectedChildShards = false
# The sleep time between two listShards calls from the proxy when throttled.
# Default: 1500
#listShardsBackoffTimeInMillis = 1500
# The number of times the Proxy will retry listShards call when throttled.
# Default: 50
#maxListShardsRetryAttempts = 50
}
}
}
You can’t perform that action at this time.