Skip to content

Commit

Permalink
added "clientCount" configuration to Connection
Browse files Browse the repository at this point in the history
* broadcast some commands to all clients in cluster
* wait for responses + aggregate responses in AggregatedConnectivityCommandResponse
* simplified clients by making the Props serializable
* added some missing javadoc

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch-si.com>
  • Loading branch information
thjaeckle committed Mar 26, 2018
1 parent 780cc86 commit 0438a63
Show file tree
Hide file tree
Showing 66 changed files with 899 additions and 500 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,16 @@ public interface Connection extends Jsonifiable.WithFieldSelectorAndPredicate<Js
*/
Set<Target> getTargets();

/**
* Returns how many clients on different cluster nodes should establish the {@code Connection}.
* <p>
* If greater than 1, the connection is created in a HA mode, running on at least 2 cluster nodes.
* </p>
*
* @return the client count.
*/
int getClientCount();

/**
* Returns whether or not failover is enabled for this {@code Connection}.
*
Expand Down Expand Up @@ -213,6 +223,13 @@ final class JsonFields {
JsonFactory.newJsonArrayFieldDefinition("targets", FieldType.REGULAR, JsonSchemaVersion.V_1,
JsonSchemaVersion.V_2);

/**
* JSON field containing the {@code Connection} client count.
*/
public static final JsonFieldDefinition<Integer> CLIENT_COUNT =
JsonFactory.newIntFieldDefinition("clientCount", FieldType.REGULAR, JsonSchemaVersion.V_1,
JsonSchemaVersion.V_2);

/**
* JSON field containing the {@code Connection} failover enabled.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ public interface ConnectionBuilder {
*/
ConnectionBuilder targets(Set<Target> targets);

/**
* Sets how many clients on different cluster nodes should establish the {@link Connection}.
* <p>
* If greater than 1, the connection is created in a HA mode, running on at least 2 cluster nodes.
* </p>
*
* @param clientCount the client count to set
* @return this builder to allow method chaining.
*/
ConnectionBuilder clientCount(int clientCount);

/**
* Builds a new {@link Connection}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,35 @@
import org.eclipse.ditto.model.base.json.Jsonifiable;

/**
* TODO TJ doc
* Connection Metrics represent the current (and not the persisted/desired) connection status and information of this
* connection like amount of consumed/published messages, etc.
*/
@Immutable
public interface ConnectionMetrics extends Jsonifiable.WithFieldSelectorAndPredicate<JsonField> {

/**
*
* @return
* @return the current ConnectionStatus of the related {@link Connection}.
*/
ConnectionStatus getConnectionStatus();

/**
*
* @return
* @return the optional details of the ConnectionStatus of the related {@link Connection}.
*/
Optional<String> getConnectionStatusDetails();

/**
*
* @return
* @return in which state the client handling the {@link Connection} currently is.
*/
String getClientState();

/**
*
* @return
* @return the metrics of all Connection {@link Source}s.
*/
List<SourceMetrics> getSourcesMetrics();

/**
*
* @return
* @return the metrics of all Connection {@link Target}s.
*/
List<TargetMetrics> getTargetsMetrics();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,18 @@ public static SourceMetrics newSourceMetrics(final Map<String, AddressMetric> ad
return ImmutableSourceMetrics.of(addressMetrics, consumedMessages);
}

/**
* Creates a new {@code SourceMetrics} object from the specified JSON object.
*
* @param jsonObject a JSON object which provides the data for the Connection to be created.
* @return a new SourceMetrics which is initialised with the extracted data from {@code jsonObject}.
* @throws NullPointerException if {@code jsonObject} is {@code null}.
* @throws org.eclipse.ditto.json.JsonParseException if {@code jsonObject} is not an appropriate JSON object.
*/
public static SourceMetrics sourceMetricsFromJson(final JsonObject jsonObject) {
return ImmutableSourceMetrics.fromJson(jsonObject);
}

/**
* Returns a new {@code TargetMetrics}.
*
Expand All @@ -115,6 +127,18 @@ public static TargetMetrics newTargetMetrics(final Map<String, AddressMetric> ad
return ImmutableTargetMetrics.of(addressMetrics, consumedMessages);
}

/**
* Creates a new {@code TargetMetrics} object from the specified JSON object.
*
* @param jsonObject a JSON object which provides the data for the Connection to be created.
* @return a new TargetMetrics which is initialised with the extracted data from {@code jsonObject}.
* @throws NullPointerException if {@code jsonObject} is {@code null}.
* @throws org.eclipse.ditto.json.JsonParseException if {@code jsonObject} is not an appropriate JSON object.
*/
public static TargetMetrics targetMetricsFromJson(final JsonObject jsonObject) {
return ImmutableTargetMetrics.fromJson(jsonObject);
}

/**
* Returns a new {@code AddressMetric}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ final class ImmutableConnection implements Connection {

private final Set<Source> sources;
private final Set<Target> targets;
private final int clientCount;
private final boolean failoverEnabled;
private final boolean validateCertificate;
private final int throttle;
Expand All @@ -73,6 +74,7 @@ final class ImmutableConnection implements Connection {
checkSourceAndTargetAreValid(builder);
this.sources = Collections.unmodifiableSet(new HashSet<>(builder.sources));
this.targets = Collections.unmodifiableSet(new HashSet<>(builder.targets));
this.clientCount = builder.clientCount;
this.failoverEnabled = builder.failoverEnabled;
this.validateCertificate = builder.validateCertificate;
this.throttle = builder.throttle;
Expand Down Expand Up @@ -148,6 +150,7 @@ public static Connection fromJson(final JsonObject jsonObject) {
.collect(Collectors.toSet()))
.orElse(Collections.emptySet());

final Optional<Integer> readClientCount = jsonObject.getValue(JsonFields.CLIENT_COUNT);
final Optional<Boolean> readFailoverEnabled = jsonObject.getValue(JsonFields.FAILOVER_ENABLED);
final Optional<Boolean> readValidateCertificates = jsonObject.getValue(JsonFields.VALIDATE_CERTIFICATES);
final Optional<Integer> readThrottle = jsonObject.getValue(JsonFields.THROTTLE);
Expand All @@ -158,6 +161,7 @@ public static Connection fromJson(final JsonObject jsonObject) {

builder.sources(readSources);
builder.targets(readTargets);
readClientCount.ifPresent(builder::clientCount);
readThrottle.ifPresent(builder::throttle);
readFailoverEnabled.ifPresent(builder::failoverEnabled);
readValidateCertificates.ifPresent(builder::validateCertificate);
Expand Down Expand Up @@ -190,6 +194,11 @@ public Set<Target> getTargets() {
return targets;
}

@Override
public int getClientCount() {
return clientCount;
}

@Override
public boolean isFailoverEnabled() {
return failoverEnabled;
Expand Down Expand Up @@ -264,6 +273,7 @@ public JsonObject toJson(final JsonSchemaVersion schemaVersion, final Predicate<
jsonObjectBuilder.set(JsonFields.TARGETS, targets.stream()
.map(source -> source.toJson(schemaVersion, thePredicate))
.collect(JsonCollectors.valuesToArray()), predicate.and(Objects::nonNull));
jsonObjectBuilder.set(JsonFields.CLIENT_COUNT, clientCount, predicate);
jsonObjectBuilder.set(JsonFields.FAILOVER_ENABLED, failoverEnabled, predicate);
jsonObjectBuilder.set(JsonFields.VALIDATE_CERTIFICATES, validateCertificate, predicate);
jsonObjectBuilder.set(JsonFields.THROTTLE, throttle, predicate);
Expand All @@ -284,6 +294,7 @@ public boolean equals(@Nullable final Object o) {
Objects.equals(authorizationContext, that.authorizationContext) &&
Objects.equals(sources, that.sources) &&
Objects.equals(targets, that.targets) &&
Objects.equals(clientCount, that.clientCount) &&
Objects.equals(uri, that.uri) &&
Objects.equals(protocol, that.protocol) &&
Objects.equals(username, that.username) &&
Expand All @@ -297,7 +308,7 @@ public boolean equals(@Nullable final Object o) {

@Override
public int hashCode() {
return Objects.hash(id, connectionType, authorizationContext, sources, targets,
return Objects.hash(id, connectionType, authorizationContext, sources, targets, clientCount,
failoverEnabled, uri, protocol, username, password, hostname, path, port, validateCertificate, throttle,
processorPoolSize);
}
Expand All @@ -318,6 +329,7 @@ public String toString() {
", path=" + path +
", sources=" + sources +
", targets=" + targets +
", clientCount=" + clientCount +
", validateCertificate=" + validateCertificate +
", throttle=" + throttle +
", processorPoolSize=" + processorPoolSize +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class ImmutableConnectionBuilder implements ConnectionBuilder {
boolean validateCertificate = true;
final Set<Source> sources = new HashSet<>();
final Set<Target> targets = new HashSet<>();
int clientCount = 1;
int throttle = -1;
int processorPoolSize = 5;

Expand Down Expand Up @@ -95,6 +96,12 @@ public ConnectionBuilder targets(final Set<Target> targets) {
return this;
}

@Override
public ConnectionBuilder clientCount(final int clientCount) {
this.clientCount = checkArgument(clientCount, ps -> ps > 0, () -> "clientCount must > 0");
return this;
}

@Override
public Connection build() {
return new ImmutableConnection(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;

/**
* TODO TJ doc
* Immutable implementation of {@link ConnectionMetrics}.
*/
@Immutable
final class ImmutableConnectionMetrics implements ConnectionMetrics {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public final class ImmutableConnectionTest {
.collect(JsonCollectors.valuesToArray()))
.set(Connection.JsonFields.SOURCES, KNOWN_SOURCES_JSON)
.set(Connection.JsonFields.TARGETS, KNOWN_TARGETS_JSON)
.set(Connection.JsonFields.CLIENT_COUNT, 2)
.set(Connection.JsonFields.FAILOVER_ENABLED, true)
.set(Connection.JsonFields.VALIDATE_CERTIFICATES, true)
.set(Connection.JsonFields.THROTTLE, -1)
Expand Down Expand Up @@ -170,6 +171,7 @@ public void fromJsonReturnsExpected() {
ConnectivityModelFactory.newConnectionBuilder(ID, TYPE, URI, AUTHORIZATION_CONTEXT)
.sources(SOURCES)
.targets(TARGETS)
.clientCount(2)
.build();

final Connection actual = ImmutableConnection.fromJson(KNOWN_JSON);
Expand All @@ -196,6 +198,7 @@ public void toJsonReturnsExpected() {
ConnectivityModelFactory.newConnectionBuilder(ID, TYPE, URI, AUTHORIZATION_CONTEXT)
.sources(SOURCES)
.targets(TARGETS)
.clientCount(2)
.build()
.toJson();

Expand Down
Loading

0 comments on commit 0438a63

Please sign in to comment.