Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=9.4.5
version=10.0.0
SONATYPE_CONNECT_TIMEOUT_SECONDS=120
96 changes: 80 additions & 16 deletions src/main/java/com/configcat/ConfigCatClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.Proxy;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

Expand Down Expand Up @@ -40,11 +42,8 @@ private ConfigCatClient(String sdkKey, Options options) {
this.configCatHooks = options.configCatHooks;

if (this.overrideBehaviour != OverrideBehaviour.LOCAL_ONLY) {
ConfigFetcher fetcher = new ConfigFetcher(options.httpClient == null
? new OkHttpClient
.Builder()
.build()
: options.httpClient,
ConfigFetcher fetcher = new ConfigFetcher(
createHttpClient(options.httpOptions()),
this.logger,
sdkKey,
!options.isBaseURLCustom()
Expand All @@ -63,6 +62,22 @@ private ConfigCatClient(String sdkKey, Options options) {
this.defaultUser = options.defaultUser;
}

private static OkHttpClient createHttpClient(Options.HttpOptions httpOptions) {
OkHttpClient.Builder builder = new OkHttpClient.Builder();

if (httpOptions.getConnectTimeoutMillis() != null) {
builder.connectTimeout(httpOptions.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS);
}
if (httpOptions.getReadTimeoutMillis() != null) {
builder.readTimeout(httpOptions.getReadTimeoutMillis(), TimeUnit.MILLISECONDS);
}
if (httpOptions.getProxy() != null) {
builder.proxy(httpOptions.getProxy());
}

return builder.build();
}

@Override
public <T> T getValue(Class<T> classOfT, String key, T defaultValue) {
return this.getValue(classOfT, key, null, defaultValue);
Expand Down Expand Up @@ -696,7 +711,7 @@ private <T> EvaluationDetails<T> evaluate(Class<T> classOfT, Setting setting, St
* Options for configuring {@link ConfigCatClient} instance.
*/
public static class Options {
private OkHttpClient httpClient;
private final HttpOptions httpOptions = new HttpOptions();
private ConfigCache cache = new NullConfigCache();
private String baseUrl;
private PollingMode pollingMode = PollingModes.autoPoll();
Expand All @@ -709,16 +724,6 @@ public static class Options {
private final ConfigCatHooks configCatHooks = new ConfigCatHooks();
private LogFilterFunction logFilter;


/**
* Sets the underlying http client which will be used to fetch the latest configuration.
*
* @param httpClient the http client.
*/
public void httpClient(OkHttpClient httpClient) {
this.httpClient = httpClient;
}

/**
* Sets the internal cache implementation.
*
Expand Down Expand Up @@ -814,6 +819,14 @@ public ConfigCatHooks hooks() {
return configCatHooks;
}

/**
* HTTP related options for {@link ConfigCatClient}.
**/
public HttpOptions httpOptions() {
return this.httpOptions;
}


/**
* Set the client's log filter callback function. When logFilterFunction returns false, the ConfigCatLogger skips the log event.
*/
Expand All @@ -824,5 +837,56 @@ public void logFilter(LogFilterFunction logFilter) {
private boolean isBaseURLCustom() {
return this.baseUrl != null && !this.baseUrl.isEmpty();
}

/**
* HTTP configuration options for a {@link ConfigCatClient} instance.
*/
public static class HttpOptions {
private Integer connectTimeoutMillis;
private Integer readTimeoutMillis;
private Proxy proxy;

/**
* Sets HTTP connect timeout in milliseconds.
*
* @param connectTimeoutMillis the connect timeout in milliseconds.
*/
public HttpOptions connectTimeoutMillis(int connectTimeoutMillis) {
this.connectTimeoutMillis = connectTimeoutMillis;
return this;
}

/**
* Sets the HTTP read timeout in milliseconds.
*
* @param readTimeoutMillis the read timeout in milliseconds.
*/
public HttpOptions readTimeoutMillis(int readTimeoutMillis) {
this.readTimeoutMillis = readTimeoutMillis;
return this;
}

/**
* Sets the HTTP proxy.
*
* @param proxy the HTTP proxy.
*/
public HttpOptions proxy(Proxy proxy) {
this.proxy = proxy;
return this;
}

Integer getConnectTimeoutMillis() {
return connectTimeoutMillis;
}

Integer getReadTimeoutMillis() {
return readTimeoutMillis;
}

Proxy getProxy() {
return proxy;
}
}
}
}
80 changes: 62 additions & 18 deletions src/main/java/com/configcat/ConfigFetcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,18 @@
import java.util.concurrent.atomic.AtomicBoolean;

class ConfigFetcher implements Closeable {

private static final long RETRY_DELAY_MS = 50;

private static final long EVICT_ALL_THRESHOLD_NS = 30_000_000_000L; // 30 seconds in nanoseconds

private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final ConfigCatLogger logger;
private final OkHttpClient httpClient;
private final String mode;

private long lastEvictAllTimestamp = Long.MIN_VALUE;

private final String sdkKey;
private final boolean urlIsCustom;

Expand Down Expand Up @@ -45,7 +52,7 @@ public CompletableFuture<FetchResponse> fetchAsync(String eTag) {
}

private CompletableFuture<FetchResponse> executeFetchAsync(int executionCount, String eTag) {
return this.getResponseAsync(eTag).thenComposeAsync(fetchResponse -> {
return this.fetchWithRetryAsync(eTag).thenComposeAsync(fetchResponse -> {
if (!fetchResponse.isFetched()) {
return CompletableFuture.completedFuture(fetchResponse);
}
Expand Down Expand Up @@ -97,64 +104,101 @@ private CompletableFuture<FetchResponse> getResponseAsync(final String eTag) {
this.httpClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
int logEventId = 1103;
Object message = ConfigCatLogMessages.getFetchFailedDueToUnexpectedError(null);
if (!isClosed.get()) {
if (e instanceof SocketTimeoutException) {
logEventId = 1102;
message = ConfigCatLogMessages.getFetchFailedDueToRequestTimeout(httpClient.connectTimeoutMillis(), httpClient.readTimeoutMillis(), httpClient.writeTimeoutMillis(), null);
FetchResponse fetchResponse = null;
try{
int logEventId = 1103;
Object message = ConfigCatLogMessages.getFetchFailedDueToUnexpectedError(null);
if (!isClosed.get()) {
if (e instanceof SocketTimeoutException) {
logEventId = 1102;
message = ConfigCatLogMessages.getFetchFailedDueToRequestTimeout(httpClient.connectTimeoutMillis(), httpClient.readTimeoutMillis(), httpClient.writeTimeoutMillis(), null);
}
logger.error(logEventId, message, e);
}
fetchResponse = FetchResponse.failed(message, false, null, true);
} finally {
if(fetchResponse == null) {
FormattableLogMessage formattableLogMessage = ConfigCatLogMessages.getFetchFailedDueToUnexpectedError(null);
fetchResponse = FetchResponse.failed(formattableLogMessage,false, null, false);
}
logger.error(logEventId, message, e);
future.complete(fetchResponse);
}
future.complete(FetchResponse.failed(message, false, null));
}

@Override
public void onResponse(@NotNull Call call, @NotNull Response response) {
String cfRayId = null;
FetchResponse fetchResponse = null;
try (ResponseBody body = response.body()) {
cfRayId = response.header("CF-RAY");
if (response.code() == 200) {
String content = body != null ? body.string() : null;
String eTag = response.header("ETag");
Result<Config> result = deserializeConfig(content, cfRayId);
if (result.error() != null) {
future.complete(FetchResponse.failed(result.error(), false, cfRayId));
return;
fetchResponse = FetchResponse.failed(result.error(), false, cfRayId, false);
} else {
fetchResponse = FetchResponse.fetched(new Entry(result.value(), eTag, content, System.currentTimeMillis()), cfRayId);
logger.debug("Fetch was successful: new config fetched.");
}
logger.debug("Fetch was successful: new config fetched.");
future.complete(FetchResponse.fetched(new Entry(result.value(), eTag, content, System.currentTimeMillis()), cfRayId));
} else if (response.code() == 304) {
fetchResponse = FetchResponse.notModified(cfRayId);
if(cfRayId != null) {
logger.debug(String.format("Fetch was successful: config not modified. %s", ConfigCatLogMessages.getCFRayIdPostFix(cfRayId)));
} else {
logger.debug("Fetch was successful: config not modified.");
}
future.complete(FetchResponse.notModified(cfRayId));
} else if (response.code() == 403 || response.code() == 404) {
FormattableLogMessage message = ConfigCatLogMessages.getFetchFailedDueToInvalidSDKKey(cfRayId);
fetchResponse = FetchResponse.failed(message, true, cfRayId, false);
logger.error(1100, message);
future.complete(FetchResponse.failed(message, true, cfRayId));
} else {
FormattableLogMessage formattableLogMessage = ConfigCatLogMessages.getFetchFailedDueToUnexpectedHttpResponse(response.code(), response.message(), cfRayId);
fetchResponse = FetchResponse.failed(formattableLogMessage, false, cfRayId, true);
logger.error(1101, formattableLogMessage);
future.complete(FetchResponse.failed(formattableLogMessage, false, cfRayId));
}
} catch (SocketTimeoutException e) {
FormattableLogMessage formattableLogMessage = ConfigCatLogMessages.getFetchFailedDueToRequestTimeout(httpClient.connectTimeoutMillis(), httpClient.readTimeoutMillis(), httpClient.writeTimeoutMillis(), cfRayId);
fetchResponse = FetchResponse.failed(formattableLogMessage, false, cfRayId, true);
logger.error(1102, formattableLogMessage, e);
future.complete(FetchResponse.failed(formattableLogMessage, false, cfRayId));
} catch (Exception e) {
FormattableLogMessage formattableLogMessage = ConfigCatLogMessages.getFetchFailedDueToUnexpectedError(cfRayId);
fetchResponse = FetchResponse.failed(formattableLogMessage, false, cfRayId, true);
logger.error(1103, formattableLogMessage, e);
future.complete(FetchResponse.failed(formattableLogMessage, false, cfRayId));
} finally {
if(fetchResponse == null) {
FormattableLogMessage formattableLogMessage = ConfigCatLogMessages.getFetchFailedDueToUnexpectedError(cfRayId);
fetchResponse = FetchResponse.failed(formattableLogMessage,false, cfRayId, false);
}
future.complete(fetchResponse);
}
}
});

return future;
}

private CompletableFuture<FetchResponse> fetchWithRetryAsync(final String eTag) {
return this.getResponseAsync(eTag).thenComposeAsync(response -> {
if (response.shouldRetry()) {
try {
long now = System.nanoTime();
if (lastEvictAllTimestamp == Long.MIN_VALUE || (now - lastEvictAllTimestamp) >= EVICT_ALL_THRESHOLD_NS) {
this.httpClient.connectionPool().evictAll();
lastEvictAllTimestamp = now;
}
Thread.sleep(RETRY_DELAY_MS);
return this.getResponseAsync(eTag);
} catch (InterruptedException e) {
this.logger.error(0, "Thread interrupted.", e);
Thread.currentThread().interrupt();
return CompletableFuture.completedFuture(response);
}
}
return CompletableFuture.completedFuture(response);
});
}

@Override
public void close() throws IOException {
if (!this.isClosed.compareAndSet(false, true)) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/configcat/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ private Constants() { /* prevent from instantiation*/ }
static final long DISTANT_PAST = 0;
static final String CONFIG_JSON_NAME = "config_v6.json";
static final String SERIALIZATION_FORMAT_VERSION = "v2";
static final String VERSION = "9.4.5";
static final String VERSION = "10.0.0";

static final String SDK_KEY_PROXY_PREFIX = "configcat-proxy/";
static final String SDK_KEY_PREFIX = "configcat-sdk-1";
Expand Down
14 changes: 9 additions & 5 deletions src/main/java/com/configcat/FetchResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public enum Status {
private final Object error;
private final boolean fetchTimeUpdatable;
private final String cfRayId;
private final boolean shouldRetry;

public boolean isFetched() {
return this.status == Status.FETCHED;
Expand Down Expand Up @@ -39,23 +40,26 @@ public Object error() {

public String cfRayId() {return this.cfRayId;}

FetchResponse(Status status, Entry entry, Object error, boolean fetchTimeUpdatable, String cfRayId) {
public boolean shouldRetry() {return shouldRetry;}

FetchResponse(Status status, Entry entry, Object error, boolean fetchTimeUpdatable, String cfRayId, boolean shouldRetry) {
this.status = status;
this.entry = entry;
this.error = error;
this.fetchTimeUpdatable = fetchTimeUpdatable;
this.cfRayId = cfRayId;
this.shouldRetry = shouldRetry;
}

public static FetchResponse fetched(Entry entry, String cfRayId) {
return new FetchResponse(Status.FETCHED, entry == null ? Entry.EMPTY : entry, null, false, cfRayId);
return new FetchResponse(Status.FETCHED, entry == null ? Entry.EMPTY : entry, null, false, cfRayId, false);
}

public static FetchResponse notModified(String cfRayId) {
return new FetchResponse(Status.NOT_MODIFIED, Entry.EMPTY, null, true, cfRayId);
return new FetchResponse(Status.NOT_MODIFIED, Entry.EMPTY, null, true, cfRayId, false);
}

public static FetchResponse failed(Object error, boolean fetchTimeUpdatable, String cfRayId) {
return new FetchResponse(Status.FAILED, Entry.EMPTY, error, fetchTimeUpdatable, cfRayId);
public static FetchResponse failed(Object error, boolean fetchTimeUpdatable, String cfRayId, boolean shouldRetry) {
return new FetchResponse(Status.FAILED, Entry.EMPTY, error, fetchTimeUpdatable, cfRayId, shouldRetry);
}
}
2 changes: 2 additions & 0 deletions src/test/java/com/configcat/AutoPollingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public void get() throws InterruptedException, ExecutionException, IOException {
@Test
public void getFail() throws InterruptedException, ExecutionException, IOException {
this.server.enqueue(new MockResponse().setResponseCode(500).setBody(""));
this.server.enqueue(new MockResponse().setResponseCode(500).setBody(""));

ConfigCache cache = new NullConfigCache();
PollingMode pollingMode = PollingModes.autoPoll(2);
Expand Down Expand Up @@ -133,6 +134,7 @@ public void getMany() throws InterruptedException, ExecutionException, IOExcepti
public void getWithFailedRefresh() throws InterruptedException, ExecutionException, IOException {
this.server.enqueue(new MockResponse().setResponseCode(200).setBody(String.format(TEST_JSON, "test")));
this.server.enqueue(new MockResponse().setResponseCode(500));
this.server.enqueue(new MockResponse().setResponseCode(500));

ConfigCache cache = new NullConfigCache();
PollingMode pollingMode = PollingModes.autoPoll(2);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.configcat;

import okhttp3.OkHttpClient;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -33,7 +32,6 @@ void setUp() throws IOException {
this.server.start();

this.client = ConfigCatClient.get(Helpers.SDK_KEY, options -> {
options.httpClient(new OkHttpClient.Builder().build());
options.pollingMode(PollingModes.lazyLoad(2));
options.baseUrl(this.server.url("/").toString());
});
Expand Down Expand Up @@ -195,7 +193,7 @@ void invalidateCacheFail() {

@Test
void getConfigurationJsonStringWithDefaultConfigTimeout() {
ConfigCatClient cl = ConfigCatClient.get("configcat-sdk-1/TEST_KEY1-123456789012/1234567890123456789012", options -> options.httpClient(new OkHttpClient.Builder().readTimeout(2, TimeUnit.SECONDS).build()));
ConfigCatClient cl = ConfigCatClient.get("configcat-sdk-1/TEST_KEY1-123456789012/1234567890123456789012", options -> options.httpOptions().readTimeoutMillis(2000));

// makes a call to a real url which would fail, null expected
String config = cl.getValue(String.class, "test", null);
Expand Down
Loading