Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add design for automatic event streaming reconnects. #1236

Merged
merged 3 commits into from
May 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions docs/design/core/event-streaming/reconnect/CurrentState.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package software.amazon.awssdk.services.transcribestreaming;

import com.github.davidmoten.rx2.Bytes;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.junit.Test;
import org.reactivestreams.Publisher;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.transcribestreaming.model.AudioEvent;
import software.amazon.awssdk.services.transcribestreaming.model.AudioStream;
import software.amazon.awssdk.services.transcribestreaming.model.LanguageCode;
import software.amazon.awssdk.services.transcribestreaming.model.MediaEncoding;
import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionRequest;
import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionResponseHandler;
import software.amazon.awssdk.services.transcribestreaming.model.TranscriptEvent;
import software.amazon.awssdk.services.transcribestreaming.model.TranscriptResultStream;

public class CurrentState {
private File audioFile = new File(getClass().getClassLoader().getResource("silence_16kHz_s16le.wav").getFile());

@Test
public void demoCurrentState() throws FileNotFoundException {
try (TranscribeStreamingAsyncClient client = TranscribeStreamingAsyncClient.create()) {
// Create the audio stream for transcription - we have to create a publisher that resumes where it left off.
// If we don't, we'll replay the whole thing again on a reconnect.
Publisher<AudioStream> audioStream =
Bytes.from(new FileInputStream(audioFile))
.map(SdkBytes::fromByteArray)
.map(bytes -> AudioEvent.builder().audioChunk(bytes).build())
.cast(AudioStream.class);

CompletableFuture<Void> result = printAudio(client, audioStream, null, 3);
result.join();
}
}

private CompletableFuture<Void> printAudio(TranscribeStreamingAsyncClient client,
Publisher<AudioStream> audioStream,
String sessionId,
int resumesRemaining) {
if (resumesRemaining == 0) {
CompletableFuture<Void> result = new CompletableFuture<>();
result.completeExceptionally(new IllegalStateException("Failed to resume audio, because the maximum resumes " +
"have been exceeded."));
return result;
}

// Create the request for transcribe that includes the audio metadata
StartStreamTranscriptionRequest audioMetadata =
StartStreamTranscriptionRequest.builder()
.languageCode(LanguageCode.EN_US)
.mediaEncoding(MediaEncoding.PCM)
.mediaSampleRateHertz(16_000)
.sessionId(sessionId)
.build();

// Create the transcription handler
AtomicReference<String> atomicSessionId = new AtomicReference<>(sessionId);
Consumer<TranscriptResultStream> reader = event -> {
if (event instanceof TranscriptEvent) {
TranscriptEvent transcriptEvent = (TranscriptEvent) event;
System.out.println(transcriptEvent.transcript().results());
}
};

StartStreamTranscriptionResponseHandler responseHandler =
StartStreamTranscriptionResponseHandler.builder()
.onResponse(r -> atomicSessionId.set(r.sessionId()))
.subscriber(reader)
.build();

// Start talking with transcribe
return client.startStreamTranscription(audioMetadata, audioStream, responseHandler)
.handle((x, error) -> resumePrintAudio(client, audioStream, atomicSessionId.get(), resumesRemaining, error))
.thenCompose(flatten -> flatten);
}

private CompletableFuture<Void> resumePrintAudio(TranscribeStreamingAsyncClient client,
Publisher<AudioStream> audioStream,
String sessionId,
int resumesRemaining,
Throwable error) {
if (error == null) {
return CompletableFuture.completedFuture(null);
}

System.out.print("Error happened. Reconnecting and trying again...");
error.printStackTrace();

if (sessionId == null) {
CompletableFuture<Void> result = new CompletableFuture<>();
result.completeExceptionally(error);
return result;
}

// If we failed, recursively call printAudio
return printAudio(client, audioStream, sessionId, resumesRemaining - 1);
}
}
81 changes: 81 additions & 0 deletions docs/design/core/event-streaming/reconnect/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Event Stream Reconnects

Event streaming allows long-running bi-directional communication between
customers and AWS services over HTTP/2 connections.

Because a single request is intended to be long-running, services
usually provide a way for a client to "resume" an interrupted session on
a new TCP connection. In Kinesis's subscribe-to-shard API, each response
event includes a `continuationSequenceNumber` that can be specified in a
request message to pick up from where the disconnect occurred. In
Transcribe's streaming-transcription API, each response includes a
`sessionId` with similar semantics.

The current implementation requires the service to write a high-level
library for handling this logic (e.g. Kinesis's consumer library), or
for each customer to write this logic by hand.
[This hand-written code](CurrentState.java) is verbose and error prone.

This mini-design outlines API options for the SDK automatically
reconnecting when a network error occurs.

## [API Option 1: New Method](prototype/Option1.java)

This option adds a new method to each streaming operation that the
customer can use to enable automatic reconnects. The customer selects to
work with or without reconnects based on the method that they use.

```Java
try (TranscribeStreamingAsyncClient client = TranscribeStreamingAsyncClient.create()) {
// ...
// Current method (behavior is unchanged)
client.startStreamTranscription(audioMetadata,
audioStream,
responseHandler);

// New method that transparently reconnects on network errors (name to be bikeshed)
client.startStreamTranscriptionWithReconnects(audioMetadata,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not that it matters too much as name is TBD but the name you use here is different than in the Option 1 code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

audioStream,
responseHandler);
// ...
}
```

## [API Option 2: New Client Configuration](prototype/Option2.java)

This option adds a new setting on the client that the customer can use
to *disable* automatic reconnects. The customer gets automatic
reconnects by default, and would need to explicitly disable them if they
do not want them for their use-case.

```Java
// Current method is updated to transparently reconnect on network errors
try (TranscribeStreamingAsyncClient client =
TranscribeStreamingAsyncClient.create()) {
// ...
client.startStreamTranscription(audioMetadata,
audioStream,
responseHandler);
// ...
}

// New client configuration option can be used to configure reconnect behavior
try (TranscribeStreamingAsyncClient client =
TranscribeStreamingAsyncClient.builder()
.overrideConfiguration(c -> c.reconnectPolicy(ReconnectPolicy.none()))
.build()) {
// ...
client.startStreamTranscription(audioMetadata,
audioStream,
responseHandler);
// ...
}
```

## Comparison

| | Option 1 | Option 2 |
| --- | --- | --- |
| Discoverability | - | + |
| Configurability | - | + |
| Backwards Compatibility | + | - |
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.transcribestreaming;

import com.github.davidmoten.rx2.Bytes;
import io.reactivex.Flowable;
import java.io.File;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.junit.Test;
import org.reactivestreams.Publisher;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.transcribestreaming.model.AudioEvent;
import software.amazon.awssdk.services.transcribestreaming.model.AudioStream;
import software.amazon.awssdk.services.transcribestreaming.model.LanguageCode;
import software.amazon.awssdk.services.transcribestreaming.model.MediaEncoding;
import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionRequest;
import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionResponseHandler;
import software.amazon.awssdk.services.transcribestreaming.model.TranscriptEvent;
import software.amazon.awssdk.services.transcribestreaming.model.TranscriptResultStream;

/**
* Option 1: Add a new method to hide: (1) the need for non-replayable publishers, (2) the reconnect boilerplate.
*/
public class Option1 {
private File audioFile = new File(getClass().getClassLoader().getResource("silence_16kHz_s16le.wav").getFile());

@Test
public void option1() {
try (TranscribeStreamingAsyncClient client = TranscribeStreamingAsyncClient.create()) {
// Create the request for transcribe that includes the audio metadata
StartStreamTranscriptionRequest audioMetadata =
StartStreamTranscriptionRequest.builder()
.languageCode(LanguageCode.EN_US)
.mediaEncoding(MediaEncoding.PCM)
.mediaSampleRateHertz(16_000)
.build();

// Create the audio stream for transcription
Publisher<AudioStream> audioStream =
Bytes.from(audioFile)
.map(SdkBytes::fromByteArray)
.map(bytes -> AudioEvent.builder().audioChunk(bytes).build())
.cast(AudioStream.class);

// Create the visitor that handles the transcriptions from transcribe
Consumer<TranscriptResultStream> reader = event -> {
if (event instanceof TranscriptEvent) {
TranscriptEvent transcriptEvent = (TranscriptEvent) event;
System.out.println(transcriptEvent.transcript().results());
}
};

StartStreamTranscriptionResponseHandler responseHandler = StartStreamTranscriptionResponseHandler.builder()
.subscriber(reader)
.build();

// Start talking with transcribe using a new auto-reconnect method (method name to be bikeshed)
CompletableFuture<Void> result = client.startStreamTranscriptionWithAutoReconnect(audioMetadata,
audioStream,
responseHandler);
result.join();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.transcribestreaming;

import com.github.davidmoten.rx2.Bytes;
import io.reactivex.Flowable;
import java.io.File;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.junit.Test;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.transcribestreaming.model.AudioEvent;
import software.amazon.awssdk.services.transcribestreaming.model.AudioStream;
import software.amazon.awssdk.services.transcribestreaming.model.LanguageCode;
import software.amazon.awssdk.services.transcribestreaming.model.MediaEncoding;
import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionRequest;
import software.amazon.awssdk.services.transcribestreaming.model.StartStreamTranscriptionResponseHandler;
import software.amazon.awssdk.services.transcribestreaming.model.TranscriptEvent;
import software.amazon.awssdk.services.transcribestreaming.model.TranscriptResultStream;

/**
* Option 2: Update current method to automatically reconnect and hide: (1) the need for non-replayable publishers,
* (2) the reconnect boilerplate.
*
* This behavior can be configured (e.g. disabled) via the "reconnect policy" on the client constructor.
*/
public class Option2Test {
private File audioFile = new File(getClass().getClassLoader().getResource("silence_16kHz_s16le.wav").getFile());

@Test
public void option2() {
try (TranscribeStreamingAsyncClient client = TranscribeStreamingAsyncClient.create()) {
// Create the request for transcribe that includes the audio metadata
StartStreamTranscriptionRequest audioMetadata =
StartStreamTranscriptionRequest.builder()
.languageCode(LanguageCode.EN_US)
.mediaEncoding(MediaEncoding.PCM)
.mediaSampleRateHertz(16_000)
.build();

// Create the audio stream for transcription
Flowable<AudioStream> audioStream =
Bytes.from(audioFile)
.map(SdkBytes::fromByteArray)
.map(bytes -> AudioEvent.builder().audioChunk(bytes).build())
.cast(AudioStream.class);

// Create the visitor that handles the transcriptions from transcribe
Consumer<TranscriptResultStream> reader = event -> {
if (event instanceof TranscriptEvent) {
TranscriptEvent transcriptEvent = (TranscriptEvent) event;
System.out.println(transcriptEvent.transcript().results());
}
};

StartStreamTranscriptionResponseHandler responseHandler = StartStreamTranscriptionResponseHandler.builder()
.subscriber(reader)
.build();

// Start talking with transcribe using the existing method
CompletableFuture<Void> result = client.startStreamTranscription(audioMetadata, audioStream, responseHandler);
result.join();
}
}

@Test
public void disableReconnectsDemo() {
// Turn off reconnects
try (TranscribeStreamingAsyncClient client =
TranscribeStreamingAsyncClient.builder()
.overrideConfiguration(c -> c.reconnectPolicy(ReconnectPolicy.none()))
.build()) {
// ....
}
}
}