diff --git a/swatch-contracts/build.gradle b/swatch-contracts/build.gradle
index b8419f2b47..25138675a1 100644
--- a/swatch-contracts/build.gradle
+++ b/swatch-contracts/build.gradle
@@ -2,6 +2,7 @@ plugins {
id 'swatch.quarkus-conventions'
id 'swatch.liquibase-conventions'
id 'org.openapi.generator'
+ id 'jsonschema2pojo'
}
repositories {
@@ -32,10 +33,12 @@ dependencies {
implementation("io.quarkus:quarkus-jdbc-postgresql")
implementation("io.quarkus:quarkus-hibernate-orm-panache")
implementation 'io.quarkus:quarkus-smallrye-reactive-messaging-amqp'
+ implementation 'io.quarkus:quarkus-smallrye-reactive-messaging-kafka'
implementation 'io.smallrye.reactive:smallrye-reactive-messaging-in-memory'
implementation project(':clients:swatch-internal-subscription-client')
implementation project(':clients:quarkus:subscription-client')
implementation project(':swatch-common-config-workaround')
+ implementation project(':swatch-common-kafka')
implementation project(':swatch-common-resteasy')
implementation project(':clients:quarkus:rbac-client')
implementation project(':clients:quarkus:product-client')
@@ -54,6 +57,7 @@ dependencies {
testImplementation 'io.quarkus:quarkus-test-security'
testImplementation libraries["junit-jupiter"]
testImplementation libraries["wiremock"]
+ testImplementation libraries["awaitility"]
testImplementation("io.quarkus:quarkus-jdbc-h2")
testAnnotationProcessor libraries["mapstruct-processor"]
// if you are using mapstruct in test code
@@ -93,4 +97,19 @@ openApiGenerate {
]
}
+jsonSchema2Pojo {
+ source = files("${projectDir}/../swatch-core/schemas/enabled_orgs_request.yaml", "${projectDir}/../swatch-core/schemas/enabled_orgs_response.yaml")
+ targetPackage = "com.redhat.swatch.contract.model"
+ targetDirectory = file("${buildDir}/generated/src/gen/java")
+ includeAdditionalProperties = false
+ includeJsr303Annotations = true
+ initializeCollections = false
+ dateTimeType = 'java.time.OffsetDateTime'
+ sourceType = 'yamlschema'
+ generateBuilders = true
+ includeGetters = true
+ includeSetters = true
+ useJakartaValidation = true
+}
+
sourceSets.main.java.srcDirs += ["${buildDir}/generated/src/gen/java"]
diff --git a/swatch-contracts/deploy/clowdapp.yaml b/swatch-contracts/deploy/clowdapp.yaml
index 0171a6b20e..b970d9238c 100644
--- a/swatch-contracts/deploy/clowdapp.yaml
+++ b/swatch-contracts/deploy/clowdapp.yaml
@@ -56,6 +56,20 @@ parameters:
value: /pinhead/keystore.jks
- name: PRODUCT_URL
value: https://product.stage.api.redhat.com/svcrest/product/v3
+ - name: SUBSCRIPTION_SYNC_ENABLED
+ value: 'true'
+ - name: KAFKA_ENABLED_ORGS_REPLICAS
+ value: '3'
+ - name: KAFKA_ENABLED_ORGS_PARTITIONS
+ value: '3'
+ - name: KAFKA_SUBSCRIPTION_PRUNE_TASK_REPLICAS
+ value: '3'
+ - name: KAFKA_SUBSCRIPTION_PRUNE_TASK_PARTITIONS
+ value: '3'
+ - name: KAFKA_SUBSCRIPTION_SYNC_TASK_REPLICAS
+ value: '3'
+ - name: KAFKA_SUBSCRIPTION_SYNC_TASK_PARTITIONS
+ value: '3'
objects:
- apiVersion: cloud.redhat.com/v1alpha1
@@ -77,6 +91,17 @@ objects:
- swatch-subscription-sync
- rbac
+ kafkaTopics:
+ - replicas: ${{KAFKA_ENABLED_ORGS_REPLICAS}}
+ partitions: ${{KAFKA_ENABLED_ORGS_PARTITIONS}}
+ topicName: platform.rhsm-subscriptions.enabled-orgs-for-tasks
+ - replicas: ${{KAFKA_SUBSCRIPTION_PRUNE_TASK_REPLICAS}}
+ partitions: ${{KAFKA_SUBSCRIPTION_PRUNE_TASK_PARTITIONS}}
+ topicName: platform.rhsm-subscriptions.subscription-prune-task
+ - replicas: ${{KAFKA_SUBSCRIPTION_SYNC_TASK_REPLICAS}}
+ partitions: ${{KAFKA_SUBSCRIPTION_SYNC_TASK_PARTITIONS}}
+ topicName: platform.rhsm-subscriptions.subscription-sync-task
+
# Creates a database if local mode, or uses RDS in production
# database:
# name: ${DB_POD}
@@ -219,6 +244,8 @@ objects:
fieldPath: metadata.namespace
- name: PRODUCT_URL
value: ${PRODUCT_URL}
+ - name: SUBSCRIPTION_SYNC_ENABLED
+ value: ${SUBSCRIPTION_SYNC_ENABLED}
volumeMounts:
- name: logs
mountPath: /logs
diff --git a/swatch-contracts/src/main/java/com/redhat/swatch/contract/config/ApplicationConfiguration.java b/swatch-contracts/src/main/java/com/redhat/swatch/contract/config/ApplicationConfiguration.java
new file mode 100644
index 0000000000..bdf6895a20
--- /dev/null
+++ b/swatch-contracts/src/main/java/com/redhat/swatch/contract/config/ApplicationConfiguration.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright Red Hat, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ * Red Hat trademarks are not licensed under GPLv3. No permission is
+ * granted to use or replicate Red Hat trademarks that are incorporated
+ * in this software or its documentation.
+ */
+package com.redhat.swatch.contract.config;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import lombok.Getter;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+
+@ApplicationScoped
+@Getter
+public class ApplicationConfiguration {
+ @ConfigProperty(name = "rhsm-subscriptions.subscription-sync-enabled")
+ boolean subscriptionSyncEnabled;
+}
diff --git a/swatch-contracts/src/main/java/com/redhat/swatch/contract/resource/ContractsTestingResource.java b/swatch-contracts/src/main/java/com/redhat/swatch/contract/resource/ContractsTestingResource.java
index 105576358f..c30ef9206c 100644
--- a/swatch-contracts/src/main/java/com/redhat/swatch/contract/resource/ContractsTestingResource.java
+++ b/swatch-contracts/src/main/java/com/redhat/swatch/contract/resource/ContractsTestingResource.java
@@ -20,6 +20,7 @@
*/
package com.redhat.swatch.contract.resource;
+import com.redhat.swatch.contract.config.ApplicationConfiguration;
import com.redhat.swatch.contract.openapi.model.AwsUsageContext;
import com.redhat.swatch.contract.openapi.model.AzureUsageContext;
import com.redhat.swatch.contract.openapi.model.Contract;
@@ -38,21 +39,27 @@
import com.redhat.swatch.contract.openapi.resource.DefaultApi;
import com.redhat.swatch.contract.repository.ContractEntity;
import com.redhat.swatch.contract.service.ContractService;
+import com.redhat.swatch.contract.service.EnabledOrgsProducer;
import jakarta.annotation.security.RolesAllowed;
import jakarta.enterprise.context.ApplicationScoped;
-import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
import jakarta.ws.rs.ProcessingException;
import java.time.OffsetDateTime;
import java.util.List;
+import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jboss.resteasy.reactive.common.NotImplementedYet;
@Slf4j
@ApplicationScoped
+@AllArgsConstructor
public class ContractsTestingResource implements DefaultApi {
- @Inject ContractService service;
+ public static final String FEATURE_NOT_ENABLED_MESSAGE = "This feature is not currently enabled.";
+
+ private final ContractService service;
+ private final EnabledOrgsProducer enabledOrgsProducer;
+ private final ApplicationConfiguration applicationConfiguration;
/**
* Create contract record in database from provided contract dto payload
@@ -198,7 +205,8 @@ public OfferingProductTags getSkuProductTags(String sku) throws ProcessingExcept
@Override
@RolesAllowed({"test", "support", "service"})
public RpcResponse pruneUnlistedSubscriptions() throws ProcessingException {
- throw new NotImplementedYet();
+ enabledOrgsProducer.sendTaskForSubscriptionsPrune();
+ return new RpcResponse();
}
@Override
@@ -217,7 +225,15 @@ public OfferingResponse syncAllOfferings() throws ProcessingException {
@Override
@RolesAllowed({"test", "support", "service"})
public RpcResponse syncAllSubscriptions(Boolean forceSync) throws ProcessingException {
- throw new NotImplementedYet();
+ var response = new RpcResponse();
+ if (Boolean.FALSE.equals(forceSync) && !applicationConfiguration.isSubscriptionSyncEnabled()) {
+ log.info(
+ "Will not sync subscriptions for all opted-in orgs even though job was scheduled because subscriptionSyncEnabled=false.");
+ response.setResult(FEATURE_NOT_ENABLED_MESSAGE);
+ return response;
+ }
+ enabledOrgsProducer.sendTaskForSubscriptionsSync();
+ return response;
}
@Override
diff --git a/swatch-contracts/src/main/java/com/redhat/swatch/contract/service/EnabledOrgsProducer.java b/swatch-contracts/src/main/java/com/redhat/swatch/contract/service/EnabledOrgsProducer.java
new file mode 100644
index 0000000000..8878b5623a
--- /dev/null
+++ b/swatch-contracts/src/main/java/com/redhat/swatch/contract/service/EnabledOrgsProducer.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright Red Hat, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ * Red Hat trademarks are not licensed under GPLv3. No permission is
+ * granted to use or replicate Red Hat trademarks that are incorporated
+ * in this software or its documentation.
+ */
+package com.redhat.swatch.contract.service;
+
+import com.redhat.swatch.contract.model.EnabledOrgsRequest;
+import jakarta.enterprise.context.ApplicationScoped;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.eclipse.microprofile.reactive.messaging.Channel;
+import org.eclipse.microprofile.reactive.messaging.Emitter;
+
+@ApplicationScoped
+public class EnabledOrgsProducer {
+
+ public static final String SUBSCRIPTION_SYNC_TASK_TOPIC =
+ "mp.messaging.incoming.subscription-sync-task.topic";
+ public static final String SUBSCRIPTION_PRUNE_TASK_TOPIC =
+ "mp.messaging.incoming.subscription-prune-task.topic";
+
+ private final Emitter emitter;
+ private final String subscriptionSyncTaskTopic;
+ private final String subscriptionPruneTaskTopic;
+
+ public EnabledOrgsProducer(
+ @Channel("enabled-orgs") Emitter emitter,
+ @ConfigProperty(name = SUBSCRIPTION_SYNC_TASK_TOPIC) String subscriptionSyncTaskTopic,
+ @ConfigProperty(name = SUBSCRIPTION_PRUNE_TASK_TOPIC) String subscriptionPruneTaskTopic) {
+ this.emitter = emitter;
+ this.subscriptionSyncTaskTopic = subscriptionSyncTaskTopic;
+ this.subscriptionPruneTaskTopic = subscriptionPruneTaskTopic;
+ }
+
+ public void sendTaskForSubscriptionsPrune() {
+ sendTask(subscriptionPruneTaskTopic);
+ }
+
+ public void sendTaskForSubscriptionsSync() {
+ sendTask(subscriptionSyncTaskTopic);
+ }
+
+ private void sendTask(String topic) {
+ emitter.send(new EnabledOrgsRequest().withTargetTopic(topic));
+ }
+}
diff --git a/swatch-contracts/src/main/java/com/redhat/swatch/contract/service/SubscriptionPruneTaskConsumer.java b/swatch-contracts/src/main/java/com/redhat/swatch/contract/service/SubscriptionPruneTaskConsumer.java
new file mode 100644
index 0000000000..c489170479
--- /dev/null
+++ b/swatch-contracts/src/main/java/com/redhat/swatch/contract/service/SubscriptionPruneTaskConsumer.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright Red Hat, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ * Red Hat trademarks are not licensed under GPLv3. No permission is
+ * granted to use or replicate Red Hat trademarks that are incorporated
+ * in this software or its documentation.
+ */
+package com.redhat.swatch.contract.service;
+
+import com.redhat.swatch.contract.model.EnabledOrgsResponse;
+import jakarta.enterprise.context.ApplicationScoped;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+
+@Slf4j
+@ApplicationScoped
+public class SubscriptionPruneTaskConsumer {
+ @Incoming("subscription-prune-task")
+ public void consume(EnabledOrgsResponse message) {
+ log.info("Received task for subscription prune with org ID: {}", message.getOrgId());
+ // Implementation will be done as part of SWATCH-2281
+ }
+}
diff --git a/swatch-contracts/src/main/java/com/redhat/swatch/contract/service/SubscriptionSyncTaskConsumer.java b/swatch-contracts/src/main/java/com/redhat/swatch/contract/service/SubscriptionSyncTaskConsumer.java
new file mode 100644
index 0000000000..a5aeed212f
--- /dev/null
+++ b/swatch-contracts/src/main/java/com/redhat/swatch/contract/service/SubscriptionSyncTaskConsumer.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright Red Hat, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ * Red Hat trademarks are not licensed under GPLv3. No permission is
+ * granted to use or replicate Red Hat trademarks that are incorporated
+ * in this software or its documentation.
+ */
+package com.redhat.swatch.contract.service;
+
+import com.redhat.swatch.contract.model.EnabledOrgsResponse;
+import jakarta.enterprise.context.ApplicationScoped;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+
+@Slf4j
+@ApplicationScoped
+public class SubscriptionSyncTaskConsumer {
+ @Incoming("subscription-sync-task")
+ public void consume(EnabledOrgsResponse message) {
+ log.info("Received task for subscription sync with org ID: {}", message.getOrgId());
+ // Implementation will be done as part of SWATCH-2281
+ }
+}
diff --git a/swatch-contracts/src/main/java/com/redhat/swatch/contract/service/json/EnabledOrgsResponseDeserializer.java b/swatch-contracts/src/main/java/com/redhat/swatch/contract/service/json/EnabledOrgsResponseDeserializer.java
new file mode 100644
index 0000000000..fca7ef6960
--- /dev/null
+++ b/swatch-contracts/src/main/java/com/redhat/swatch/contract/service/json/EnabledOrgsResponseDeserializer.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright Red Hat, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ * Red Hat trademarks are not licensed under GPLv3. No permission is
+ * granted to use or replicate Red Hat trademarks that are incorporated
+ * in this software or its documentation.
+ */
+package com.redhat.swatch.contract.service.json;
+
+import com.redhat.swatch.contract.model.EnabledOrgsResponse;
+import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;
+
+/** Provides quarkus a hint that we want to use Jackson to serialize EnabledOrgsResponse objects. */
+public class EnabledOrgsResponseDeserializer extends ObjectMapperDeserializer {
+ public EnabledOrgsResponseDeserializer() {
+ super(EnabledOrgsResponse.class);
+ }
+}
diff --git a/swatch-contracts/src/main/resources/application.properties b/swatch-contracts/src/main/resources/application.properties
index 6f7d93ff3e..f6929fbbaf 100644
--- a/swatch-contracts/src/main/resources/application.properties
+++ b/swatch-contracts/src/main/resources/application.properties
@@ -2,11 +2,11 @@ SERVER_PORT=${clowder.endpoints.swatch-contracts.port:8000}
SWATCH_INTERNAL_SUBSCRIPTION_ENDPOINT=${clowder.endpoints.swatch-subscription-sync-service.url}
LOGGING_LEVEL_COM_REDHAT_SWATCH=INFO
LOGGING_LEVEL_ROOT=INFO
-DATABASE_HOST: ${clowder.database.hostname:localhost}
-DATABASE_PORT: ${clowder.database.port:5432}
-DATABASE_DATABASE: ${clowder.database.name:rhsm-subscriptions}
-DATABASE_USERNAME: ${clowder.database.username:rhsm-subscriptions}
-DATABASE_PASSWORD: ${clowder.database.password:rhsm-subscriptions}
+DATABASE_HOST=${clowder.database.hostname:localhost}
+DATABASE_PORT=${clowder.database.port:5432}
+DATABASE_DATABASE=${clowder.database.name:rhsm-subscriptions}
+DATABASE_USERNAME=${clowder.database.username:rhsm-subscriptions}
+DATABASE_PASSWORD=${clowder.database.password:rhsm-subscriptions}
SWATCH_TEST_APIS_ENABLED=true
%ephemeral.SWATCH_TEST_APIS_ENABLED=true
@@ -78,8 +78,6 @@ quarkus.datasource.username=${DATABASE_USERNAME}
quarkus.datasource.password=${DATABASE_PASSWORD}
quarkus.datasource.jdbc.url=jdbc:postgresql://${DATABASE_HOST}:${DATABASE_PORT}/${DATABASE_DATABASE}
-
-
%test.quarkus.datasource.db-kind=h2
# NOTE: because some of the entities use columns named "value", it is necessary to configure h2 to *not* treat it as a keyword.
%test.quarkus.datasource.jdbc.url=jdbc:h2:mem:db;NON_KEYWORDS=VALUE
@@ -217,12 +215,27 @@ CONTRACT_UMB_QUEUE=umb-contract
amqp-host=${UMB_HOSTNAME}
amqp-port=${UMB_PORT}
+quarkus.kafka.devservices.enabled=false
+# Clowder quarkus config takes care of setting the common kafka settings
+kafka.bootstrap.servers=localhost:9092
+
mp.messaging.incoming.contracts.address=${CONTRACT_UMB_QUEUE}
mp.messaging.incoming.contracts.client-options-name=umb
mp.messaging.outgoing.contractstest.address=${CONTRACT_UMB_QUEUE}
mp.messaging.outgoing.contractstest.client-options-name=umb
+mp.messaging.incoming.subscription-sync-task.connector=smallrye-kafka
+mp.messaging.incoming.subscription-sync-task.topic=platform.rhsm-subscriptions.subscription-sync-task
+mp.messaging.incoming.subscription-sync-task.value.deserializer=com.redhat.swatch.contract.service.json.EnabledOrgsResponseDeserializer
+
+mp.messaging.incoming.subscription-prune-task.connector=smallrye-kafka
+mp.messaging.incoming.subscription-prune-task.topic=platform.rhsm-subscriptions.subscription-prune-task
+mp.messaging.incoming.subscription-prune-task.value.deserializer=com.redhat.swatch.contract.service.json.EnabledOrgsResponseDeserializer
+
+mp.messaging.outgoing.enabled-orgs.connector=smallrye-kafka
+mp.messaging.outgoing.enabled-orgs.topic=platform.rhsm-subscriptions.enabled-orgs-for-tasks
+mp.messaging.outgoing.enabled-orgs.value.serializer=io.quarkus.kafka.client.serialization.ObjectMapperSerializer
mp.messaging.incoming.contracts.connector=smallrye-in-memory
mp.messaging.outgoing.contractstest.connector=smallrye-in-memory
@@ -254,3 +267,5 @@ quarkus.smallrye-openapi.path=/api/${quarkus.application.name}/internal/openapi
quarkus.smallrye-openapi.management.enabled=false
quarkus.swagger-ui.always-include=true
quarkus.swagger-ui.path=/api/${quarkus.application.name}/internal/swagger-ui
+
+rhsm-subscriptions.subscription-sync-enabled=${SUBSCRIPTION_SYNC_ENABLED:true}
\ No newline at end of file
diff --git a/swatch-contracts/src/test/java/com/redhat/swatch/contract/resource/ContractsTestingResourceTest.java b/swatch-contracts/src/test/java/com/redhat/swatch/contract/resource/ContractsTestingResourceTest.java
new file mode 100644
index 0000000000..db554515e6
--- /dev/null
+++ b/swatch-contracts/src/test/java/com/redhat/swatch/contract/resource/ContractsTestingResourceTest.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright Red Hat, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ * Red Hat trademarks are not licensed under GPLv3. No permission is
+ * granted to use or replicate Red Hat trademarks that are incorporated
+ * in this software or its documentation.
+ */
+package com.redhat.swatch.contract.resource;
+
+import static com.redhat.swatch.contract.resource.ContractsTestingResource.FEATURE_NOT_ENABLED_MESSAGE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.redhat.swatch.contract.config.ApplicationConfiguration;
+import com.redhat.swatch.contract.service.EnabledOrgsProducer;
+import io.quarkus.test.InjectMock;
+import io.quarkus.test.junit.QuarkusTest;
+import io.quarkus.test.security.TestSecurity;
+import jakarta.inject.Inject;
+import org.junit.jupiter.api.Test;
+
+@QuarkusTest
+@TestSecurity(
+ user = "placeholder",
+ roles = {"service"})
+class ContractsTestingResourceTest {
+ @InjectMock ApplicationConfiguration applicationConfiguration;
+ @InjectMock EnabledOrgsProducer enabledOrgsProducer;
+ @Inject ContractsTestingResource resource;
+
+ @Test
+ void testSyncAllSubscriptionsWhenFeatureIsNotEnabled() {
+ when(applicationConfiguration.isSubscriptionSyncEnabled()).thenReturn(false);
+ var result = resource.syncAllSubscriptions(false);
+ assertEquals(FEATURE_NOT_ENABLED_MESSAGE, result.getResult());
+ verify(enabledOrgsProducer, times(0)).sendTaskForSubscriptionsSync();
+ }
+
+ @Test
+ void testSyncAllSubscriptionsWhenFeatureIsNotEnabledButForce() {
+ when(applicationConfiguration.isSubscriptionSyncEnabled()).thenReturn(false);
+ var result = resource.syncAllSubscriptions(true);
+ assertNull(result.getResult());
+ verify(enabledOrgsProducer).sendTaskForSubscriptionsSync();
+ }
+
+ @Test
+ void testSyncAllSubscriptionsWhenFeatureIsEnabled() {
+ when(applicationConfiguration.isSubscriptionSyncEnabled()).thenReturn(true);
+ var result = resource.syncAllSubscriptions(false);
+ assertNull(result.getResult());
+ verify(enabledOrgsProducer).sendTaskForSubscriptionsSync();
+ }
+
+ @Test
+ void testPruneUnlistedSubscriptions() {
+ var result = resource.pruneUnlistedSubscriptions();
+ assertNull(result.getResult());
+ verify(enabledOrgsProducer).sendTaskForSubscriptionsPrune();
+ }
+}
diff --git a/swatch-contracts/src/test/java/com/redhat/swatch/contract/service/EnabledOrgsProducerTest.java b/swatch-contracts/src/test/java/com/redhat/swatch/contract/service/EnabledOrgsProducerTest.java
new file mode 100644
index 0000000000..97bf5687e9
--- /dev/null
+++ b/swatch-contracts/src/test/java/com/redhat/swatch/contract/service/EnabledOrgsProducerTest.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright Red Hat, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ * Red Hat trademarks are not licensed under GPLv3. No permission is
+ * granted to use or replicate Red Hat trademarks that are incorporated
+ * in this software or its documentation.
+ */
+package com.redhat.swatch.contract.service;
+
+import static com.redhat.swatch.contract.service.EnabledOrgsProducer.SUBSCRIPTION_PRUNE_TASK_TOPIC;
+import static com.redhat.swatch.contract.service.EnabledOrgsProducer.SUBSCRIPTION_SYNC_TASK_TOPIC;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import com.redhat.swatch.contract.model.EnabledOrgsRequest;
+import com.redhat.swatch.contract.test.resources.InMemoryMessageBrokerKafkaResource;
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+import io.smallrye.reactive.messaging.memory.InMemoryConnector;
+import io.smallrye.reactive.messaging.memory.InMemorySink;
+import jakarta.enterprise.inject.Any;
+import jakarta.inject.Inject;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+@QuarkusTest
+@QuarkusTestResource(
+ value = InMemoryMessageBrokerKafkaResource.class,
+ restrictToAnnotatedClass = true)
+class EnabledOrgsProducerTest {
+
+ private static final String ENABLED_ORGS_CHANNEL = "enabled-orgs";
+
+ @ConfigProperty(name = SUBSCRIPTION_SYNC_TASK_TOPIC)
+ String subscriptionSyncTaskTopic;
+
+ @ConfigProperty(name = SUBSCRIPTION_PRUNE_TASK_TOPIC)
+ String subscriptionPruneTaskTopic;
+
+ @Inject @Any InMemoryConnector connector;
+ @Inject EnabledOrgsProducer producer;
+
+ InMemorySink sink;
+
+ @BeforeEach
+ void setup() {
+ sink = connector.sink(ENABLED_ORGS_CHANNEL);
+ sink.clear();
+ }
+
+ @Test
+ void testSendTaskForSubscriptionsPrune() {
+ producer.sendTaskForSubscriptionsPrune();
+ verifyRequestContainsTargetTopic(subscriptionPruneTaskTopic);
+ }
+
+ @Test
+ void testSendTaskForSubscriptionsSync() {
+ producer.sendTaskForSubscriptionsSync();
+ verifyRequestContainsTargetTopic(subscriptionSyncTaskTopic);
+ }
+
+ private void verifyRequestContainsTargetTopic(String expectedTopic) {
+ var message = sink.received().get(0);
+ assertEquals(expectedTopic, message.getPayload().getTargetTopic());
+ }
+}
diff --git a/swatch-contracts/src/test/java/com/redhat/swatch/contract/service/SubscriptionPruneTaskConsumerTest.java b/swatch-contracts/src/test/java/com/redhat/swatch/contract/service/SubscriptionPruneTaskConsumerTest.java
new file mode 100644
index 0000000000..96e23425c7
--- /dev/null
+++ b/swatch-contracts/src/test/java/com/redhat/swatch/contract/service/SubscriptionPruneTaskConsumerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright Red Hat, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ * Red Hat trademarks are not licensed under GPLv3. No permission is
+ * granted to use or replicate Red Hat trademarks that are incorporated
+ * in this software or its documentation.
+ */
+package com.redhat.swatch.contract.service;
+
+import static org.mockito.Mockito.verify;
+
+import com.redhat.swatch.contract.model.EnabledOrgsResponse;
+import com.redhat.swatch.contract.test.resources.InMemoryMessageBrokerKafkaResource;
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+import io.quarkus.test.junit.mockito.InjectSpy;
+import io.smallrye.reactive.messaging.memory.InMemoryConnector;
+import io.smallrye.reactive.messaging.memory.InMemorySource;
+import jakarta.enterprise.inject.Any;
+import jakarta.inject.Inject;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+@QuarkusTest
+@QuarkusTestResource(
+ value = InMemoryMessageBrokerKafkaResource.class,
+ restrictToAnnotatedClass = true)
+class SubscriptionPruneTaskConsumerTest {
+
+ private static final String ORG_ID = "org123";
+
+ @Inject @Any InMemoryConnector connector;
+ @InjectSpy SubscriptionPruneTaskConsumer consumer;
+ InMemorySource source;
+
+ @BeforeEach
+ void setup() {
+ source = connector.source("subscription-prune-task");
+ }
+
+ @Test
+ void whenConsumeMessage() {
+ var response = new EnabledOrgsResponse().withOrgId(ORG_ID);
+ source.send(response);
+ Awaitility.await().untilAsserted(() -> verify(consumer).consume(response));
+ }
+}
diff --git a/swatch-contracts/src/test/java/com/redhat/swatch/contract/service/SubscriptionSyncTaskConsumerTest.java b/swatch-contracts/src/test/java/com/redhat/swatch/contract/service/SubscriptionSyncTaskConsumerTest.java
new file mode 100644
index 0000000000..e324581d1f
--- /dev/null
+++ b/swatch-contracts/src/test/java/com/redhat/swatch/contract/service/SubscriptionSyncTaskConsumerTest.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright Red Hat, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ * Red Hat trademarks are not licensed under GPLv3. No permission is
+ * granted to use or replicate Red Hat trademarks that are incorporated
+ * in this software or its documentation.
+ */
+package com.redhat.swatch.contract.service;
+
+import static org.mockito.Mockito.verify;
+
+import com.redhat.swatch.contract.model.EnabledOrgsResponse;
+import com.redhat.swatch.contract.test.resources.InMemoryMessageBrokerKafkaResource;
+import io.quarkus.test.common.QuarkusTestResource;
+import io.quarkus.test.junit.QuarkusTest;
+import io.quarkus.test.junit.mockito.InjectSpy;
+import io.smallrye.reactive.messaging.memory.InMemoryConnector;
+import io.smallrye.reactive.messaging.memory.InMemorySource;
+import jakarta.enterprise.inject.Any;
+import jakarta.inject.Inject;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+@QuarkusTest
+@QuarkusTestResource(
+ value = InMemoryMessageBrokerKafkaResource.class,
+ restrictToAnnotatedClass = true)
+class SubscriptionSyncTaskConsumerTest {
+
+ private static final String ORG_ID = "org123";
+
+ @Inject @Any InMemoryConnector connector;
+ @InjectSpy SubscriptionSyncTaskConsumer consumer;
+ InMemorySource source;
+
+ @BeforeEach
+ void setup() {
+ source = connector.source("subscription-sync-task");
+ }
+
+ @Test
+ void whenConsumeMessage() {
+ var response = new EnabledOrgsResponse().withOrgId(ORG_ID);
+ source.send(response);
+ Awaitility.await().untilAsserted(() -> verify(consumer).consume(response));
+ }
+}
diff --git a/swatch-contracts/src/test/java/com/redhat/swatch/contract/test/resources/InMemoryMessageBrokerKafkaResource.java b/swatch-contracts/src/test/java/com/redhat/swatch/contract/test/resources/InMemoryMessageBrokerKafkaResource.java
new file mode 100644
index 0000000000..279a5bae69
--- /dev/null
+++ b/swatch-contracts/src/test/java/com/redhat/swatch/contract/test/resources/InMemoryMessageBrokerKafkaResource.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright Red Hat, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ *
+ * Red Hat trademarks are not licensed under GPLv3. No permission is
+ * granted to use or replicate Red Hat trademarks that are incorporated
+ * in this software or its documentation.
+ */
+package com.redhat.swatch.contract.test.resources;
+
+import io.quarkus.test.common.QuarkusTestResourceLifecycleManager;
+import io.smallrye.reactive.messaging.memory.InMemoryConnector;
+import java.util.HashMap;
+import java.util.Map;
+
+public class InMemoryMessageBrokerKafkaResource implements QuarkusTestResourceLifecycleManager {
+
+ @Override
+ public Map start() {
+ Map env = new HashMap<>();
+ env.putAll(InMemoryConnector.switchIncomingChannelsToInMemory("subscription-prune-task"));
+ env.putAll(InMemoryConnector.switchIncomingChannelsToInMemory("subscription-sync-task"));
+ env.putAll(InMemoryConnector.switchOutgoingChannelsToInMemory("enabled-orgs"));
+ return env;
+ }
+
+ @Override
+ public void stop() {
+ InMemoryConnector.clear();
+ }
+}