Skip to content

Commit

Permalink
merge: #9326
Browse files Browse the repository at this point in the history
9326: Add exporter API test implementations r=npepinpe a=npepinpe

## Description

This PR adds test implementation for the exporter API, allowing easier testing of exporters.

Configuration/context is kept to a minimum: users can set the configuration that will be instantiated beforehand, but no instantiation is done for them from the provided args. This is a limitation at the moment, to avoid coupling the broker's implementation and this one here. It's sufficient for our own use cases for now.

The controller can schedule tasks and process them deterministically. This is done manually, by having users "tick" the controller manually - it will then run all scheduled tasks which have "expired" in order, synchronously. Tasks are kept in memory, even after being executed, so we can still assert properties of previous scheduled, canceled, or executed tasks.

## Related issues

closes #9319 



Co-authored-by: Nicolas Pepin-Perreault <nicolas.pepin-perreault@camunda.com>
  • Loading branch information
zeebe-bors-camunda[bot] and npepinpe committed May 10, 2022
2 parents 6a38ef8 + 041c522 commit 9e82fff
Show file tree
Hide file tree
Showing 10 changed files with 590 additions and 0 deletions.
56 changes: 56 additions & 0 deletions exporter-test/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.camunda</groupId>
<artifactId>zeebe-parent</artifactId>
<version>8.1.0-SNAPSHOT</version>
<relativePath>../parent/pom.xml</relativePath>
</parent>

<artifactId>zeebe-exporter-test</artifactId>
<packaging>jar</packaging>

<name>Zeebe Exporter Test Harness</name>

<dependencies>
<dependency>
<groupId>io.camunda</groupId>
<artifactId>zeebe-exporter-api</artifactId>
</dependency>

<!-- utilities -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<!-- SpotBugs annotations -->
<dependency>
<groupId>net.jcip</groupId>
<artifactId>jcip-annotations</artifactId>
<scope>provided</scope>
</dependency>

<!-- Test dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.agrona</groupId>
<artifactId>agrona</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.exporter.test;

import io.camunda.zeebe.exporter.api.context.Configuration;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import net.jcip.annotations.Immutable;

/**
* An immutable implementation of {@link Configuration}. Accepts configuration suppliers, passing
* the arguments map to the supplier. This allows for flexible injection of configuration when
* testing the exporter.
*
* @param <T> the actual configuration type
*/
@Immutable
public final class ExporterTestConfiguration<T> implements Configuration {
private final String id;
private final Map<String, Object> arguments;
private final Function<Map<String, Object>, T> configurationSupplier;

public ExporterTestConfiguration(final String id, final T configuration) {
this(id, ignored -> configuration);
}

public ExporterTestConfiguration(
final String id, final Function<Map<String, Object>, T> configurationSupplier) {
this(id, Collections.emptyMap(), configurationSupplier);
}

public ExporterTestConfiguration(
final String id,
final Map<String, Object> arguments,
final Function<Map<String, Object>, T> configurationSupplier) {
this.id = Objects.requireNonNull(id, "must specify an ID");
this.arguments = Objects.requireNonNull(arguments, "must specify arguments");
this.configurationSupplier =
Objects.requireNonNull(configurationSupplier, "must specific a configurationSupplier");
}

@Override
public String getId() {
return id;
}

@Override
public Map<String, Object> getArguments() {
return arguments;
}

@Override
public <R> R instantiate(final Class<R> configClass) {
Objects.requireNonNull(configClass, "must pass a non null configClass");

final var configuration = configurationSupplier.apply(arguments);
return configClass.cast(configuration);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.exporter.test;

import io.camunda.zeebe.exporter.api.context.Configuration;
import io.camunda.zeebe.exporter.api.context.Context;
import java.util.Objects;
import net.jcip.annotations.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A mutable implementation of {@link Context} for testing. The context is passed only during the
* configuration phase, and any modifications afterwards isn't really used, so there is no real need
* to make this thread-safe at the moment.
*/
@NotThreadSafe
public final class ExporterTestContext implements Context {
private static final Logger DEFAULT_LOGGER = LoggerFactory.getLogger(ExporterTestContext.class);

private Configuration configuration;
private RecordFilter recordFilter;

@Override
public Logger getLogger() {
return DEFAULT_LOGGER;
}

@Override
public Configuration getConfiguration() {
return configuration;
}

public ExporterTestContext setConfiguration(final Configuration configuration) {
this.configuration = Objects.requireNonNull(configuration, "must specify a configuration");
return this;
}

@Override
public void setFilter(final RecordFilter filter) {
recordFilter = filter;
}

public RecordFilter getRecordFilter() {
return recordFilter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.exporter.test;

import io.camunda.zeebe.exporter.api.context.Controller;
import io.camunda.zeebe.exporter.api.context.ScheduledTask;
import java.time.Duration;
import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import net.jcip.annotations.ThreadSafe;

/**
* A thread safe implementation of {@link Controller}. Tasks are scheduled and executed
* synchronously. To trigger execution of scheduled tasks, a manual call to {@link
* #runScheduledTasks(Duration)} is required. Time is always relative to the last call to this
* method.
*
* <p>NOTE: if a task is scheduled with a {@link Duration#ZERO}, it is <em>not</em> ran immediately,
* but instead will run the next time {@link #runScheduledTasks(Duration)} is called.
*
* <p>NOTE: tasks are not removed from the in-memory lists when they are executed. This is so you
* can still assert/verify their properties even after they were executed. You can clear the task
* lists if it grows to large via {@link #resetScheduledTasks()}
*/
@ThreadSafe
public final class ExporterTestController implements Controller {
private static final long UNKNOWN_POSITION = -1;

private final AtomicLong position = new AtomicLong(UNKNOWN_POSITION);
private final List<ExporterTestScheduledTask> scheduledTasks = new CopyOnWriteArrayList<>();
private volatile long lastRanAtMs = 0;

@Override
public void updateLastExportedRecordPosition(final long position) {
this.position.getAndAccumulate(position, Math::max);
}

@Override
public synchronized ScheduledTask scheduleCancellableTask(
final Duration delay, final Runnable task) {
final var scheduledTask =
new ExporterTestScheduledTask(
Objects.requireNonNull(delay, "must specify a task delay"),
Objects.requireNonNull(task, "must specify a task"));

scheduledTasks.add(scheduledTask);
return scheduledTask;
}

/**
* Clears the list of scheduled tasks and resets the time of the scheduler to 0. NOTE: this call
* does not cancel scheduled tasks.
*/
public synchronized void resetScheduledTasks() {
lastRanAtMs = 0;
scheduledTasks.clear();
}

/**
* Returns the last updated position, as set via {@link #updateLastExportedRecordPosition(long)}.
*/
public long getPosition() {
return position.get();
}

/**
* Returns all scheduled tasks since the last call to {@link #resetScheduledTasks()}, including
* tasks that were already canceled or executed.
*/
public List<ExporterTestScheduledTask> getScheduledTasks() {
return scheduledTasks;
}

/** Returns the last time the scheduler ran. Primarily for debugging purposes. */
public Instant getLastRanAt() {
return Instant.ofEpochMilli(lastRanAtMs);
}

/**
* Will run all tasks scheduled since the last time this was executed + the given duration.
*
* @param elapsed upper bound of tasks delay
*/
public synchronized void runScheduledTasks(final Duration elapsed) {
Objects.requireNonNull(elapsed, "must specify a tick duration");
final Duration upperBound = elapsed.plusMillis(lastRanAtMs);

scheduledTasks.stream()
.filter(t -> t.getDelay().compareTo(upperBound) <= 0)
.filter(t -> !t.isCanceled())
.sorted(Comparator.comparing(ExporterTestScheduledTask::getDelay))
.forEach(ExporterTestScheduledTask::run);

lastRanAtMs = upperBound.toMillis();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under
* one or more contributor license agreements. See the NOTICE file distributed
* with this work for additional information regarding copyright ownership.
* Licensed under the Zeebe Community License 1.1. You may not use this file
* except in compliance with the Zeebe Community License 1.1.
*/
package io.camunda.zeebe.exporter.test;

import io.camunda.zeebe.exporter.api.context.ScheduledTask;
import java.time.Duration;
import java.util.Objects;
import net.jcip.annotations.ThreadSafe;

/**
* A controllable, thread-safe implementation of {@link ScheduledTask}. Thread-safety is important
* as exporters may cancel the task from a different thread, and it's not that difficult to
* guarantee.
*
* <p>This implementation is meant to be used with {@link ExporterTestController}.
*/
@ThreadSafe
public final class ExporterTestScheduledTask implements ScheduledTask, Runnable {
private final Duration delay;
private final Runnable task;

private volatile boolean isExecuted;
private volatile boolean isCanceled;

public ExporterTestScheduledTask(final Duration delay, final Runnable task) {
this.delay = Objects.requireNonNull(delay, "must specify a task delay");
this.task = Objects.requireNonNull(task, "must specify a task");
}

public Duration getDelay() {
return delay;
}

public Runnable getTask() {
return task;
}

public boolean isCanceled() {
return isCanceled;
}

public boolean wasExecuted() {
return isExecuted;
}

@Override
public synchronized void run() {
if (isCanceled || isExecuted) {
return;
}

task.run();
isExecuted = true;
}

@Override
public synchronized void cancel() {
if (isCanceled || isExecuted) {
return;
}

isCanceled = true;
}
}
Loading

0 comments on commit 9e82fff

Please sign in to comment.