Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PYIC-6584: POC for async SQS messaging #2020

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ awsSdkKms = { module = "software.amazon.awssdk:kms" }
awsSdkLambda = { module = "software.amazon.awssdk:lambda" }
awsSdkSqs = { module = "software.amazon.awssdk:sqs" }
awsSdkUrlConnectionClient = { module = "software.amazon.awssdk:url-connection-client" }
awsSdkCrtClient = { module = "software.amazon.awssdk:aws-crt-client" }
commonsCodec = "commons-codec:commons-codec:1.17.0"
hamcrest = "org.hamcrest:hamcrest:2.2"
jacksonDatabind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ public Map<String, Object> handleRequest(JourneyRequest event, Context context)
return new JourneyErrorResponse(
JOURNEY_ERROR_PATH, e.getResponseCode(), e.getErrorResponse())
.toObjectMap();
} finally {
auditService.awaitAuditEvents();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scary thing about this approach is that if we don't call this, then the lambda runtime will suspend and (potentially) interrupt an in-progress request.

I can't think of a great solution to this beyond 'make sure it's included in all the handlers' and 'never construct a new audit service outside of a handler'.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we'd catch this in the pipeline before any mistakes hit prod?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not, which is the scary part :/

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ public APIGatewayProxyResponseEvent handleRequest(
LogHelper.buildErrorMessage("Failed to check if stronger vot vc present.", e));
return ApiGatewayResponseGenerator.proxyJsonResponse(
HttpStatus.SC_BAD_REQUEST, ErrorResponse.FAILED_TO_PARSE_ISSUED_CREDENTIALS);
} finally {
auditService.awaitAuditEvents();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ public Map<String, Object> handleRequest(JourneyRequest journeyRequest, Context
} catch (SqsException e) {
return StepFunctionHelpers.generateErrorOutputMap(
HttpStatus.SC_INTERNAL_SERVER_ERROR, ErrorResponse.FAILED_TO_SEND_AUDIT_EVENT);
} finally {
auditService.awaitAuditEvents();
}
}

Expand Down
1 change: 1 addition & 0 deletions libs/audit-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ repositories {
dependencies {
implementation platform(libs.awsSdkBom),
libs.awsSdkUrlConnectionClient,
libs.awsSdkCrtClient,
libs.powertoolsLogging,
libs.powertoolsParameters,
project(":libs:common-services"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package uk.gov.di.ipv.core.library.service;

public class AuditException extends RuntimeException {
public AuditException(String message, Exception e) {
super(message, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,76 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.services.sqs.SqsClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.http.crt.AwsCrtAsyncHttpClient;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
import uk.gov.di.ipv.core.library.auditing.AuditEvent;
import uk.gov.di.ipv.core.library.exceptions.SqsException;
import uk.gov.di.ipv.core.library.helpers.LogHelper;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static software.amazon.awssdk.regions.Region.EU_WEST_2;
import static uk.gov.di.ipv.core.library.config.EnvironmentVariable.SQS_AUDIT_EVENT_QUEUE_URL;

public class AuditService {
private final SqsClient sqs;
private static final Logger LOGGER = LogManager.getLogger();

private final SqsAsyncClient sqs;
private final String queueUrl;
private final ObjectMapper objectMapper;

public AuditService(SqsClient sqs, ConfigService configService) {
private List<CompletableFuture<SendMessageResponse>> events = new ArrayList<>();

public AuditService(SqsAsyncClient sqs, ConfigService configService) {
this.sqs = sqs;
this.queueUrl = configService.getEnvironmentVariable(SQS_AUDIT_EVENT_QUEUE_URL);
this.objectMapper = new ObjectMapper();
}

public AuditService(SqsClient sqs, ConfigService configService, ObjectMapper objectMapper) {
public AuditService(
SqsAsyncClient sqs, ConfigService configService, ObjectMapper objectMapper) {
this.sqs = sqs;
this.queueUrl = configService.getEnvironmentVariable(SQS_AUDIT_EVENT_QUEUE_URL);
this.objectMapper = objectMapper;
}

public static SqsClient getSqsClient() {
return SqsClient.builder()
public static SqsAsyncClient getSqsClient() {
return SqsAsyncClient.builder()
.region(EU_WEST_2)
.httpClientBuilder(UrlConnectionHttpClient.builder())
.httpClientBuilder(AwsCrtAsyncHttpClient.builder())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The AWS docs seem to imply that we should use the AWS CRT client for the sync case as well (https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/http-configuration.html#http-clients-recommend) - unless it's a snapstart thing to prefer the UrlConnectionHttpClient?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.httpClientBuilder(AwsCrtAsyncHttpClient.builder())
.httpClient(AwsCrtAsyncHttpClient.create())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the reason for preferring this? In any case, I probably won't change for this PR, but whoever picks up https://govukverify.atlassian.net/browse/PYIC-6883 can update if they wish

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think functionaly there's no difference, but if I was looking at it for the first time I'd wonder why we have to pass in a builder? Is something weird happening under the hood (it's not). But the way i've suggested seems more...expected?

Copy link
Contributor Author

@Joe-Edwards-GDS Joe-Edwards-GDS Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs show using builder unless you want to provide your own shared HTTP Client, but I suspect in our case it'll make no difference. (https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/http-configuration-crt.html)

I wonder if it changes the laziness at all (e.g. will it defer creating the client until it needs it?)

.build();
}

public void sendAuditEvent(AuditEvent auditEvent) throws SqsException {
try {
sqs.sendMessage(
SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(objectMapper.writeValueAsString(auditEvent))
.build());
var event =
sqs.sendMessage(
SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(objectMapper.writeValueAsString(auditEvent))
.build());
events.add(event);
} catch (JsonProcessingException e) {
throw new SqsException(e);
}
}

// Await audit events MUST be called before the end of each handler
public void awaitAuditEvents() {
try {
var eventsToAwait = events;
events = new ArrayList<>();
CompletableFuture.allOf(eventsToAwait.toArray(new CompletableFuture[0])).get();
} catch (InterruptedException | ExecutionException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we get an InterruptedException (which I can't imagine we will), do we need to call Thread.interrupted()?

LOGGER.error(LogHelper.buildErrorMessage("Failed to send audit event(s)", e));
throw new AuditException("Failed to send audit event(s)", e);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a RuntimeException to ease its use - this means an audit failure will fail the lambda, but this is probably a good thing!

I'm in discussion about whether any of the audit events can be marked as non-critical (i.e. we can ignore if they fail), but for now let's assume that they all are, because TICF may need them.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import uk.gov.di.ipv.core.library.auditing.AuditEvent;
import uk.gov.di.ipv.core.library.auditing.AuditEventTypes;
Expand Down Expand Up @@ -46,7 +46,7 @@ class AuditServiceTest {

private AuditService auditService;

@Mock private SqsClient mockSqs;
@Mock private SqsAsyncClient mockSqs;
@Mock private ConfigService mockConfigService;

@BeforeEach
Expand Down