Skip to content

Commit

Permalink
implement retry functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
tinahollygb committed Jul 28, 2023
1 parent d65ed40 commit 3056df3
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 47 deletions.
37 changes: 0 additions & 37 deletions lib/src/main/java/growthbook/sdk/java/GBEventSourceListener.java

This file was deleted.

83 changes: 73 additions & 10 deletions lib/src/main/java/growthbook/sdk/java/GBFeaturesRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import lombok.Builder;
import lombok.Getter;
import okhttp3.*;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import okhttp3.sse.EventSources;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -52,6 +54,8 @@ public class GBFeaturesRepository implements IGBFeaturesRepository {
private Boolean initialized = false;

private Boolean sseAllowed = false;
@Nullable private Request sseRequest = null;
@Nullable private EventSource sseEventSource = null;

/**
* Allows you to get the features JSON from the provided {@link GBFeaturesRepository#getFeaturesEndpoint()}.
Expand Down Expand Up @@ -196,28 +200,50 @@ public void initialize() throws FeatureFetchException {

private void initializeSSE() throws FeatureFetchException {
if (!this.sseAllowed) {
// TODO: Fallback to SWR strategy
System.out.printf("\nNot initializing SSE because header 'X-Sse-Support: enabled' not present on resource returned at %s", this.featuresEndpoint);
throw new FeatureFetchException(FeatureFetchException.FeatureFetchErrorCode.SSE_CONNECTION_ERROR);
// return;
}

this.sseHttpClient = new OkHttpClient.Builder()
.addInterceptor(new GBFeaturesRepositoryRequestInterceptor())
.connectTimeout(0, TimeUnit.SECONDS)
.readTimeout(0, TimeUnit.MINUTES)
.writeTimeout(0, TimeUnit.MINUTES)
.build();
createEventSourceListenerAndStartListening();
}

/**
* Creates an SSE HTTP client if null.
* Creates and enqueues a new asynchronous request to the events endpoint.
* Assigns a close listener to recreate the connection.
*/
private void createEventSourceListenerAndStartListening() {
this.sseEventSource = null;
this.sseRequest = null;

if (this.sseHttpClient == null) {
this.sseHttpClient = new OkHttpClient.Builder()
.addInterceptor(new GBFeaturesRepositoryRequestInterceptor())
.retryOnConnectionFailure(true)
.connectTimeout(0, TimeUnit.SECONDS)
.readTimeout(0, TimeUnit.SECONDS)
.writeTimeout(0, TimeUnit.SECONDS)
.build();
}

Request sseRequest = new Request.Builder()
this.sseRequest = new Request.Builder()
.url(this.eventsEndpoint)
.header("Accept", "application/json; q=0.5")
.addHeader("Accept", "text/event-stream")
.build();

EventSources
this.sseEventSource = EventSources
.createFactory(this.sseHttpClient)
.newEventSource(sseRequest, new GBEventSourceListener());

.newEventSource(sseRequest, new GBEventSourceListener(new GBEventSourceCloseHandler() {
@Override
public void onClose(EventSource eventSource) {
eventSource.cancel();
System.out.printf("\n\nGBEventSourceCloseHandler#onClose %s \n\n", eventSource);
createEventSourceListenerAndStartListening();
}
}));
this.sseHttpClient.newCall(sseRequest).enqueue(new Callback() {
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) {
Expand Down Expand Up @@ -288,6 +314,7 @@ private void fetchFeatures() throws FeatureFetchException {
* @param response Successful response
*/
private void onSuccess(Response response) throws FeatureFetchException {
// TODO: refactor to not be Response and instead the string
try {
ResponseBody responseBody = response.body();
if (responseBody == null) {
Expand Down Expand Up @@ -341,4 +368,40 @@ private void onSuccess(Response response) throws FeatureFetchException {
);
}
}

private interface GBEventSourceCloseHandler {
void onClose(EventSource eventSource);
}

private static class GBEventSourceListener extends EventSourceListener {
private final GBEventSourceCloseHandler closeHandler;

public GBEventSourceListener(GBEventSourceCloseHandler closeHandler) {
this.closeHandler = closeHandler;
}

@Override
public void onClosed(@NotNull EventSource eventSource) {
super.onClosed(eventSource);
closeHandler.onClose(eventSource);
}

@Override
public void onEvent(@NotNull EventSource eventSource, @Nullable String id, @Nullable String type, @NotNull String data) {
System.out.printf("\n\n eventsource = %s - id = %s - type = %s - data = %s \n\n", eventSource, id, type, data);
super.onEvent(eventSource, id, type, data);
}

@Override
public void onFailure(@NotNull EventSource eventSource, @Nullable Throwable t, @Nullable Response response) {
System.out.printf("\n\n eventsource = %s , error = %s , response = %s\n\n", eventSource, t, response);
super.onFailure(eventSource, t, response);
}

@Override
public void onOpen(@NotNull EventSource eventSource, @NotNull Response response) {
System.out.printf("\n\n eventsource = %s , response = %s\n\n", eventSource, response);
super.onOpen(eventSource, response);
}
}
}

0 comments on commit 3056df3

Please sign in to comment.