Skip to content

Commit

Permalink
Merge #2675
Browse files Browse the repository at this point in the history
2675: chore(logstreams): remove producer id from LoggedEvent r=saig0 a=saig0

* also remove process id and process name since there is only one stream processor
* flatten state directory structure since there is only one state database


Co-authored-by: Philipp Ossler <philipp.ossler@gmail.com>
  • Loading branch information
zeebe-bors[bot] and saig0 committed Jun 21, 2019
2 parents 7de0c59 + 2322e40 commit 1fd03da
Show file tree
Hide file tree
Showing 40 changed files with 75 additions and 416 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import io.atomix.cluster.messaging.ClusterEventService;
import io.zeebe.broker.Loggers;
import io.zeebe.broker.engine.EngineService;
import io.zeebe.broker.engine.EngineServiceNames;
import io.zeebe.broker.engine.impl.StateReplication;
import io.zeebe.broker.exporter.ExporterServiceNames;
Expand Down Expand Up @@ -150,15 +149,14 @@ public StorageConfiguration getConfiguration() {
}

private StateSnapshotController createSnapshotController() {
final String streamProcessorName = EngineService.PROCESSOR_NAME;

final StateStorageFactory storageFactory =
new StateStorageFactory(configuration.getStatesDirectory());
final StateStorage stateStorage = storageFactory.create(partitionId, streamProcessorName);
final StateStorage stateStorage = storageFactory.create();

stateReplication =
shouldReplicateSnapshots()
? new StateReplication(clusterEventService, partitionId, streamProcessorName)
? new StateReplication(clusterEventService, partitionId)
: new NoneSnapshotReplication();

return new StateSnapshotController(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,12 @@
import io.zeebe.broker.system.configuration.ClusterCfg;
import io.zeebe.broker.system.configuration.DataCfg;
import io.zeebe.broker.transport.commandapi.CommandResponseWriterImpl;
import io.zeebe.engine.processor.AsyncSnapshotingDirectorService;
import io.zeebe.engine.processor.ProcessingContext;
import io.zeebe.engine.processor.StreamProcessor;
import io.zeebe.engine.processor.StreamProcessorServiceNames;
import io.zeebe.engine.processor.TypedRecordProcessors;
import io.zeebe.engine.processor.*;
import io.zeebe.engine.processor.workflow.EngineProcessors;
import io.zeebe.engine.processor.workflow.message.command.SubscriptionCommandSender;
import io.zeebe.engine.state.ZeebeState;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.servicecontainer.Injector;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceContainer;
import io.zeebe.servicecontainer.ServiceGroupReference;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.servicecontainer.*;
import io.zeebe.transport.ServerTransport;
import io.zeebe.util.DurationUtil;
import io.zeebe.util.sched.ActorControl;
Expand Down Expand Up @@ -84,10 +75,9 @@ public void start(final ServiceStartContext serviceContext) {

public void startEngineForPartition(
final ServiceName<Partition> partitionServiceName, final Partition partition) {
final int partitionId = partition.getPartitionId();

final LogStream logStream = partition.getLogStream();
StreamProcessor.builder(partitionId, PROCESSOR_NAME)
StreamProcessor.builder()
.logStream(logStream)
.actorScheduler(serviceContext.getScheduler())
.additionalDependencies(partitionServiceName)
Expand Down Expand Up @@ -116,9 +106,9 @@ private void createAsyncSnapshotDirectorService(final Partition partition) {
snapshotPeriod);

final ServiceName<AsyncSnapshotingDirectorService> snapshotDirectorServiceName =
StreamProcessorServiceNames.asyncSnapshotingDirectorService(logName, PROCESSOR_NAME);
StreamProcessorServiceNames.asyncSnapshotingDirectorService(logName);
final ServiceName<StreamProcessor> streamProcessorControllerServiceName =
StreamProcessorServiceNames.streamProcessorService(logName, EngineService.PROCESSOR_NAME);
StreamProcessorServiceNames.streamProcessorService(logName);

serviceContext
.createService(snapshotDirectorServiceName, snapshotDirectorService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

public class StateReplication implements SnapshotReplication {

public static final String REPLICATION_TOPIC_FORMAT = "replication-%d-%s";
public static final String REPLICATION_TOPIC_FORMAT = "replication-%d";
private static final Logger LOG = Loggers.STREAM_PROCESSING;

private final String replicationTopic;
Expand All @@ -42,9 +42,9 @@ public class StateReplication implements SnapshotReplication {
private ExecutorService executorService;
private Subscription subscription;

public StateReplication(ClusterEventService eventService, int partitionId, String name) {
public StateReplication(ClusterEventService eventService, int partitionId) {
this.eventService = eventService;
this.replicationTopic = String.format(REPLICATION_TOPIC_FORMAT, partitionId, name);
this.replicationTopic = String.format(REPLICATION_TOPIC_FORMAT, partitionId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public ExporterDirector get() {
@Override
protected void onActorStarting() {
final MetricsManager metricsManager = actorScheduler.getMetricsManager();
metrics = new ExporterMetrics(metricsManager, getName(), Integer.toString(partitionId));
metrics = new ExporterMetrics(metricsManager, Integer.toString(partitionId));

this.logStreamReader.wrap(logStream);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,18 @@
*/
package io.zeebe.broker.exporter.stream;

import io.zeebe.engine.processor.SnapshotMetrics;
import io.zeebe.util.metrics.Metric;
import io.zeebe.util.metrics.MetricsManager;

public class ExporterMetrics {
private final Metric eventsExportedCountMetric;
private final Metric eventsSkippedCountMetric;
private final SnapshotMetrics snapshotMetrics;

public ExporterMetrics(
final MetricsManager metricsManager, final String processorName, final String partitionId) {
public ExporterMetrics(final MetricsManager metricsManager, final String partitionId) {
eventsExportedCountMetric =
metricsManager
.newMetric("exporter_events_count")
.type("counter")
.label("processor", processorName)
.label("action", "exported")
.label("partition", partitionId)
.create();
Expand All @@ -41,18 +37,14 @@ public ExporterMetrics(
metricsManager
.newMetric("exporter_events_count")
.type("counter")
.label("processor", processorName)
.label("action", "skipped")
.label("partition", partitionId)
.create();

snapshotMetrics = new SnapshotMetrics(metricsManager, processorName, partitionId);
}

public void close() {
eventsExportedCountMetric.close();
eventsSkippedCountMetric.close();
snapshotMetrics.close();
}

public void incrementEventsExportedCount() {
Expand All @@ -62,8 +54,4 @@ public void incrementEventsExportedCount() {
public void incrementEventsSkippedCount() {
eventsSkippedCountMetric.incrementOrdered();
}

public SnapshotMetrics getSnapshotMetrics() {
return snapshotMetrics;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.atomix.primitive.partition.PartitionGroup;
import io.atomix.primitive.partition.PartitionId;
import io.atomix.primitive.partition.PartitionService;
import io.zeebe.broker.engine.EngineService;
import io.zeebe.broker.logstreams.state.StatePositionSupplier;
import io.zeebe.distributedlog.StorageConfiguration;
import io.zeebe.distributedlog.impl.LogstreamConfig;
Expand Down Expand Up @@ -69,8 +68,7 @@ public SnapshotRestoreContext createSnapshotRestoreContext(int partitionId, Logg
final StorageConfiguration configuration =
LogstreamConfig.getConfig(localMemberId, partitionId).join();
final StateStorage restoreStateStorage =
new StateStorageFactory(configuration.getStatesDirectory())
.create(partitionId, EngineService.PROCESSOR_NAME, "-restore-log");
new StateStorageFactory(configuration.getStatesDirectory()).createTemporary("-restore-log");

final SnapshotController stateSnapshotController =
new StateSnapshotController(DefaultZeebeDbFactory.DEFAULT_DB_FACTORY, restoreStateStorage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ public Statement apply(Statement base, Description description) {
public void startExporterDirector(List<ExporterDescriptor> exporterDescriptors) {
final LogStream stream = streams.getLogStream(STREAM_NAME);

final StateStorage stateStorage =
streams.getStateStorageFactory(stream).create(EXPORTER_PROCESSOR_ID, PROCESSOR_NAME);
final StateStorage stateStorage = streams.getStateStorageFactory(stream).create();
final StateSnapshotController snapshotController =
spy(new StateSnapshotController(zeebeDbFactory, stateStorage));
capturedZeebeDb = spy(snapshotController.openDb());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ public void start(final ServiceStartContext startContext) {
final StreamProcessor streamProcessor = streamProcessorInjector.getValue();
final SnapshotMetrics snapshotMetrics =
new SnapshotMetrics(
startContext.getScheduler().getMetricsManager(),
streamProcessor.getName(),
String.valueOf(partitionId));
startContext.getScheduler().getMetricsManager(), String.valueOf(partitionId));

asyncSnapshotDirector =
new AsyncSnapshotDirector(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ public class ProcessingContext implements ReadonlyProcessingContext {

private ActorControl actor;
private MetricsManager metricsManager;
private int producerId;
private String streamProcessorName;
private EventFilter eventFilter;
private LogStream logStream;
private LogStreamReader logStreamReader;
Expand Down Expand Up @@ -92,16 +90,6 @@ public ProcessingContext abortCondition(BooleanSupplier abortCondition) {
return this;
}

public ProcessingContext producerId(int producerId) {
this.producerId = producerId;
return this;
}

public ProcessingContext streamProcessorName(String streamProcessorName) {
this.streamProcessorName = streamProcessorName;
return this;
}

public ProcessingContext logStreamWriter(TypedStreamWriter logStreamWriter) {
this.logStreamWriter = logStreamWriter;
return this;
Expand All @@ -121,14 +109,6 @@ public ActorControl getActor() {
return actor;
}

public int getProducerId() {
return producerId;
}

public String getStreamProcessorName() {
return streamProcessorName;
}

public EventFilter getEventFilter() {
return eventFilter;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ public final class ProcessingStateMachine {
private static final String ERROR_MESSAGE_EXECUTE_SIDE_EFFECT_ABORTED =
"Expected to execute side effects for event '{}' successfully, but exception was thrown.";
private static final String ERROR_MESSAGE_UPDATE_STATE_FAILED =
"Expected to successfully update state for event '{}' with processor '{}', but caught an exception. Retry.";
"Expected to successfully update state for event '{}', but caught an exception. Retry.";
private static final String ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT =
"Expected to find event processor for event '{}' with processor '{}', but caught an exception. Skip this event.";
"Expected to find event processor for event '{}', but caught an exception. Skip this event.";
private static final String ERROR_MESSAGE_PROCESSING_FAILED_SKIP_EVENT =
"Expected to successfully process event '{}' with processor '{}', but caught an exception. Skip this event.";
"Expected to successfully process event '{}' with processor, but caught an exception. Skip this event.";
private static final String ERROR_MESSAGE_PROCESSING_FAILED_RETRY_PROCESSING =
"Expected to process event '{}' successfully on stream processor '{}', but caught recoverable exception. Retry processing.";
"Expected to process event '{}' successfully on stream processor, but caught recoverable exception. Retry processing.";
private static final String PROCESSING_ERROR_MESSAGE =
"Expected to process event '%s' without errors, but exception occurred with message '%s' .";

Expand All @@ -110,8 +110,6 @@ public final class ProcessingStateMachine {
private static final Duration PROCESSING_RETRY_DELAY = Duration.ofMillis(250);

private final ActorControl actor;
private final int producerId;
private final String streamProcessorName;
private final StreamProcessorMetrics metrics;
private final EventFilter eventFilter;
private final LogStream logStream;
Expand Down Expand Up @@ -143,8 +141,6 @@ public ProcessingStateMachine(
BooleanSupplier shouldProcessNext) {

this.actor = context.getActor();
this.producerId = context.getProducerId();
this.streamProcessorName = context.getStreamProcessorName();
this.eventFilter = context.getEventFilter();
this.recordProcessorMap = context.getRecordProcessorMap();
this.eventCache = context.getEventCache();
Expand Down Expand Up @@ -225,14 +221,10 @@ private void processEvent(final LoggedEvent event) {
writeEvent();
} catch (final RecoverableException recoverableException) {
// recoverable
LOG.error(
ERROR_MESSAGE_PROCESSING_FAILED_RETRY_PROCESSING,
event,
streamProcessorName,
recoverableException);
LOG.error(ERROR_MESSAGE_PROCESSING_FAILED_RETRY_PROCESSING, event, recoverableException);
actor.runDelayed(PROCESSING_RETRY_DELAY, () -> processEvent(currentEvent));
} catch (final Exception e) {
LOG.error(ERROR_MESSAGE_PROCESSING_FAILED_SKIP_EVENT, event, streamProcessorName, e);
LOG.error(ERROR_MESSAGE_PROCESSING_FAILED_SKIP_EVENT, event, e);
onError(e, this::writeEvent);
}
}
Expand All @@ -245,7 +237,7 @@ private TypedRecordProcessor<?> chooseNextProcessor(LoggedEvent event) {
recordProcessorMap.get(
metadata.getRecordType(), metadata.getValueType(), metadata.getIntent().value());
} catch (final Exception e) {
LOG.error(ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT, event, streamProcessorName, e);
LOG.error(ERROR_MESSAGE_ON_EVENT_FAILED_SKIP_EVENT, event, e);
}

return typedRecordProcessor;
Expand Down Expand Up @@ -277,7 +269,7 @@ private void processInTransaction(final TypedEventImpl typedRecord) throws Excep
private void resetOutput(long sourceRecordPosition) {
responseWriter.reset();
logStreamWriter.reset();
logStreamWriter.configureSourceContext(producerId, sourceRecordPosition);
logStreamWriter.configureSourceContext(sourceRecordPosition);
}

public void setSideEffectProducer(final SideEffectProducer sideEffectProducer) {
Expand Down Expand Up @@ -384,8 +376,7 @@ private void updateState() {
retryFuture,
(bool, throwable) -> {
if (throwable != null) {
LOG.error(
ERROR_MESSAGE_UPDATE_STATE_FAILED, currentEvent, streamProcessorName, throwable);
LOG.error(ERROR_MESSAGE_UPDATE_STATE_FAILED, currentEvent, throwable);
onError(throwable, this::updateState);
} else {
executeSideEffects();
Expand Down
Loading

0 comments on commit 1fd03da

Please sign in to comment.