Skip to content

Commit

Permalink
Add custom SBR provider which allows to be enabled/disabled during ru…
Browse files Browse the repository at this point in the history
…ntime

Signed-off-by: Yannic Klem <Yannic.Klem@bosch.io>
  • Loading branch information
Yannic92 committed Apr 21, 2022
1 parent 2967d5e commit d1498fd
Show file tree
Hide file tree
Showing 17 changed files with 418 additions and 69 deletions.
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.base.service.cluster;

import javax.annotation.Nullable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;

/**
* This is just a wrapper for the {@link akka.cluster.sbr.SplitBrainResolver akka sbr} with an addition, that this
* sbr can be turned off/on via {@link ModifySplitBrainResolver} sent as piggyback command to /system/cluster/core/daemon/downingProvider
*/
final class DittoSplitBrainResolver extends AbstractActor {

private static final Logger LOGGER = LoggerFactory.getLogger(DittoSplitBrainResolver.class);

private final Props splitBrainResolverProps;

@Nullable
private ActorRef splitBrainResolverActor;

private DittoSplitBrainResolver(final Props splitBrainResolverProps) {
this.splitBrainResolverProps = splitBrainResolverProps;
}

@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(ModifySplitBrainResolver.class, this::updateEnabled)
.matchAny(() -> splitBrainResolverActor != null, this::forward)
.matchAny(this::logDropped)
.build();
}

static Props props(@Nullable final Props splitBrainResolverProps) {
return Props.create(DittoSplitBrainResolver.class, splitBrainResolverProps);
}

private ActorRef startChildActor(final Props props) {
return getContext().actorOf(props);
}

private void updateEnabled(final ModifySplitBrainResolver modifySplitBrainResolver) {
if (modifySplitBrainResolver.isEnabled() && splitBrainResolverActor == null) {
LOGGER.info("Enabling akka split brain resolver");
splitBrainResolverActor = startChildActor(splitBrainResolverProps);
} else if (!modifySplitBrainResolver.isEnabled() && splitBrainResolverActor != null) {
LOGGER.info("Stopping akka split brain resolver");
getContext().stop(splitBrainResolverActor);
splitBrainResolverActor = null;
}
sender().tell(ModifySplitBrainResolverResponse.of(modifySplitBrainResolver), getSelf());
}

private void forward(final Object obj) {
if (splitBrainResolverActor != null) {
splitBrainResolverActor.forward(obj, getContext());
}
}

private void logDropped(final Object obj) {
LOGGER.info("Dropped message <{}> because split brain resolver is disabled", obj);
}

}
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.base.service.cluster;

import org.eclipse.ditto.internal.utils.config.DittoConfigError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.DowningProvider;
import akka.cluster.sbr.SplitBrainResolverProvider;
import scala.Option;
import scala.concurrent.duration.FiniteDuration;

/**
* This provider is providing the props of {@link DittoSplitBrainResolver}.
*/
@SuppressWarnings("unused")
final class DittoSplitBrainResolverProvider extends DowningProvider {

private static final Logger LOGGER = LoggerFactory.getLogger(DittoSplitBrainResolverProvider.class);

private final SplitBrainResolverProvider splitBrainResolverProvider;

/**
* This constructor is called by akka.
*
* @param actorSystem the actor system used to instantiate this provider.
*/
@SuppressWarnings("unused")
DittoSplitBrainResolverProvider(final ActorSystem actorSystem) {
splitBrainResolverProvider = new SplitBrainResolverProvider(actorSystem);

}

@Override
public FiniteDuration downRemovalMargin() {
return splitBrainResolverProvider.downRemovalMargin();
}

@Override
public Option<Props> downingActorProps() {
try {
final Props splitBrainResolverProps = splitBrainResolverProvider.downingActorProps().get();
return Option.apply(DittoSplitBrainResolver.props(splitBrainResolverProps));
} catch (final Exception e) {
final String msg = "Could not create ditto split brain resolver props.";
LOGGER.error(msg, e);
throw new DittoConfigError(msg, e);
}
}

}
@@ -0,0 +1,89 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.base.service.cluster;

import java.util.function.Predicate;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.FieldType;
import org.eclipse.ditto.base.model.json.JsonParsableCommand;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.commands.AbstractCommand;
import org.eclipse.ditto.base.model.signals.commands.CommandJsonDeserializer;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonPointer;

@JsonParsableCommand(typePrefix = ModifySplitBrainResolver.PREFIX, name = ModifySplitBrainResolver.NAME)
public final class ModifySplitBrainResolver extends AbstractCommand<ModifySplitBrainResolver> {

private static final JsonFieldDefinition<Boolean> ENABLED =
JsonFieldDefinition.ofBoolean("enabled", FieldType.REGULAR, JsonSchemaVersion.V_2);

static final String PREFIX = "ditto.sbr:";
static final String NAME = "modify";
public static final String TYPE = PREFIX + NAME;

private final boolean enabled;

private ModifySplitBrainResolver(final DittoHeaders dittoHeaders, final boolean enabled) {
super(TYPE, dittoHeaders);
this.enabled = enabled;
}

public boolean isEnabled() {
return enabled;
}

public static ModifySplitBrainResolver fromJson(final JsonObject jsonObject, final DittoHeaders dittoHeaders) {
return new CommandJsonDeserializer<ModifySplitBrainResolver>(TYPE, jsonObject).deserialize(() -> {
final boolean enabled = jsonObject.getValue(ENABLED).orElseThrow();

return new ModifySplitBrainResolver(dittoHeaders, enabled);
});
}

@Override
protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final JsonSchemaVersion schemaVersion,
final Predicate<JsonField> predicate) {
jsonObjectBuilder.set(ENABLED, enabled, schemaVersion.and(predicate));
}

@Override
public String getTypePrefix() {
return PREFIX;
}

@Override
public Category getCategory() {
return Category.MODIFY;
}

@Override
public ModifySplitBrainResolver setDittoHeaders(final DittoHeaders dittoHeaders) {
return new ModifySplitBrainResolver(dittoHeaders, enabled);
}

@Override
public JsonPointer getResourcePath() {
return JsonPointer.empty();
}

@Override
public String getResourceType() {
return "sbr";
}

}
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.base.service.cluster;

import java.util.function.Predicate;

import org.eclipse.ditto.base.model.common.HttpStatus;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.json.FieldType;
import org.eclipse.ditto.base.model.json.JsonParsableCommandResponse;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.base.model.signals.commands.AbstractCommandResponse;
import org.eclipse.ditto.base.model.signals.commands.CommandResponseJsonDeserializer;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonPointer;

@JsonParsableCommandResponse(type = ModifySplitBrainResolverResponse.TYPE)
public final class ModifySplitBrainResolverResponse
extends AbstractCommandResponse<ModifySplitBrainResolverResponse> {

private static final JsonFieldDefinition<Boolean> ENABLED =
JsonFieldDefinition.ofBoolean("enabled", FieldType.REGULAR, JsonSchemaVersion.V_2);

static final String PREFIX = "ditto.sbr:";
static final String NAME = "modifyResponse";
public static final String TYPE = PREFIX + NAME;

private static final CommandResponseJsonDeserializer<ModifySplitBrainResolverResponse> JSON_DESERIALIZER =
CommandResponseJsonDeserializer.newInstance(TYPE,
context -> {
final JsonObject jsonObject = context.getJsonObject();
final boolean enabled = jsonObject.getValue(ENABLED).orElseThrow();
return new ModifySplitBrainResolverResponse(context.getDittoHeaders(), enabled);
});

private final boolean enabled;

private ModifySplitBrainResolverResponse(final DittoHeaders dittoHeaders, final boolean enabled) {
super(TYPE, HttpStatus.OK, dittoHeaders);
this.enabled = enabled;
}

static ModifySplitBrainResolverResponse of(final ModifySplitBrainResolver modifySplitBrainResolver) {
return new ModifySplitBrainResolverResponse(modifySplitBrainResolver.getDittoHeaders(),
modifySplitBrainResolver.isEnabled());
}

public boolean isEnabled() {
return enabled;
}

public static ModifySplitBrainResolverResponse fromJson(final JsonObject jsonObject,
final DittoHeaders dittoHeaders) {
return JSON_DESERIALIZER.deserialize(jsonObject, dittoHeaders);
}

@Override
protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final JsonSchemaVersion schemaVersion,
final Predicate<JsonField> predicate) {
jsonObjectBuilder.set(ENABLED, enabled, schemaVersion.and(predicate));
}

@Override
public JsonPointer getResourcePath() {
return JsonPointer.empty();
}

@Override
public String getResourceType() {
return "sbr";
}

@Override
public ModifySplitBrainResolverResponse setDittoHeaders(final DittoHeaders dittoHeaders) {
return new ModifySplitBrainResolverResponse(dittoHeaders, enabled);
}

}
Expand Up @@ -12,26 +12,27 @@
*/
package org.eclipse.ditto.concierge.service.starter;

import org.eclipse.ditto.base.api.common.Shutdown;
import org.eclipse.ditto.base.api.common.purge.PurgeEntities;
import org.eclipse.ditto.base.api.devops.signals.commands.ExecutePiggybackCommand;
import org.eclipse.ditto.base.api.persistence.cleanup.CleanupPersistence;
import org.eclipse.ditto.policies.api.commands.sudo.SudoRetrievePolicy;
import org.eclipse.ditto.base.model.namespaces.signals.commands.PurgeNamespace;
import org.eclipse.ditto.base.service.cluster.ModifySplitBrainResolver;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.OpenConnection;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnection;
import org.eclipse.ditto.internal.models.streaming.SudoStreamPids;
import org.eclipse.ditto.things.api.commands.sudo.SudoRetrieveThing;
import org.eclipse.ditto.thingsearch.api.commands.sudo.SudoCountThings;
import org.eclipse.ditto.internal.utils.health.RetrieveHealth;
import org.eclipse.ditto.internal.utils.pubsub.api.PublishSignal;
import org.eclipse.ditto.internal.utils.test.GlobalCommandRegistryTestCases;
import org.eclipse.ditto.base.api.common.Shutdown;
import org.eclipse.ditto.base.api.common.purge.PurgeEntities;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.OpenConnection;
import org.eclipse.ditto.connectivity.model.signals.commands.query.RetrieveConnection;
import org.eclipse.ditto.base.api.devops.signals.commands.ExecutePiggybackCommand;
import org.eclipse.ditto.messages.model.signals.commands.SendClaimMessage;
import org.eclipse.ditto.base.model.namespaces.signals.commands.PurgeNamespace;
import org.eclipse.ditto.policies.api.commands.sudo.SudoRetrievePolicy;
import org.eclipse.ditto.policies.model.signals.commands.actions.ActivateTokenIntegration;
import org.eclipse.ditto.policies.model.signals.commands.modify.DeleteSubject;
import org.eclipse.ditto.policies.model.signals.commands.query.RetrieveResource;
import org.eclipse.ditto.things.api.commands.sudo.SudoRetrieveThing;
import org.eclipse.ditto.things.model.signals.commands.modify.ModifyFeatureProperty;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveFeature;
import org.eclipse.ditto.thingsearch.api.commands.sudo.SudoCountThings;
import org.eclipse.ditto.thingsearch.model.signals.commands.query.QueryThings;
import org.eclipse.ditto.thingsearch.model.signals.commands.subscription.CreateSubscription;

Expand All @@ -58,6 +59,7 @@ public ConciergeServiceGlobalCommandRegistryTest() {
CleanupPersistence.class,
RetrieveHealth.class,
PurgeEntities.class,
ModifySplitBrainResolver.class,
PublishSignal.class
);
}
Expand Down

0 comments on commit d1498fd

Please sign in to comment.