Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ jobs:
distribution: 'temurin'
cache: maven
- name: Build with Maven
run: mvn -B package --file pom.xml
run: |
mvn -B package --file pom.xml
mvn failsafe:integration-test

- name: Upload coverage reports to Codecov with GitHub Action
uses: codecov/codecov-action@v2
uses: codecov/codecov-action@v2
19 changes: 19 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
<version.jackson>2.14.0</version.jackson>
<version.commons>3.11</version.commons>
<version.maven-artifact>3.8.5</version.maven-artifact>
<version.socketio>2.1.0</version.socketio>
<groovy.version>3.0.9</groovy.version>
</properties>

Expand All @@ -72,6 +73,11 @@
<artifactId>okhttp</artifactId>
<version>${version.okhttp}</version>
</dependency>
<dependency>
<groupId>io.socket</groupId>
<artifactId>socket.io-client</artifactId>
<version>${version.socketio}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down Expand Up @@ -207,6 +213,19 @@
</statelessTestsetReporter>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>3.0.0-M4</version>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/com/featureprobe/sdk/server/FPConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public final class FPConfig {

URL eventUrl;

URI realtimeUri;

final String location;

final SynchronizerFactory synchronizerFactory;
Expand All @@ -49,6 +51,7 @@ protected FPConfig(Builder builder) {
builder.httpConfiguration;
this.synchronizerUrl = builder.synchronizerUrl;
this.eventUrl = builder.eventUrl;
this.realtimeUri = builder.realtimeUri;
this.startWait = builder.startWait == null ? DEFAULT_START_WAIT : builder.startWait;
}

Expand All @@ -74,6 +77,8 @@ public static class Builder {

private URL eventUrl;

private URI realtimeUri;

private Long startWait;

public Builder() {
Expand Down Expand Up @@ -127,6 +132,16 @@ public Builder eventUrl(URL eventUrl) {
return this;
}

public Builder realtimeUri(URI realtimeUri) {
this.realtimeUri = realtimeUri;
return this;
}

public Builder realtimeUri(String realtimeUri) {
this.realtimeUri = URI.create(realtimeUri);
return this;
}

public Builder startWait(Long startWaitTime, TimeUnit unit) {
this.startWait = unit.toNanos(startWaitTime);
return this;
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/com/featureprobe/sdk/server/FPContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.time.Duration;
import java.util.Objects;
Expand All @@ -26,10 +27,14 @@ final class FPContext {

private static final String POST_EVENTS_DATA_API = "/api/events";

private static final String REALTIME_TOGGLE_UPDATE_API = "/realtime";

private URL synchronizerUrl;

private URL eventUrl;

private URI realtimeUri;

private final String serverSdkKey;

private final Duration refreshInterval;
Expand All @@ -52,6 +57,11 @@ final class FPContext {
} else {
this.eventUrl = config.eventUrl;
}
if (Objects.isNull(config.realtimeUri)) {
this.realtimeUri = URI.create(config.remoteUri.toString() + REALTIME_TOGGLE_UPDATE_API);
} else {
this.realtimeUri = config.realtimeUri;
}
} catch (MalformedURLException e) {
logger.error("construction context error", e);
}
Expand All @@ -74,6 +84,10 @@ public URL getEventUrl() {
return eventUrl;
}

public URI getRealtimeUri() {
return realtimeUri;
}

public String getServerSdkKey() {
return serverSdkKey;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public final class FeatureProbe {
@VisibleForTesting
final DataRepository dataRepository;

@VisibleForTesting
Synchronizer synchronizer;

@VisibleForTesting
Expand Down
58 changes: 49 additions & 9 deletions src/main/java/com/featureprobe/sdk/server/PollingSynchronizer.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.featureprobe.sdk.server.exceptions.HttpErrorException;
import com.featureprobe.sdk.server.model.Repository;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.socket.client.IO;
import io.socket.client.Socket;
import io.socket.engineio.client.transports.WebSocket;
import okhttp3.Headers;
import okhttp3.OkHttpClient;
import okhttp3.Request;
Expand All @@ -14,6 +18,8 @@
import java.io.IOException;
import java.net.URL;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -24,12 +30,15 @@
final class PollingSynchronizer implements Synchronizer {

private static final Logger logger = Loggers.SYNCHRONIZER;
private static final String GET_SDK_KEY_HEADER = "Authorization";
final ObjectMapper mapper = new ObjectMapper();
DataRepository dataRepository;
private final ObjectMapper mapper = new ObjectMapper();
private final DataRepository dataRepository;
private final Duration refreshInterval;
private final URL apiUrl;
private volatile ScheduledFuture<?> worker;

@VisibleForTesting
Socket socket;

private final OkHttpClient httpClient;
private final Headers headers;

Expand All @@ -47,14 +56,15 @@ final class PollingSynchronizer implements Synchronizer {
this.apiUrl = context.getSynchronizerUrl();
this.dataRepository = dataRepository;
this.initFuture = new CompletableFuture<>();
OkHttpClient.Builder builder = new OkHttpClient.Builder()
this.headers = context.getHeaders();
this.httpClient = new OkHttpClient.Builder()
.connectionPool(context.getHttpConfiguration().connectionPool)
.connectTimeout(context.getHttpConfiguration().connectTimeout)
.readTimeout(context.getHttpConfiguration().readTimeout)
.writeTimeout(context.getHttpConfiguration().writeTimeout)
.retryOnConnectionFailure(false);
headers = context.getHeaders();
httpClient = builder.build();
.retryOnConnectionFailure(false)
.build();
this.socket = connectSocket(context);
}

@Override
Expand All @@ -77,6 +87,10 @@ public void close() throws IOException {
worker.cancel(true);
worker = null;
}
if (socket != null) {
socket.close();
socket = null;
}
}
}

Expand All @@ -89,8 +103,8 @@ private void poll() {
try (Response response = httpClient.newCall(request).execute()) {
String body = response.body().string();
if (!response.isSuccessful()) {
throw new HttpErrorException(String.format("Http request error: code: {}, body: {}:" + response.code(),
response.body()));
throw new HttpErrorException(String.format("Http request error: code: %d, body: %s",
response.code(), response.body().toString()));
}
logger.debug("Http response: {}", response);
logger.debug("Http response body: {}", body);
Expand All @@ -102,4 +116,30 @@ private void poll() {
logger.error("Unexpected error from polling processor", e);
}
}

private Socket connectSocket(FPContext context) {
IO.Options sioOptions = IO.Options.builder()
.setTransports(new String[]{WebSocket.NAME})
.setPath(context.getRealtimeUri().getPath())
.build();
Socket sio = IO.socket(context.getRealtimeUri(), sioOptions);

sio.on("connect", objects -> {
logger.info("connect socketio success");
Map<String, String> credentials = new HashMap<>(1);
credentials.put("key", context.getServerSdkKey());
sio.emit("register", credentials);
});

sio.on("update", objects -> {
logger.info("socketio recv update event");
poll();
});

sio.on("disconnect", objects -> logger.info("socketio disconnected"));

sio.on("connect_error", objects -> logger.error("socketio error: {}", objects));

return sio.connect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.featureprobe.sdk.server

import ch.qos.logback.classic.Level
import ch.qos.logback.classic.Logger
import org.slf4j.LoggerFactory
import spock.lang.Specification

class PollingSynchronizerIT extends Specification {

def "Socketio realtime toggle update"() {

(LoggerFactory.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME) as Logger).setLevel(Level.DEBUG)

given:
def config = FPConfig.builder()
.pollingMode()
.remoteUri("https://featureprobe.io/server")
.realtimeUri("https://featureprobe.io/server/realtime")
.useMemoryRepository()
.build()
def featureProbe = new FeatureProbe("server-61db54ecea79824cae3ac38d73f1961d698d0477", config)
def repository = featureProbe.dataRepository
def socket = (featureProbe.synchronizer as PollingSynchronizer).socket
def updateCnt = 0
socket.on("update", objects -> updateCnt++)

sleep(5000)

featureProbe.close()

sleep(5000)

expect:
repository.initialized()
!socket.connected()
updateCnt > 0
}
}