diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 145daef..63e0715 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -20,7 +20,9 @@ jobs:
distribution: 'temurin'
cache: maven
- name: Build with Maven
- run: mvn -B package --file pom.xml
+ run: |
+ mvn -B package --file pom.xml
+ mvn failsafe:integration-test
- name: Upload coverage reports to Codecov with GitHub Action
- uses: codecov/codecov-action@v2
\ No newline at end of file
+ uses: codecov/codecov-action@v2
diff --git a/pom.xml b/pom.xml
index 1a66d3b..59a06b7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,7 @@
2.14.0
3.11
3.8.5
+ 2.1.0
3.0.9
@@ -72,6 +73,11 @@
okhttp
${version.okhttp}
+
+ io.socket
+ socket.io-client
+ ${version.socketio}
+
org.apache.commons
commons-lang3
@@ -207,6 +213,19 @@
+
+ org.apache.maven.plugins
+ maven-failsafe-plugin
+ 3.0.0-M4
+
+
+
+ integration-test
+ verify
+
+
+
+
org.jacoco
jacoco-maven-plugin
diff --git a/src/main/java/com/featureprobe/sdk/server/FPConfig.java b/src/main/java/com/featureprobe/sdk/server/FPConfig.java
index e63d3b9..5b830f9 100644
--- a/src/main/java/com/featureprobe/sdk/server/FPConfig.java
+++ b/src/main/java/com/featureprobe/sdk/server/FPConfig.java
@@ -26,6 +26,8 @@ public final class FPConfig {
URL eventUrl;
+ URI realtimeUri;
+
final String location;
final SynchronizerFactory synchronizerFactory;
@@ -49,6 +51,7 @@ protected FPConfig(Builder builder) {
builder.httpConfiguration;
this.synchronizerUrl = builder.synchronizerUrl;
this.eventUrl = builder.eventUrl;
+ this.realtimeUri = builder.realtimeUri;
this.startWait = builder.startWait == null ? DEFAULT_START_WAIT : builder.startWait;
}
@@ -74,6 +77,8 @@ public static class Builder {
private URL eventUrl;
+ private URI realtimeUri;
+
private Long startWait;
public Builder() {
@@ -127,6 +132,16 @@ public Builder eventUrl(URL eventUrl) {
return this;
}
+ public Builder realtimeUri(URI realtimeUri) {
+ this.realtimeUri = realtimeUri;
+ return this;
+ }
+
+ public Builder realtimeUri(String realtimeUri) {
+ this.realtimeUri = URI.create(realtimeUri);
+ return this;
+ }
+
public Builder startWait(Long startWaitTime, TimeUnit unit) {
this.startWait = unit.toNanos(startWaitTime);
return this;
diff --git a/src/main/java/com/featureprobe/sdk/server/FPContext.java b/src/main/java/com/featureprobe/sdk/server/FPContext.java
index aedd3b3..1c19b00 100644
--- a/src/main/java/com/featureprobe/sdk/server/FPContext.java
+++ b/src/main/java/com/featureprobe/sdk/server/FPContext.java
@@ -5,6 +5,7 @@
import java.io.InputStream;
import java.net.MalformedURLException;
+import java.net.URI;
import java.net.URL;
import java.time.Duration;
import java.util.Objects;
@@ -26,10 +27,14 @@ final class FPContext {
private static final String POST_EVENTS_DATA_API = "/api/events";
+ private static final String REALTIME_TOGGLE_UPDATE_API = "/realtime";
+
private URL synchronizerUrl;
private URL eventUrl;
+ private URI realtimeUri;
+
private final String serverSdkKey;
private final Duration refreshInterval;
@@ -52,6 +57,11 @@ final class FPContext {
} else {
this.eventUrl = config.eventUrl;
}
+ if (Objects.isNull(config.realtimeUri)) {
+ this.realtimeUri = URI.create(config.remoteUri.toString() + REALTIME_TOGGLE_UPDATE_API);
+ } else {
+ this.realtimeUri = config.realtimeUri;
+ }
} catch (MalformedURLException e) {
logger.error("construction context error", e);
}
@@ -74,6 +84,10 @@ public URL getEventUrl() {
return eventUrl;
}
+ public URI getRealtimeUri() {
+ return realtimeUri;
+ }
+
public String getServerSdkKey() {
return serverSdkKey;
}
diff --git a/src/main/java/com/featureprobe/sdk/server/FeatureProbe.java b/src/main/java/com/featureprobe/sdk/server/FeatureProbe.java
index bcfeea2..803ce6c 100644
--- a/src/main/java/com/featureprobe/sdk/server/FeatureProbe.java
+++ b/src/main/java/com/featureprobe/sdk/server/FeatureProbe.java
@@ -34,6 +34,7 @@ public final class FeatureProbe {
@VisibleForTesting
final DataRepository dataRepository;
+ @VisibleForTesting
Synchronizer synchronizer;
@VisibleForTesting
diff --git a/src/main/java/com/featureprobe/sdk/server/PollingSynchronizer.java b/src/main/java/com/featureprobe/sdk/server/PollingSynchronizer.java
index e0d13d6..350f90d 100644
--- a/src/main/java/com/featureprobe/sdk/server/PollingSynchronizer.java
+++ b/src/main/java/com/featureprobe/sdk/server/PollingSynchronizer.java
@@ -4,7 +4,11 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.featureprobe.sdk.server.exceptions.HttpErrorException;
import com.featureprobe.sdk.server.model.Repository;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.socket.client.IO;
+import io.socket.client.Socket;
+import io.socket.engineio.client.transports.WebSocket;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
@@ -14,6 +18,8 @@
import java.io.IOException;
import java.net.URL;
import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -24,12 +30,15 @@
final class PollingSynchronizer implements Synchronizer {
private static final Logger logger = Loggers.SYNCHRONIZER;
- private static final String GET_SDK_KEY_HEADER = "Authorization";
- final ObjectMapper mapper = new ObjectMapper();
- DataRepository dataRepository;
+ private final ObjectMapper mapper = new ObjectMapper();
+ private final DataRepository dataRepository;
private final Duration refreshInterval;
private final URL apiUrl;
private volatile ScheduledFuture> worker;
+
+ @VisibleForTesting
+ Socket socket;
+
private final OkHttpClient httpClient;
private final Headers headers;
@@ -47,14 +56,15 @@ final class PollingSynchronizer implements Synchronizer {
this.apiUrl = context.getSynchronizerUrl();
this.dataRepository = dataRepository;
this.initFuture = new CompletableFuture<>();
- OkHttpClient.Builder builder = new OkHttpClient.Builder()
+ this.headers = context.getHeaders();
+ this.httpClient = new OkHttpClient.Builder()
.connectionPool(context.getHttpConfiguration().connectionPool)
.connectTimeout(context.getHttpConfiguration().connectTimeout)
.readTimeout(context.getHttpConfiguration().readTimeout)
.writeTimeout(context.getHttpConfiguration().writeTimeout)
- .retryOnConnectionFailure(false);
- headers = context.getHeaders();
- httpClient = builder.build();
+ .retryOnConnectionFailure(false)
+ .build();
+ this.socket = connectSocket(context);
}
@Override
@@ -77,6 +87,10 @@ public void close() throws IOException {
worker.cancel(true);
worker = null;
}
+ if (socket != null) {
+ socket.close();
+ socket = null;
+ }
}
}
@@ -89,8 +103,8 @@ private void poll() {
try (Response response = httpClient.newCall(request).execute()) {
String body = response.body().string();
if (!response.isSuccessful()) {
- throw new HttpErrorException(String.format("Http request error: code: {}, body: {}:" + response.code(),
- response.body()));
+ throw new HttpErrorException(String.format("Http request error: code: %d, body: %s",
+ response.code(), response.body().toString()));
}
logger.debug("Http response: {}", response);
logger.debug("Http response body: {}", body);
@@ -102,4 +116,30 @@ private void poll() {
logger.error("Unexpected error from polling processor", e);
}
}
+
+ private Socket connectSocket(FPContext context) {
+ IO.Options sioOptions = IO.Options.builder()
+ .setTransports(new String[]{WebSocket.NAME})
+ .setPath(context.getRealtimeUri().getPath())
+ .build();
+ Socket sio = IO.socket(context.getRealtimeUri(), sioOptions);
+
+ sio.on("connect", objects -> {
+ logger.info("connect socketio success");
+ Map credentials = new HashMap<>(1);
+ credentials.put("key", context.getServerSdkKey());
+ sio.emit("register", credentials);
+ });
+
+ sio.on("update", objects -> {
+ logger.info("socketio recv update event");
+ poll();
+ });
+
+ sio.on("disconnect", objects -> logger.info("socketio disconnected"));
+
+ sio.on("connect_error", objects -> logger.error("socketio error: {}", objects));
+
+ return sio.connect();
+ }
}
diff --git a/src/test/groovy/com/featureprobe/sdk/server/PollingSynchronizerIT.groovy b/src/test/groovy/com/featureprobe/sdk/server/PollingSynchronizerIT.groovy
new file mode 100644
index 0000000..cf1a389
--- /dev/null
+++ b/src/test/groovy/com/featureprobe/sdk/server/PollingSynchronizerIT.groovy
@@ -0,0 +1,39 @@
+package com.featureprobe.sdk.server
+
+import ch.qos.logback.classic.Level
+import ch.qos.logback.classic.Logger
+import org.slf4j.LoggerFactory
+import spock.lang.Specification
+
+class PollingSynchronizerIT extends Specification {
+
+ def "Socketio realtime toggle update"() {
+
+ (LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME) as Logger).setLevel(Level.DEBUG)
+
+ given:
+ def config = FPConfig.builder()
+ .pollingMode()
+ .remoteUri("https://featureprobe.io/server")
+ .realtimeUri("https://featureprobe.io/server/realtime")
+ .useMemoryRepository()
+ .build()
+ def featureProbe = new FeatureProbe("server-61db54ecea79824cae3ac38d73f1961d698d0477", config)
+ def repository = featureProbe.dataRepository
+ def socket = (featureProbe.synchronizer as PollingSynchronizer).socket
+ def updateCnt = 0
+ socket.on("update", objects -> updateCnt++)
+
+ sleep(5000)
+
+ featureProbe.close()
+
+ sleep(5000)
+
+ expect:
+ repository.initialized()
+ !socket.connected()
+ updateCnt > 0
+ }
+}
+