diff --git a/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/ApmTraceConstants.java b/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/ApmTraceConstants.java new file mode 100644 index 00000000000..d7c21584e8d --- /dev/null +++ b/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/ApmTraceConstants.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.connector_builder; + +/** + * Collection of constants for APM tracing. + */ +public final class ApmTraceConstants { + + /** + * Collection of constants for APM tracing. + */ + public static final String CONNECTOR_BUILDER_OPERATION_NAME = "connector_builder_server"; + + private ApmTraceConstants() { + + } + +} diff --git a/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/command_runner/ProcessOutputParser.java b/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/command_runner/ProcessOutputParser.java index 552641b2917..20f76eba470 100644 --- a/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/command_runner/ProcessOutputParser.java +++ b/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/command_runner/ProcessOutputParser.java @@ -4,7 +4,9 @@ package io.airbyte.connector_builder.command_runner; +import datadog.trace.api.Trace; import io.airbyte.commons.io.IOs; +import io.airbyte.connector_builder.ApmTraceConstants; import io.airbyte.connector_builder.exceptions.AirbyteCdkInvalidInputException; import io.airbyte.connector_builder.exceptions.CdkProcessException; import io.airbyte.connector_builder.requester.AirbyteCdkRequesterImpl; @@ -33,6 +35,7 @@ public class ProcessOutputParser { private static final int timeOut = 30; @SuppressWarnings("PMD.AvoidCatchingNPE") + @Trace(operationName = ApmTraceConstants.CONNECTOR_BUILDER_OPERATION_NAME) AirbyteRecordMessage parse( final Process process, final AirbyteStreamFactory streamFactory, diff --git a/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/command_runner/SynchronousPythonCdkCommandRunner.java b/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/command_runner/SynchronousPythonCdkCommandRunner.java index 4e89ca3b929..45dc9163b3c 100644 --- a/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/command_runner/SynchronousPythonCdkCommandRunner.java +++ b/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/command_runner/SynchronousPythonCdkCommandRunner.java @@ -6,6 +6,8 @@ import com.google.common.base.Joiner; import com.google.common.collect.Lists; +import datadog.trace.api.Trace; +import io.airbyte.connector_builder.ApmTraceConstants; import io.airbyte.connector_builder.file_writer.AirbyteArgument; import io.airbyte.connector_builder.file_writer.AirbyteFileWriter; import io.airbyte.protocol.models.AirbyteRecordMessage; @@ -48,6 +50,7 @@ public SynchronousPythonCdkCommandRunner( * returned by the CDK. */ @Override + @Trace(operationName = ApmTraceConstants.CONNECTOR_BUILDER_OPERATION_NAME) public AirbyteRecordMessage runCommand( final String cdkCommand, final String configContents, @@ -62,6 +65,7 @@ public AirbyteRecordMessage runCommand( * Start the python process. NOTE: This method should be called within a try-with-resources * statement, to ensure that the files are cleaned up after the process is done. */ + @Trace(operationName = ApmTraceConstants.CONNECTOR_BUILDER_OPERATION_NAME) AirbyteCdkProcess start( final String cdkCommand, final String configContents, diff --git a/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/handlers/StreamsHandler.java b/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/handlers/StreamsHandler.java index dcdf1dff5d0..077a3b0857e 100644 --- a/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/handlers/StreamsHandler.java +++ b/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/handlers/StreamsHandler.java @@ -36,7 +36,7 @@ public StreamsHandler(final AirbyteCdkRequester requester) { public StreamsListRead listStreams(final StreamsListRequestBody streamsListRequestBody) throws AirbyteCdkInvalidInputException, ConnectorBuilderException { try { - LOGGER.info("Handling list_streams request."); + LOGGER.debug("Handling list_streams request."); return this.requester.listStreams(streamsListRequestBody.getManifest(), streamsListRequestBody.getConfig()); } catch (final IOException exc) { LOGGER.error("Error handling list_streams request.", exc); diff --git a/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/requester/AirbyteCdkRequesterImpl.java b/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/requester/AirbyteCdkRequesterImpl.java index 60a91e8691b..5bfd1f0dbe5 100644 --- a/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/requester/AirbyteCdkRequesterImpl.java +++ b/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/requester/AirbyteCdkRequesterImpl.java @@ -10,6 +10,8 @@ import com.fasterxml.jackson.databind.ObjectWriter; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import datadog.trace.api.Trace; +import io.airbyte.connector_builder.ApmTraceConstants; import io.airbyte.connector_builder.api.model.generated.ResolveManifest; import io.airbyte.connector_builder.api.model.generated.StreamRead; import io.airbyte.connector_builder.api.model.generated.StreamReadSlicesInner; @@ -67,27 +69,35 @@ public AirbyteCdkRequesterImpl(final SynchronousCdkCommandRunner commandRunner) * Launch a CDK process responsible for handling resolve_manifest requests. */ @Override + @Trace(operationName = ApmTraceConstants.CONNECTOR_BUILDER_OPERATION_NAME) public StreamRead readStream(final JsonNode manifest, final JsonNode config, final String stream, final Integer recordLimit) throws IOException, AirbyteCdkInvalidInputException, CdkProcessException { if (stream == null) { throw new AirbyteCdkInvalidInputException("Missing required `stream` field."); } final AirbyteRecordMessage record = request(manifest, config, readStreamCommand, stream, recordLimit); + return recordToResponse(record); + } + + @Trace(operationName = ApmTraceConstants.CONNECTOR_BUILDER_OPERATION_NAME) + private StreamRead recordToResponse(final AirbyteRecordMessage record) { final StreamRead response = new StreamRead(); final JsonNode data = record.getData(); - final List logList = convertToList(data.get("logs"), new TypeReference>() {}); - final List sliceList = convertToList(data.get("slices"), new TypeReference>() {}); + final List logList = convertToList(data.get("logs"), new TypeReference<>() {}); + final List sliceList = convertToList(data.get("slices"), new TypeReference<>() {}); response.setLogs(logList); response.setSlices(sliceList); response.setInferredSchema(data.get("inferred_schema")); response.setTestReadLimitReached(data.get("test_read_limit_reached").asBoolean()); return response; + } /** * Launch a CDK process responsible for handling resolve_manifest requests. */ @Override + @Trace(operationName = ApmTraceConstants.CONNECTOR_BUILDER_OPERATION_NAME) public ResolveManifest resolveManifest(final JsonNode manifest) throws IOException, AirbyteCdkInvalidInputException, CdkProcessException { final AirbyteRecordMessage record = request(manifest, CONFIG_NODE, resolveManifestCommand); @@ -95,6 +105,7 @@ public ResolveManifest resolveManifest(final JsonNode manifest) } @Override + @Trace(operationName = ApmTraceConstants.CONNECTOR_BUILDER_OPERATION_NAME) public StreamsListRead listStreams(final JsonNode manifest, final JsonNode config) throws IOException, AirbyteCdkInvalidInputException, CdkProcessException { return new StreamsListRead().streams( diff --git a/docker-compose.datadog.yaml b/docker-compose.datadog.yaml index d42491836e2..24824646e99 100644 --- a/docker-compose.datadog.yaml +++ b/docker-compose.datadog.yaml @@ -64,6 +64,11 @@ services: environment: <<: *datadogged-environment DD_SERVICE: airbyte-server + airbyte-connector-builder-server: + <<: *datadogged-volumes + environment: + <<: *datadogged-environment + DD_SERVICE: airbyte-connector-builder-server airbyte-cron: <<: *datadogged-volumes environment: diff --git a/docker-compose.debug.yaml b/docker-compose.debug.yaml index f53e5f22642..94f63e358d2 100644 --- a/docker-compose.debug.yaml +++ b/docker-compose.debug.yaml @@ -37,8 +37,16 @@ services: server: # You will need to create a remote JVM debugging Run Configuration # If you're on a Mac you will need to obtain the IP address of the container - # The value of DEBUG_SERVER_JAVA_OPTIONS should be the same as DEBUG_CONTAINER_JAVA_OPTS above + # The value of JAVA_TOOL_OPTIONS should be the same as DEBUG_CONTAINER_JAVA_OPTS above environment: - JAVA_TOOL_OPTIONS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5007 ports: - 5007:5007 + airbyte-connector-builder-server: + # You will need to create a remote JVM debugging Run Configuration + # If you're on a Mac you will need to obtain the IP address of the container + # The value of JAVA_TOOL_OPTIONS should be the same as DEBUG_CONTAINER_JAVA_OPTS above + environment: + - JAVA_TOOL_OPTIONS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5008 + ports: + - 5008:5008