Skip to content

Commit

Permalink
Add datadog traces for the connector builder server (#6254)
Browse files Browse the repository at this point in the history
  • Loading branch information
girarda committed May 1, 2023
1 parent 3d6aeaf commit 8ac46f2
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.connector_builder;

/**
* Collection of constants for APM tracing.
*/
public final class ApmTraceConstants {

/**
* Collection of constants for APM tracing.
*/
public static final String CONNECTOR_BUILDER_OPERATION_NAME = "connector_builder_server";

private ApmTraceConstants() {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

package io.airbyte.connector_builder.command_runner;

import datadog.trace.api.Trace;
import io.airbyte.commons.io.IOs;
import io.airbyte.connector_builder.ApmTraceConstants;
import io.airbyte.connector_builder.exceptions.AirbyteCdkInvalidInputException;
import io.airbyte.connector_builder.exceptions.CdkProcessException;
import io.airbyte.connector_builder.requester.AirbyteCdkRequesterImpl;
Expand Down Expand Up @@ -33,6 +35,7 @@ public class ProcessOutputParser {
private static final int timeOut = 30;

@SuppressWarnings("PMD.AvoidCatchingNPE")
@Trace(operationName = ApmTraceConstants.CONNECTOR_BUILDER_OPERATION_NAME)
AirbyteRecordMessage parse(
final Process process,
final AirbyteStreamFactory streamFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import datadog.trace.api.Trace;
import io.airbyte.connector_builder.ApmTraceConstants;
import io.airbyte.connector_builder.file_writer.AirbyteArgument;
import io.airbyte.connector_builder.file_writer.AirbyteFileWriter;
import io.airbyte.protocol.models.AirbyteRecordMessage;
Expand Down Expand Up @@ -48,6 +50,7 @@ public SynchronousPythonCdkCommandRunner(
* returned by the CDK.
*/
@Override
@Trace(operationName = ApmTraceConstants.CONNECTOR_BUILDER_OPERATION_NAME)
public AirbyteRecordMessage runCommand(
final String cdkCommand,
final String configContents,
Expand All @@ -62,6 +65,7 @@ public AirbyteRecordMessage runCommand(
* Start the python process. NOTE: This method should be called within a try-with-resources
* statement, to ensure that the files are cleaned up after the process is done.
*/
@Trace(operationName = ApmTraceConstants.CONNECTOR_BUILDER_OPERATION_NAME)
AirbyteCdkProcess start(
final String cdkCommand,
final String configContents,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public StreamsHandler(final AirbyteCdkRequester requester) {
public StreamsListRead listStreams(final StreamsListRequestBody streamsListRequestBody)
throws AirbyteCdkInvalidInputException, ConnectorBuilderException {
try {
LOGGER.info("Handling list_streams request.");
LOGGER.debug("Handling list_streams request.");
return this.requester.listStreams(streamsListRequestBody.getManifest(), streamsListRequestBody.getConfig());
} catch (final IOException exc) {
LOGGER.error("Error handling list_streams request.", exc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import datadog.trace.api.Trace;
import io.airbyte.connector_builder.ApmTraceConstants;
import io.airbyte.connector_builder.api.model.generated.ResolveManifest;
import io.airbyte.connector_builder.api.model.generated.StreamRead;
import io.airbyte.connector_builder.api.model.generated.StreamReadSlicesInner;
Expand Down Expand Up @@ -67,34 +69,43 @@ public AirbyteCdkRequesterImpl(final SynchronousCdkCommandRunner commandRunner)
* Launch a CDK process responsible for handling resolve_manifest requests.
*/
@Override
@Trace(operationName = ApmTraceConstants.CONNECTOR_BUILDER_OPERATION_NAME)
public StreamRead readStream(final JsonNode manifest, final JsonNode config, final String stream, final Integer recordLimit)
throws IOException, AirbyteCdkInvalidInputException, CdkProcessException {
if (stream == null) {
throw new AirbyteCdkInvalidInputException("Missing required `stream` field.");
}
final AirbyteRecordMessage record = request(manifest, config, readStreamCommand, stream, recordLimit);
return recordToResponse(record);
}

@Trace(operationName = ApmTraceConstants.CONNECTOR_BUILDER_OPERATION_NAME)
private StreamRead recordToResponse(final AirbyteRecordMessage record) {
final StreamRead response = new StreamRead();
final JsonNode data = record.getData();
final List<Object> logList = convertToList(data.get("logs"), new TypeReference<List<Object>>() {});
final List<StreamReadSlicesInner> sliceList = convertToList(data.get("slices"), new TypeReference<List<StreamReadSlicesInner>>() {});
final List<Object> logList = convertToList(data.get("logs"), new TypeReference<>() {});
final List<StreamReadSlicesInner> sliceList = convertToList(data.get("slices"), new TypeReference<>() {});
response.setLogs(logList);
response.setSlices(sliceList);
response.setInferredSchema(data.get("inferred_schema"));
response.setTestReadLimitReached(data.get("test_read_limit_reached").asBoolean());
return response;

}

/**
* Launch a CDK process responsible for handling resolve_manifest requests.
*/
@Override
@Trace(operationName = ApmTraceConstants.CONNECTOR_BUILDER_OPERATION_NAME)
public ResolveManifest resolveManifest(final JsonNode manifest)
throws IOException, AirbyteCdkInvalidInputException, CdkProcessException {
final AirbyteRecordMessage record = request(manifest, CONFIG_NODE, resolveManifestCommand);
return new ResolveManifest().manifest(record.getData().get("manifest"));
}

@Override
@Trace(operationName = ApmTraceConstants.CONNECTOR_BUILDER_OPERATION_NAME)
public StreamsListRead listStreams(final JsonNode manifest, final JsonNode config)
throws IOException, AirbyteCdkInvalidInputException, CdkProcessException {
return new StreamsListRead().streams(
Expand Down
5 changes: 5 additions & 0 deletions docker-compose.datadog.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ services:
environment:
<<: *datadogged-environment
DD_SERVICE: airbyte-server
airbyte-connector-builder-server:
<<: *datadogged-volumes
environment:
<<: *datadogged-environment
DD_SERVICE: airbyte-connector-builder-server
airbyte-cron:
<<: *datadogged-volumes
environment:
Expand Down
10 changes: 9 additions & 1 deletion docker-compose.debug.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,16 @@ services:
server:
# You will need to create a remote JVM debugging Run Configuration
# If you're on a Mac you will need to obtain the IP address of the container
# The value of DEBUG_SERVER_JAVA_OPTIONS should be the same as DEBUG_CONTAINER_JAVA_OPTS above
# The value of JAVA_TOOL_OPTIONS should be the same as DEBUG_CONTAINER_JAVA_OPTS above
environment:
- JAVA_TOOL_OPTIONS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5007
ports:
- 5007:5007
airbyte-connector-builder-server:
# You will need to create a remote JVM debugging Run Configuration
# If you're on a Mac you will need to obtain the IP address of the container
# The value of JAVA_TOOL_OPTIONS should be the same as DEBUG_CONTAINER_JAVA_OPTS above
environment:
- JAVA_TOOL_OPTIONS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5008
ports:
- 5008:5008

0 comments on commit 8ac46f2

Please sign in to comment.