Skip to content

Commit

Permalink
fix(inbound): report health UP (#2413)
Browse files Browse the repository at this point in the history
* fix(webhook): report health

* fix(kafka): report initial health

* fix(sns): report health

* fix(sqs): report health

* fix(sqs): report health

* fix(sqs): report health

* fix(sqs): report health

(cherry picked from commit ac7aba5)
  • Loading branch information
chillleader committed May 3, 2024
1 parent 87748be commit d759986
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.amazonaws.services.sns.message.SnsSubscriptionConfirmation;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.connector.api.annotation.InboundConnector;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.webhook.MappedHttpRequest;
import io.camunda.connector.api.inbound.webhook.WebhookConnectorExecutable;
Expand Down Expand Up @@ -72,6 +73,7 @@ public class SnsWebhookExecutable implements WebhookConnectorExecutable {
private final ObjectMapper objectMapper;
private final SnsClientSupplier snsClientSupplier;

private InboundConnectorContext context;
private SnsWebhookConnectorProperties props;

public SnsWebhookExecutable() {
Expand Down Expand Up @@ -142,9 +144,11 @@ public void activate(InboundConnectorContext context) throws Exception {
if (context == null) {
throw new Exception("Inbound connector context cannot be null");
}
this.context = context;
props =
new SnsWebhookConnectorProperties(
context.bindProperties(SnsWebhookConnectorPropertiesWrapper.class));
context.reportHealth(Health.up());
}

// Topic ARN header has a format arn:aws:sns:region-xyz:000011112222:TopicName, and
Expand All @@ -157,4 +161,9 @@ private String extractRegionFromTopicArnHeader(final Map<String, String> headers
() -> new Exception("SNS request did not contain header: " + TOPIC_ARN_HEADER));
return topicArn.split(":")[3];
}

@Override
public void deactivate() throws Exception {
context.reportHealth(Health.down());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.amazonaws.services.sqs.AmazonSQS;
import io.camunda.connector.api.annotation.InboundConnector;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorExecutable;
import io.camunda.connector.aws.AwsUtils;
Expand Down Expand Up @@ -70,6 +71,7 @@ public class SqsExecutable implements InboundConnectorExecutable {
private final ExecutorService executorService;
private AmazonSQS amazonSQS;
private SqsQueueConsumer sqsQueueConsumer;
private InboundConnectorContext context;

public SqsExecutable() {
this.sqsClientSupplier = new DefaultAmazonSQSClientSupplier();
Expand All @@ -90,6 +92,8 @@ public void activate(final InboundConnectorContext context) {
SqsInboundProperties properties = context.bindProperties(SqsInboundProperties.class);
LOGGER.info("Subscription activation requested by the Connector runtime: {}", properties);

this.context = context;

var region =
AwsUtils.extractRegionOrDefault(
properties.getConfiguration(), properties.getQueue().region());
Expand All @@ -102,12 +106,15 @@ public void activate(final InboundConnectorContext context) {
}
executorService.execute(sqsQueueConsumer);
LOGGER.debug("SQS queue consumer started successfully");
context.reportHealth(Health.up());
}

@Override
public void deactivate() {

sqsQueueConsumer.setQueueConsumerActive(false);
LOGGER.debug("Deactivating subscription");
context.reportHealth(Health.down());
if (executorService != null) {
LOGGER.debug("Shutting down executor service");
executorService.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import io.camunda.connector.api.error.ConnectorInputException;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.inbound.model.SqsInboundProperties;
import java.util.List;
Expand Down Expand Up @@ -61,6 +62,7 @@ public void run() {
}
} while (queueConsumerActive.get());
LOGGER.info("Stopping SQS consumer for queue {}", properties.getQueue().url());
context.reportHealth(Health.down());
}

private ReceiveMessageRequest createReceiveMessageRequest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.InboundConnectorDefinition;
import io.camunda.connector.api.inbound.ProcessElement;
import io.camunda.connector.aws.ObjectMapperSupplier;
import io.camunda.connector.common.suppliers.AmazonSQSClientSupplier;
import io.camunda.connector.inbound.model.SqsInboundProperties;
import io.camunda.connector.test.inbound.InboundConnectorContextBuilder;
import io.camunda.connector.test.inbound.InboundConnectorContextBuilder.TestInboundConnectorContext;
import io.camunda.connector.test.inbound.InboundConnectorDefinitionBuilder;
import io.camunda.connector.validation.impl.DefaultValidationProvider;
import java.io.File;
Expand Down Expand Up @@ -108,13 +110,26 @@ public void activateTest(Map<String, Object> properties) throws InterruptedExcep
@Test
public void deactivateTest() {
// Given
consumer = new SqsQueueConsumer(sqsClient, null, null);
Map<String, Object> properties =
Map.of(
"authentication",
Map.of(
"secretKey", ACTUAL_SECRET_KEY,
"accessKey", ACTUAL_ACCESS_KEY),
"configuration",
Map.of("region", "us-east-1"),
"queue",
Map.of("url", ACTUAL_QUEUE_URL, "pollingWaitTime", "1"));
var context = createConnectorContext(properties, createDefinition());
consumer = new SqsQueueConsumer(sqsClient, new SqsInboundProperties(), context);
consumer.setQueueConsumerActive(true);
SqsExecutable sqsExecutable = new SqsExecutable(supplier, executorService, consumer);
// When
sqsExecutable.activate(context);
sqsExecutable.deactivate();
// Then
assertThat(consumer.isQueueConsumerActive()).isFalse();
assertThat(context.getHealth()).isEqualTo(Health.down());
assertThat(executorService.isShutdown()).isTrue();
}

Expand All @@ -123,7 +138,7 @@ private InboundConnectorDefinition createDefinition() {
return InboundConnectorDefinitionBuilder.create().elements(element).type("type").build();
}

private InboundConnectorContext createConnectorContext(
private TestInboundConnectorContext createConnectorContext(
Map<String, Object> properties, InboundConnectorDefinition definition) {
return InboundConnectorContextBuilder.create()
.secret(AWS_SECRET_KEY, ACTUAL_SECRET_KEY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.inbound.model.SqsInboundProperties;
import io.camunda.connector.inbound.model.SqsInboundQueueProperties;
Expand Down Expand Up @@ -136,6 +137,7 @@ void consumeRun_withNoResults() throws InterruptedException {
consumer.setQueueConsumerActive(false);
thread.join();
// then
verifyNoInteractions(context);
verify(context).reportHealth(Health.down());
verifyNoMoreInteractions(context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class KafkaConnectorConsumer {

KafkaConnectorProperties elementProps;

private Health consumerStatus = Health.up();
private Health consumerStatus = Health.unknown();

public static ObjectMapper objectMapper =
new ObjectMapper()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import io.camunda.connector.api.annotation.InboundConnector;
import io.camunda.connector.api.inbound.Activity;
import io.camunda.connector.api.inbound.Health;
import io.camunda.connector.api.inbound.InboundConnectorContext;
import io.camunda.connector.api.inbound.Severity;
import io.camunda.connector.api.inbound.webhook.MappedHttpRequest;
Expand Down Expand Up @@ -102,6 +103,7 @@ public void activate(InboundConnectorContext context) {
props = new WebhookConnectorProperties(wrappedProps);
authChecker = WebhookAuthorizationHandler.getHandlerForAuth(props.auth());
responseExpression = mapResponseExpression();
context.reportHealth(Health.up());
}

@Override
Expand Down Expand Up @@ -223,4 +225,10 @@ public WebhookHttpResponse verify(WebhookProcessingPayload payload) {
}
return result;
}

@Override
public void deactivate() {
LOGGER.debug("Deactivating webhook connector");
context.reportHealth(Health.down());
}
}

0 comments on commit d759986

Please sign in to comment.