Skip to content

Commit

Permalink
SWATCH-2356: Send request to enabled-orgs-for-tasks for prune and sync
Browse files Browse the repository at this point in the history
- The method "pruneUnlistedSubscriptions" from SWATCH-2277 should produce a single message to "platform.rhsm-subscriptions.enabled-orgs-for-tasks" with a message containing the topic name "platform.rhsm-subscriptions.subscription-prune-task".
- The method "syncAllSubscriptions" from SWATCH-2277 should produce a single message to "platform.rhsm-subscriptions.enabled-orgs-for-tasks" with a message containing the topic name "platform.rhsm-subscriptions.subscription-sync-task".
- Create the consumer of "platform.rhsm-subscriptions.subscription-prune-task" that receives a message that includes the orgId. Leave the implementation empty.
- Create the consumer of "platform.rhsm-subscriptions.subscription-sync-task" that receives a message that includes the orgId. Leave the implementation empty.
  • Loading branch information
Sgitario committed Apr 22, 2024
1 parent 6441529 commit a6eb075
Show file tree
Hide file tree
Showing 14 changed files with 606 additions and 11 deletions.
19 changes: 19 additions & 0 deletions swatch-contracts/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ plugins {
id 'swatch.quarkus-conventions'
id 'swatch.liquibase-conventions'
id 'org.openapi.generator'
id 'jsonschema2pojo'
}

repositories {
Expand Down Expand Up @@ -32,10 +33,12 @@ dependencies {
implementation("io.quarkus:quarkus-jdbc-postgresql")
implementation("io.quarkus:quarkus-hibernate-orm-panache")
implementation 'io.quarkus:quarkus-smallrye-reactive-messaging-amqp'
implementation 'io.quarkus:quarkus-smallrye-reactive-messaging-kafka'
implementation 'io.smallrye.reactive:smallrye-reactive-messaging-in-memory'
implementation project(':clients:swatch-internal-subscription-client')
implementation project(':clients:quarkus:subscription-client')
implementation project(':swatch-common-config-workaround')
implementation project(':swatch-common-kafka')
implementation project(':swatch-common-resteasy')
implementation project(':clients:quarkus:rbac-client')
implementation project(':clients:quarkus:product-client')
Expand All @@ -54,6 +57,7 @@ dependencies {
testImplementation 'io.quarkus:quarkus-test-security'
testImplementation libraries["junit-jupiter"]
testImplementation libraries["wiremock"]
testImplementation libraries["awaitility"]
testImplementation("io.quarkus:quarkus-jdbc-h2")
testAnnotationProcessor libraries["mapstruct-processor"]
// if you are using mapstruct in test code
Expand Down Expand Up @@ -93,4 +97,19 @@ openApiGenerate {
]
}

jsonSchema2Pojo {
source = files("${projectDir}/../swatch-core/schemas/enabled_orgs_request.yaml", "${projectDir}/../swatch-core/schemas/enabled_orgs_response.yaml")
targetPackage = "com.redhat.swatch.contract.model"
targetDirectory = file("${buildDir}/generated/src/gen/java")
includeAdditionalProperties = false
includeJsr303Annotations = true
initializeCollections = false
dateTimeType = 'java.time.OffsetDateTime'
sourceType = 'yamlschema'
generateBuilders = true
includeGetters = true
includeSetters = true
useJakartaValidation = true
}

sourceSets.main.java.srcDirs += ["${buildDir}/generated/src/gen/java"]
27 changes: 27 additions & 0 deletions swatch-contracts/deploy/clowdapp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,20 @@ parameters:
value: /pinhead/keystore.jks
- name: PRODUCT_URL
value: https://product.stage.api.redhat.com/svcrest/product/v3
- name: SUBSCRIPTION_SYNC_ENABLED
value: 'true'
- name: KAFKA_ENABLED_ORGS_REPLICAS
value: '3'
- name: KAFKA_ENABLED_ORGS_PARTITIONS
value: '3'
- name: KAFKA_SUBSCRIPTION_PRUNE_TASK_REPLICAS
value: '3'
- name: KAFKA_SUBSCRIPTION_PRUNE_TASK_PARTITIONS
value: '3'
- name: KAFKA_SUBSCRIPTION_SYNC_TASK_REPLICAS
value: '3'
- name: KAFKA_SUBSCRIPTION_SYNC_TASK_PARTITIONS
value: '3'

objects:
- apiVersion: cloud.redhat.com/v1alpha1
Expand All @@ -77,6 +91,17 @@ objects:
- swatch-subscription-sync
- rbac

kafkaTopics:
- replicas: ${{KAFKA_ENABLED_ORGS_REPLICAS}}
partitions: ${{KAFKA_ENABLED_ORGS_PARTITIONS}}
topicName: platform.rhsm-subscriptions.enabled-orgs-for-tasks
- replicas: ${{KAFKA_SUBSCRIPTION_PRUNE_TASK_REPLICAS}}
partitions: ${{KAFKA_SUBSCRIPTION_PRUNE_TASK_PARTITIONS}}
topicName: platform.rhsm-subscriptions.subscription-prune-task
- replicas: ${{KAFKA_SUBSCRIPTION_SYNC_TASK_REPLICAS}}
partitions: ${{KAFKA_SUBSCRIPTION_SYNC_TASK_PARTITIONS}}
topicName: platform.rhsm-subscriptions.subscription-sync-task

# Creates a database if local mode, or uses RDS in production
# database:
# name: ${DB_POD}
Expand Down Expand Up @@ -219,6 +244,8 @@ objects:
fieldPath: metadata.namespace
- name: PRODUCT_URL
value: ${PRODUCT_URL}
- name: SUBSCRIPTION_SYNC_ENABLED
value: ${SUBSCRIPTION_SYNC_ENABLED}
volumeMounts:
- name: logs
mountPath: /logs
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright Red Hat, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
* Red Hat trademarks are not licensed under GPLv3. No permission is
* granted to use or replicate Red Hat trademarks that are incorporated
* in this software or its documentation.
*/
package com.redhat.swatch.contract.config;

import jakarta.enterprise.context.ApplicationScoped;
import lombok.Getter;
import org.eclipse.microprofile.config.inject.ConfigProperty;

@ApplicationScoped
@Getter
public class ApplicationConfiguration {
@ConfigProperty(name = "rhsm-subscriptions.subscription-sync-enabled")
boolean subscriptionSyncEnabled;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
package com.redhat.swatch.contract.resource;

import com.redhat.swatch.contract.config.ApplicationConfiguration;
import com.redhat.swatch.contract.openapi.model.AwsUsageContext;
import com.redhat.swatch.contract.openapi.model.AzureUsageContext;
import com.redhat.swatch.contract.openapi.model.Contract;
Expand All @@ -38,21 +39,27 @@
import com.redhat.swatch.contract.openapi.resource.DefaultApi;
import com.redhat.swatch.contract.repository.ContractEntity;
import com.redhat.swatch.contract.service.ContractService;
import com.redhat.swatch.contract.service.EnabledOrgsProducer;
import jakarta.annotation.security.RolesAllowed;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.transaction.Transactional;
import jakarta.ws.rs.ProcessingException;
import java.time.OffsetDateTime;
import java.util.List;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jboss.resteasy.reactive.common.NotImplementedYet;

@Slf4j
@ApplicationScoped
@AllArgsConstructor
public class ContractsTestingResource implements DefaultApi {

@Inject ContractService service;
public static final String FEATURE_NOT_ENABLED_MESSAGE = "This feature is not currently enabled.";

private final ContractService service;
private final EnabledOrgsProducer enabledOrgsProducer;
private final ApplicationConfiguration applicationConfiguration;

/**
* Create contract record in database from provided contract dto payload
Expand Down Expand Up @@ -198,7 +205,8 @@ public OfferingProductTags getSkuProductTags(String sku) throws ProcessingExcept
@Override
@RolesAllowed({"test", "support", "service"})
public RpcResponse pruneUnlistedSubscriptions() throws ProcessingException {
throw new NotImplementedYet();
enabledOrgsProducer.sendTaskForSubscriptionsPrune();
return new RpcResponse();
}

@Override
Expand All @@ -217,7 +225,15 @@ public OfferingResponse syncAllOfferings() throws ProcessingException {
@Override
@RolesAllowed({"test", "support", "service"})
public RpcResponse syncAllSubscriptions(Boolean forceSync) throws ProcessingException {
throw new NotImplementedYet();
var response = new RpcResponse();
if (Boolean.FALSE.equals(forceSync) && !applicationConfiguration.isSubscriptionSyncEnabled()) {
log.info(
"Will not sync subscriptions for all opted-in orgs even though job was scheduled because subscriptionSyncEnabled=false.");
response.setResult(FEATURE_NOT_ENABLED_MESSAGE);
return response;
}
enabledOrgsProducer.sendTaskForSubscriptionsSync();
return response;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright Red Hat, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
* Red Hat trademarks are not licensed under GPLv3. No permission is
* granted to use or replicate Red Hat trademarks that are incorporated
* in this software or its documentation.
*/
package com.redhat.swatch.contract.service;

import com.redhat.swatch.contract.model.EnabledOrgsRequest;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@ApplicationScoped
public class EnabledOrgsProducer {

public static final String SUBSCRIPTION_SYNC_TASK_TOPIC =
"mp.messaging.incoming.subscription-sync-task.topic";
public static final String SUBSCRIPTION_PRUNE_TASK_TOPIC =
"mp.messaging.incoming.subscription-prune-task.topic";

private final Emitter<EnabledOrgsRequest> emitter;
private final String subscriptionSyncTaskTopic;
private final String subscriptionPruneTaskTopic;

public EnabledOrgsProducer(
@Channel("enabled-orgs") Emitter<EnabledOrgsRequest> emitter,
@ConfigProperty(name = SUBSCRIPTION_SYNC_TASK_TOPIC) String subscriptionSyncTaskTopic,
@ConfigProperty(name = SUBSCRIPTION_PRUNE_TASK_TOPIC) String subscriptionPruneTaskTopic) {
this.emitter = emitter;
this.subscriptionSyncTaskTopic = subscriptionSyncTaskTopic;
this.subscriptionPruneTaskTopic = subscriptionPruneTaskTopic;
}

public void sendTaskForSubscriptionsPrune() {
sendTask(subscriptionPruneTaskTopic);
}

public void sendTaskForSubscriptionsSync() {
sendTask(subscriptionSyncTaskTopic);
}

private void sendTask(String topic) {
emitter.send(new EnabledOrgsRequest().withTargetTopic(topic));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright Red Hat, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
* Red Hat trademarks are not licensed under GPLv3. No permission is
* granted to use or replicate Red Hat trademarks that are incorporated
* in this software or its documentation.
*/
package com.redhat.swatch.contract.service;

import com.redhat.swatch.contract.model.EnabledOrgsResponse;
import jakarta.enterprise.context.ApplicationScoped;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.microprofile.reactive.messaging.Incoming;

@Slf4j
@ApplicationScoped
public class SubscriptionPruneTaskConsumer {
@Incoming("subscription-prune-task")
public void consume(EnabledOrgsResponse message) {
log.info("Received task for subscription prune with org ID: {}", message.getOrgId());
// Implementation will be done as part of SWATCH-2281
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright Red Hat, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
* Red Hat trademarks are not licensed under GPLv3. No permission is
* granted to use or replicate Red Hat trademarks that are incorporated
* in this software or its documentation.
*/
package com.redhat.swatch.contract.service;

import com.redhat.swatch.contract.model.EnabledOrgsResponse;
import jakarta.enterprise.context.ApplicationScoped;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.microprofile.reactive.messaging.Incoming;

@Slf4j
@ApplicationScoped
public class SubscriptionSyncTaskConsumer {
@Incoming("subscription-sync-task")
public void consume(EnabledOrgsResponse message) {
log.info("Received task for subscription sync with org ID: {}", message.getOrgId());
// Implementation will be done as part of SWATCH-2281
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Red Hat, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
* Red Hat trademarks are not licensed under GPLv3. No permission is
* granted to use or replicate Red Hat trademarks that are incorporated
* in this software or its documentation.
*/
package com.redhat.swatch.contract.service.json;

import com.redhat.swatch.contract.model.EnabledOrgsResponse;
import io.quarkus.kafka.client.serialization.ObjectMapperDeserializer;

/** Provides quarkus a hint that we want to use Jackson to serialize EnabledOrgsResponse objects. */
public class EnabledOrgsResponseDeserializer extends ObjectMapperDeserializer<EnabledOrgsResponse> {
public EnabledOrgsResponseDeserializer() {
super(EnabledOrgsResponse.class);
}
}
Loading

0 comments on commit a6eb075

Please sign in to comment.