Skip to content

Commit

Permalink
Merge #5528
Browse files Browse the repository at this point in the history
5528: [Backport 0.23] Don't update exporter position to smaller value r=Zelldon a=Zelldon

## Description

Backports #5462
<!-- Please explain the changes you made here. -->

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #5294 

## Definition of Done

_Not all items need to be done depending on the issue and the pull request._

Code changes:
* [ ] The changes are backwards compatibility with previous versions
* [ ] If it fixes a bug then PRs are created to [backport](https://github.com/zeebe-io/zeebe/compare/stable/0.24...develop?expand=1&template=backport_template.md&title=[Backport%200.24]) the fix to the last two minor versions

Testing:
* [ ] There are unit/integration tests that verify all acceptance criterias of the issue
* [ ] New tests are written to ensure backwards compatibility with further versions
* [ ] The behavior is tested manually
* [ ] The impact of the changes is verified by a benchmark 

Documentation: 
* [ ] The documentation is updated (e.g. BPMN reference, configuration, examples, get-started guides, etc.)
* [ ] New content is added to the [release announcement](https://drive.google.com/drive/u/0/folders/1DTIeswnEEq-NggJ25rm2BsDjcCQpDape)


Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
  • Loading branch information
zeebe-bors[bot] and Zelldon committed Oct 12, 2020
2 parents 561d4a1 + 98b912b commit 8efd828
Show file tree
Hide file tree
Showing 6 changed files with 599 additions and 167 deletions.
@@ -0,0 +1,156 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.0. You may not use this file
* except in compliance with the Zeebe Community License 1.0.
*/
package io.zeebe.broker.exporter.stream;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.exporter.context.ExporterContext;
import io.zeebe.broker.exporter.repo.ExporterDescriptor;
import io.zeebe.engine.processor.TypedRecord;
import io.zeebe.exporter.api.Exporter;
import io.zeebe.exporter.api.context.Context;
import io.zeebe.exporter.api.context.Controller;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.record.Record;
import io.zeebe.util.sched.ActorControl;
import java.time.Duration;
import org.slf4j.Logger;

final class ExporterContainer implements Controller {

private static final Logger LOG = Loggers.EXPORTER_LOGGER;

private static final String SKIP_POSITION_UPDATE_ERROR_MESSAGE =
"Failed to update exporter position when skipping filtered record, can be skipped, but may indicate an issue if it occurs often";

private final ExporterContext context;
private final Exporter exporter;
private long position;
private long lastUnacknowledgedPosition;
private ExportersState exportersState;
private ActorControl actor;

ExporterContainer(final ExporterDescriptor descriptor) {
context =
new ExporterContext(
Loggers.getExporterLogger(descriptor.getId()), descriptor.getConfiguration());

exporter = descriptor.newInstance();
}

void initContainer(final ActorControl actor, final ExportersState state) {
this.actor = actor;
exportersState = state;
}

void initPosition() {
position = exportersState.getPosition(getId());
lastUnacknowledgedPosition = position;
if (position == ExportersState.VALUE_NOT_FOUND) {
exportersState.setPosition(getId(), -1L);
}
}

void openExporter() {
LOG.debug("Open exporter with id '{}'", getId());
exporter.open(this);
}

public ExporterContext getContext() {
return context;
}

public Exporter getExporter() {
return exporter;
}

public long getPosition() {
return position;
}

long getLastUnacknowledgedPosition() {
return lastUnacknowledgedPosition;
}

/**
* Updates the exporter's position if it is up to date - that is, if it's last acknowledged
* position is greater than or equal to its last unacknowledged position. This is safe to do when
* skipping records as it means we passed no record to this exporter between both.
*
* @param eventPosition the new, up to date position
*/
void updatePositionOnSkipIfUpToDate(final long eventPosition) {
if (position >= lastUnacknowledgedPosition && position < eventPosition) {
try {
updateExporterLastExportedRecordPosition(eventPosition);
} catch (final Exception e) {
LOG.warn(SKIP_POSITION_UPDATE_ERROR_MESSAGE, e);
}
}
}

private void updateExporterLastExportedRecordPosition(final long eventPosition) {
if (position < eventPosition) {
exportersState.setPosition(getId(), eventPosition);
position = eventPosition;
}
}

@Override
public void updateLastExportedRecordPosition(final long position) {
actor.run(() -> updateExporterLastExportedRecordPosition(position));
}

@Override
public void scheduleTask(final Duration delay, final Runnable task) {
actor.runDelayed(delay, task);
}

public String getId() {
return context.getConfiguration().getId();
}

private boolean acceptRecord(final RecordMetadata metadata) {
final Context.RecordFilter filter = context.getFilter();
return filter.acceptType(metadata.getRecordType())
&& filter.acceptValue(metadata.getValueType());
}

void configureExporter() throws Exception {
LOG.debug("Configure exporter with id '{}'", getId());
exporter.configure(context);
}

boolean exportRecord(final RecordMetadata rawMetadata, final TypedRecord typedEvent) {
try {
if (position < typedEvent.getPosition()) {
if (acceptRecord(rawMetadata)) {
export(typedEvent);
} else {
updatePositionOnSkipIfUpToDate(typedEvent.getPosition());
}
}
return true;
} catch (final Exception ex) {
context.getLogger().error("Error on exporting record with key {}", typedEvent.getKey(), ex);
return false;
}
}

private void export(final Record<?> record) {
exporter.export(record);
lastUnacknowledgedPosition = record.getPosition();
}

public void close() {
try {
exporter.close();
} catch (final Exception e) {
context.getLogger().error("Error on close", e);
}
}
}
Expand Up @@ -8,21 +8,16 @@
package io.zeebe.broker.exporter.stream;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.exporter.context.ExporterContext;
import io.zeebe.broker.exporter.repo.ExporterDescriptor;
import io.zeebe.db.ZeebeDb;
import io.zeebe.engine.processor.EventFilter;
import io.zeebe.engine.processor.RecordValues;
import io.zeebe.engine.processor.TypedEventImpl;
import io.zeebe.exporter.api.Exporter;
import io.zeebe.exporter.api.context.Context;
import io.zeebe.exporter.api.context.Controller;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamReader;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.zeebe.protocol.record.Record;
import io.zeebe.protocol.record.RecordType;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.util.LangUtil;
Expand Down Expand Up @@ -52,8 +47,6 @@ public final class ExporterDirector extends Actor {
"Expected to find event with the snapshot position %s in log stream, but nothing was found. Failed to recover '%s'.";

private static final Logger LOG = Loggers.EXPORTER_LOGGER;
private static final String SKIP_POSITION_UPDATE_ERROR_MESSAGE =
"Failed to update exporter position when skipping filtered record, can be skipped, but may indicate an issue if it occurs often";
private final AtomicBoolean isOpened = new AtomicBoolean(false);
private final List<ExporterContainer> containers;
private final LogStream logStream;
Expand All @@ -71,19 +64,19 @@ public final class ExporterDirector extends Actor {
private boolean inExportingPhase;

public ExporterDirector(final ExporterDirectorContext context) {
this.name = context.getName();
this.containers =
name = context.getName();
containers =
context.getDescriptors().stream().map(ExporterContainer::new).collect(Collectors.toList());

this.logStream = Objects.requireNonNull(context.getLogStream());
logStream = Objects.requireNonNull(context.getLogStream());
final int partitionId = logStream.getPartitionId();
this.recordExporter = new RecordExporter(containers, partitionId);
this.exportingRetryStrategy = new BackOffRetryStrategy(actor, Duration.ofSeconds(10));
this.recordWrapStrategy = new EndlessRetryStrategy(actor);
recordExporter = new RecordExporter(containers, partitionId);
exportingRetryStrategy = new BackOffRetryStrategy(actor, Duration.ofSeconds(10));
recordWrapStrategy = new EndlessRetryStrategy(actor);

this.zeebeDb = context.getZeebeDb();
zeebeDb = context.getZeebeDb();

this.metrics = new ExporterMetrics(partitionId);
metrics = new ExporterMetrics(partitionId);
}

public ActorFuture<Void> startAsync(final ActorScheduler actorScheduler) {
Expand Down Expand Up @@ -127,8 +120,8 @@ protected void onActorStarted() {
recoverFromSnapshot();

for (final ExporterContainer container : containers) {
LOG.debug("Configure exporter with id '{}'", container.getId());
container.exporter.configure(container.context);
container.initContainer(actor, state);
container.configureExporter();
}

eventFilter = createEventFilter(containers);
Expand Down Expand Up @@ -160,17 +153,11 @@ protected void onActorClosed() {
@Override
protected void onActorCloseRequested() {
isOpened.set(false);
for (final ExporterContainer container : containers) {
try {
container.exporter.close();
} catch (final Exception e) {
container.context.getLogger().error("Error on close", e);
}
}
containers.forEach(ExporterContainer::close);
}

private void recoverFromSnapshot() {
this.state = new ExportersState(zeebeDb, zeebeDb.createContext());
state = new ExportersState(zeebeDb, zeebeDb.createContext());

final long snapshotPosition = state.getLowestPosition();
final boolean failedToRecoverReader = !logStreamReader.seekToNextEvent(snapshotPosition);
Expand All @@ -188,7 +175,7 @@ private void recoverFromSnapshot() {
private ExporterEventFilter createEventFilter(final List<ExporterContainer> containers) {

final List<Context.RecordFilter> recordFilters =
containers.stream().map(c -> c.context.getFilter()).collect(Collectors.toList());
containers.stream().map(c -> c.getContext().getFilter()).collect(Collectors.toList());

final Map<RecordType, Boolean> acceptRecordTypes =
Arrays.stream(RecordType.values())
Expand Down Expand Up @@ -220,13 +207,8 @@ private void onSnapshotRecovered() {

// start reading
for (final ExporterContainer container : containers) {
container.position = state.getPosition(container.getId());
container.lastUnacknowledgedPosition = container.position;
if (container.position == ExportersState.VALUE_NOT_FOUND) {
state.setPosition(container.getId(), -1L);
}
LOG.debug("Open exporter with id '{}'", container.getId());
container.exporter.open(container);
container.initPosition();
container.openExporter();
}

clearExporterState();
Expand Down Expand Up @@ -361,21 +343,9 @@ public boolean export() {
while (exporterIndex < exportersCount) {
final ExporterContainer container = containers.get(exporterIndex);

try {
if (container.position < typedEvent.getPosition()) {
if (container.acceptRecord(rawMetadata)) {
container.export(typedEvent);
} else {
container.updatePositionOnSkipIfUpToDate(typedEvent.getPosition());
}
}

if (container.exportRecord(rawMetadata, typedEvent)) {
exporterIndex++;
} catch (final Exception ex) {
container
.context
.getLogger()
.error("Error on exporting record with key {}", typedEvent.getKey(), ex);
} else {
return false;
}
}
Expand Down Expand Up @@ -421,66 +391,4 @@ public String toString() {
+ '}';
}
}

private class ExporterContainer implements Controller {
private final ExporterContext context;
private final Exporter exporter;
private long position;
private long lastUnacknowledgedPosition;

ExporterContainer(final ExporterDescriptor descriptor) {
context =
new ExporterContext(
Loggers.getExporterLogger(descriptor.getId()), descriptor.getConfiguration());

exporter = descriptor.newInstance();
}

private void export(final Record<?> record) {
exporter.export(record);
lastUnacknowledgedPosition = record.getPosition();
}

/**
* Updates the exporter's position if it is up to date - that is, if it's last acknowledged
* position is greater than or equal to its last unacknowledged position. This is safe to do
* when skipping records as it means we passed no record to this exporter between both.
*
* @param eventPosition the new, up to date position
*/
private void updatePositionOnSkipIfUpToDate(final long eventPosition) {
if (position >= lastUnacknowledgedPosition && position < eventPosition) {
try {
updateExporterLastExportedRecordPosition(eventPosition);
} catch (final Exception e) {
LOG.warn(SKIP_POSITION_UPDATE_ERROR_MESSAGE, e);
}
}
}

private void updateExporterLastExportedRecordPosition(final long eventPosition) {
state.setPosition(getId(), eventPosition);
position = eventPosition;
}

@Override
public void updateLastExportedRecordPosition(final long position) {
actor.run(() -> updateExporterLastExportedRecordPosition(position));
}

@Override
public void scheduleTask(final Duration delay, final Runnable task) {
actor.runDelayed(delay, task);
}

private String getId() {
return context.getConfiguration().getId();
}

private boolean acceptRecord(final RecordMetadata metadata) {
final Context.RecordFilter filter = context.getFilter();
return filter.acceptType(metadata.getRecordType())
&& filter.acceptValue(metadata.getValueType());
}
}
}
Expand Up @@ -34,23 +34,6 @@ public void setPosition(final String exporterId, final long position) {
setPosition(position);
}

public void setPositionIfGreater(final String exporterId, final long position) {
this.exporterId.wrapString(exporterId);

setPositionIfGreater(position);
}

private void setPositionIfGreater(final long position) {
// not that performant then rocksdb merge but
// was currently simpler and easier to implement
// if necessary change it again to merge

final long oldPosition = getPosition();
if (oldPosition < position) {
setPosition(position);
}
}

public long getPosition(final String exporterId) {
this.exporterId.wrapString(exporterId);
return getPosition();
Expand Down

0 comments on commit 8efd828

Please sign in to comment.