Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-7474] [ Streaming Connectors] AzureEventhubs-connector, support read from and write to Azure eventhubs #4535

Open
wants to merge 134 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
134 commits
Select commit Hold shift + click to select a range
fc868aa
Add Azure eventhubs connector
zhuganghuaonnet Jun 15, 2017
cc1df1e
[FLINK]Eventhub connector support timestamp and watermark
zhuganghuaonnet Jun 16, 2017
9ffdd8b
Add eventhub producer, and to examples for eventhub read and write
zhuganghuaonnet Jun 22, 2017
8966698
Fix retrive NM cores as -1 when NM core value is not set in yarn-site…
zhuganghuaonnet Jun 26, 2017
b7c1dfa
[FLINK-7055][blob] refactor getURL() to the more generic getFile()
Jun 21, 2017
0a19c45
[FLINK-7056][blob] add API to allow job-related BLOBs to be stored
Jun 21, 2017
4c0fa6e
[FLINK-7434][doc] scafolding with "sbt new"
sv3ndk Aug 12, 2017
3c84f8b
[FLINK-6180] [rpc] Remove TestingSerialRpcService
tillrohrmann Aug 10, 2017
98c06ad
[FLINK-7441] [table] Double quote string literals is not supported in…
wuchong Aug 14, 2017
e30eb13
[FLINK-7399] [checkstyle] Forbid imports from org.codehaus.jackson
zentol Aug 9, 2017
8dfb9d0
[FLINK-6982] [guava] Integrate flink-shaded-guava-18
zentol Aug 1, 2017
4a4db0a
[hotfix] [docs][metrics] Small typo fix in the scope section
AjayTripathy Aug 14, 2017
61ed131
[FLINK-7445] [GitHub] Remove FLINK-1234 reference from PR template
zentol Aug 14, 2017
9eeb5b2
[FLINK-7443] [metrics] MetricFetcher store and deserializer fields no…
zentol Aug 14, 2017
90c2185
Fix bug when user set offset to latest, it cannot work
zhuganghuaonnet Aug 15, 2017
5f97ac9
[FLINK-7415] [cassandra] Add example instructions for creating keyspace
yew1eb Aug 10, 2017
ff27e20
[hotfix][misc] Fix logging of local port in NetworkFailuresProxy
pnowojski Aug 9, 2017
be196f7
[hotfix][misc] More verbose logging in NetworkFailuresProxy
pnowojski Aug 10, 2017
3b0321a
[FLINK-7405] [metrics] Reduce excessive warning logging from DatadogH…
bowenli86 Aug 9, 2017
b71154a
[FLINK-7213] Introduce state management by OperatorID in TaskManager
StefanRRichter Jun 26, 2017
91a4b27
[FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)…
StefanRRichter Jul 25, 2017
d29bed3
[FLINK-7268] Add delaying executor in *EventTimeWindowCheckpointingIT…
aljoscha Jul 28, 2017
843f0cb
[FLINK-7362] [checkpoints] Savepoint property is lost after de/serial…
StefanRRichter Aug 7, 2017
6c6d900
[FLINK-7462] [docs] Add very obvious warning about outdated docs
uce Aug 16, 2017
d0a1506
[FLINK-7056][tests][hotfix] make sure the client and a created InputS…
Aug 17, 2017
9c80d40
[hotfix] increase Scala checkstyle maxParameters to 20
Jul 4, 2017
7b23624
[FLINK-7057][blob] move ref-counting from the LibraryCacheManager to …
Jun 27, 2017
76f1022
[FLINK-7347] Keep IDs for checkpoint in a set in MessageQueue Source
Aug 2, 2017
bbac4a6
[FLINK-6630] [FLINK-6631] Implement FLIP-6 Mesos cluster entrypoints …
Aug 16, 2017
40656c5
[FLINK-7077] [mesos] Implement task release to support dynamic scaling
Aug 18, 2017
bd70a00
[FLINK-7269] Refactor passing of dynamic properties
FangYongs Jul 28, 2017
5b8acb7
Fix bug that can not set offset to lastest
zhuganghuaonnet Aug 21, 2017
76d80f8
Merge branch 'eventhubs' of https://github.com/zhuganghuaonnet/flink …
zhuganghuaonnet Aug 21, 2017
39d94b5
[FLINK-7474] [ Streaming Connectors] AzureEventhubs-connector, suppor…
zhuganghuaonnet Jun 15, 2017
fe13d65
Merge branch 'eventhubs' of https://github.com/zhuganghuaonnet/flink …
zhuganghuaonnet Aug 21, 2017
0a0f6ed
[FLINK-7477] Use "hadoop classpath" to augment classpath when available
aljoscha Aug 18, 2017
a3143bc
[FLINK-7480] Set HADOOP_CONF_DIR to sane default if not set
aljoscha Aug 18, 2017
9077e51
[FLINK-7300] Also ignore AskTimeoutException in end-to-end test logs
aljoscha Aug 21, 2017
40cec17
[FLINK-5851] [streaming API] Rename AsyncCollector into ResultFuture
Jul 2, 2017
9995588
[FLINK-7123] [cep] Support timesOrMore in CEP
dianfu Aug 11, 2017
6ed5815
[FLINK-6244] [cep] Emit timeouted Patterns as Side Output
dawidwys Aug 11, 2017
93d0ae4
[FLINK-7337] [table] Refactor internal handling of time indicator att…
fhueske Aug 4, 2017
47944b1
[FLINK-7337] [table] Efficient handling of rowtime timestamps
twalthr Aug 12, 2017
3e706b1
[hotifx][streaming] Simplify state of TwoPhaseCommitSinkFunction
pnowojski Aug 14, 2017
b3ffd91
[hotfix] [cep] Spelling corrections
dawidwys Aug 24, 2017
d21d5d6
[FLINK-7147] [cep] Support greedy quantifier in CEP
dianfu Jul 11, 2017
87e5b8b
[hotfix][tests] Implement AutoCloseable in TestHarness
pnowojski Aug 14, 2017
3f4de57
[hotfix][streaming] Refactor TwoPhaseCommitSinkFunctionTest
pnowojski Aug 17, 2017
ca87bec
[FLINK-7460] [state backends] Close all ColumnFamilyHandles when rest…
StefanRRichter Aug 14, 2017
5456cf9
[FLINK-7505] Use lambdas in suppressed exception idiom
StefanRRichter Aug 24, 2017
6642768
[FLINK-7461] Remove Backwards compatibility with <= Flink 1.1
StefanRRichter Aug 24, 2017
04add8d
[hotfix] Remove old Kinesis snapshots for Flink 1.1
aljoscha Aug 25, 2017
c384e52
[FLINK-7429] [kinesis] Add IT tests for migration from 1.3
tzulitai Aug 18, 2017
c019787
[FLINK-7040] [rest] Add basics for REST communication
zentol Aug 16, 2017
bafddd7
[FLINK-7040] [rest] Introduce executor, shutdown timeouts and future …
tillrohrmann Aug 25, 2017
dcce0b7
[FLINK-7544] [REST] Make all path parameters mandatory
zentol Aug 28, 2017
a6905df
[FLINK-7543] [REST] Simplify handler access to path/query parameters
zentol Aug 28, 2017
257a5a5
[FLINK-7454] [docs] Uupdate 'Monitoring Current Event Time' section
bowenli86 Aug 16, 2017
ac72360
[FLINK-7498][streaming] Bind together state fields of TwoPhaseCommitS…
pnowojski Aug 21, 2017
959d54f
[FLINK-7497][streaming] Introduce user context in TwoPhaseCommitSinkF…
pnowojski Aug 21, 2017
9d9cdcb
[hotfix][streaming] Allow to override methods from TwoPhaseCommitSink…
pnowojski Aug 14, 2017
1e2a638
[hotfix][streaming] Fix logging in TwoPhaseCommitSinkFunction
pnowojski Aug 22, 2017
00fc641
[hotfix] [docs] Add section in docs about writing unit and integratio…
pnowojski Aug 1, 2017
5ce0ded
[hotfix] [docs] Fix typos and improve testing docs
twalthr Aug 29, 2017
88848e7
[hotfix] [docs] Remove duplicate docs page
twalthr Aug 29, 2017
1fc0b64
[FLINK-7206] [table] Add DataView to support direct state access in A…
Aug 23, 2017
68fdaa5
[FLINK-7245] [table] Add operators to hold back watermarks with stati…
xccui Aug 9, 2017
29e849b
[FLINK-7309] [table] Fix NullPointerException when selecting null fie…
yestinchen Aug 4, 2017
df7452d
[FLINK-7398] [table] Add Logging trait to prevent serialization of Lo…
Aug 23, 2017
b915757
FLINK-7366 Upgrade kinesis-producer-library in flink-connector-kinesi…
bowenli86 Aug 10, 2017
1b7f8bd
[FLINK-7559] [quickstart] fix Typo in flink-quickstart pom
yew1eb Aug 30, 2017
e753db8
[FLINK-7556] Allow Integer.MIN_VALUE for fetch size in JDBCInputFormat
nycholas Aug 29, 2017
e6fddbc
[FLINK-7558][table]Improve SQL ValidationException message.
sunjincheng121 Aug 30, 2017
ddf62b1
[FLINK-6787] Fix Job-/StoppableException extend FlinkException
FangYongs Aug 9, 2017
d8d061b
[hotfix][table] Fix bug of testAggregateFunctionOperandTypeCheck
sunjincheng121 Aug 31, 2017
391e39b
[FLINK-7299][AVRO] Write GenericRecord using AvroOutputFormat
soniclavier Jul 31, 2017
c872751
[FLINK-7422] Upgrade Kinesis Client Library (KCL) and AWS SDK in flin…
bowenli86 Aug 10, 2017
a008303
[FLINK-7169][CEP] Support AFTER MATCH SKIP function in CEP
yestinchen Jul 14, 2017
fb3bd1f
[FLINK-7457] Make Dispatcher highly available
tillrohrmann Aug 16, 2017
1fc4a60
[FLINK-7489] Remove startJobExecution and suspendExecution from JobMa…
tillrohrmann Aug 22, 2017
0f3de89
[FLINK-7501] Generalize RegisteredRpcConnection to support generic le…
tillrohrmann Aug 24, 2017
0cf7f76
[FLINK-7522] Add termination future to ClusterEntrypoint
tillrohrmann Aug 14, 2017
d7cea58
[FLINK-7519] Add HttpResponseStatus to RestClientException
tillrohrmann Aug 25, 2017
1804aa3
[FLINK-7078] [rpc] Introduce FencedRpcEndpoint
tillrohrmann Aug 23, 2017
ab1fbfd
[FLINK-7409] [web] Make WebRuntimeMonitor reactive
tillrohrmann Aug 10, 2017
a3df5a2
[FLINK-7444] [rpc] Make external calls non-blocking
tillrohrmann Aug 14, 2017
a157871
[FLINK-7500] Set JobMaster leader session id in main thread
tillrohrmann Aug 24, 2017
64e8de9
[FLINK-7526] [TaskExecutor] Filter out duplicate JobManager gained le…
tillrohrmann Aug 26, 2017
84c2a92
[FLINK-7507] [dispatcher] Fence Dispatcher
tillrohrmann Aug 24, 2017
e70de0e
[FLINK-7504] Fence the ResourceManager
tillrohrmann Aug 24, 2017
ff16606
[FLINK-7523] Add proper resource shutdown to ResourceManager/JobManag…
tillrohrmann Aug 26, 2017
ba03b78
[FLINK-7506] Fence JobMaster
tillrohrmann Aug 24, 2017
09344aa
[FLINK-6751] [docs] Add documentation for user-defined AggregateFunct…
shaoxuan-wang Aug 15, 2017
b7b0d40
[FLINK-7564] [table] Fix watermark semantics in rowtime unbounded OVE…
xccui Sep 1, 2017
0eef8e8
[FLINK-7572] [table] Improve TableSchema and FlinkTable validation ex…
sunjincheng121 Sep 5, 2017
4cf737c
[FLINK-7404] [table] Generate code for non-equi join conditions only.
fhueske Aug 9, 2017
7c11bd7
[FLINK-7227] [table] Fix push-down of disjunctive predicates with mor…
Aug 28, 2017
9fe8f21
[FLINK-7568] Change role of ProcessWindowFunction and WindowFunction …
aljoscha Sep 1, 2017
006572f
[FLINK-7568] Update ProcessFunction.Context in window documentation
aljoscha Sep 1, 2017
f35f2d6
[FLINK-7568] Add note about 'key' parameter to window doc
aljoscha Sep 1, 2017
7d9e3bf
[FLINK-7568] Add note about start/end timestamps in window doc
aljoscha Sep 1, 2017
6d2124e
[FLINK-7568] Add section about consecutive windows to window doc
aljoscha Sep 1, 2017
769ce2a
[FLINK-7576] [futures] Add FutureUtils.retryWithDelay
tillrohrmann Sep 4, 2017
8119baa
[FLINK-7430] Set StreamTask.isRunning to false after closing StreamOp…
tillrohrmann Sep 6, 2017
d0636c8
[hotfix][kafka][docs] Add warning regarding data losses when writing …
pnowojski Aug 31, 2017
93369e7
[FLINK-7407] [kafka] Adapt AbstractPartitionDiscoverer to handle non-…
tzulitai Aug 11, 2017
36412c6
[hotfix] [kafka] Remove unused shouldAssignToThisSubtask method in Ab…
tzulitai Aug 11, 2017
eaafb61
[FLINK-7440] [kinesis] Eagerly check serializability of deserializati…
tzulitai Aug 14, 2017
98737f9
[FLINK-7440] [kinesis] Eagerly check that provided schema and partiti…
tzulitai Aug 14, 2017
9ed5d9a
[FLINK-7367] [kinesis] Generalize configuration for FlinkKinesisProdu…
bowenli86 Aug 4, 2017
59eab45
[FLINK-7363] [kinesis] Clean up deprecation of ProducerConfigConstants
tzulitai Aug 15, 2017
fb06277
Fix merge error
zhuganghuaonnet Sep 12, 2017
530368c
fix merge error in pom.xml
zhuganghuaonnet Sep 14, 2017
1c626e3
Merge branch 'eventhubs' of https://github.com/zhuganghuaonnet/flink …
zhuganghuaonnet Sep 14, 2017
7219503
[FLINK-7474] [ Streaming Connectors] AzureEventhubs-connector, suppor…
zhuganghuaonnet Jun 15, 2017
5d96956
Add Azure eventhubs connector
zhuganghuaonnet Jun 15, 2017
c1c8446
[FLINK]Eventhub connector support timestamp and watermark
zhuganghuaonnet Jun 16, 2017
9102257
Add eventhub producer, and to examples for eventhub read and write
zhuganghuaonnet Jun 22, 2017
d00e14b
Fix retrive NM cores as -1 when NM core value is not set in yarn-site…
zhuganghuaonnet Jun 26, 2017
7751e79
Fix bug that can not set offset to lastest
zhuganghuaonnet Aug 21, 2017
43fa260
Fix merge error
zhuganghuaonnet Sep 12, 2017
ea9154a
remove <flink-fast-tests-pattern>never-match-me</flink-fast-tests-pat…
zhuganghuaonnet Sep 14, 2017
3c0f8e4
Merge branch 'eventhubs' of https://github.com/zhuganghuaonnet/flink …
zhuganghuaonnet Sep 14, 2017
9a899a5
Add ASF
zhuganghuaonnet Sep 14, 2017
ca8744d
Modify D:\Tools\jdk1.8.0_101 back to $JAVA_HOME, since it is not runn…
zhuganghuaonnet Sep 14, 2017
0a76d47
Modify %JAVA_HOME% to $JAVA_HOME, since it is not running on windows …
zhuganghuaonnet Sep 14, 2017
ecd8015
[FLINK-7474]Fix checkstyle error
zhuganghuaonnet Sep 14, 2017
7f5ffd0
Merge branch 'eventhubs' of https://github.com/zhuganghuaonnet/flink …
zhuganghuaonnet Sep 14, 2017
3d3b8fb
[FLINK-7474]Fix check build error, deprecated CheckpointedRestoring
zhuganghuaonnet Sep 15, 2017
5e42a75
[FLINK-7474]Fix check build error, deprecated CheckpointedRestoring
zhuganghuaonnet Sep 15, 2017
6dc4c80
[FLINK-7474]Fix bug, maxEventRate is not set right if user set it ove…
zhuganghuaonnet Sep 15, 2017
ca11bb6
[FLINK-7474]Fix checkstyle error
zhuganghuaonnet Sep 15, 2017
2e3657c
[FLINK-7474] Fix bug that flinkeventhubproducer will be in abnormal s…
zhuganghuaonnet Sep 25, 2017
683abaf
Change Preconditions reference from hadoop back to flink bits
zhuganghuaonnet Nov 6, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

- Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue.

- Name the pull request in the form "[FLINK-1234] [component] Title of the pull request", where *FLINK-1234* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
- Name the pull request in the form "[FLINK-XXXX] [component] Title of the pull request", where *FLINK-XXXX* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component.
Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

- Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
Expand Down
7 changes: 4 additions & 3 deletions docs/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ download_url: "http://flink.apache.org/downloads.html"
# please use a protocol relative URL here
baseurl: //ci.apache.org/projects/flink/flink-docs-release-1.4

# Flag whether this is the latest stable version or not. If not, a warning
# will be printed pointing to the docs of the latest stable version.
is_latest: true
# Flag whether this is a stable version or not. Used for the quickstart page.
is_stable: false

# Flag to indicate whether an outdated warning should be shown.
show_outdated_warning: false

previous_docs:
1.3: http://ci.apache.org/projects/flink/flink-docs-release-1.3
1.2: http://ci.apache.org/projects/flink/flink-docs-release-1.2
Expand Down
6 changes: 2 additions & 4 deletions docs/_layouts/base.html
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,10 @@
<![endif]-->
</head>
<body>
{% if site.is_stable %}
{% unless site.is_latest %}
<div style="position:fixed; bottom:0; left:0; z-index:99999; width:100%; text-align:center; padding:15px; border-top:1px dashed #CE4B65; background:#f6f0e3; font-weight:bold">
{% if site.show_outdated_warning %}
<div style="position:fixed; bottom:0; left:0; z-index:99999; width:100%; text-align:center; padding:15px; border-top:5px solid #ECCCD1; background:#F2DEDE; color:#AD433F; font-weight:bold">
This documentation is for an out-of-date version of Apache Flink. We recommend you use <a href="https://flink.apache.org/q/stable-docs.html">the latest stable version</a>.
</div>
{% endunless %}
{% endif %}

<!-- Main content. -->
Expand Down
5 changes: 5 additions & 0 deletions docs/_layouts/plain.html
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,10 @@
</ol>

<h1>{{ page.title }}{% if page.is_beta %} <span class="beta">Beta</span>{% endif %}</h1>
{% if site.show_outdated_warning %}
<div class="alert alert-danger" role="alert">
<strong>This documentation is for an out-of-date version of Apache Flink. We recommend you use <a href="https://flink.apache.org/q/stable-docs.html">the latest stable version</a>.</strong>
</div>
{% endif %}

{{ content }}
23 changes: 21 additions & 2 deletions docs/dev/connectors/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,14 @@ are other constructor variants that allow providing the following:

### Kafka Producers and Fault Tolerance

With Flink's checkpointing enabled, the Flink Kafka Producer can provide
at-least-once delivery guarantees.
#### Kafka 0.8

Before 0.9 Kafka did not provide any mechanisms to guarantee at-least-once or exactly-once semantics.

#### Kafka 0.9 and 0.10

With Flink's checkpointing enabled, the `FlinkKafkaProducer09` and `FlinkKafkaProducer010`
can provide at-least-once delivery guarantees.

Besides enabling Flink's checkpointing, you should also configure the setter
methods `setLogFailuresOnly(boolean)` and `setFlushOnCheckpoint(boolean)` appropriately,
Expand All @@ -499,6 +505,19 @@ we recommend setting the number of retries to a higher value.
**Note**: There is currently no transactional producer for Kafka, so Flink can not guarantee exactly-once delivery
into a Kafka topic.

<div class="alert alert-warning">
<strong>Attention:</strong> Depending on your Kafka configuration, even after Kafka acknowledges
writes you can still experience data loss. In particular keep in mind the following Kafka settings:
<ul>
<li><tt>acks</tt></li>
<li><tt>log.flush.interval.messages</tt></li>
<li><tt>log.flush.interval.ms</tt></li>
<li><tt>log.flush.*</tt></li>
</ul>
Default values for the above options can easily lead to data loss. Please refer to Kafka documentation
for more explanation.
</div>

## Using Kafka timestamps and Flink event time in Kafka 0.10

Since Apache Kafka 0.10+, Kafka's messages can carry [timestamps](https://cwiki.apache.org/confluence/display/KAFKA/KIP-32+-+Add+timestamps+to+Kafka+message), indicating
Expand Down
61 changes: 36 additions & 25 deletions docs/dev/connectors/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,23 +256,29 @@ consumer when calling this API can also be modified by using the other keys pref

## Kinesis Producer

The `FlinkKinesisProducer` is used for putting data from a Flink stream into a Kinesis stream. Note that the producer is not participating in
Flink's checkpointing and doesn't provide exactly-once processing guarantees.
Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details).
The `FlinkKinesisProducer` uses [Kinesis Producer Library (KPL)](http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) to put data from a Flink stream into a Kinesis stream.

Note that the producer is not participating in Flink's checkpointing and doesn't provide exactly-once processing guarantees. Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details).

In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics.

To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the AWS dashboard.

For the monitoring to work, the user accessing the stream needs access to the Cloud watch service.
For the monitoring to work, the user accessing the stream needs access to the CloudWatch service.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
// Required configs
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
// Optional configs
producerConfig.put("AggregationMaxCount", "4294967295");
producerConfig.put("CollectionMaxCount", "1000");
producerConfig.put("RecordTtl", "30000");
producerConfig.put("RequestTimeout", "6000");

FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig);
kinesis.setFailOnError(true);
Expand All @@ -286,9 +292,15 @@ simpleStringStream.addSink(kinesis);
<div data-lang="scala" markdown="1">
{% highlight scala %}
val producerConfig = new Properties();
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
// Required configs
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
// Optional KPL configs
producerConfig.put("AggregationMaxCount", "4294967295");
producerConfig.put("CollectionMaxCount", "1000");
producerConfig.put("RecordTtl", "30000");
producerConfig.put("RequestTimeout", "6000");

val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig);
kinesis.setFailOnError(true);
Expand All @@ -301,45 +313,44 @@ simpleStringStream.addSink(kinesis);
</div>
</div>

The above is a simple example of using the producer. Configuration for the producer with the mandatory configuration values is supplied with a `java.util.Properties`
instance as described above for the consumer. The example demonstrates producing a single Kinesis stream in the AWS region "us-east-1".
The above is a simple example of using the producer. To initialize `FlinkKinesisProducer`, users are required to pass in `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` via a `java.util.Properties` instance. Users can also pass in KPL's configurations as optional parameters to customize the KPL underlying `FlinkKinesisProducer`. The full list of KPL configs and explanations can be found [here](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties). The example demonstrates producing a single Kinesis stream in the AWS region "us-east-1".

If users don't specify any KPL configs and values, `FlinkKinesisProducer` will use default config values of KPL, except `RateLimit`. `RateLimit` limits the maximum allowed put rate for a shard, as a percentage of the backend limits. KPL's default value is 150 but it makes KPL throw `RateLimitExceededException` too frequently and breaks Flink sink as a result. Thus `FlinkKinesisProducer` overrides KPL's default value to 100.

Instead of a `SerializationSchema`, it also supports a `KinesisSerializationSchema`. The `KinesisSerializationSchema` allows to send the data to multiple streams. This is
done using the `KinesisSerializationSchema.getTargetStream(T element)` method. Returning `null` there will instruct the producer to write the element to the default stream.
Otherwise, the returned stream name is used.

Other optional configuration keys for the producer can be found in `ProducerConfigConstants`.


## Using Non-AWS Kinesis Endpoints for Testing

It is sometimes desirable to have Flink operate as a consumer or producer against a non-AWS Kinesis endpoint such as
[Kinesalite](https://github.com/mhart/kinesalite); this is especially useful when performing functional testing of a Flink
application. The AWS endpoint that would normally be inferred by the AWS region set in the Flink configuration must be overridden via a configuration property.

To override the AWS endpoint, taking the producer for example, set the `ProducerConfigConstants.AWS_ENDPOINT` property in the
Flink configuration, in addition to the `ProducerConfigConstants.AWS_REGION` required by Flink. Although the region is
To override the AWS endpoint, taking the producer for example, set the `AWSConfigConstants.AWS_ENDPOINT` property in the
Flink configuration, in addition to the `AWSConfigConstants.AWS_REGION` required by Flink. Although the region is
required, it will not be used to determine the AWS endpoint URL.

The following example shows how one might supply the `ProducerConfigConstants.AWS_ENDPOINT` configuration property:
The following example shows how one might supply the `AWSConfigConstants.AWS_ENDPOINT` configuration property:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(ProducerConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val producerConfig = new Properties();
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(ProducerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(ProducerConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
{% endhighlight %}
</div>
</div>