Skip to content

Commit

Permalink
Performance Harness: Support for pseudo-parallel streams (#26219)
Browse files Browse the repository at this point in the history
* Adds support for pseudo-parallel datasets

* Ran ./gradlew :spotlessJavaApply

* Automated Change

* Fixes issue with parallel datasets credentials

* Fixes filter for parallel credentials

* Adds a new configurable property  to build a pseudo-parallel catalog

* Fixes Github Actions variable to be processed properly with the K8s harness yaml

* Adds unit test for random streams and generating streams within the same configured catalog

* Ran ./gradlew :spotlessJavaApply

* Added additional description for GitHub Actions

* Update connector-performance-command.yml

Moved text up to connect with other argument discussion

* Fixes spotBugs issue

* Automated Commit - Formatting Changes

* Update GitHub Action description

---------

Co-authored-by: ryankfu <ryankfu@users.noreply.github.com>
Co-authored-by: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com>
  • Loading branch information
3 people committed May 22, 2023
1 parent 988ce24 commit 3689ff3
Show file tree
Hide file tree
Showing 12 changed files with 183 additions and 34 deletions.
11 changes: 9 additions & 2 deletions .github/workflows/connector-performance-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ on:
description: "Name of dataset to use for performance measurement. Currently supports 1m, 10m, 20m."
required: false
default: "1m"
stream-number:
description: "Number of streams to use for destination performance measurement."
required: false
default: "1"
jobs:
uuid:
name: "Custom UUID of workflow run"
Expand Down Expand Up @@ -68,8 +72,9 @@ jobs:
with:
comment-id: ${{ github.event.inputs.comment-id }}
body: |
#### Note: The following `dataset=` values are supported: `1m`<sub>(default)</sub>, `10m`, `20m`, `bottleneck_stream1`
> :runner: ${{github.event.inputs.connector}} https://github.com/${{github.repository}}/actions/runs/${{github.run_id}}
#### Note: The following `dataset=` values are supported: `1m`<sub>(default)</sub>, `10m`, `20m`, `bottleneck_stream1`.
For destination performance only: you can also use `stream-numbers=N` to simulate N number of parallel streams.
> :runner: ${{github.event.inputs.connector}} https://github.com/${{github.repository}}/actions/runs/${{github.run_id}}.
- name: Search for valid connector name format
id: regex
uses: AsasInnab/regex-action@v1
Expand Down Expand Up @@ -140,6 +145,7 @@ jobs:
env:
CONN: ${{ github.event.inputs.connector }}
DS: ${{ github.event.inputs.dataset }}
STREAM_NUMBER: ${{ github.event.inputs.stream-number }}
PREFIX: '{"type":"LOG","log":{"level":"INFO","message":"INFO i.a.i.p.PerformanceTest(runTest):165'
SUFFIX: '"}}'
HARNESS_TYPE: ${{ steps.which-harness.outputs.harness_type }}
Expand All @@ -151,6 +157,7 @@ jobs:
# envsubst requires variables to be exported
export CONNECTOR_IMAGE_NAME=${CONN/connectors/airbyte}:dev
export DATASET=$DS
export STREAM_NUMBER=$STREAM_NUMBER
export HARNESS=$HARNESS_TYPE
envsubst < ./tools/bin/run-harness-process.yaml | kubectl create -f -
echo "harness is ${{ steps.which-harness.outputs.harness_type }}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,8 @@ Performance harness for destination connectors.

This component is used by the `/connector-performance` GitHub action and is used in order to test throughput of
destination connectors on a number of datasets.

Associated files are:
<li>Main.java - the main entrypoint for the harness
<li>PerformanceTest.java - sets up the destination connector, sends records to it, and measures throughput
<li>run-harness-process.yaml - kubernetes file that processes dynamic arguments and runs the harness
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}

repositories {
Expand All @@ -22,4 +23,9 @@ dependencies {
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
implementation 'io.airbyte:airbyte-commons-worker:0.42.0'
implementation 'io.airbyte.airbyte-config:config-models:0.42.0'
implementation 'junit:junit:4.13.1'
implementation 'junit:junit:4.13.1'
implementation 'org.testng:testng:7.1.0'
implementation 'org.junit.jupiter:junit-jupiter:5.8.1'

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import java.io.BufferedReader;
Expand All @@ -26,16 +29,23 @@ public class Main {
private static final String CREDENTIALS_PATH = "secrets/%s_%s_credentials.json";

public static void main(final String[] args) {
// If updating args for Github Actions, also update the run-performance-test.yml file
log.info("args: {}", Arrays.toString(args));
String image = null;
String dataset = "1m";
int numOfParallelStreams = 1;

switch (args.length) {
case 1 -> image = args[0];
case 2 -> {
image = args[0];
dataset = args[1];
}
case 3 -> {
image = args[0];
dataset = args[1];
numOfParallelStreams = Integer.parseInt(args[2]);
}
default -> {
log.info("unexpected arguments");
System.exit(1);
Expand All @@ -55,6 +65,7 @@ public static void main(final String[] args) {
final JsonNode catalog;
try {
catalog = getCatalog(dataset, connector);
duplicateStreams(catalog, numOfParallelStreams);
} catch (final IOException ex) {
throw new IllegalStateException("Failed to read catalog", ex);
}
Expand All @@ -72,7 +83,7 @@ public static void main(final String[] args) {

log.info("Starting performance harness for {} ({})", image, dataset);
try {
final PerformanceTest test = new PerformanceTest(
final PerformanceHarness test = new PerformanceHarness(
image,
config.toString(),
catalog.toString(),
Expand All @@ -86,13 +97,38 @@ public static void main(final String[] args) {
System.exit(0);
}

static JsonNode getCatalog(final String dataset, final String connector) throws IOException {
final ObjectMapper objectMapper = new ObjectMapper();
final String catalogFilename = "catalogs/%s/%s_catalog.json".formatted(connector, dataset);
final InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(catalogFilename);
return objectMapper.readTree(is);
/**
* Duplicate the streams in the catalog to emulate parallel streams
*
* @param root the catalog
* @param duplicateFactor the number of times to duplicate each stream
*/
@VisibleForTesting
static void duplicateStreams(final JsonNode root, final int duplicateFactor) {
try {
final ObjectNode streamObject = (ObjectNode) root.path("streams").get(0);
// Since we already have one stream, we only need to duplicate the remaining streams
for (int i = 1; i < duplicateFactor; i++) {
final ObjectNode newStream = streamObject.deepCopy();
final String streamName = newStream.path("stream").path("name").asText();
((ObjectNode) newStream.get("stream")).put("name", streamName + i);
((ArrayNode) root.path("streams")).add(newStream);
}
} catch (final Exception e) {
log.error("Failed to duplicate streams", e);
}
}

/**
* Read the datasource file for the given dataset and connector.
* <p>
* Example: catalogs/destination_snowflake/1m_datasource.txt
*
* @param dataset the dataset to read
* @param connector the connector to read
* @return the datasource
* @throws IOException if the datasource file cannot be read
*/
static String getDatasource(final String dataset, final String connector) throws IOException {
final String datasourceFilename = "catalogs/%s/%s_datasource.txt".formatted(connector, dataset);
log.info("datasourceFilename {}", datasourceFilename);
Expand All @@ -103,4 +139,21 @@ static String getDatasource(final String dataset, final String connector) throws
}
}

/**
* Read the catalog file for the given dataset and connector.
* <p>
* Example: catalogs/destination_snowflake/1m_catalog.json
*
* @param dataset the dataset to read
* @param connector the connector to read
* @return the catalog
* @throws IOException if the catalog file cannot be read
*/
static JsonNode getCatalog(final String dataset, final String connector) throws IOException {
final ObjectMapper objectMapper = new ObjectMapper();
final String catalogFilename = "catalogs/%s/%s_catalog.json".formatted(connector, dataset);
final InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(catalogFilename);
return objectMapper.readTree(is);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.AllowedHosts;
Expand Down Expand Up @@ -44,6 +45,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.regex.Pattern;
Expand All @@ -56,7 +58,7 @@
* sending AirbyteRecordMessages the same way platform pipes data into the destination
*/
@Slf4j
public class PerformanceTest {
public class PerformanceHarness {

public static final int PORT1 = 9877;
public static final int PORT2 = 9878;
Expand All @@ -73,7 +75,7 @@ public class PerformanceTest {

private DefaultAirbyteDestination destination;

PerformanceTest(final String imageName, final String config, final String catalog, final String datasource) throws JsonProcessingException {
PerformanceHarness(final String imageName, final String config, final String catalog, final String datasource) throws JsonProcessingException {
final ObjectMapper mapper = new ObjectMapper();
this.imageName = imageName;
this.config = mapper.readTree(config);
Expand All @@ -95,6 +97,8 @@ public class PerformanceTest {
* @throws Exception
*/
void runTest() throws Exception {
final List<String> streamNames = catalog.getStreams().stream().map(stream -> stream.getStream().getName()).toList();
final Random random = new Random();
final AirbyteIntegrationLauncher dstIntegtationLauncher = getAirbyteIntegrationLauncher();
final WorkerDestinationConfig dstConfig = new WorkerDestinationConfig()
.withDestinationConnectionConfiguration(this.config)
Expand Down Expand Up @@ -149,7 +153,7 @@ void runTest() throws Exception {
final AirbyteMessage airbyteMessage = new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(catalog.getStreams().get(0).getStream().getName())
.withStream(getStreamName(streamNames, random))
.withNamespace(catalog.getStreams().get(0).getStream().getNamespace())
.withData(Jsons.deserialize(recordString)));
airbyteMessage.getRecord().setEmittedAt(start);
Expand All @@ -172,6 +176,13 @@ void runTest() throws Exception {
logListener.cancel(true);
log.info("Test ended successfully");
computeThroughput(totalBytes, counter, start);
// TODO: (ryankfu) when incremental syncs are supported, add a tearDown method to clear table
}

// TODO: (ryankfu) get less hacky way to generate multiple streams
@VisibleForTesting
static String getStreamName(final List<String> listOfStreamNames, final Random random) {
return listOfStreamNames.get(random.nextInt(listOfStreamNames.size()));
}

private void computeThroughput(final double totalBytes, final long counter, final long start) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
"sync_mode": "full_refresh",
"primary_key": [["id"]],
"cursor_field": ["updated_at"],
"destination_sync_mode": "append"
"destination_sync_mode": "overwrite"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
"sync_mode": "full_refresh",
"primary_key": [["id"]],
"cursor_field": ["updated_at"],
"destination_sync_mode": "append"
"destination_sync_mode": "overwrite"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination_performance;

import static org.junit.jupiter.api.Assertions.*;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;

class MainTest {

@Test
void testDuplicateStreams() throws JsonProcessingException {
final String simpleCatalog = """
{
"streams": [
{
"stream": {
"name": "users",
"namespace": "PERF_TEST_HARNESS",
"json_schema": {
"type": "object",
"properties": {
"id": {
"type": "number",
"airbyte_type": "integer"
},
"academic_degree": {
"type": "string"
}
}
},
"default_cursor_field": [],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "full_refresh",
"primary_key": [["id"]],
"cursor_field": ["updated_at"],
"destination_sync_mode": "overwrite"
}
]
}
""";
final ObjectMapper objectMapper = new ObjectMapper();
final JsonNode root = objectMapper.readTree(simpleCatalog);
final int duplicateFactor = 10;
Main.duplicateStreams(root, duplicateFactor);
assertEquals(duplicateFactor, root.get("streams").size());
assertEquals("users9", root.path("streams").get(9).path("stream").path("name").asText());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination_performance;

import static org.junit.Assert.assertNotEquals;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.junit.jupiter.api.Test;

class PerformanceHarnessTest {

@Test
public void testRandomStreamName() {
final List<String> streamNames = new ArrayList<>();
final Random random = new Random();
final int duplicateFactor = 1000;
// Keep this number high to avoid statistical collisions. Alternative was to consider chi-squared
for (int i = 1; i <= duplicateFactor; i++) {
streamNames.add("stream" + i);
}
final String streamName1 = PerformanceHarness.getStreamName(streamNames, random);
final String streamName2 = PerformanceHarness.getStreamName(streamNames, random);
assertNotEquals(streamName1, streamName2);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,6 @@ public static void main(final String[] args) {
image,
config.toString(),
catalog.toString());

// final ExecutorService executors = Executors.newFixedThreadPool(2);
// final CompletableFuture<Void> readSrcAndWriteDstThread = CompletableFuture.runAsync(() -> {
// try {
// test.runTest();
// } catch (final Exception e) {
// throw new RuntimeException(e);
// }
// }, executors);

// Uncomment to add destination
/*
* final CompletableFuture<Void> readFromDstThread = CompletableFuture.runAsync(() -> { try {
* Thread.sleep(20_000); test.readFromDst(); } catch (final InterruptedException e) { throw new
* RuntimeException(e); } }, executors);
*/

// CompletableFuture.anyOf(readSrcAndWriteDstThread/* , readFromDstThread */).get();
test.runTest();
} catch (final Exception e) {
log.error("Test failed", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ public class PerformanceTest {
private final JsonNode config;
private final ConfiguredAirbyteCatalog catalog;

// private DefaultAirbyteDestination destination;

PerformanceTest(final String imageName, final String config, final String catalog) throws JsonProcessingException {
final ObjectMapper mapper = new ObjectMapper();
this.imageName = imageName;
Expand Down
2 changes: 1 addition & 1 deletion tools/bin/run-harness-process.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ spec:
containers:
- name: main
image: airbyte/$HARNESS:dev
args: ["$CONNECTOR_IMAGE_NAME", "$DATASET"]
args: ["$CONNECTOR_IMAGE_NAME", "$DATASET", "$STREAM_NUMBER"]
volumeMounts:
- name: secrets-volume
mountPath: /airbyte/secrets
Expand Down

0 comments on commit 3689ff3

Please sign in to comment.