Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: retry interceptor now closes previous response #332

Merged
merged 2 commits into from
Mar 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions aws-android-sdk-appsync/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ dependencies {
testImplementation 'org.mockito:mockito-core:3.2.4'
testImplementation "com.amazonaws:aws-android-sdk-cognitoidentityprovider:$aws_version"
testImplementation project(':aws-android-sdk-appsync-runtime')
testImplementation("com.squareup.okhttp3:mockwebserver:4.3.1")
implementation ("com.amazonaws:aws-android-sdk-mobile-client:$aws_version@aar") { transitive = true }
implementation ("com.amazonaws:aws-android-sdk-auth-userpools:$aws_version@aar") { transitive = true }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public Response intercept(Chain chain) throws IOException {
if (retryAfterHeaderValue != null) {
try {
waitMillis = Integer.parseInt(retryAfterHeaderValue) * 1000;
response.close();
continue;
} catch (NumberFormatException e) {
Log.w(TAG, "Could not parse Retry-After header: " + retryAfterHeaderValue);
Expand All @@ -69,6 +70,7 @@ public Response intercept(Chain chain) throws IOException {
if ((response.code() >= 500 && response.code() < 600)
|| response.code() == 429 ) {
waitMillis = calculateBackoff(retryCount);
response.close();
continue;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/**
* Copyright 2021 Amazon.com,,
* Inc. or its affiliates. All Rights Reserved.
* <p>
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazonaws.mobileconnectors.appsync.retry;

import android.support.annotation.NonNull;

import com.amazonaws.mobileconnectors.appsync.util.Await;

import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.robolectric.RobolectricTestRunner;
import org.robolectric.annotation.Config;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;

import static org.junit.Assert.assertTrue;

/**
* Tests for retry interceptor.
*/
@RunWith(RobolectricTestRunner.class)
@Config(manifest = "AndroidManifest.xml")
public class RetryInterceptorTest {
public static final int TEST_TIMEOUT = 10;
private MockWebServer mockWebServer;
private OkHttpClient okHttpClient;

@Before
public void beforeEachTest() throws IOException {
mockWebServer = new MockWebServer();
mockWebServer.start(8888);

okHttpClient = new OkHttpClient.Builder()
.addInterceptor(new RetryInterceptor())
.build();
}

@After
public void afterEachTest() throws IOException {
mockWebServer.shutdown();
}

/**
* Verify that everything works when the first attempt succeeds.
* @throws IOException Not expected
* @throws InterruptedException Not expected
*/
@Test
public void successfulRequestWithoutFailuresTest() throws Throwable {
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody("{\"result\":\"all good\""));

final Request request = new Request.Builder()
.url("http://localhost:8888")
.method("POST", RequestBody.create("{}", MediaType.get("application/json")))
.build();

Response response = Await.result(new Await.ResultErrorEmitter<Response, Throwable>() {
@Override
public void emitTo(@NonNull Consumer<Response> onResult, @NonNull Consumer<Throwable> onError) {
okHttpClient.newCall(request).enqueue(new OkHttpCallback(onResult, onError));
}
});
assertTrue(response.body().string().contains("all good"));
}

/**
* Verify that retries happen successfully without leaving the previous response open.
* This test was created as a result of a Github issue
* https://github.com/awslabs/aws-mobile-appsync-sdk-android/issues/305.
* @throws IOException Not expected
* @throws InterruptedException Not expected
*/
@Test
public void successfulRequestWithFailuresTest() throws Throwable {
mockWebServer.enqueue(new MockResponse().setResponseCode(500).setBody("{\"error\":\"some exception\""));
mockWebServer.enqueue(new MockResponse().setResponseCode(501).setBody("{\"error\":\"another exception\"").setHeader("Retry-After", "1"));
mockWebServer.enqueue(new MockResponse().setResponseCode(500).setBody("{\"error\":\"some exception\""));
mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody("{\"result\":\"all good\""));

final Request request = new Request.Builder()
.url("http://localhost:8888")
.method("POST", RequestBody.create("{}", MediaType.get("application/json")))
.build();

mockWebServer.enqueue(new MockResponse().setResponseCode(200).setBody("{\"result\":\"all good\""));

Response response = Await.result(new Await.ResultErrorEmitter<Response, Throwable>() {
@Override
public void emitTo(@NonNull Consumer<Response> onResult, @NonNull Consumer<Throwable> onError) {
okHttpClient.newCall(request).enqueue(new OkHttpCallback(onResult, onError));
}
});
assertTrue(response.body().string().contains("all good"));
}

/**
* Wrapper class that takes the two Consumer callbacks from the Await.result function
* and uses them to emit result or error.
*/
static final class OkHttpCallback implements Callback {
private final AtomicBoolean success = new AtomicBoolean(false);
private final Consumer<Response> onResponse;
private final Consumer<Throwable> onError;

public OkHttpCallback(Consumer<Response> onResponse, Consumer<Throwable> onError) {
this.onResponse = onResponse;
this.onError = onError;
}

@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
onError.accept(e);
}

@Override
public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException {
onResponse.accept(response);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Copyright 2021 Amazon.com,
* Inc. or its affiliates. All Rights Reserved.
* <p>
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazonaws.mobileconnectors.appsync.util;

import android.support.annotation.NonNull;

import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

/**
* A utility to await a value from an async function, in a synchronous way.
*/
@SuppressWarnings({"WeakerAccess", "SameParameterValue", "unused", "UnusedReturnValue"})
public final class Await {
private static final long DEFAULT_WAIT_TIME_MS = TimeUnit.SECONDS.toMillis(10);

private Await() {}

/**
* Await a latch to count down.
* @param latch Latch for which count down is awaited
* @param waitTimeMs Time in milliseconds to wait for count down before timing out with exception
* @throws RuntimeException If the latch doesn't count down in the specified amount of time
*/
public static void latch(CountDownLatch latch, long waitTimeMs) {
try {
latch.await(waitTimeMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException interruptedException) {
throw new RuntimeException(interruptedException);
}
if (latch.getCount() != 0) {
throw new RuntimeException("Latch did not count down.");
}
}

/**
* Await a latch to count down, for a "reasonable" amount of time.
* Note: we choose what "reasonable" means. If you want to choose, use
* {@link #latch(CountDownLatch, long)}, instead.
* @param latch A count down latch for which countdown is awaited
* @throws RuntimeException If the latch doesn't count down in a reasonable amount of time
*/
public static void latch(@NonNull CountDownLatch latch) {
latch(latch, DEFAULT_WAIT_TIME_MS);
}

/**
* Awaits emission of either a result of an error.
* Blocks the thread of execution until either the value is
* available, of a default timeout has elapsed.
* @param resultErrorEmitter A function which emits result or error
* @param <R> Type of result
* @param <E> type of error
* @return The result
* @throws E if error is emitted
* @throws RuntimeException In all other situations where there is not a non-null result
*/
@NonNull
public static <R, E extends Throwable> R result(
@NonNull ResultErrorEmitter<R, E> resultErrorEmitter) throws E {
return result(DEFAULT_WAIT_TIME_MS, resultErrorEmitter);
}

/**
* Await emission of either a result or an error.
* Blocks the thread of execution until either the value is available,
* or the timeout is reached.
* @param timeMs Amount of time to wait
* @param resultErrorEmitter A function which emits result or error
* @param <R> Type of result
* @param <E> Type of error
* @return The result
* @throws E if error is emitted
* @throws RuntimeException In all other situations where there is not a non-null result
*/
@NonNull
public static <R, E extends Throwable> R result(
long timeMs, @NonNull ResultErrorEmitter<R, E> resultErrorEmitter) throws E {

Objects.requireNonNull(resultErrorEmitter);

AtomicReference<R> resultContainer = new AtomicReference<>();
AtomicReference<E> errorContainer = new AtomicReference<>();

await(timeMs, resultErrorEmitter, resultContainer, errorContainer);

R result = resultContainer.get();
E error = errorContainer.get();
if (error != null) {
throw error;
} else if (result != null) {
return result;
}

throw new IllegalStateException("Latch counted down, but where's the value?");
}

/**
* Awaits receipt of an error or a callback.
* Blocks the thread of execution until it arrives, or until the wait times out.
* @param resultErrorEmitter An emitter of result of error
* @param <R> Type of result
* @param <E> Type of error
* @return The error that was emitted by the emitter
* @throws RuntimeException If no error was emitted by emitter
*/
@NonNull
public static <R, E extends Throwable> E error(@NonNull ResultErrorEmitter<R, E> resultErrorEmitter) {
return error(DEFAULT_WAIT_TIME_MS, resultErrorEmitter);
}

/**
* Awaits receipt of an error on an error callback.
* Blocks the calling thread until it shows up, or until timeout elapses.
* @param timeMs Amount of time to wait
* @param resultErrorEmitter A function which emits result or error
* @param <R> Type of result
* @param <E> Type of error
* @return Error, if attained
* @throws RuntimeException If no error is emitted by the emitter
*/
@NonNull
public static <R, E extends Throwable> E error(
long timeMs, @NonNull ResultErrorEmitter<R, E> resultErrorEmitter) {

Objects.requireNonNull(resultErrorEmitter);

AtomicReference<R> resultContainer = new AtomicReference<>();
AtomicReference<E> errorContainer = new AtomicReference<>();

await(timeMs, resultErrorEmitter, resultContainer, errorContainer);

R result = resultContainer.get();
E error = errorContainer.get();
if (result != null) {
throw new RuntimeException("Expected error, but had result = " + result);
} else if (error != null) {
return error;
}

throw new RuntimeException("Neither error nor result consumers accepted a value.");
}

private static <R, E extends Throwable> void await(
long timeMs,
@NonNull final ResultErrorEmitter<R, E> resultErrorEmitter,
@NonNull final AtomicReference<R> resultContainer,
@NonNull final AtomicReference<E> errorContainer) {

final CountDownLatch latch = new CountDownLatch(1);
resultErrorEmitter.emitTo(
new Consumer<R>() {
@Override
public void accept(R result) {
resultContainer.set(result);
latch.countDown();
}
}, new Consumer<E>() {
@Override
public void accept(E error) {
errorContainer.set(error);
latch.countDown();
}
}
);

try {
latch.await(timeMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException interruptedException) {
// Will check the latch count regardless, and branch appropriately.
}
if (latch.getCount() != 0) {
throw new RuntimeException(
"Neither result nor error consumers accepted a value within " + timeMs + "ms."
);
}
}

/**
* A function which, upon completion, either emits a single result,
* or emits an error.
* @param <R> Type of result
* @param <E> Type of error
*/
public interface ResultErrorEmitter<R, E extends Throwable> {
/**
* A function that emits a value upon completion, either as a
* result or as an error.
* @param onResult Callback invoked upon emission of result
* @param onError Callback invoked upon emission of error
*/
void emitTo(@NonNull Consumer<R> onResult, @NonNull Consumer<E> onError);
}
}