Skip to content

Commit

Permalink
[ISSUE #25805] parsing error output even when InputStream is not null…
Browse files Browse the repository at this point in the history
… (#6398)
  • Loading branch information
maxi297 committed May 11, 2023
1 parent 6b3ec32 commit 9c5b717
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.connector_builder.ApmTraceConstants;
import io.airbyte.connector_builder.exceptions.AirbyteCdkInvalidInputException;
import io.airbyte.connector_builder.exceptions.CdkProcessException;
import io.airbyte.connector_builder.exceptions.CdkUnknownException;
import io.airbyte.connector_builder.requester.AirbyteCdkRequesterImpl;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
Expand All @@ -23,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -45,12 +47,11 @@ AirbyteRecordMessage parse(
try {
messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, timeOut);
} catch (final NullPointerException exc) {
checkProcessError(process, cdkCommand);
throwCdkException(process, cdkCommand);
}

if (messagesByType == null) {
throw new AirbyteCdkInvalidInputException(
String.format("No records found for %s request.", cdkCommand));
if (messagesByType == null || messagesByType.isEmpty()) {
throwCdkException(process, cdkCommand);
}

final Optional<AirbyteRecordMessage> record = messagesByType
Expand All @@ -73,27 +74,32 @@ AirbyteRecordMessage parse(
"Error response from CDK: {}\n{}",
traceMessage.getError().getMessage(),
traceMessage.getError().getStackTrace());
throw new AirbyteCdkInvalidInputException("AirbyteTraceMessage response from CDK.", trace.get());
} else {
throw new AirbyteCdkInvalidInputException(
String.format("No records found for %s request.", cdkCommand));
throw new AirbyteCdkInvalidInputException("AirbyteTraceMessage response from CDK.", traceMessage);
}
throw generateError(process, cdkCommand);
}

void checkProcessError(final Process process, final String cdkCommand)
throws CdkProcessException {
private void throwCdkException(final Process process, final String cdkCommand) {
throw generateError(process, cdkCommand);
}

private RuntimeException generateError(final Process process, final String cdkCommand)
throws CdkProcessException, CdkUnknownException {
final int exitCode = process.exitValue();
if (exitCode == 0) {
return;
final String errorMessage = String.format(
"The CDK command `%s` completed properly but no records nor trace were found. Logs were: %s.", cdkCommand, process.exitValue());
LOGGER.error(errorMessage);
return new CdkUnknownException(errorMessage);
}

final InputStream errStream = process.getErrorStream();
final BufferedReader stderr = IOs.newBufferedReader(errStream);
final String error = stderr.toString();
final String error = stderr.lines().collect(Collectors.joining());

throw new CdkProcessException(
String.format("CDK subprocess for %s finished with exit code %d. error=%s",
cdkCommand, exitCode, error));
final String errorMessage = String.format("CDK subprocess for %s finished with exit code %d. error=%s", cdkCommand, exitCode, error);
LOGGER.error(errorMessage);
return new CdkProcessException(errorMessage);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
@Produces
@Singleton
@Requires(classes = AirbyteCdkInvalidInputException.class)
@Requires(classes = CdkProcessException.class)
public class CdkProcessExceptionHandler implements ExceptionHandler<CdkProcessException, HttpResponse> {

final ExceptionHelper helper = new ExceptionHelper();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.connector_builder.exceptions;

import io.airbyte.commons.server.errors.KnownException;
import io.micronaut.http.HttpStatus;

/**
* Thrown when the CDK encountered an error when processing the request.
*/
public class CdkUnknownException extends KnownException {

public CdkUnknownException(final String message) {
super(message);
}

@Override
public int getHttpCode() {
return HttpStatus.INTERNAL_SERVER_ERROR.getCode();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.connector_builder.exceptions;

import io.micronaut.context.annotation.Requires;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.HttpResponse;
import io.micronaut.http.annotation.Produces;
import io.micronaut.http.server.exceptions.ExceptionHandler;
import jakarta.inject.Singleton;

/**
* Custom Micronaut exception handler for the {@link CdkUnknownException}.
*/
@Produces
@Singleton
@Requires(classes = CdkUnknownException.class)
public class CdkUnknownExceptionHandler implements ExceptionHandler<CdkUnknownException, HttpResponse> {

final ExceptionHelper helper = new ExceptionHelper();

@Override
public HttpResponse handle(final HttpRequest request, final CdkUnknownException exception) {
return helper.handle(request, exception);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.airbyte.connector_builder.command_runner.SynchronousCdkCommandRunner;
import io.airbyte.connector_builder.exceptions.AirbyteCdkInvalidInputException;
import io.airbyte.connector_builder.exceptions.CdkProcessException;
import io.airbyte.connector_builder.exceptions.CdkUnknownException;
import io.airbyte.connector_builder.exceptions.ConnectorBuilderException;
import io.airbyte.connector_builder.file_writer.MockAirbyteFileWriterImpl;
import io.airbyte.connector_builder.handlers.HealthHandler;
Expand Down Expand Up @@ -158,8 +159,8 @@ void testResolveManifestNoRecordsReturnsError() {
resolveManifestRequestBody.setManifest(validManifest);

final Exception exception =
Assertions.assertThrows(AirbyteCdkInvalidInputException.class, () -> controller.resolveManifest(resolveManifestRequestBody));
assertTrue(exception.getMessage().contains("No records found"));
Assertions.assertThrows(CdkUnknownException.class, () -> controller.resolveManifest(resolveManifestRequestBody));
assertTrue(exception.getMessage().contains("no records nor trace were found"));
assertNotNull(exception.getStackTrace());
}

Expand All @@ -182,11 +183,35 @@ void testResolveManifestTraceResponseReturnsError() {

@Test
void testResolveManifestNonzeroExitCodeReturnsError() {
final InputStream stream = new ByteArrayInputStream(
final InputStream errorStream = new ByteArrayInputStream(
StandardCharsets.UTF_8.encode(cdkException).array());

final ConnectorBuilderController controller = createControllerWithSynchronousRunner(
false, 1, null, errorStream, null);

final ResolveManifestRequestBody resolveManifestRequestBody = new ResolveManifestRequestBody();
resolveManifestRequestBody.setManifest(validManifest);

final Exception exception = Assertions.assertThrows(CdkProcessException.class, () -> controller.resolveManifest(resolveManifestRequestBody));
assertTrue(exception.getMessage().contains("CDK subprocess for resolve_manifest finished with exit code 1."));
assertNotNull(exception.getStackTrace());
}

@Test
void testResolveManifestNonzeroExitCodeAndInputStreamReturnsError() {
final InputStream emptyInputStream = new InputStream() {

@Override
public int read() {
return -1;
}

};
final InputStream errorStream = new ByteArrayInputStream(
StandardCharsets.UTF_8.encode(cdkException).array());

final ConnectorBuilderController controller = createControllerWithSynchronousRunner(
false, 1, null, stream, null);
false, 1, emptyInputStream, errorStream, null);

final ResolveManifestRequestBody resolveManifestRequestBody = new ResolveManifestRequestBody();
resolveManifestRequestBody.setManifest(validManifest);
Expand Down

0 comments on commit 9c5b717

Please sign in to comment.