Skip to content

Commit

Permalink
review: fixed some minor findings and adjusted code style;
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Oct 27, 2022
1 parent f2ce5e8 commit 330794d
Show file tree
Hide file tree
Showing 14 changed files with 65 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.eclipse.ditto.base.model.signals.commands.CommandJsonDeserializer;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.WithConnectionId;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
Expand Down Expand Up @@ -65,7 +64,6 @@ public static SudoRetrieveConnectionStatus of(final ConnectionId connectionId, f
return new SudoRetrieveConnectionStatus(connectionId, dittoHeaders);
}


/**
* Creates a new {@code SudoRetrieveConnectionStatus} from a JSON object.
*
Expand All @@ -89,7 +87,6 @@ public static SudoRetrieveConnectionStatus fromJson(final JsonObject jsonObject,
@Override
protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final JsonSchemaVersion schemaVersion,
final Predicate<JsonField> thePredicate) {

jsonObjectBuilder.set(ConnectivitySudoCommand.JsonFields.JSON_CONNECTION_ID, connectionId.toString(),
schemaVersion.and(thePredicate));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
/**
* Response to a {@link SudoRetrieveConnectionStatus} command.
*
* @since 3.0.0
* @since 3.1.0
*/
@Immutable
@JsonParsableCommandResponse(type = SudoRetrieveConnectionStatusResponse.TYPE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,5 @@ private static ActorRef startClientShardRegion(final ActorSystem actorSystem, fi
return ShardRegionCreator.start(actorSystem, ConnectivityMessagingConstants.CLIENT_SHARD_REGION, props,
numberOfShards, ConnectivityMessagingConstants.CLUSTER_ROLE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2050,6 +2050,7 @@ private void startSubscriptionRefreshTimer() {

private Supplier<CompletionStage<Done>> askSelfServiceUnbind() {
final var shutdownTimeout = Duration.ofMinutes(2);

return () -> Patterns.ask(getSelf(), Control.SERVICE_UNBIND, shutdownTimeout).thenApply(answer -> Done.done());
}

Expand All @@ -2072,14 +2073,14 @@ private FSM.State<BaseClientState, BaseClientData> ackregatorStopped(final Contr

private FSM.State<BaseClientState, BaseClientData> serviceRequestsDone(final Control event,
final BaseClientData data) {

shuttingDown = true;
logger.info("{}: ackregatorCount={}", Control.STOP_SHARDED_ACTOR, ackregatorCount);
if (ackregatorCount == 0) {
return stop();
} else {
final var shutdownTimeout = connectivityConfig.getConnectionConfig().getShutdownTimeout();
startSingleTimer(Control.SHUTDOWN_TIMEOUT.name(), Control.SHUTDOWN_TIMEOUT, shutdownTimeout);

return stay();
}
}
Expand All @@ -2088,6 +2089,7 @@ private FSM.State<BaseClientState, BaseClientData> shutdownTimeout(final Control
final BaseClientData data) {
final var shutdownTimeout = connectivityConfig.getConnectionConfig().getShutdownTimeout();
logger.warning("Shutdown timeout <{}> reached; aborting <{}> ackregators", shutdownTimeout, ackregatorCount);

return stop();
}

Expand All @@ -2098,6 +2100,7 @@ private CompletionStage<?> sendDisconnectAnnouncementInCoordinatedShutdown() {
getSelf().tell(SEND_DISCONNECT_ANNOUNCEMENT, ActorRef.noSender());
final CompletableFuture<?> waitForAnnouncementFuture = new CompletableFuture<>();
waitForAnnouncementFuture.completeOnTimeout(null, disconnectAnnouncementTimeout, TimeUnit.MILLISECONDS);

return waitForAnnouncementFuture;
} else {
return CompletableFuture.completedStage(null);
Expand Down Expand Up @@ -2189,6 +2192,7 @@ public void reset() {
@Override
public Duration getNextTimeout() {
increaseTimeoutAfterRecovery();

return currentTimeout;
}

Expand All @@ -2197,6 +2201,7 @@ public Duration getNextBackoff() {
// no need to perform recovery here because timeout always happens after a backoff
final Duration result = nextBackoff;
nextBackoff = minDuration(maxBackoff, nextBackoff.multipliedBy(2L));

return result;
}

Expand Down Expand Up @@ -2235,6 +2240,7 @@ private static boolean isLonger(final Duration d1, final Duration d2) {
private static Predicate<Duration> isLowerThanOrEqual(final Duration otherDuration) {
return arg -> {
final Duration minus = arg.minus(otherDuration);

return minus.isNegative() || minus.isZero();
};
}
Expand Down Expand Up @@ -2266,7 +2272,6 @@ public String toString() {
"outboundSignal=" + outboundSignal +
"]";
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public static ClientActorId fromActorName(final ActorRef actorRef) {
}
final var connectionId = ConnectionId.of(actorName.substring(0, separatorIndex));
final int clientNumber = Integer.parseInt(actorName.substring(separatorIndex + 1));

return new ClientActorId(connectionId, clientNumber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import akka.serialization.Serializers;

/**
* Serializer of {@link org.eclipse.ditto.connectivity.service.messaging.ClientActorPropsArgsSerializer}.
* Serializer of {@link org.eclipse.ditto.connectivity.service.messaging.ClientActorPropsArgs}.
*/
public final class ClientActorPropsArgsSerializer
extends SerializerWithStringManifest implements ByteBufferSerializer {
Expand Down Expand Up @@ -75,10 +75,12 @@ public void toBinary(final Object o, final ByteBuffer buf) {
public byte[] toBinary(final Object o) {
final var args = (ClientActorPropsArgs) o;
final var connection = new Field(args.connection().toJsonString());
final var commandForwarder = toFieldWithManifest(args.commandForwarderActor());
final var commandForwarder =
toFieldWithManifest(args.commandForwarderActor());
final var connectionActor = toField(args.connectionActor());
final var dittoHeaders = new Field(args.dittoHeaders().toJsonString());
final var connectivityConfigOverwrites = toFieldWithManifest(args.connectivityConfigOverwrites());
final var connectivityConfigOverwrites =
toFieldWithManifest(args.connectivityConfigOverwrites());

final var buffer = ByteBuffer.allocate(
connection.length() +
Expand All @@ -94,6 +96,7 @@ public byte[] toBinary(final Object o) {
connectionActor.write(buffer);
dittoHeaders.write(buffer);
connectivityConfigOverwrites.write(buffer);

return buffer.array();
}

Expand All @@ -112,6 +115,7 @@ public Object fromBinary(final ByteBuffer buf, final String manifest) {
final var dittoHeaders = Field.read(buf);
final var connectivityConfigOverwrites = FieldWithManifest.read(buf);
buf.order(originalByteOrder);

return new ClientActorPropsArgs(
ConnectivityModelFactory.connectionFromJson(JsonObject.of(connection.asString())),
toActorRef(commandForwarder, commandForwarder.value()),
Expand All @@ -125,6 +129,7 @@ private Serialization getSerialization() {
if (serialization == null) {
serialization = SerializationExtension.get(actorSystem);
}

return serialization;
}

Expand All @@ -147,6 +152,7 @@ private FieldWithManifest toFieldWithManifest(final Object value) {
final int id = serializer.identifier();
final var manifest = new Field(Serializers.manifestFor(serializer, value));
final var valueField = new Field(serialization.serialize(value).get());

return new FieldWithManifest(id, manifest, valueField);
}

Expand All @@ -172,6 +178,7 @@ private int length() {
private static Field read(final ByteBuffer buffer) {
final var bytes = new byte[buffer.getInt()];
buffer.get(bytes);

return new Field(bytes);
}
}
Expand All @@ -188,11 +195,13 @@ private static FieldWithManifest read(final ByteBuffer buffer) {
final var id = buffer.getInt();
final var manifest = Field.read(buffer);
final var value = Field.read(buffer);

return new FieldWithManifest(id, manifest, value);
}

private int length() {
return 4 + manifest.length() + value.length();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
public final class ClientSupervisor extends AbstractActorWithTimers {

private final DittoDiagnosticLoggingAdapter logger = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
;

private final ClientActorId clientActorId = ClientActorId.fromActorName(getSelf());
private final SudoRetrieveConnectionStatus sudoRetrieveConnectionStatus =
SudoRetrieveConnectionStatus.of(clientActorId.connectionId(),
Expand All @@ -60,17 +60,18 @@ public final class ClientSupervisor extends AbstractActorWithTimers {
private Props props;
private ActorRef clientActor;

@SuppressWarnings({"unused"})
private ClientSupervisor(final int numberOfShards, final Duration statusCheckInterval) {
this.statusCheckInterval = statusCheckInterval;
final var actorSystem = getContext().getSystem();
final var clusterSharding = ClusterSharding.get(actorSystem);
final var extractor = ShardRegionExtractor.of(numberOfShards, actorSystem);
connectionShardRegion = clusterSharding.startProxy(ConnectivityMessagingConstants.SHARD_REGION,
Optional.of(ConnectivityMessagingConstants.CLUSTER_ROLE),
extractor);
Optional.of(ConnectivityMessagingConstants.CLUSTER_ROLE), extractor);
propsFactory = getClientActorPropsFactory(actorSystem);
}

@SuppressWarnings({"unused"})
// constructor for unit tests
private ClientSupervisor(final Duration statusCheckInterval, final ActorRef connectionShardRegion) {
this.statusCheckInterval = statusCheckInterval;
Expand Down Expand Up @@ -147,18 +148,19 @@ private void childTerminated(final Terminated terminated) {
}

private void startClientActor(final ClientActorPropsArgs propsArgs) {
final var props = propsFactory.getActorProps(propsArgs, getContext().getSystem());
if (props.equals(this.props)) {
logger.debug("Refreshing props");
final var actorProps = propsFactory.getActorProps(propsArgs, getContext().getSystem());
if (actorProps.equals(this.props)) {
logger.debug("Refreshing actorProps");
} else {
final var oldClientActor = clientActor;
if (oldClientActor != null) {
getContext().unwatch(oldClientActor);
getContext().stop(oldClientActor);
}
this.props = props;
clientActor = getContext().watch(getContext().actorOf(props));
logger.debug("New props received; stopped client actor <{}> and started <{}>", oldClientActor, clientActor);
this.props = actorProps;
clientActor = getContext().watch(getContext().actorOf(actorProps));
logger.debug("New actorProps received; stopped client actor <{}> and started <{}>", oldClientActor,
clientActor);
}
}

Expand Down Expand Up @@ -228,4 +230,5 @@ private static ClientActorPropsFactory getClientActorPropsFactory(final ActorSys
private enum Control {
STATUS_CHECK
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ private void interpretStagedCommand(final StagedCommand command) {
becomeDeletedHandler();
interpretStagedCommand(command.next());
}
case CHECK_LOGGING_ENABLED -> CHECK_LOGGING_ENABLED(command.next());
case CHECK_LOGGING_ENABLED -> checkLoggingEnabled(command.next());
case BROADCAST_TO_CLIENT_ACTORS_IF_STARTED -> {
broadcastToClientActors(command.getCommand(), getSelf());
interpretStagedCommand(command.next());
Expand Down Expand Up @@ -763,7 +763,7 @@ private void updatePriority(final UpdatePriority updatePriority) {
}
}

private void CHECK_LOGGING_ENABLED(final StagedCommand command) {
private void checkLoggingEnabled(final StagedCommand command) {
if (isDesiredStateOpen()) {
startEnabledLoggingChecker();
updateLoggingIfEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ protected Result<ConnectivityEvent<?>> doApply(final Context<ConnectionState> co
final long nextRevision,
final SudoRetrieveConnectionStatus command,
@Nullable final Metadata metadata) {

final var connectionStatus = entity != null && !entity.hasLifecycle(ConnectionLifecycle.DELETED)
? entity.getConnectionStatus()
: ConnectivityStatus.CLOSED;
final var clientCount = entity != null ? entity.getClientCount() : 0;

return ResultFactory.newQueryResult(command,
SudoRetrieveConnectionStatusResponse.of(connectionStatus, clientCount, command.getDittoHeaders()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ protected Props getConsumerActorProps(final Sink<Object, NotUsed> inboundMapping
));
final HeaderMapping mappingWithSpecialKafkaHeaders = ConnectivityModelFactory.newHeaderMapping(map);

AtMostOnceKafkaConsumerSourceSupplier sourceSupplier = mock(AtMostOnceKafkaConsumerSourceSupplier.class);
final AtMostOnceKafkaConsumerSourceSupplier sourceSupplier = mock(AtMostOnceKafkaConsumerSourceSupplier.class);
when(sourceSupplier.get()).thenReturn(source);
final String address = "kafka";
final org.eclipse.ditto.connectivity.model.Source connectionSource = ConnectivityModelFactory.newSourceBuilder()
Expand Down
Loading

0 comments on commit 330794d

Please sign in to comment.