Skip to content

Commit

Permalink
Merge #2675
Browse files Browse the repository at this point in the history
2675: chore(logstreams): remove producer id from LoggedEvent r=saig0 a=saig0

* also remove process id and process name since there is only one stream processor
* flatten state directory structure since there is only one state database


Co-authored-by: Philipp Ossler <philipp.ossler@gmail.com>
  • Loading branch information
zeebe-bors-heroku[bot] and saig0 committed Jun 21, 2019
2 parents 7de0c59 + 2322e40 commit 1fd03da
Show file tree
Hide file tree
Showing 40 changed files with 75 additions and 416 deletions.
Expand Up @@ -19,7 +19,6 @@

import io.atomix.cluster.messaging.ClusterEventService;
import io.zeebe.broker.Loggers;
import io.zeebe.broker.engine.EngineService;
import io.zeebe.broker.engine.EngineServiceNames;
import io.zeebe.broker.engine.impl.StateReplication;
import io.zeebe.broker.exporter.ExporterServiceNames;
Expand Down Expand Up @@ -150,15 +149,14 @@ public StorageConfiguration getConfiguration() {
}

private StateSnapshotController createSnapshotController() {
final String streamProcessorName = EngineService.PROCESSOR_NAME;

final StateStorageFactory storageFactory =
new StateStorageFactory(configuration.getStatesDirectory());
final StateStorage stateStorage = storageFactory.create(partitionId, streamProcessorName);
final StateStorage stateStorage = storageFactory.create();

stateReplication =
shouldReplicateSnapshots()
? new StateReplication(clusterEventService, partitionId, streamProcessorName)
? new StateReplication(clusterEventService, partitionId)
: new NoneSnapshotReplication();

return new StateSnapshotController(
Expand Down
Expand Up @@ -27,21 +27,12 @@
import io.zeebe.broker.system.configuration.ClusterCfg;
import io.zeebe.broker.system.configuration.DataCfg;
import io.zeebe.broker.transport.commandapi.CommandResponseWriterImpl;
import io.zeebe.engine.processor.AsyncSnapshotingDirectorService;
import io.zeebe.engine.processor.ProcessingContext;
import io.zeebe.engine.processor.StreamProcessor;
import io.zeebe.engine.processor.StreamProcessorServiceNames;
import io.zeebe.engine.processor.TypedRecordProcessors;
import io.zeebe.engine.processor.*;
import io.zeebe.engine.processor.workflow.EngineProcessors;
import io.zeebe.engine.processor.workflow.message.command.SubscriptionCommandSender;
import io.zeebe.engine.state.ZeebeState;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.servicecontainer.Injector;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceContainer;
import io.zeebe.servicecontainer.ServiceGroupReference;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.servicecontainer.*;
import io.zeebe.transport.ServerTransport;
import io.zeebe.util.DurationUtil;
import io.zeebe.util.sched.ActorControl;
Expand Down Expand Up @@ -84,10 +75,9 @@ public void start(final ServiceStartContext serviceContext) {

public void startEngineForPartition(
final ServiceName<Partition> partitionServiceName, final Partition partition) {
final int partitionId = partition.getPartitionId();

final LogStream logStream = partition.getLogStream();
StreamProcessor.builder(partitionId, PROCESSOR_NAME)
StreamProcessor.builder()
.logStream(logStream)
.actorScheduler(serviceContext.getScheduler())
.additionalDependencies(partitionServiceName)
Expand Down Expand Up @@ -116,9 +106,9 @@ private void createAsyncSnapshotDirectorService(final Partition partition) {
snapshotPeriod);

final ServiceName<AsyncSnapshotingDirectorService> snapshotDirectorServiceName =
StreamProcessorServiceNames.asyncSnapshotingDirectorService(logName, PROCESSOR_NAME);
StreamProcessorServiceNames.asyncSnapshotingDirectorService(logName);
final ServiceName<StreamProcessor> streamProcessorControllerServiceName =
StreamProcessorServiceNames.streamProcessorService(logName, EngineService.PROCESSOR_NAME);
StreamProcessorServiceNames.streamProcessorService(logName);

serviceContext
.createService(snapshotDirectorServiceName, snapshotDirectorService)
Expand Down
Expand Up @@ -31,7 +31,7 @@

public class StateReplication implements SnapshotReplication {

public static final String REPLICATION_TOPIC_FORMAT = "replication-%d-%s";
public static final String REPLICATION_TOPIC_FORMAT = "replication-%d";
private static final Logger LOG = Loggers.STREAM_PROCESSING;

private final String replicationTopic;
Expand All @@ -42,9 +42,9 @@ public class StateReplication implements SnapshotReplication {
private ExecutorService executorService;
private Subscription subscription;

public StateReplication(ClusterEventService eventService, int partitionId, String name) {
public StateReplication(ClusterEventService eventService, int partitionId) {
this.eventService = eventService;
this.replicationTopic = String.format(REPLICATION_TOPIC_FORMAT, partitionId, name);
this.replicationTopic = String.format(REPLICATION_TOPIC_FORMAT, partitionId);
}

@Override
Expand Down
Expand Up @@ -128,7 +128,7 @@ public ExporterDirector get() {
@Override
protected void onActorStarting() {
final MetricsManager metricsManager = actorScheduler.getMetricsManager();
metrics = new ExporterMetrics(metricsManager, getName(), Integer.toString(partitionId));
metrics = new ExporterMetrics(metricsManager, Integer.toString(partitionId));

this.logStreamReader.wrap(logStream);
}
Expand Down
Expand Up @@ -17,22 +17,18 @@
*/
package io.zeebe.broker.exporter.stream;

import io.zeebe.engine.processor.SnapshotMetrics;
import io.zeebe.util.metrics.Metric;
import io.zeebe.util.metrics.MetricsManager;

public class ExporterMetrics {
private final Metric eventsExportedCountMetric;
private final Metric eventsSkippedCountMetric;
private final SnapshotMetrics snapshotMetrics;

public ExporterMetrics(
final MetricsManager metricsManager, final String processorName, final String partitionId) {
public ExporterMetrics(final MetricsManager metricsManager, final String partitionId) {
eventsExportedCountMetric =
metricsManager
.newMetric("exporter_events_count")
.type("counter")
.label("processor", processorName)
.label("action", "exported")
.label("partition", partitionId)
.create();
Expand All @@ -41,18 +37,14 @@ public ExporterMetrics(
metricsManager
.newMetric("exporter_events_count")
.type("counter")
.label("processor", processorName)
.label("action", "skipped")
.label("partition", partitionId)
.create();

snapshotMetrics = new SnapshotMetrics(metricsManager, processorName, partitionId);
}

public void close() {
eventsExportedCountMetric.close();
eventsSkippedCountMetric.close();
snapshotMetrics.close();
}

public void incrementEventsExportedCount() {
Expand All @@ -62,8 +54,4 @@ public void incrementEventsExportedCount() {
public void incrementEventsSkippedCount() {
eventsSkippedCountMetric.incrementOrdered();
}

public SnapshotMetrics getSnapshotMetrics() {
return snapshotMetrics;
}
}
Expand Up @@ -22,7 +22,6 @@
import io.atomix.primitive.partition.PartitionGroup;
import io.atomix.primitive.partition.PartitionId;
import io.atomix.primitive.partition.PartitionService;
import io.zeebe.broker.engine.EngineService;
import io.zeebe.broker.logstreams.state.StatePositionSupplier;
import io.zeebe.distributedlog.StorageConfiguration;
import io.zeebe.distributedlog.impl.LogstreamConfig;
Expand Down Expand Up @@ -69,8 +68,7 @@ public SnapshotRestoreContext createSnapshotRestoreContext(int partitionId, Logg
final StorageConfiguration configuration =
LogstreamConfig.getConfig(localMemberId, partitionId).join();
final StateStorage restoreStateStorage =
new StateStorageFactory(configuration.getStatesDirectory())
.create(partitionId, EngineService.PROCESSOR_NAME, "-restore-log");
new StateStorageFactory(configuration.getStatesDirectory()).createTemporary("-restore-log");

final SnapshotController stateSnapshotController =
new StateSnapshotController(DefaultZeebeDbFactory.DEFAULT_DB_FACTORY, restoreStateStorage);
Expand Down
Expand Up @@ -95,8 +95,7 @@ public Statement apply(Statement base, Description description) {
public void startExporterDirector(List<ExporterDescriptor> exporterDescriptors) {
final LogStream stream = streams.getLogStream(STREAM_NAME);

final StateStorage stateStorage =
streams.getStateStorageFactory(stream).create(EXPORTER_PROCESSOR_ID, PROCESSOR_NAME);
final StateStorage stateStorage = streams.getStateStorageFactory(stream).create();
final StateSnapshotController snapshotController =
spy(new StateSnapshotController(zeebeDbFactory, stateStorage));
capturedZeebeDb = spy(snapshotController.openDb());
Expand Down
Expand Up @@ -50,9 +50,7 @@ public void start(final ServiceStartContext startContext) {
final StreamProcessor streamProcessor = streamProcessorInjector.getValue();
final SnapshotMetrics snapshotMetrics =
new SnapshotMetrics(
startContext.getScheduler().getMetricsManager(),
streamProcessor.getName(),
String.valueOf(partitionId));
startContext.getScheduler().getMetricsManager(), String.valueOf(partitionId));

asyncSnapshotDirector =
new AsyncSnapshotDirector(
Expand Down
Expand Up @@ -32,8 +32,6 @@ public class ProcessingContext implements ReadonlyProcessingContext {

private ActorControl actor;
private MetricsManager metricsManager;
private int producerId;
private String streamProcessorName;
private EventFilter eventFilter;
private LogStream logStream;
private LogStreamReader logStreamReader;
Expand Down Expand Up @@ -92,16 +90,6 @@ public ProcessingContext abortCondition(BooleanSupplier abortCondition) {
return this;
}

public ProcessingContext producerId(int producerId) {
this.producerId = producerId;
return this;
}

public ProcessingContext streamProcessorName(String streamProcessorName) {
this.streamProcessorName = streamProcessorName;
return this;
}

public ProcessingContext logStreamWriter(TypedStreamWriter logStreamWriter) {
this.logStreamWriter = logStreamWriter;
return this;
Expand All @@ -121,14 +109,6 @@ public ActorControl getActor() {
return actor;
}

public int getProducerId() {
return producerId;
}

public String getStreamProcessorName() {
return streamProcessorName;
}

public EventFilter getEventFilter() {
return eventFilter;
}
Expand Down
Expand Up @@ -92,13 +92,13 @@ public final class ProcessingStateMachine {
private static final String ERROR_MESSAGE_EXECUTE_SIDE_EFFECT_ABORTED =
"Expected to execute side effects for event '{}' successfully, but exception was thrown.";
private static final String ERROR_MESSAGE_UPDATE_STATE_FAILED =
"Expected to successfully update state for event '{}' with processor '{}', but caught an exception. Retry.";
"Expected to successfully update state for event '{}', but caught an exception. Retry.";
private static final String ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT =
"Expected to find event processor for event '{}' with processor '{}', but caught an exception. Skip this event.";
"Expected to find event processor for event '{}', but caught an exception. Skip this event.";
private static final String ERROR_MESSAGE_PROCESSING_FAILED_SKIP_EVENT =
"Expected to successfully process event '{}' with processor '{}', but caught an exception. Skip this event.";
"Expected to successfully process event '{}' with processor, but caught an exception. Skip this event.";
private static final String ERROR_MESSAGE_PROCESSING_FAILED_RETRY_PROCESSING =
"Expected to process event '{}' successfully on stream processor '{}', but caught recoverable exception. Retry processing.";
"Expected to process event '{}' successfully on stream processor, but caught recoverable exception. Retry processing.";
private static final String PROCESSING_ERROR_MESSAGE =
"Expected to process event '%s' without errors, but exception occurred with message '%s' .";

Expand All @@ -110,8 +110,6 @@ public final class ProcessingStateMachine {
private static final Duration PROCESSING_RETRY_DELAY = Duration.ofMillis(250);

private final ActorControl actor;
private final int producerId;
private final String streamProcessorName;
private final StreamProcessorMetrics metrics;
private final EventFilter eventFilter;
private final LogStream logStream;
Expand Down Expand Up @@ -143,8 +141,6 @@ public ProcessingStateMachine(
BooleanSupplier shouldProcessNext) {

this.actor = context.getActor();
this.producerId = context.getProducerId();
this.streamProcessorName = context.getStreamProcessorName();
this.eventFilter = context.getEventFilter();
this.recordProcessorMap = context.getRecordProcessorMap();
this.eventCache = context.getEventCache();
Expand Down Expand Up @@ -225,14 +221,10 @@ private void processEvent(final LoggedEvent event) {
writeEvent();
} catch (final RecoverableException recoverableException) {
// recoverable
LOG.error(
ERROR_MESSAGE_PROCESSING_FAILED_RETRY_PROCESSING,
event,
streamProcessorName,
recoverableException);
LOG.error(ERROR_MESSAGE_PROCESSING_FAILED_RETRY_PROCESSING, event, recoverableException);
actor.runDelayed(PROCESSING_RETRY_DELAY, () -> processEvent(currentEvent));
} catch (final Exception e) {
LOG.error(ERROR_MESSAGE_PROCESSING_FAILED_SKIP_EVENT, event, streamProcessorName, e);
LOG.error(ERROR_MESSAGE_PROCESSING_FAILED_SKIP_EVENT, event, e);
onError(e, this::writeEvent);
}
}
Expand All @@ -245,7 +237,7 @@ private TypedRecordProcessor<?> chooseNextProcessor(LoggedEvent event) {
recordProcessorMap.get(
metadata.getRecordType(), metadata.getValueType(), metadata.getIntent().value());
} catch (final Exception e) {
LOG.error(ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT, event, streamProcessorName, e);
LOG.error(ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT, event, e);
}

return typedRecordProcessor;
Expand Down Expand Up @@ -277,7 +269,7 @@ private void processInTransaction(final TypedEventImpl typedRecord) throws Excep
private void resetOutput(long sourceRecordPosition) {
responseWriter.reset();
logStreamWriter.reset();
logStreamWriter.configureSourceContext(producerId, sourceRecordPosition);
logStreamWriter.configureSourceContext(sourceRecordPosition);
}

public void setSideEffectProducer(final SideEffectProducer sideEffectProducer) {
Expand Down Expand Up @@ -384,8 +376,7 @@ private void updateState() {
retryFuture,
(bool, throwable) -> {
if (throwable != null) {
LOG.error(
ERROR_MESSAGE_UPDATE_STATE_FAILED, currentEvent, streamProcessorName, throwable);
LOG.error(ERROR_MESSAGE_UPDATE_STATE_FAILED, currentEvent, throwable);
onError(throwable, this::updateState);
} else {
executeSideEffects();
Expand Down

0 comments on commit 1fd03da

Please sign in to comment.