Skip to content

Commit

Permalink
ClientSupervisor: Retrieve connection status on startup to deal with …
Browse files Browse the repository at this point in the history
…shard rebalancing.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Dec 9, 2022
1 parent 68878f6 commit fdd4a61
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.api.commands.sudo;

import java.util.Objects;
import java.util.function.Predicate;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.FieldType;
import org.eclipse.ditto.base.model.json.JsonParsableCommand;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.commands.AbstractCommand;
import org.eclipse.ditto.base.model.signals.commands.CommandJsonDeserializer;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.WithConnectionId;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;

/**
* Command which requests the connection persistence actor to send arguments with which to construct client actor props.
*
* @since 3.1.0
*/
@Immutable
@JsonParsableCommand(typePrefix = ConnectivitySudoCommand.TYPE_PREFIX, name = SudoRetrieveClientActorProps.NAME)
public final class SudoRetrieveClientActorProps extends AbstractCommand<SudoRetrieveClientActorProps>
implements ConnectivitySudoCommand<SudoRetrieveClientActorProps>, WithConnectionId {

public static final String NAME = "sudoRetrieveClientActorProps";

public static final String TYPE = TYPE_PREFIX + NAME;

private static final JsonFieldDefinition<Integer> CLIENT_ACTOR_NUMBER =
JsonFactory.newIntFieldDefinition("clientActorNumber", FieldType.REGULAR, JsonSchemaVersion.V_2);

private final ConnectionId connectionId;
private final int clientActorNumber;

private SudoRetrieveClientActorProps(final ConnectionId connectionId, final int clientActorId,
final DittoHeaders dittoHeaders) {
super(TYPE, dittoHeaders);
this.connectionId = connectionId;
this.clientActorNumber = clientActorId;
}

/**
* Returns a new instance of {@code SudoRetrieveClientActorProps}.
*
* @param connectionId the Connection ID.
* @param clientActorNumber the client actor number.
* @param dittoHeaders the headers of the request.
* @return a new SudoRetrieveClientActorProps command.
* @throws NullPointerException if any argument is {@code null}.
*/
public static SudoRetrieveClientActorProps of(final ConnectionId connectionId,
final int clientActorNumber, final DittoHeaders dittoHeaders) {
return new SudoRetrieveClientActorProps(connectionId, clientActorNumber, dittoHeaders);
}

/**
* Creates a new {@code SudoRetrieveClientActorProps} from a JSON object.
*
* @param jsonObject the JSON object of which the command is to be created.
* @param dittoHeaders the headers of the command.
* @return the command.
* @throws NullPointerException if {@code jsonObject} is {@code null}.
* @throws org.eclipse.ditto.json.JsonParseException if the passed in {@code jsonObject} was not in the expected
* format.
*/
public static SudoRetrieveClientActorProps fromJson(final JsonObject jsonObject, final DittoHeaders dittoHeaders) {
return new CommandJsonDeserializer<SudoRetrieveClientActorProps>(TYPE, jsonObject).deserialize(
() -> {
final String readConnectionId =
jsonObject.getValueOrThrow(ConnectivitySudoCommand.JsonFields.JSON_CONNECTION_ID);
final ConnectionId connectionId = ConnectionId.of(readConnectionId);
final int clientNumber = jsonObject.getValueOrThrow(CLIENT_ACTOR_NUMBER);
return of(connectionId, clientNumber, dittoHeaders);
});
}

@Override
protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final JsonSchemaVersion schemaVersion,
final Predicate<JsonField> thePredicate) {
final Predicate<JsonField> predicate = schemaVersion.and(thePredicate);
jsonObjectBuilder.set(ConnectivitySudoCommand.JsonFields.JSON_CONNECTION_ID, connectionId.toString(),
predicate);
jsonObjectBuilder.set(CLIENT_ACTOR_NUMBER, clientActorNumber);
}

@Override
public Category getCategory() {
return Category.QUERY;
}

@Override
public SudoRetrieveClientActorProps setDittoHeaders(final DittoHeaders dittoHeaders) {
return of(connectionId, clientActorNumber, dittoHeaders);
}

@Override
public ConnectionId getEntityId() {
return connectionId;
}

/**
* Retrieve the client actor number.
*
* @return The client actor number.
*/
public int getClientActorNumber() {
return clientActorNumber;
}

@Override
protected boolean canEqual(@Nullable final Object other) {
return other instanceof SudoRetrieveClientActorProps;
}

@Override
public boolean equals(@Nullable final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
final SudoRetrieveClientActorProps that = (SudoRetrieveClientActorProps) o;
return Objects.equals(connectionId, that.connectionId) && clientActorNumber == that.clientActorNumber;
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), connectionId, clientActorNumber);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
super.toString() +
", connectionId=" + connectionId +
", clientActorNumber=" + clientActorNumber +
"]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.api.ConnectivityMessagingConstants;
import org.eclipse.ditto.connectivity.api.commands.sudo.SudoRetrieveClientActorProps;
import org.eclipse.ditto.connectivity.api.commands.sudo.SudoRetrieveConnectionStatus;
import org.eclipse.ditto.connectivity.api.commands.sudo.SudoRetrieveConnectionStatusResponse;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
Expand Down Expand Up @@ -104,7 +105,7 @@ public static Props propsForTest(final Duration statusCheckInterval, final Actor

@Override
public void preStart() {
scheduleNextStatusCheck();
startStatusCheck(Control.STATUS_CHECK);
}

@Override
Expand Down Expand Up @@ -190,7 +191,10 @@ private void checkConnectionStatus(final SudoRetrieveConnectionStatusResponse re
clientActorId.clientNumber() >= response.getClientCount()) {
connectionNotAccessible(response);
} else if (clientActor == null) {
logger.error("Client actor <{}> of open connection did not start", clientActorId);
logger.info("Client actor <{}> of open connection did not start. Requesting props.", clientActorId);
final var command = SudoRetrieveClientActorProps.of(clientActorId.connectionId(),
clientActorId.clientNumber(), DittoHeaders.empty());
connectionShardRegion.tell(command, getSelf());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,15 @@
import org.eclipse.ditto.base.model.signals.SignalWithEntityId;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.connectivity.api.BaseClientState;
import org.eclipse.ditto.connectivity.api.commands.sudo.ConnectivitySudoCommand;
import org.eclipse.ditto.connectivity.api.commands.sudo.SudoRetrieveClientActorProps;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectionLifecycle;
import org.eclipse.ditto.connectivity.model.ConnectionMetrics;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommand;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommandInterceptor;
import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionFailedException;
import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionNotAccessibleException;
Expand Down Expand Up @@ -673,6 +676,10 @@ protected Receive matchAnyAfterInitialization() {
.matchEquals(Control.TRIGGER_UPDATE_PRIORITY, this::triggerUpdatePriority)
.match(UpdatePriority.class, this::updatePriority)
.match(StopShardedActor.class, this::stopShardedActor)
.match(SudoRetrieveClientActorProps.class, this::retrieveClientActorProps)
// Respond with not-accessible-exception for unhandled connectivity commands
.match(ConnectivitySudoCommand.class, this::notAccessible)
.match(ConnectivityCommand.class, this::notAccessible)
.build()
.orElse(super.matchAnyAfterInitialization());
}
Expand Down Expand Up @@ -911,6 +918,20 @@ private void respondWithEmptyLogs(final WithDittoHeaders command, final ActorRef
origin.tell(logsResponse, getSelf());
}

private void retrieveClientActorProps(final SudoRetrieveClientActorProps command) {
if (entity != null) {
log.info("Sending client actor props for <{}>", getSender());
final var args = new ClientActorPropsArgs(entity, commandForwarderActor, getSelf(),
command.getDittoHeaders(), connectivityConfigOverwrites);
final var clientActorId = new ClientActorId(entityId, command.getClientActorNumber());
final var envelope = shardedBinaryEnvelope(args, clientActorId);
getSender().tell(envelope, getSelf());
} else {
log.info("Received request for client actor props of nonexistent connection from <{}>", getSender());
notAccessible(command);
}
}

private CompletionStage<Object> startAndAskClientActorForTest(final TestConnection cmd) {
final var args = new ClientActorPropsArgs(cmd.getConnection(), commandForwarderActor, getSelf(),
cmd.getDittoHeaders(), connectivityConfigOverwrites);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ private static ConnectionCreatedStrategies newCreatedStrategies() {
return strategies;
}

@Override
public boolean isDefined(final Command<?> command) {
return command instanceof ConnectivityCommand || command instanceof ConnectivitySudoCommand;
}

@Override
public Result<ConnectivityEvent<?>> unhandled(final Context<ConnectionState> context,
@Nullable final Connection entity,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging;

import static org.assertj.core.api.Assertions.assertThat;

import java.time.Duration;
import java.util.Map;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.service.UriEncoding;
import org.eclipse.ditto.connectivity.api.commands.sudo.SudoRetrieveClientActorProps;
import org.eclipse.ditto.connectivity.api.commands.sudo.SudoRetrieveConnectionStatus;
import org.eclipse.ditto.connectivity.api.commands.sudo.SudoRetrieveConnectionStatusResponse;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
import org.eclipse.ditto.internal.utils.akka.ActorSystemResource;
import org.junit.Rule;
import org.junit.Test;

import com.typesafe.config.ConfigFactory;

import akka.testkit.TestKit;
import scala.concurrent.duration.FiniteDuration;

/**
* Tests {@link ClientSupervisor}.
*/
public final class ClientSupervisorTest {

@Rule
public final ActorSystemResource ACTOR_SYSTEM_RESOURCE = ActorSystemResource.newInstance(
ConfigFactory.parseMap(Map.of(
"ditto.extensions.client-actor-props-factory.extension-class",
MockClientActorPropsFactory.class.getName()
)).withFallback(TestConstants.CONFIG));

@Test
public void checkConnectionStatusOnStartUp() {
new TestKit(ACTOR_SYSTEM_RESOURCE.getActorSystem()) {{
final var props = ClientSupervisor.propsForTest(Duration.ofDays(1), testActor());
final var clientActorId = new ClientActorId(TestConstants.createRandomConnectionId(), 0);

// WHEN: Client supervisor starts
// THEN: It retrieves the connection status
final var underTest = childActorOf(props, UriEncoding.encodePath(clientActorId.toString()));
watch(underTest);
expectMsgClass(SudoRetrieveConnectionStatus.class);
assertThat(lastSender()).isEqualTo(underTest);

// WHEN: Connection status is open and the client actor is not started
// THEN: Client supervisor retrieves the client actor props args
underTest.tell(SudoRetrieveConnectionStatusResponse.of(ConnectivityStatus.OPEN, 1, DittoHeaders.empty()),
testActor());
expectMsgClass(SudoRetrieveClientActorProps.class);

// WHEN: Connection status is closed
// THEN: Client supervisor terminates
underTest.tell(SudoRetrieveConnectionStatusResponse.of(ConnectivityStatus.CLOSED, 1, DittoHeaders.empty()),
testActor());
expectTerminated(underTest, FiniteDuration.apply(10, "s"));
}};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.api.BaseClientState;
import org.eclipse.ditto.connectivity.api.commands.sudo.SudoRetrieveClientActorProps;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionConfigurationInvalidException;
import org.eclipse.ditto.connectivity.model.ConnectionId;
Expand Down Expand Up @@ -68,6 +69,8 @@
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnectionStatus;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnectionStatusResponse;
import org.eclipse.ditto.connectivity.model.signals.events.ConnectionDeleted;
import org.eclipse.ditto.connectivity.service.messaging.ClientActorId;
import org.eclipse.ditto.connectivity.service.messaging.ClientActorPropsArgs;
import org.eclipse.ditto.connectivity.service.messaging.MockClientActorPropsFactory;
import org.eclipse.ditto.connectivity.service.messaging.MockCommandValidator;
import org.eclipse.ditto.connectivity.service.messaging.TestConstants;
Expand All @@ -76,6 +79,7 @@
import org.eclipse.ditto.internal.utils.akka.ActorSystemResource;
import org.eclipse.ditto.internal.utils.akka.PingCommand;
import org.eclipse.ditto.internal.utils.akka.controlflow.WithSender;
import org.eclipse.ditto.internal.utils.cluster.ShardedBinaryEnvelope;
import org.eclipse.ditto.internal.utils.persistentactors.AbstractPersistenceSupervisor;
import org.eclipse.ditto.internal.utils.test.Retry;
import org.eclipse.ditto.internal.utils.tracing.DittoTracingInitResource;
Expand Down Expand Up @@ -674,10 +678,8 @@ public void recoverOpenConnection() {

// create connection
underTest.tell(createConnection(), testProbe.ref());
testProbe.expectMsg(createConnectionResponse());

// wait for open connection of initial creation
simulateSuccessfulOpenConnectionInClientActor();
testProbe.expectMsg(createConnectionResponse());

// stop actor
actorSystemResource1.stopActor(underTest);
Expand Down Expand Up @@ -1212,6 +1214,29 @@ public void deleteConnectionCommandEmitsEvent() {
testProbe.expectTerminated(underTest, FiniteDuration.apply(3, TimeUnit.SECONDS));
}

@Test
public void retrieveClientActorProps() {
final var underTest = createSupervisor();
final var testProbe = actorSystemResource1.newTestProbe();
final var clientActorId = new ClientActorId(connectionId, 0);
final var command = SudoRetrieveClientActorProps.of(clientActorId.connectionId(),
clientActorId.clientNumber(), DittoHeaders.newBuilder().randomCorrelationId().build());

underTest.tell(command, testProbe.ref());
testProbe.expectMsgClass(ConnectionNotAccessibleException.class);

final var createConnection = createConnection();
underTest.tell(createConnection, testProbe.ref());
simulateSuccessfulOpenConnectionInClientActor();
testProbe.expectMsg(createConnectionResponse());

underTest.tell(command, testProbe.ref());
final var envelope = testProbe.expectMsgClass(ShardedBinaryEnvelope.class);
final var message = (ClientActorPropsArgs) envelope.message();
assertThat(message.connection()).isEqualTo(createConnection.getConnection());
assertThat(message.dittoHeaders()).isEqualTo(command.getDittoHeaders());
}

private ActorRef createSupervisor() {
return TestConstants.createConnectionSupervisorActor(connectionId,
actorSystemResource1.getActorSystem(),
Expand Down
Loading

0 comments on commit fdd4a61

Please sign in to comment.