Skip to content

Commit

Permalink
Adjusted some code formatting.
Browse files Browse the repository at this point in the history
Signed-off-by: Juergen Fickel <juergen.fickel@bosch-si.com>
  • Loading branch information
Juergen Fickel committed Oct 19, 2018
1 parent 84384c8 commit 299a378
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 27 deletions.
Expand Up @@ -10,6 +10,8 @@
*/
package org.eclipse.ditto.services.utils.cluster;

import javax.annotation.concurrent.Immutable;

import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
Expand All @@ -22,6 +24,7 @@
/**
* Convenience methods to operate an Akka cluster.
*/
@Immutable
public final class ClusterUtil {

/**
Expand Down Expand Up @@ -63,4 +66,5 @@ public static ActorRef startSingleton(final ActorSystem system,
final Props singletonManagerProps = ClusterSingletonManager.props(props, PoisonPill.getInstance(), settings);
return actorRefFactory.actorOf(singletonManagerProps, actorName);
}

}
Expand Up @@ -27,9 +27,9 @@

/**
* Superclass of actors operating on the persistence at the level of namespaces.
* It subscribes to the commands {@link PurgeNamespace} from the pub-sub mediator
* and carries them out. Instances of this actor should start as cluster singletons in order not to perform
* identical operations multiple times on the database.
* It subscribes to the commands {@link PurgeNamespace} from the pub-sub mediator and carries them out.
* Instances of this actor should start as cluster singletons in order to not perform identical operations multiple
* times on the database.
*
* @param <S> type of namespace selection for the underlying persistence.
*/
Expand Down Expand Up @@ -57,7 +57,7 @@ protected AbstractNamespaceOpsActor(final ActorRef pubSubMediator, final Namespa
}

/**
* Create a new instance of this actor using the pub-sub mediator of the actor system in which it is created.
* Creates a new instance of this actor using the pub-sub mediator of the actor system in which it is created.
*
* @param namespaceOps namespace operations on the persistence.
*/
Expand All @@ -67,11 +67,6 @@ protected AbstractNamespaceOpsActor(final NamespaceOps<S> namespaceOps) {
materializer = ActorMaterializer.create(getContext());
}

/**
* @return Resource type this actor operates on.
*/
protected abstract String resourceType();

/**
* Get all documents in a given namespace among all collections in the database.
*
Expand All @@ -85,6 +80,11 @@ public void preStart() {
subscribeForNamespaceCommands();
}

private void subscribeForNamespaceCommands() {
final ActorRef self = getSelf();
pubSubMediator.tell(new DistributedPubSubMediator.Subscribe(PurgeNamespace.TYPE, self), self);
}

@Override
public Receive createReceive() {
return ReceiveBuilder.create()
Expand All @@ -94,36 +94,26 @@ public Receive createReceive() {
.build();
}

private void subscribeForNamespaceCommands() {
final ActorRef self = getSelf();
pubSubMediator.tell(new DistributedPubSubMediator.Subscribe(PurgeNamespace.TYPE, self), self);
}

private void ignoreSubscribeAck(final DistributedPubSubMediator.SubscribeAck subscribeAck) {
// do nothing
}

private void purgeNamespace(final PurgeNamespace purgeNamespace) {
final ActorRef sender = getSender();
LogUtil.enhanceLogWithCorrelationId(log, purgeNamespace);
final Collection<S> namespaceSelections = selectNamespace(purgeNamespace.getNamespace());
log.info("Running <{}>. Affected collections: <{}>", purgeNamespace, namespaceSelections);
namespaceOps.purgeAll(selectNamespace(purgeNamespace.getNamespace()))
namespaceOps.purgeAll(namespaceSelections)
.runWith(Sink.head(), materializer)
.thenAccept(errors -> {
// send response to speed up purge workflow
final PurgeNamespaceResponse response;
if (errors.isEmpty()) {
response = PurgeNamespaceResponse.successful(purgeNamespace.getNamespace(), resourceType(),
response = PurgeNamespaceResponse.successful(purgeNamespace.getNamespace(), getResourceType(),
purgeNamespace.getDittoHeaders());
} else {
LogUtil.enhanceLogWithCorrelationId(log, purgeNamespace);
final String namespace = purgeNamespace.getNamespace();
errors.forEach(error -> log.error(error, "Error purging namespace <{}>", namespace));
response = PurgeNamespaceResponse.failed(namespace, resourceType(),
response = PurgeNamespaceResponse.failed(namespace, getResourceType(),
purgeNamespace.getDittoHeaders());
}
sender.tell(response, getSelf());
getSender().tell(response, getSelf());
})
.exceptionally(error -> {
LogUtil.enhanceLogWithCorrelationId(log, purgeNamespace);
Expand All @@ -133,4 +123,15 @@ private void purgeNamespace(final PurgeNamespace purgeNamespace) {
});
}

/**
* Returns the resource type this actor operates on.
*
* @return the resource type.
*/
protected abstract String getResourceType();

private void ignoreSubscribeAck(final DistributedPubSubMediator.SubscribeAck subscribeAck) {
// do nothing
}

}
Expand Up @@ -10,17 +10,13 @@
*/
package org.eclipse.ditto.services.utils.persistence.mongo.namespace;

import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;

import org.bson.Document;
import org.bson.conversions.Bson;

import com.mongodb.client.model.BulkWriteOptions;
import com.mongodb.client.model.CountOptions;
import com.mongodb.client.model.DeleteManyModel;
import com.mongodb.client.model.WriteModel;
import com.mongodb.reactivestreams.client.MongoCollection;
Expand Down Expand Up @@ -68,4 +64,5 @@ public Source<Optional<Throwable>, NotUsed> purge(final MongoNamespaceSelection
.recover(Match.<Throwable, Optional<Throwable>>matchAny(Optional::of).build());
}
}

}

0 comments on commit 299a378

Please sign in to comment.