Skip to content

Commit

Permalink
[FLINK-12137][docs] Use AWSConfigConstants instead of ConsumerConfigC…
Browse files Browse the repository at this point in the history
…onstants in examples.

This closes #8128.
  • Loading branch information
yjk891 authored and fhueske committed Apr 16, 2019
1 parent a1b574c commit 7c8e980
Showing 1 changed file with 9 additions and 8 deletions.
17 changes: 9 additions & 8 deletions docs/dev/connectors/kinesis.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ Before consuming data from Kinesis streams, make sure that all streams are creat
<div data-lang="java" markdown="1">
{% highlight java %}
Properties consumerConfig = new Properties();
consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1");
consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

StreamExecutionEnvironment env = StreamExecutionEnvironment.getEnvironment();
Expand All @@ -96,9 +96,9 @@ DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
<div data-lang="scala" markdown="1">
{% highlight scala %}
val consumerConfig = new Properties()
consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
consumerConfig.put(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")

val env = StreamExecutionEnvironment.getEnvironment
Expand All @@ -110,10 +110,11 @@ val kinesis = env.addSource(new FlinkKinesisConsumer[String](
</div>

The above is a simple example of using the consumer. Configuration for the consumer is supplied with a `java.util.Properties`
instance, the configuration keys for which can be found in `ConsumerConfigConstants`. The example
instance, the configuration keys for which can be found in `AWSConfigConstants` (AWS-specific parameters) and
`ConsumerConfigConstants` (Kinesis consumer parameters). The example
demonstrates consuming a single Kinesis stream in the AWS region "us-east-1". The AWS credentials are supplied using the basic method in which
the AWS access key ID and secret access key are directly supplied in the configuration (other options are setting
`ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER` to `ENV_VAR`, `SYS_PROP`, `PROFILE`, `ASSUME_ROLE`, and `AUTO`). Also, data is being consumed
`AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` to `ENV_VAR`, `SYS_PROP`, `PROFILE`, `ASSUME_ROLE`, and `AUTO`). Also, data is being consumed
from the newest position in the Kinesis stream (the other option will be setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION`
to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from the earliest record possible).

Expand Down

0 comments on commit 7c8e980

Please sign in to comment.