Skip to content

Commit

Permalink
chore(broker): extract ExporterContainer
Browse files Browse the repository at this point in the history
 * create separate class for the ExporterContainer and logic of setting exporter positions etc
 * make it possible to test how position are set
 * add new test class to test ExporterContainer
  • Loading branch information
Zelldon committed Oct 7, 2020
1 parent 29f6ff1 commit deaf0d1
Show file tree
Hide file tree
Showing 5 changed files with 568 additions and 159 deletions.
@@ -0,0 +1,156 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.0. You may not use this file
* except in compliance with the Zeebe Community License 1.0.
*/
package io.zeebe.broker.exporter.stream;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.exporter.context.ExporterContext;
import io.zeebe.broker.exporter.repo.ExporterDescriptor;
import io.zeebe.engine.processor.TypedRecord;
import io.zeebe.exporter.api.Exporter;
import io.zeebe.exporter.api.context.Context;
import io.zeebe.exporter.api.context.Controller;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.record.Record;
import io.zeebe.util.sched.ActorControl;
import java.time.Duration;
import org.slf4j.Logger;

final class ExporterContainer implements Controller {

private static final Logger LOG = Loggers.EXPORTER_LOGGER;

private static final String SKIP_POSITION_UPDATE_ERROR_MESSAGE =
"Failed to update exporter position when skipping filtered record, can be skipped, but may indicate an issue if it occurs often";

private final ExporterContext context;
private final Exporter exporter;
private long position;
private long lastUnacknowledgedPosition;
private ExportersState exportersState;
private ActorControl actor;

ExporterContainer(final ExporterDescriptor descriptor) {
context =
new ExporterContext(
Loggers.getExporterLogger(descriptor.getId()), descriptor.getConfiguration());

exporter = descriptor.newInstance();
}

void initContainer(final ActorControl actor, final ExportersState state) {
this.actor = actor;
exportersState = state;
}

void initPosition() {
position = exportersState.getPosition(getId());
lastUnacknowledgedPosition = position;
if (position == ExportersState.VALUE_NOT_FOUND) {
exportersState.setPosition(getId(), -1L);
}
}

void openExporter() {
LOG.debug("Open exporter with id '{}'", getId());
exporter.open(this);
}

public ExporterContext getContext() {
return context;
}

public Exporter getExporter() {
return exporter;
}

public long getPosition() {
return position;
}

long getLastUnacknowledgedPosition() {
return lastUnacknowledgedPosition;
}

/**
* Updates the exporter's position if it is up to date - that is, if it's last acknowledged
* position is greater than or equal to its last unacknowledged position. This is safe to do when
* skipping records as it means we passed no record to this exporter between both.
*
* @param eventPosition the new, up to date position
*/
void updatePositionOnSkipIfUpToDate(final long eventPosition) {
if (position >= lastUnacknowledgedPosition && position < eventPosition) {
try {
updateExporterLastExportedRecordPosition(eventPosition);
} catch (final Exception e) {
LOG.warn(SKIP_POSITION_UPDATE_ERROR_MESSAGE, e);
}
}
}

private void updateExporterLastExportedRecordPosition(final long eventPosition) {
if (position < eventPosition) {
exportersState.setPosition(getId(), eventPosition);
position = eventPosition;
}
}

@Override
public void updateLastExportedRecordPosition(final long position) {
actor.run(() -> updateExporterLastExportedRecordPosition(position));
}

@Override
public void scheduleTask(final Duration delay, final Runnable task) {
actor.runDelayed(delay, task);
}

public String getId() {
return context.getConfiguration().getId();
}

private boolean acceptRecord(final RecordMetadata metadata) {
final Context.RecordFilter filter = context.getFilter();
return filter.acceptType(metadata.getRecordType())
&& filter.acceptValue(metadata.getValueType());
}

void configureExporter() throws Exception {
LOG.debug("Configure exporter with id '{}'", getId());
exporter.configure(context);
}

boolean exportRecord(final RecordMetadata rawMetadata, final TypedRecord typedEvent) {
try {
if (position < typedEvent.getPosition()) {
if (acceptRecord(rawMetadata)) {
export(typedEvent);
} else {
updatePositionOnSkipIfUpToDate(typedEvent.getPosition());
}
}
return true;
} catch (final Exception ex) {
context.getLogger().error("Error on exporting record with key {}", typedEvent.getKey(), ex);
return false;
}
}

private void export(final Record<?> record) {
exporter.export(record);
lastUnacknowledgedPosition = record.getPosition();
}

public void close() {
try {
exporter.close();
} catch (final Exception e) {
context.getLogger().error("Error on close", e);
}
}
}
Expand Up @@ -8,21 +8,16 @@
package io.zeebe.broker.exporter.stream;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.exporter.context.ExporterContext;
import io.zeebe.broker.exporter.repo.ExporterDescriptor;
import io.zeebe.db.ZeebeDb;
import io.zeebe.engine.processor.EventFilter;
import io.zeebe.engine.processor.RecordValues;
import io.zeebe.engine.processor.TypedEventImpl;
import io.zeebe.exporter.api.Exporter;
import io.zeebe.exporter.api.context.Context;
import io.zeebe.exporter.api.context.Controller;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.logstreams.log.LogStreamReader;
import io.zeebe.logstreams.log.LoggedEvent;
import io.zeebe.protocol.impl.record.RecordMetadata;
import io.zeebe.protocol.impl.record.UnifiedRecordValue;
import io.zeebe.protocol.record.Record;
import io.zeebe.protocol.record.RecordType;
import io.zeebe.protocol.record.ValueType;
import io.zeebe.util.LangUtil;
Expand Down Expand Up @@ -52,8 +47,6 @@ public final class ExporterDirector extends Actor {
"Expected to find event with the snapshot position %s in log stream, but nothing was found. Failed to recover '%s'.";

private static final Logger LOG = Loggers.EXPORTER_LOGGER;
private static final String SKIP_POSITION_UPDATE_ERROR_MESSAGE =
"Failed to update exporter position when skipping filtered record, can be skipped, but may indicate an issue if it occurs often";
private final AtomicBoolean isOpened = new AtomicBoolean(false);
private final List<ExporterContainer> containers;
private final LogStream logStream;
Expand Down Expand Up @@ -127,8 +120,8 @@ protected void onActorStarted() {
recoverFromSnapshot();

for (final ExporterContainer container : containers) {
LOG.debug("Configure exporter with id '{}'", container.getId());
container.exporter.configure(container.context);
container.initContainer(actor, state);
container.configureExporter();
}

eventFilter = createEventFilter(containers);
Expand Down Expand Up @@ -160,13 +153,7 @@ protected void onActorClosed() {
@Override
protected void onActorCloseRequested() {
isOpened.set(false);
for (final ExporterContainer container : containers) {
try {
container.exporter.close();
} catch (final Exception e) {
container.context.getLogger().error("Error on close", e);
}
}
containers.forEach(ExporterContainer::close);
}

private void recoverFromSnapshot() {
Expand All @@ -188,7 +175,7 @@ private void recoverFromSnapshot() {
private ExporterEventFilter createEventFilter(final List<ExporterContainer> containers) {

final List<Context.RecordFilter> recordFilters =
containers.stream().map(c -> c.context.getFilter()).collect(Collectors.toList());
containers.stream().map(c -> c.getContext().getFilter()).collect(Collectors.toList());

final Map<RecordType, Boolean> acceptRecordTypes =
Arrays.stream(RecordType.values())
Expand Down Expand Up @@ -220,13 +207,8 @@ private void onSnapshotRecovered() {

// start reading
for (final ExporterContainer container : containers) {
container.position = state.getPosition(container.getId());
container.lastUnacknowledgedPosition = container.position;
if (container.position == ExportersState.VALUE_NOT_FOUND) {
state.setPosition(container.getId(), -1L);
}
LOG.debug("Open exporter with id '{}'", container.getId());
container.exporter.open(container);
container.initPosition();
container.openExporter();
}

clearExporterState();
Expand Down Expand Up @@ -361,21 +343,8 @@ public boolean export() {
while (exporterIndex < exportersCount) {
final ExporterContainer container = containers.get(exporterIndex);

try {
if (container.position < typedEvent.getPosition()) {
if (container.acceptRecord(rawMetadata)) {
container.export(typedEvent);
} else {
container.updatePositionOnSkipIfUpToDate(typedEvent.getPosition());
}
}

if (container.exportRecord(rawMetadata, typedEvent)) {
exporterIndex++;
} catch (final Exception ex) {
container
.context
.getLogger()
.error("Error on exporting record with key {}", typedEvent.getKey(), ex);
return false;
}
}
Expand Down Expand Up @@ -421,67 +390,4 @@ public String toString() {
+ '}';
}
}

private class ExporterContainer implements Controller {

private final ExporterContext context;
private final Exporter exporter;
private long position;
private long lastUnacknowledgedPosition;

ExporterContainer(final ExporterDescriptor descriptor) {
context =
new ExporterContext(
Loggers.getExporterLogger(descriptor.getId()), descriptor.getConfiguration());

exporter = descriptor.newInstance();
}

private void export(final Record<?> record) {
exporter.export(record);
lastUnacknowledgedPosition = record.getPosition();
}

/**
* Updates the exporter's position if it is up to date - that is, if it's last acknowledged
* position is greater than or equal to its last unacknowledged position. This is safe to do
* when skipping records as it means we passed no record to this exporter between both.
*
* @param eventPosition the new, up to date position
*/
private void updatePositionOnSkipIfUpToDate(final long eventPosition) {
if (position >= lastUnacknowledgedPosition && position < eventPosition) {
try {
updateExporterLastExportedRecordPosition(eventPosition);
} catch (final Exception e) {
LOG.warn(SKIP_POSITION_UPDATE_ERROR_MESSAGE, e);
}
}
}

private void updateExporterLastExportedRecordPosition(final long eventPosition) {
state.setPositionIfGreater(getId(), eventPosition);
position = eventPosition;
}

@Override
public void updateLastExportedRecordPosition(final long position) {
actor.run(() -> updateExporterLastExportedRecordPosition(position));
}

@Override
public void scheduleTask(final Duration delay, final Runnable task) {
actor.runDelayed(delay, task);
}

private String getId() {
return context.getConfiguration().getId();
}

private boolean acceptRecord(final RecordMetadata metadata) {
final Context.RecordFilter filter = context.getFilter();
return filter.acceptType(metadata.getRecordType())
&& filter.acceptValue(metadata.getValueType());
}
}
}
Expand Up @@ -34,23 +34,6 @@ public void setPosition(final String exporterId, final long position) {
setPosition(position);
}

public void setPositionIfGreater(final String exporterId, final long position) {
this.exporterId.wrapString(exporterId);

setPositionIfGreater(position);
}

private void setPositionIfGreater(final long position) {
// not that performant then rocksdb merge but
// was currently simpler and easier to implement
// if necessary change it again to merge

final long oldPosition = getPosition();
if (oldPosition < position) {
setPosition(position);
}
}

public long getPosition(final String exporterId) {
this.exporterId.wrapString(exporterId);
return getPosition();
Expand Down

0 comments on commit deaf0d1

Please sign in to comment.