Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30901][DOCS] Fix doc exemple with deprecated codes #27652

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 12 additions & 12 deletions docs/streaming-kinesis-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.streaming.kinesis.KinesisInitialPositions

val kinesisStream = KinesisInputDStream.builder
.streamingContext(streamingContext)
.endpointUrl([endpoint URL])
.regionName([region name])
.streamName([streamName])
.initialPositionInStream([initial position])
.initialPosition([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
Expand All @@ -68,14 +68,14 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContext;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import org.apache.spark.streaming.kinesis.KinesisInitialPositions;

KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
.streamingContext(streamingContext)
.endpointUrl([endpoint URL])
.regionName([region name])
.streamName([streamName])
.initialPositionInStream([initial position])
.initialPosition([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
Expand Down Expand Up @@ -110,7 +110,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.streaming.kinesis.KinesisInitialPositions
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel

Expand All @@ -119,7 +119,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
.endpointUrl([endpoint URL])
.regionName([region name])
.streamName([streamName])
.initialPositionInStream([initial position])
.initialPosition([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
Expand All @@ -133,7 +133,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
import org.apache.spark.streaming.kinesis.KinesisInputDStream;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContext;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import scala.collection.JavaConverters;
Expand All @@ -143,7 +143,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m
.endpointUrl([endpoint URL])
.regionName([region name])
.streamName([streamName])
.initialPositionInStream([initial position])
.initialPosition([initial position])
.checkpointAppName([Kinesis app name])
.checkpointInterval([checkpoint interval])
.storageLevel(StorageLevel.MEMORY_AND_DISK_2)
Expand All @@ -170,7 +170,7 @@ A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or m

- `[checkpoint interval]`: The interval (e.g., Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream. For starters, set it to the same as the batch interval of the streaming application.

- `[initial position]`: Can be either `InitialPositionInStream.TRIM_HORIZON` or `InitialPositionInStream.LATEST` (see [`Kinesis Checkpointing`](#kinesis-checkpointing) section and [`Amazon Kinesis API documentation`](http://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-sdk.html) for more details).
- `[initial position]`: Can be either `KinesisInitialPositions.TrimHorizon` or `KinesisInitialPositions.Latest` or `KinesisInitialPositions.AtTimestamp` (see [`Kinesis Checkpointing`](#kinesis-checkpointing) section and [`Amazon Kinesis API documentation`](http://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-sdk.html) for more details).

- `[message handler]`: A function that takes a Kinesis `Record` and outputs generic `T`.

Expand Down Expand Up @@ -272,9 +272,9 @@ de-aggregate records during consumption.

- Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling. The provided example handles this throttling with a random-backoff-retry strategy.

- If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (`InitialPositionInStream.TRIM_HORIZON`) or from the latest tip (`InitialPositionInStream.LATEST`). This is configurable.
- `InitialPositionInStream.LATEST` could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored).
- `InitialPositionInStream.TRIM_HORIZON` may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency.
- If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (`KinesisInitialPositions.TrimHorizon`), or from the latest tip (`KinesisInitialPositions.Latest`), or (except Python) from the position denoted by the provided UTC timestamp (`KinesisInitialPositions.AtTimestamp(Date timestamp)`). This is configurable.
- `KinesisInitialPositions.Latest` could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored).
- `KinesisInitialPositions.TrimHorizon` may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency.

#### Kinesis retry configuration
- `spark.streaming.kinesis.retry.waitTime` : Wait time between Kinesis retries as a duration string. When reading from Amazon Kinesis, users may hit `ProvisionedThroughputExceededException`'s, when consuming faster than 5 transactions/second or, exceeding the maximum read rate of 2 MiB/second. This configuration can be tweaked to increase the sleep between fetches when a fetch fails to reduce these exceptions. Default is "100ms".
Expand Down