From 22a05776d4a83cd034c88da4d5837261456f7f9a Mon Sep 17 00:00:00 2001 From: hezean Date: Mon, 5 Dec 2022 01:50:12 +0800 Subject: [PATCH 1/4] feat: support realtime toggle update via socket.io --- pom.xml | 6 +++ .../com/featureprobe/sdk/server/FPConfig.java | 10 ++++ .../featureprobe/sdk/server/FPContext.java | 13 +++++ .../sdk/server/PollingSynchronizer.java | 48 +++++++++++++++++++ 4 files changed, 77 insertions(+) diff --git a/pom.xml b/pom.xml index 1a66d3b..4e3e734 100644 --- a/pom.xml +++ b/pom.xml @@ -46,6 +46,7 @@ 2.14.0 3.11 3.8.5 + 2.1.0 3.0.9 @@ -72,6 +73,11 @@ okhttp ${version.okhttp} + + io.socket + socket.io-client + ${version.socketio} + org.apache.commons commons-lang3 diff --git a/src/main/java/com/featureprobe/sdk/server/FPConfig.java b/src/main/java/com/featureprobe/sdk/server/FPConfig.java index e63d3b9..8b7c57c 100644 --- a/src/main/java/com/featureprobe/sdk/server/FPConfig.java +++ b/src/main/java/com/featureprobe/sdk/server/FPConfig.java @@ -26,6 +26,8 @@ public final class FPConfig { URL eventUrl; + URL realtimeUrl; + final String location; final SynchronizerFactory synchronizerFactory; @@ -49,6 +51,7 @@ protected FPConfig(Builder builder) { builder.httpConfiguration; this.synchronizerUrl = builder.synchronizerUrl; this.eventUrl = builder.eventUrl; + this.realtimeUrl = builder.realtimeUrl; this.startWait = builder.startWait == null ? DEFAULT_START_WAIT : builder.startWait; } @@ -74,6 +77,8 @@ public static class Builder { private URL eventUrl; + private URL realtimeUrl; + private Long startWait; public Builder() { @@ -127,6 +132,11 @@ public Builder eventUrl(URL eventUrl) { return this; } + public Builder realtimeUrl(URL realtimeUrl) { + this.realtimeUrl = realtimeUrl; + return this; + } + public Builder startWait(Long startWaitTime, TimeUnit unit) { this.startWait = unit.toNanos(startWaitTime); return this; diff --git a/src/main/java/com/featureprobe/sdk/server/FPContext.java b/src/main/java/com/featureprobe/sdk/server/FPContext.java index aedd3b3..d1eb88c 100644 --- a/src/main/java/com/featureprobe/sdk/server/FPContext.java +++ b/src/main/java/com/featureprobe/sdk/server/FPContext.java @@ -26,10 +26,14 @@ final class FPContext { private static final String POST_EVENTS_DATA_API = "/api/events"; + private static final String REALTIME_TOGGLE_UPDATE_API = "/realtime"; + private URL synchronizerUrl; private URL eventUrl; + private URL realtimeUrl; + private final String serverSdkKey; private final Duration refreshInterval; @@ -52,6 +56,11 @@ final class FPContext { } else { this.eventUrl = config.eventUrl; } + if (Objects.isNull(config.realtimeUrl)) { + this.realtimeUrl = new URL(config.remoteUri.toString() + REALTIME_TOGGLE_UPDATE_API); + } else { + this.realtimeUrl = config.realtimeUrl; + } } catch (MalformedURLException e) { logger.error("construction context error", e); } @@ -74,6 +83,10 @@ public URL getEventUrl() { return eventUrl; } + public URL getRealtimeUrl() { + return realtimeUrl; + } + public String getServerSdkKey() { return serverSdkKey; } diff --git a/src/main/java/com/featureprobe/sdk/server/PollingSynchronizer.java b/src/main/java/com/featureprobe/sdk/server/PollingSynchronizer.java index e0d13d6..49de275 100644 --- a/src/main/java/com/featureprobe/sdk/server/PollingSynchronizer.java +++ b/src/main/java/com/featureprobe/sdk/server/PollingSynchronizer.java @@ -5,6 +5,13 @@ import com.featureprobe.sdk.server.exceptions.HttpErrorException; import com.featureprobe.sdk.server.model.Repository; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.socket.client.IO; +import io.socket.client.Socket; +import io.socket.engineio.client.transports.WebSocket; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; import okhttp3.Headers; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -30,6 +37,7 @@ final class PollingSynchronizer implements Synchronizer { private final Duration refreshInterval; private final URL apiUrl; private volatile ScheduledFuture worker; + private Socket socket; private final OkHttpClient httpClient; private final Headers headers; @@ -55,6 +63,7 @@ final class PollingSynchronizer implements Synchronizer { .retryOnConnectionFailure(false); headers = context.getHeaders(); httpClient = builder.build(); + connectSocket(context); } @Override @@ -77,6 +86,10 @@ public void close() throws IOException { worker.cancel(true); worker = null; } + if (socket != null) { + socket.close(); + socket = null; + } } } @@ -102,4 +115,39 @@ private void poll() { logger.error("Unexpected error from polling processor", e); } } + + private void connectSocket(FPContext context) { + URI realtimeUri; + try { + realtimeUri = context.getRealtimeUrl().toURI(); + } catch (URISyntaxException e) { + logger.error("invalid remote uri: {}, realtime toggle update is disabled", + context.getRealtimeUrl(), e); + return; + } + + IO.Options sioOptions = IO.Options.builder() + .setTransports(new String[] {WebSocket.NAME}) + .setPath(realtimeUri.getPath()) + .build(); + Socket sio = IO.socket(realtimeUri, sioOptions); + + sio.on("connect", objects -> { + logger.info("connect socketio success"); + Map credential = new HashMap<>(1); + credential.put("key", context.getServerSdkKey()); + sio.emit("register", credential); + }); + + sio.on("update", objects -> { + logger.info("socketio recv update event"); + poll(); + }); + + sio.on("disconnect", objects -> logger.info("socketio disconnected")); + + sio.on("connect_error", objects -> logger.error("socketio error: {}", objects)); + + this.socket = sio.connect(); + } } From ee77471a13780a33d8fff2588ee003adbc3e4f91 Mon Sep 17 00:00:00 2001 From: HeZean Date: Tue, 27 Dec 2022 14:04:39 +0800 Subject: [PATCH 2/4] cleanup code --- .../com/featureprobe/sdk/server/FPConfig.java | 15 +++++++---- .../featureprobe/sdk/server/FPContext.java | 13 +++++----- .../sdk/server/PollingSynchronizer.java | 25 ++++++------------- 3 files changed, 24 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/featureprobe/sdk/server/FPConfig.java b/src/main/java/com/featureprobe/sdk/server/FPConfig.java index 8b7c57c..5b830f9 100644 --- a/src/main/java/com/featureprobe/sdk/server/FPConfig.java +++ b/src/main/java/com/featureprobe/sdk/server/FPConfig.java @@ -26,7 +26,7 @@ public final class FPConfig { URL eventUrl; - URL realtimeUrl; + URI realtimeUri; final String location; @@ -51,7 +51,7 @@ protected FPConfig(Builder builder) { builder.httpConfiguration; this.synchronizerUrl = builder.synchronizerUrl; this.eventUrl = builder.eventUrl; - this.realtimeUrl = builder.realtimeUrl; + this.realtimeUri = builder.realtimeUri; this.startWait = builder.startWait == null ? DEFAULT_START_WAIT : builder.startWait; } @@ -77,7 +77,7 @@ public static class Builder { private URL eventUrl; - private URL realtimeUrl; + private URI realtimeUri; private Long startWait; @@ -132,8 +132,13 @@ public Builder eventUrl(URL eventUrl) { return this; } - public Builder realtimeUrl(URL realtimeUrl) { - this.realtimeUrl = realtimeUrl; + public Builder realtimeUri(URI realtimeUri) { + this.realtimeUri = realtimeUri; + return this; + } + + public Builder realtimeUri(String realtimeUri) { + this.realtimeUri = URI.create(realtimeUri); return this; } diff --git a/src/main/java/com/featureprobe/sdk/server/FPContext.java b/src/main/java/com/featureprobe/sdk/server/FPContext.java index d1eb88c..1c19b00 100644 --- a/src/main/java/com/featureprobe/sdk/server/FPContext.java +++ b/src/main/java/com/featureprobe/sdk/server/FPContext.java @@ -5,6 +5,7 @@ import java.io.InputStream; import java.net.MalformedURLException; +import java.net.URI; import java.net.URL; import java.time.Duration; import java.util.Objects; @@ -32,7 +33,7 @@ final class FPContext { private URL eventUrl; - private URL realtimeUrl; + private URI realtimeUri; private final String serverSdkKey; @@ -56,10 +57,10 @@ final class FPContext { } else { this.eventUrl = config.eventUrl; } - if (Objects.isNull(config.realtimeUrl)) { - this.realtimeUrl = new URL(config.remoteUri.toString() + REALTIME_TOGGLE_UPDATE_API); + if (Objects.isNull(config.realtimeUri)) { + this.realtimeUri = URI.create(config.remoteUri.toString() + REALTIME_TOGGLE_UPDATE_API); } else { - this.realtimeUrl = config.realtimeUrl; + this.realtimeUri = config.realtimeUri; } } catch (MalformedURLException e) { logger.error("construction context error", e); @@ -83,8 +84,8 @@ public URL getEventUrl() { return eventUrl; } - public URL getRealtimeUrl() { - return realtimeUrl; + public URI getRealtimeUri() { + return realtimeUri; } public String getServerSdkKey() { diff --git a/src/main/java/com/featureprobe/sdk/server/PollingSynchronizer.java b/src/main/java/com/featureprobe/sdk/server/PollingSynchronizer.java index 49de275..1996f09 100644 --- a/src/main/java/com/featureprobe/sdk/server/PollingSynchronizer.java +++ b/src/main/java/com/featureprobe/sdk/server/PollingSynchronizer.java @@ -8,8 +8,6 @@ import io.socket.client.IO; import io.socket.client.Socket; import io.socket.engineio.client.transports.WebSocket; -import java.net.URI; -import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; import okhttp3.Headers; @@ -102,8 +100,8 @@ private void poll() { try (Response response = httpClient.newCall(request).execute()) { String body = response.body().string(); if (!response.isSuccessful()) { - throw new HttpErrorException(String.format("Http request error: code: {}, body: {}:" + response.code(), - response.body())); + throw new HttpErrorException(String.format("Http request error: code: %d, body: %s", + response.code(), response.body().toString())); } logger.debug("Http response: {}", response); logger.debug("Http response body: {}", body); @@ -117,26 +115,17 @@ private void poll() { } private void connectSocket(FPContext context) { - URI realtimeUri; - try { - realtimeUri = context.getRealtimeUrl().toURI(); - } catch (URISyntaxException e) { - logger.error("invalid remote uri: {}, realtime toggle update is disabled", - context.getRealtimeUrl(), e); - return; - } - IO.Options sioOptions = IO.Options.builder() .setTransports(new String[] {WebSocket.NAME}) - .setPath(realtimeUri.getPath()) + .setPath(context.getRealtimeUri().getPath()) .build(); - Socket sio = IO.socket(realtimeUri, sioOptions); + Socket sio = IO.socket(context.getRealtimeUri(), sioOptions); sio.on("connect", objects -> { logger.info("connect socketio success"); - Map credential = new HashMap<>(1); - credential.put("key", context.getServerSdkKey()); - sio.emit("register", credential); + Map credentials = new HashMap<>(1); + credentials.put("key", context.getServerSdkKey()); + sio.emit("register", credentials); }); sio.on("update", objects -> { From d6a552945ed015aeea92d7a2d88a5177a02cab96 Mon Sep 17 00:00:00 2001 From: HeZean Date: Wed, 28 Dec 2022 23:02:44 +0800 Subject: [PATCH 3/4] test: add it for socketio --- .github/workflows/build.yml | 6 ++- pom.xml | 13 +++++++ .../featureprobe/sdk/server/FeatureProbe.java | 1 + .../sdk/server/PollingSynchronizer.java | 35 +++++++++-------- .../sdk/server/PollingSynchronizerIT.groovy | 39 +++++++++++++++++++ 5 files changed, 76 insertions(+), 18 deletions(-) create mode 100644 src/test/groovy/com/featureprobe/sdk/server/PollingSynchronizerIT.groovy diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 145daef..cf940e1 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -20,7 +20,9 @@ jobs: distribution: 'temurin' cache: maven - name: Build with Maven - run: mvn -B package --file pom.xml + run: | + mvn -B package --file pom.xml + mvn failsafe:integration-test - name: Upload coverage reports to Codecov with GitHub Action - uses: codecov/codecov-action@v2 \ No newline at end of file + uses: codecov/codecov-action@v2 diff --git a/pom.xml b/pom.xml index 4e3e734..59a06b7 100644 --- a/pom.xml +++ b/pom.xml @@ -213,6 +213,19 @@ + + org.apache.maven.plugins + maven-failsafe-plugin + 3.0.0-M4 + + + + integration-test + verify + + + + org.jacoco jacoco-maven-plugin diff --git a/src/main/java/com/featureprobe/sdk/server/FeatureProbe.java b/src/main/java/com/featureprobe/sdk/server/FeatureProbe.java index bcfeea2..803ce6c 100644 --- a/src/main/java/com/featureprobe/sdk/server/FeatureProbe.java +++ b/src/main/java/com/featureprobe/sdk/server/FeatureProbe.java @@ -34,6 +34,7 @@ public final class FeatureProbe { @VisibleForTesting final DataRepository dataRepository; + @VisibleForTesting Synchronizer synchronizer; @VisibleForTesting diff --git a/src/main/java/com/featureprobe/sdk/server/PollingSynchronizer.java b/src/main/java/com/featureprobe/sdk/server/PollingSynchronizer.java index 1996f09..350f90d 100644 --- a/src/main/java/com/featureprobe/sdk/server/PollingSynchronizer.java +++ b/src/main/java/com/featureprobe/sdk/server/PollingSynchronizer.java @@ -4,12 +4,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.featureprobe.sdk.server.exceptions.HttpErrorException; import com.featureprobe.sdk.server.model.Repository; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.socket.client.IO; import io.socket.client.Socket; import io.socket.engineio.client.transports.WebSocket; -import java.util.HashMap; -import java.util.Map; import okhttp3.Headers; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -19,6 +18,8 @@ import java.io.IOException; import java.net.URL; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -29,13 +30,15 @@ final class PollingSynchronizer implements Synchronizer { private static final Logger logger = Loggers.SYNCHRONIZER; - private static final String GET_SDK_KEY_HEADER = "Authorization"; - final ObjectMapper mapper = new ObjectMapper(); - DataRepository dataRepository; + private final ObjectMapper mapper = new ObjectMapper(); + private final DataRepository dataRepository; private final Duration refreshInterval; private final URL apiUrl; private volatile ScheduledFuture worker; - private Socket socket; + + @VisibleForTesting + Socket socket; + private final OkHttpClient httpClient; private final Headers headers; @@ -53,15 +56,15 @@ final class PollingSynchronizer implements Synchronizer { this.apiUrl = context.getSynchronizerUrl(); this.dataRepository = dataRepository; this.initFuture = new CompletableFuture<>(); - OkHttpClient.Builder builder = new OkHttpClient.Builder() + this.headers = context.getHeaders(); + this.httpClient = new OkHttpClient.Builder() .connectionPool(context.getHttpConfiguration().connectionPool) .connectTimeout(context.getHttpConfiguration().connectTimeout) .readTimeout(context.getHttpConfiguration().readTimeout) .writeTimeout(context.getHttpConfiguration().writeTimeout) - .retryOnConnectionFailure(false); - headers = context.getHeaders(); - httpClient = builder.build(); - connectSocket(context); + .retryOnConnectionFailure(false) + .build(); + this.socket = connectSocket(context); } @Override @@ -114,11 +117,11 @@ private void poll() { } } - private void connectSocket(FPContext context) { + private Socket connectSocket(FPContext context) { IO.Options sioOptions = IO.Options.builder() - .setTransports(new String[] {WebSocket.NAME}) - .setPath(context.getRealtimeUri().getPath()) - .build(); + .setTransports(new String[]{WebSocket.NAME}) + .setPath(context.getRealtimeUri().getPath()) + .build(); Socket sio = IO.socket(context.getRealtimeUri(), sioOptions); sio.on("connect", objects -> { @@ -137,6 +140,6 @@ private void connectSocket(FPContext context) { sio.on("connect_error", objects -> logger.error("socketio error: {}", objects)); - this.socket = sio.connect(); + return sio.connect(); } } diff --git a/src/test/groovy/com/featureprobe/sdk/server/PollingSynchronizerIT.groovy b/src/test/groovy/com/featureprobe/sdk/server/PollingSynchronizerIT.groovy new file mode 100644 index 0000000..cf1a389 --- /dev/null +++ b/src/test/groovy/com/featureprobe/sdk/server/PollingSynchronizerIT.groovy @@ -0,0 +1,39 @@ +package com.featureprobe.sdk.server + +import ch.qos.logback.classic.Level +import ch.qos.logback.classic.Logger +import org.slf4j.LoggerFactory +import spock.lang.Specification + +class PollingSynchronizerIT extends Specification { + + def "Socketio realtime toggle update"() { + + (LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME) as Logger).setLevel(Level.DEBUG) + + given: + def config = FPConfig.builder() + .pollingMode() + .remoteUri("https://featureprobe.io/server") + .realtimeUri("https://featureprobe.io/server/realtime") + .useMemoryRepository() + .build() + def featureProbe = new FeatureProbe("server-61db54ecea79824cae3ac38d73f1961d698d0477", config) + def repository = featureProbe.dataRepository + def socket = (featureProbe.synchronizer as PollingSynchronizer).socket + def updateCnt = 0 + socket.on("update", objects -> updateCnt++) + + sleep(5000) + + featureProbe.close() + + sleep(5000) + + expect: + repository.initialized() + !socket.connected() + updateCnt > 0 + } +} + From 284fde51bebccf9c33aefe491c805b8f997836e5 Mon Sep 17 00:00:00 2001 From: Chris Date: Wed, 28 Dec 2022 23:30:51 +0800 Subject: [PATCH 4/4] Update build.yml --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index cf940e1..63e0715 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -20,7 +20,7 @@ jobs: distribution: 'temurin' cache: maven - name: Build with Maven - run: | + run: | mvn -B package --file pom.xml mvn failsafe:integration-test