diff --git a/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/command_runner/ProcessOutputParser.java b/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/command_runner/ProcessOutputParser.java index 20f76eba470..54ecd7ac64e 100644 --- a/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/command_runner/ProcessOutputParser.java +++ b/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/command_runner/ProcessOutputParser.java @@ -9,6 +9,7 @@ import io.airbyte.connector_builder.ApmTraceConstants; import io.airbyte.connector_builder.exceptions.AirbyteCdkInvalidInputException; import io.airbyte.connector_builder.exceptions.CdkProcessException; +import io.airbyte.connector_builder.exceptions.CdkUnknownException; import io.airbyte.connector_builder.requester.AirbyteCdkRequesterImpl; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; @@ -23,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,12 +47,11 @@ AirbyteRecordMessage parse( try { messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, timeOut); } catch (final NullPointerException exc) { - checkProcessError(process, cdkCommand); + throwCdkException(process, cdkCommand); } - if (messagesByType == null) { - throw new AirbyteCdkInvalidInputException( - String.format("No records found for %s request.", cdkCommand)); + if (messagesByType == null || messagesByType.isEmpty()) { + throwCdkException(process, cdkCommand); } final Optional record = messagesByType @@ -73,27 +74,32 @@ AirbyteRecordMessage parse( "Error response from CDK: {}\n{}", traceMessage.getError().getMessage(), traceMessage.getError().getStackTrace()); - throw new AirbyteCdkInvalidInputException("AirbyteTraceMessage response from CDK.", trace.get()); - } else { - throw new AirbyteCdkInvalidInputException( - String.format("No records found for %s request.", cdkCommand)); + throw new AirbyteCdkInvalidInputException("AirbyteTraceMessage response from CDK.", traceMessage); } + throw generateError(process, cdkCommand); } - void checkProcessError(final Process process, final String cdkCommand) - throws CdkProcessException { + private void throwCdkException(final Process process, final String cdkCommand) { + throw generateError(process, cdkCommand); + } + + private RuntimeException generateError(final Process process, final String cdkCommand) + throws CdkProcessException, CdkUnknownException { final int exitCode = process.exitValue(); if (exitCode == 0) { - return; + final String errorMessage = String.format( + "The CDK command `%s` completed properly but no records nor trace were found. Logs were: %s.", cdkCommand, process.exitValue()); + LOGGER.error(errorMessage); + return new CdkUnknownException(errorMessage); } final InputStream errStream = process.getErrorStream(); final BufferedReader stderr = IOs.newBufferedReader(errStream); - final String error = stderr.toString(); + final String error = stderr.lines().collect(Collectors.joining()); - throw new CdkProcessException( - String.format("CDK subprocess for %s finished with exit code %d. error=%s", - cdkCommand, exitCode, error)); + final String errorMessage = String.format("CDK subprocess for %s finished with exit code %d. error=%s", cdkCommand, exitCode, error); + LOGGER.error(errorMessage); + return new CdkProcessException(errorMessage); } } diff --git a/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/exceptions/CdkProcessExceptionHandler.java b/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/exceptions/CdkProcessExceptionHandler.java index fe30a4fd6be..47a970b7eb3 100644 --- a/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/exceptions/CdkProcessExceptionHandler.java +++ b/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/exceptions/CdkProcessExceptionHandler.java @@ -16,7 +16,7 @@ */ @Produces @Singleton -@Requires(classes = AirbyteCdkInvalidInputException.class) +@Requires(classes = CdkProcessException.class) public class CdkProcessExceptionHandler implements ExceptionHandler { final ExceptionHelper helper = new ExceptionHelper(); diff --git a/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/exceptions/CdkUnknownException.java b/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/exceptions/CdkUnknownException.java new file mode 100644 index 00000000000..d1fce33496f --- /dev/null +++ b/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/exceptions/CdkUnknownException.java @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.connector_builder.exceptions; + +import io.airbyte.commons.server.errors.KnownException; +import io.micronaut.http.HttpStatus; + +/** + * Thrown when the CDK encountered an error when processing the request. + */ +public class CdkUnknownException extends KnownException { + + public CdkUnknownException(final String message) { + super(message); + } + + @Override + public int getHttpCode() { + return HttpStatus.INTERNAL_SERVER_ERROR.getCode(); + } + +} diff --git a/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/exceptions/CdkUnknownExceptionHandler.java b/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/exceptions/CdkUnknownExceptionHandler.java new file mode 100644 index 00000000000..02b51b1f3dd --- /dev/null +++ b/airbyte-connector-atelier-server/src/main/java/io/airbyte/connector_builder/exceptions/CdkUnknownExceptionHandler.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2023 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.connector_builder.exceptions; + +import io.micronaut.context.annotation.Requires; +import io.micronaut.http.HttpRequest; +import io.micronaut.http.HttpResponse; +import io.micronaut.http.annotation.Produces; +import io.micronaut.http.server.exceptions.ExceptionHandler; +import jakarta.inject.Singleton; + +/** + * Custom Micronaut exception handler for the {@link CdkUnknownException}. + */ +@Produces +@Singleton +@Requires(classes = CdkUnknownException.class) +public class CdkUnknownExceptionHandler implements ExceptionHandler { + + final ExceptionHelper helper = new ExceptionHelper(); + + @Override + public HttpResponse handle(final HttpRequest request, final CdkUnknownException exception) { + return helper.handle(request, exception); + } + +} diff --git a/airbyte-connector-atelier-server/src/test/java/io/airbyte/connector_builder/controllers/ConnectorBuilderControllerIntegrationTest.java b/airbyte-connector-atelier-server/src/test/java/io/airbyte/connector_builder/controllers/ConnectorBuilderControllerIntegrationTest.java index 830d2dfe358..003616e216b 100644 --- a/airbyte-connector-atelier-server/src/test/java/io/airbyte/connector_builder/controllers/ConnectorBuilderControllerIntegrationTest.java +++ b/airbyte-connector-atelier-server/src/test/java/io/airbyte/connector_builder/controllers/ConnectorBuilderControllerIntegrationTest.java @@ -22,6 +22,7 @@ import io.airbyte.connector_builder.command_runner.SynchronousCdkCommandRunner; import io.airbyte.connector_builder.exceptions.AirbyteCdkInvalidInputException; import io.airbyte.connector_builder.exceptions.CdkProcessException; +import io.airbyte.connector_builder.exceptions.CdkUnknownException; import io.airbyte.connector_builder.exceptions.ConnectorBuilderException; import io.airbyte.connector_builder.file_writer.MockAirbyteFileWriterImpl; import io.airbyte.connector_builder.handlers.HealthHandler; @@ -158,8 +159,8 @@ void testResolveManifestNoRecordsReturnsError() { resolveManifestRequestBody.setManifest(validManifest); final Exception exception = - Assertions.assertThrows(AirbyteCdkInvalidInputException.class, () -> controller.resolveManifest(resolveManifestRequestBody)); - assertTrue(exception.getMessage().contains("No records found")); + Assertions.assertThrows(CdkUnknownException.class, () -> controller.resolveManifest(resolveManifestRequestBody)); + assertTrue(exception.getMessage().contains("no records nor trace were found")); assertNotNull(exception.getStackTrace()); } @@ -182,11 +183,35 @@ void testResolveManifestTraceResponseReturnsError() { @Test void testResolveManifestNonzeroExitCodeReturnsError() { - final InputStream stream = new ByteArrayInputStream( + final InputStream errorStream = new ByteArrayInputStream( + StandardCharsets.UTF_8.encode(cdkException).array()); + + final ConnectorBuilderController controller = createControllerWithSynchronousRunner( + false, 1, null, errorStream, null); + + final ResolveManifestRequestBody resolveManifestRequestBody = new ResolveManifestRequestBody(); + resolveManifestRequestBody.setManifest(validManifest); + + final Exception exception = Assertions.assertThrows(CdkProcessException.class, () -> controller.resolveManifest(resolveManifestRequestBody)); + assertTrue(exception.getMessage().contains("CDK subprocess for resolve_manifest finished with exit code 1.")); + assertNotNull(exception.getStackTrace()); + } + + @Test + void testResolveManifestNonzeroExitCodeAndInputStreamReturnsError() { + final InputStream emptyInputStream = new InputStream() { + + @Override + public int read() { + return -1; + } + + }; + final InputStream errorStream = new ByteArrayInputStream( StandardCharsets.UTF_8.encode(cdkException).array()); final ConnectorBuilderController controller = createControllerWithSynchronousRunner( - false, 1, null, stream, null); + false, 1, emptyInputStream, errorStream, null); final ResolveManifestRequestBody resolveManifestRequestBody = new ResolveManifestRequestBody(); resolveManifestRequestBody.setManifest(validManifest);