Skip to content

Commit

Permalink
Add background sync actor with config and status reporting.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Feb 7, 2020
1 parent ad7bc1a commit 7b7548d
Show file tree
Hide file tree
Showing 36 changed files with 1,915 additions and 87 deletions.
Expand Up @@ -103,18 +103,17 @@ private static Source<Object, NotUsed> checkForErrors(
if (triple.t2() != null) {
return Source.single(triple.t2());
} else {
final String message = String.format("Error on sending <%s>: %s", Objects.toString(triple.t1()),
Objects.toString(triple.t3()));
final String message = String.format("Error on sending <%s>: %s", triple.t1(), triple.t3());
return Source.failed(new IllegalStateException(message));
}
}

private static DistributedPubSubMediator.Send requestStreamCommand(final PersistenceIdsConfig config,
final String path, final EntityIdWithRevision seed) {
return DistPubSubAccess.send(path, sudoStreamSnapshotRevisions(config, seed), false);
return DistPubSubAccess.send(path, sudoStreamPids(config, seed), false);
}

private static SudoStreamPids sudoStreamSnapshotRevisions(final PersistenceIdsConfig config,
private static SudoStreamPids sudoStreamPids(final PersistenceIdsConfig config,
final EntityIdWithRevision seed) {
return SudoStreamPids.of(config.getBurst(), config.getStreamIdleTimeout().toMillis(), DittoHeaders.empty())
.withLowerBound(seed);
Expand Down
Expand Up @@ -22,7 +22,7 @@
import org.eclipse.ditto.services.base.actors.DittoRootActor;
import org.eclipse.ditto.services.base.config.http.HttpConfig;
import org.eclipse.ditto.services.concierge.actors.ShardRegions;
import org.eclipse.ditto.services.concierge.actors.cleanup.CleanupStatusReporter;
import org.eclipse.ditto.services.utils.health.SingletonStatusReporter;
import org.eclipse.ditto.services.concierge.actors.cleanup.EventSnapshotCleanupCoordinator;
import org.eclipse.ditto.services.concierge.common.ConciergeConfig;
import org.eclipse.ditto.services.concierge.starter.proxy.EnforcerActorFactory;
Expand Down Expand Up @@ -141,7 +141,9 @@ private ActorRef startHealthCheckingActor(final ConciergeConfig conciergeConfig,
return startChildActor(DefaultHealthCheckingActorFactory.ACTOR_NAME,
DefaultHealthCheckingActorFactory.props(healthCheckingActorOptions,
MongoHealthChecker.props(),
CleanupStatusReporter.props(cleanupCoordinatorProxy)));
SingletonStatusReporter.props(ConciergeMessagingConstants.CLUSTER_ROLE,
cleanupCoordinatorProxy))
);
}

private static Route createRoute(final ActorSystem actorSystem, final ActorRef healthCheckingActor) {
Expand Down
@@ -0,0 +1,165 @@
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.models.policies.commands.sudo;

import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;

import java.util.Objects;
import java.util.function.Predicate;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.json.JsonParsableCommand;
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
import org.eclipse.ditto.model.policies.PolicyId;
import org.eclipse.ditto.signals.commands.base.AbstractCommand;
import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;

/**
* Command which retrieves one policy revision based on the the passed in Policy ID w/o
* authorization context.
*/
@Immutable
@AllValuesAreNonnullByDefault
@JsonParsableCommand(typePrefix = SudoRetrievePolicyRevision.TYPE_PREFIX, name = SudoRetrievePolicyRevision.NAME)
public final class SudoRetrievePolicyRevision extends AbstractCommand<SudoRetrievePolicyRevision>
implements SudoCommand<SudoRetrievePolicyRevision> {

/**
* Name of the "Sudo Retrieve Policy" command.
*/
public static final String NAME = "sudoRetrievePolicyRevision";

/**
* Type of this command.
*/
public static final String TYPE = SudoCommand.TYPE_PREFIX + NAME;

private final PolicyId policyId;

private SudoRetrievePolicyRevision(final PolicyId policyId, final DittoHeaders dittoHeaders) {
super(TYPE, dittoHeaders);

this.policyId = checkNotNull(policyId, "Policy identifier");
}

/**
* Returns a Command for retrieving the Policy with the given ID.
*
* @param policyId the ID of a single Policy to be retrieved by this command.
* @param dittoHeaders the optional command headers of the request.
* @return a Command for retrieving the Policy with the {@code policyId} as its ID.
* @throws NullPointerException if any argument is {@code null}.
*/
public static SudoRetrievePolicyRevision of(final PolicyId policyId, final DittoHeaders dittoHeaders) {
return new SudoRetrievePolicyRevision(policyId, dittoHeaders);
}

/**
* Creates a new {@code SudoRetrievePolicy} from a JSON string.
*
* @param jsonString the JSON string of which a new SudoRetrievePolicy instance is to be created.
* @param dittoHeaders the optional command headers of the request.
* @return the {@code SudoRetrievePolicy} which was created from the given JSON string.
* @throws NullPointerException if {@code jsonString} is {@code null}.
* @throws IllegalArgumentException if {@code jsonString} is empty.
* @throws org.eclipse.ditto.json.JsonParseException if the passed in {@code jsonString} was not in the expected
* format.
*/
public static SudoRetrievePolicyRevision fromJson(final String jsonString, final DittoHeaders dittoHeaders) {
final JsonObject jsonObject = JsonFactory.newObject(jsonString);

return fromJson(jsonObject, dittoHeaders);
}

/**
* Creates a new {@code SudoRetrievePolicy} from a JSON object.
*
* @param jsonObject the JSON object of which a new SudoRetrievePolicy instance is to be created.
* @param dittoHeaders the optional command headers of the request.
* @return the {@code SudoRetrievePolicy} which was created from the given JSON object.
* @throws NullPointerException if {@code jsonObject} is {@code null}.
* @throws org.eclipse.ditto.json.JsonParseException if the passed in {@code jsonObject} was not in the expected
* format.
*/
public static SudoRetrievePolicyRevision fromJson(final JsonObject jsonObject, final DittoHeaders dittoHeaders) {
final String extractedPolicyId = jsonObject.getValueOrThrow(SudoCommand.JsonFields.JSON_POLICY_ID);
final PolicyId policyId = PolicyId.of(extractedPolicyId);

return SudoRetrievePolicyRevision.of(policyId, dittoHeaders);
}

/**
* Returns the identifier of the {@code Policy} to retrieve.
*
* @return the identifier of the Policy to retrieve.
*/
@Override
public PolicyId getEntityId() {
return policyId;
}

@Override
protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final JsonSchemaVersion schemaVersion,
final Predicate<JsonField> thePredicate) {

final Predicate<JsonField> predicate = schemaVersion.and(thePredicate);
jsonObjectBuilder.set(SudoCommand.JsonFields.JSON_POLICY_ID, String.valueOf(policyId), predicate);
}

@Override
public Category getCategory() {
return Category.QUERY;
}

@Override
public SudoRetrievePolicyRevision setDittoHeaders(final DittoHeaders dittoHeaders) {
return of(policyId, dittoHeaders);
}

@SuppressWarnings({"squid:MethodCyclomaticComplexity", "squid:S1067"})
@Override
public boolean equals(@Nullable final Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
final SudoRetrievePolicyRevision that = (SudoRetrievePolicyRevision) obj;
return that.canEqual(this) && Objects.equals(policyId, that.policyId) && super.equals(that);
}

@Override
protected boolean canEqual(@Nullable final Object other) {
return other instanceof SudoRetrievePolicyRevision;
}

@SuppressWarnings("squid:S109")
@Override
public int hashCode() {
return Objects.hash(super.hashCode(), policyId);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" + super.toString() + ", policyId=" + policyId + "]";
}

}
@@ -0,0 +1,189 @@
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.models.policies.commands.sudo;

import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;

import java.util.Objects;
import java.util.function.Predicate;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.base.json.FieldType;
import org.eclipse.ditto.model.base.json.JsonParsableCommandResponse;
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
import org.eclipse.ditto.model.policies.PolicyId;
import org.eclipse.ditto.signals.commands.base.AbstractCommandResponse;
import org.eclipse.ditto.signals.commands.base.CommandResponseJsonDeserializer;

/**
* Response to a {@link org.eclipse.ditto.services.models.policies.commands.sudo.SudoRetrievePolicyRevisionResponse} command.
*/
@Immutable
@JsonParsableCommandResponse(type = SudoRetrievePolicyRevisionResponse.TYPE)
public final class SudoRetrievePolicyRevisionResponse
extends AbstractCommandResponse<SudoRetrievePolicyRevisionResponse>
implements SudoCommandResponse<SudoRetrievePolicyRevisionResponse> {

/**
* Type of this response.
*/
public static final String TYPE = TYPE_PREFIX + SudoRetrievePolicyRevision.NAME;

static final JsonFieldDefinition<Long> JSON_REVISION =
JsonFactory.newLongFieldDefinition("payload/revision", FieldType.REGULAR, JsonSchemaVersion.V_2);

private final PolicyId policyId;
private final long revision;

private SudoRetrievePolicyRevisionResponse(final PolicyId policyId,
final long revision,
final DittoHeaders dittoHeaders) {

super(TYPE, HttpStatusCode.OK, dittoHeaders);
this.policyId = checkNotNull(policyId, "Policy ID");
this.revision = revision;
}

/**
* Creates a response to a {@code SudoRetrievePolicyRevision} command.
*
* @param policyId the policy ID.
* @param revision the policy revision.
* @param dittoHeaders the headers of the preceding command.
* @return the response.
*/
public static SudoRetrievePolicyRevisionResponse of(final PolicyId policyId,
final long revision,
final DittoHeaders dittoHeaders) {
return new SudoRetrievePolicyRevisionResponse(policyId, revision, dittoHeaders);
}

/**
* Creates a response to a {@code SudoRetrievePolicyResponse} command from a JSON string.
*
* @param jsonString the JSON string of which the response is to be created.
* @param dittoHeaders the headers of the preceding command.
* @return the response.
* @throws NullPointerException if {@code jsonString} is {@code null}.
* @throws IllegalArgumentException if {@code jsonString} is empty.
* @throws org.eclipse.ditto.json.JsonParseException if the passed in {@code jsonString} was not in the expected
* format.
*/
public static SudoRetrievePolicyRevisionResponse fromJson(final String jsonString,
final DittoHeaders dittoHeaders) {
return fromJson(JsonFactory.newObject(jsonString), dittoHeaders);
}

/**
* Creates a response to a {@code SudoRetrievePolicyResponse} command from a JSON object.
*
* @param jsonObject the JSON object of which the response is to be created.
* @param dittoHeaders the headers of the preceding command.
* @return the response.
* @throws NullPointerException if {@code jsonObject} is {@code null}.
* @throws org.eclipse.ditto.json.JsonParseException if the passed in {@code jsonObject} was not in the expected
* format.
*/
public static SudoRetrievePolicyRevisionResponse fromJson(final JsonObject jsonObject,
final DittoHeaders dittoHeaders) {
return new CommandResponseJsonDeserializer<SudoRetrievePolicyRevisionResponse>(TYPE, jsonObject)
.deserialize(statusCode -> {
final String extractedPolicyId =
jsonObject.getValueOrThrow(SudoCommandResponse.JsonFields.JSON_POLICY_ID);
final PolicyId policyId = PolicyId.of(extractedPolicyId);
final long revision = jsonObject.getValueOrThrow(JSON_REVISION);

return of(policyId, revision, dittoHeaders);
});
}

@Override
public PolicyId getEntityId() {
return policyId;
}

/**
* Returns the retrieved Policy.
*
* @return the retrieved Policy.
*/
public long getRevision() {
return revision;
}

@Override
public JsonValue getEntity(final JsonSchemaVersion schemaVersion) {
return JsonValue.of(revision);
}

@Override
public SudoRetrievePolicyRevisionResponse setEntity(final JsonValue entity) {
checkNotNull(entity, "entity");
return of(policyId, entity.asLong(), getDittoHeaders());
}

@Override
public SudoRetrievePolicyRevisionResponse setDittoHeaders(final DittoHeaders dittoHeaders) {
return of(policyId, revision, dittoHeaders);
}

@Override
protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final JsonSchemaVersion schemaVersion,
final Predicate<JsonField> thePredicate) {

final Predicate<JsonField> predicate = schemaVersion.and(thePredicate);
jsonObjectBuilder.set(SudoCommandResponse.JsonFields.JSON_POLICY_ID, String.valueOf(policyId), predicate);
jsonObjectBuilder.set(JSON_REVISION, revision, predicate);
}

@Override
protected boolean canEqual(@Nullable final Object other) {
return other instanceof SudoRetrievePolicyRevisionResponse;
}

@Override
public boolean equals(@Nullable final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final SudoRetrievePolicyRevisionResponse that = (SudoRetrievePolicyRevisionResponse) o;
return that.canEqual(this) &&
Objects.equals(policyId, that.policyId) &&
revision == that.revision &&
super.equals(o);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), policyId, revision);
}

@Override
public String toString() {
return super.toString() + "policyId=" + policyId + "revision=" + revision + "]";
}

}

0 comments on commit 7b7548d

Please sign in to comment.