Skip to content

Commit

Permalink
remove TagsConfig and all dependent implementations and configs becau…
Browse files Browse the repository at this point in the history
…se streamingCacheSize it is no longer used;

Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Jul 13, 2021
1 parent 8d08360 commit d279161
Show file tree
Hide file tree
Showing 19 changed files with 69 additions and 364 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
import javax.jms.JMSRuntimeException;
import javax.naming.NamingException;

import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.service.actors.DittoRootActor;
import org.eclipse.ditto.concierge.api.actors.ConciergeEnforcerClusterRouterFactory;
import org.eclipse.ditto.concierge.api.actors.ConciergeForwarderActor;
import org.eclipse.ditto.connectivity.api.ConnectivityMessagingConstants;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommandInterceptor;
import org.eclipse.ditto.connectivity.service.config.ConnectionIdsRetrievalConfig;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.messaging.ClientActorPropsFactory;
Expand All @@ -29,9 +34,6 @@
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionPriorityProviderFactory;
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionSupervisorActor;
import org.eclipse.ditto.connectivity.service.messaging.persistence.UsageBasedPriorityProvider;
import org.eclipse.ditto.concierge.api.actors.ConciergeEnforcerClusterRouterFactory;
import org.eclipse.ditto.concierge.api.actors.ConciergeForwarderActor;
import org.eclipse.ditto.connectivity.api.ConnectivityMessagingConstants;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.ClusterUtil;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
Expand All @@ -45,8 +47,6 @@
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.internal.utils.persistentactors.PersistencePingActor;
import org.eclipse.ditto.internal.utils.pubsub.DittoProtocolSub;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.connectivity.model.signals.commands.ConnectivityCommandInterceptor;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
Expand Down Expand Up @@ -96,7 +96,7 @@ private ConnectivityRootActor(final ConnectivityConfig connectivityConfig,
// Create persistence streaming actor (with no cache) and make it known to pubSubMediator.
final ActorRef persistenceStreamingActor =
startChildActor(ConnectionPersistenceStreamingActorCreator.ACTOR_NAME,
ConnectionPersistenceStreamingActorCreator.props(0));
ConnectionPersistenceStreamingActorCreator.props());
pubSubMediator.tell(DistPubSubAccess.put(persistenceStreamingActor), getSelf());

// start DittoProtocolSub extension, even if not passed to connections via reference
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.persistence;

import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.api.ConnectionTag;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.internal.models.streaming.EntityIdWithRevision;
import org.eclipse.ditto.internal.utils.persistence.mongo.DefaultPersistenceStreamingActor;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.PidWithSeqNr;
Expand All @@ -38,10 +38,9 @@ private ConnectionPersistenceStreamingActorCreator() {
/**
* Creates Akka configuration object Props for this PersistenceQueriesActor.
*
* @param streamingCacheSize the size of the streaming cache.
* @return the Akka configuration Props object.
*/
public static Props props(final int streamingCacheSize) {
public static Props props() {
return DefaultPersistenceStreamingActor.props(ConnectionTag.class,
ConnectionPersistenceStreamingActorCreator::createElement,
ConnectionPersistenceStreamingActorCreator::createPidWithSeqNr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.PidWithSeqNr;
import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;

import com.typesafe.config.Config;

import akka.NotUsed;
import akka.stream.javadsl.Source;

Expand All @@ -53,16 +51,16 @@ public abstract class AbstractPersistenceStreamingActor<T extends EntityIdWithRe
/**
* Constructor.
*
* @param entityMapper the mapper used to map {@link org.eclipse.ditto.internal.utils.persistence.mongo.streaming.PidWithSeqNr} to {@code T}. The resulting entity will be
* streamed to the recipient actor.
* @param entityMapper the mapper used to map {@link org.eclipse.ditto.internal.utils.persistence.mongo.streaming.PidWithSeqNr}
* to {@code T}. The resulting entity will be streamed to the recipient actor.
* @param entityUnmapper the mapper used to map elements back to PidWithSeqNr for stream resumption.
*/
protected AbstractPersistenceStreamingActor(final Function<PidWithSeqNr, T> entityMapper,
final Function<EntityIdWithRevision<?>, PidWithSeqNr> entityUnmapper) {
this.entityMapper = requireNonNull(entityMapper);
this.entityUnmapper = entityUnmapper;

final Config config = getContext().getSystem().settings().config();
final var config = getContext().getSystem().settings().config();
final MongoDbConfig mongoDbConfig =
DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(config));
mongoClient = MongoClientWrapper.newInstance(mongoDbConfig);
Expand All @@ -72,8 +70,8 @@ protected AbstractPersistenceStreamingActor(final Function<PidWithSeqNr, T> enti
/**
* Constructor for tests.
*
* @param entityMapper the mapper used to map {@link org.eclipse.ditto.internal.utils.persistence.mongo.streaming.PidWithSeqNr} to {@code T}. The resulting entity will be
* streamed to the recipient actor.
* @param entityMapper the mapper used to map {@link org.eclipse.ditto.internal.utils.persistence.mongo.streaming.PidWithSeqNr}
* to {@code T}. The resulting entity will be streamed to the recipient actor.
* @param entityUnmapper the mapper used to map elements back to PidWithSeqNr for stream resumption.
* @param readJournal the ReadJournal to use instead of creating one in the non-test constructor.
*/
Expand All @@ -83,7 +81,7 @@ protected AbstractPersistenceStreamingActor(final Function<PidWithSeqNr, T> enti
this.entityMapper = requireNonNull(entityMapper);
this.entityUnmapper = entityUnmapper;

final Config config = getContext().getSystem().settings().config();
final var config = getContext().getSystem().settings().config();
final MongoDbConfig mongoDbConfig =
DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(config));
mongoClient = MongoClientWrapper.newInstance(mongoDbConfig);
Expand Down Expand Up @@ -131,12 +129,12 @@ protected Object batchMessages(final List<T> elements) {
@Override
protected final Source<T, NotUsed> createSource(final SudoStreamPids command) {
log.info("Starting stream for <{}>", command);
final Duration maxIdleTime = Duration.ofMillis(command.getTimeoutMillis());
final var maxIdleTime = Duration.ofMillis(command.getTimeoutMillis());
final int batchSize = command.getBurst() * 5;
final Source<String, NotUsed> pidSource;
if (command.hasNonEmptyLowerBound()) {
// resume from lower bound
final PidWithSeqNr pidWithSeqNr = entityUnmapper.apply(command.getLowerBound());
final var pidWithSeqNr = entityUnmapper.apply(command.getLowerBound());
pidSource =
readJournal.getJournalPidsAbove(pidWithSeqNr.getPersistenceId(), batchSize,
materializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,19 @@
import java.util.function.Function;

import org.bson.Document;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultMongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.MongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.internal.models.streaming.StreamedSnapshot;
import org.eclipse.ditto.internal.models.streaming.SudoStreamSnapshots;
import org.eclipse.ditto.internal.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.DefaultMongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.config.MongoDbConfig;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;

import com.typesafe.config.Config;

import akka.NotUsed;
import akka.actor.AbstractActor;
import akka.actor.Props;
Expand Down Expand Up @@ -71,7 +69,7 @@ private SnapshotStreamingActor(final Function<String, EntityId> pid2EntityId,
this.pid2EntityId = pid2EntityId;
this.entityId2Pid = entityId2Pid;

final Config config = getContext().getSystem().settings().config();
final var config = getContext().getSystem().settings().config();
final MongoDbConfig mongoDbConfig =
DefaultMongoDbConfig.of(DefaultScopedConfig.dittoScoped(config));
mongoClient = MongoClientWrapper.newInstance(mongoDbConfig);
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit d279161

Please sign in to comment.