diff --git a/src/main/java/org/hibernate/infra/replicate/jira/JiraConfig.java b/src/main/java/org/hibernate/infra/replicate/jira/JiraConfig.java
index bb0a1ff..de0bda3 100644
--- a/src/main/java/org/hibernate/infra/replicate/jira/JiraConfig.java
+++ b/src/main/java/org/hibernate/infra/replicate/jira/JiraConfig.java
@@ -94,6 +94,24 @@ interface JiraProjectGroup {
}
interface EventProcessing {
+
+ /**
+ * Define how many events can be acknowledged and put on the pending queue
+ * before acknowledging an event results in blocking the response and waiting
+ * for the queue to free some space.
+ */
+ @WithDefault("10000")
+ int queueSize();
+
+ /**
+ * Define the number of threads to use when processing queued events.
+ *
+ * Note, having a lot of processing threads might not bring much benefit as
+ * processing may also be limited by {@link JiraConfig.EventProcessing}
+ */
+ @WithDefault("2")
+ int threads();
+
/**
* Defines how many events can be processed within the
* {@link #timeframeInSeconds() timeframe}
@@ -127,20 +145,20 @@ interface Scheduled {
interface JiraProject {
/**
- * Downstream project id (not a project key!).
- * Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get the info.
+ * Downstream project id (not a project key!). Use
+ * {@code rest/api/2/project/YOUR-PROJECT-ID} to get the info.
*/
String projectId();
/**
- * Downstream project key.
- * Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get the info.
+ * Downstream project key. Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get
+ * the info.
*/
String projectKey();
/**
- * Upstream project key.
- * Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get the info.
+ * Upstream project key. Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get
+ * the info.
*/
String originalProjectKey();
@@ -154,8 +172,9 @@ interface WebHookSecurity {
/**
* Whether to enable signature verification.
*
- * Jira web hooks can send a {@code x-hub-signature} header with a signature of a request body.
- * This signature can be then verified using the secret used to configure the web hook.
+ * Jira web hooks can send a {@code x-hub-signature} header with a signature of
+ * a request body. This signature can be then verified using the secret used to
+ * configure the web hook.
*/
@WithDefault("false")
boolean enabled();
diff --git a/src/main/java/org/hibernate/infra/replicate/jira/ProcessingConfig.java b/src/main/java/org/hibernate/infra/replicate/jira/ProcessingConfig.java
deleted file mode 100644
index f29b7f5..0000000
--- a/src/main/java/org/hibernate/infra/replicate/jira/ProcessingConfig.java
+++ /dev/null
@@ -1,23 +0,0 @@
-package org.hibernate.infra.replicate.jira;
-
-import io.smallrye.config.ConfigMapping;
-import io.smallrye.config.WithDefault;
-
-@ConfigMapping(prefix = "processing.events")
-public interface ProcessingConfig {
- /**
- * Define how many events can be acknowledged and put on the pending queue before
- * acknowledging an event results in blocking the response and waiting for the queue to free some space.
- */
- @WithDefault("10000")
- int queueSize();
-
- /**
- * Define the number of threads to use when processing queued events.
- *
- * Note, having a lot of processing threads might not bring much benefit as processing
- * may also be limited by {@link JiraConfig.EventProcessing}
- */
- @WithDefault("2")
- int threads();
-}
diff --git a/src/main/java/org/hibernate/infra/replicate/jira/service/jira/HandlerProjectContext.java b/src/main/java/org/hibernate/infra/replicate/jira/service/jira/HandlerProjectContext.java
index bb603a1..8e4020e 100644
--- a/src/main/java/org/hibernate/infra/replicate/jira/service/jira/HandlerProjectContext.java
+++ b/src/main/java/org/hibernate/infra/replicate/jira/service/jira/HandlerProjectContext.java
@@ -138,6 +138,9 @@ public void createNextPlaceholderBatch(Long upToKeyNumber) {
JiraIssueBulkResponse response = destinationJiraClient.create(bulk);
response.issues.stream().mapToLong(i -> JiraIssue.keyToLong(i.key)).max()
.ifPresent(currentIssueKeyNumber::set);
+ Log.infof(
+ "Created more sync placeholders for %s; Current latest Jira key number is %s while required key is %s",
+ projectName, currentIssueKeyNumber.get(), upToKeyNumber);
} while (currentIssueKeyNumber.get() < upToKeyNumber);
} finally {
lock.unlock();
@@ -183,4 +186,12 @@ public String toString() {
public void close() {
projectGroupContext.close();
}
+
+ public int pendingEventsInCurrentContext() {
+ return projectGroupContext.pendingEventsInCurrentContext();
+ }
+
+ public void submitTask(Runnable runnable) {
+ projectGroupContext.submitTask(runnable);
+ }
}
diff --git a/src/main/java/org/hibernate/infra/replicate/jira/service/jira/HandlerProjectGroupContext.java b/src/main/java/org/hibernate/infra/replicate/jira/service/jira/HandlerProjectGroupContext.java
index 3f78495..5c85030 100644
--- a/src/main/java/org/hibernate/infra/replicate/jira/service/jira/HandlerProjectGroupContext.java
+++ b/src/main/java/org/hibernate/infra/replicate/jira/service/jira/HandlerProjectGroupContext.java
@@ -1,14 +1,22 @@
package org.hibernate.infra.replicate.jira.service.jira;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.hibernate.infra.replicate.jira.JiraConfig;
+import io.quarkus.logging.Log;
+
public final class HandlerProjectGroupContext implements AutoCloseable {
+ private final ExecutorService eventHandlingExecutor;
+ private final Supplier workQueueSize;
private final ScheduledExecutorService rateLimiterExecutor = Executors.newScheduledThreadPool(1);
private final Semaphore rateLimiter;
private final JiraConfig.JiraProjectGroup projectGroup;
@@ -16,13 +24,19 @@ public final class HandlerProjectGroupContext implements AutoCloseable {
public HandlerProjectGroupContext(JiraConfig.JiraProjectGroup projectGroup) {
this.projectGroup = projectGroup;
- final int permits = projectGroup.processing().eventsPerTimeframe();
+ JiraConfig.EventProcessing processing = projectGroup.processing();
+
+ final int permits = processing.eventsPerTimeframe();
this.rateLimiter = new Semaphore(permits);
rateLimiterExecutor.scheduleAtFixedRate(() -> {
rateLimiter.drainPermits();
rateLimiter.release(permits);
- }, projectGroup.processing().timeframeInSeconds(), projectGroup.processing().timeframeInSeconds(),
- TimeUnit.SECONDS);
+ }, processing.timeframeInSeconds(), processing.timeframeInSeconds(), TimeUnit.SECONDS);
+
+ LinkedBlockingDeque workQueue = new LinkedBlockingDeque<>(processing.queueSize());
+ workQueueSize = workQueue::size;
+ eventHandlingExecutor = new ThreadPoolExecutor(processing.threads(), processing.threads(), 0L,
+ TimeUnit.MILLISECONDS, workQueue);
}
public void startProcessingEvent() throws InterruptedException {
@@ -33,6 +47,14 @@ public JiraConfig.JiraProjectGroup projectGroup() {
return projectGroup;
}
+ public int pendingEventsInCurrentContext() {
+ return workQueueSize.get();
+ }
+
+ public void submitTask(Runnable task) {
+ eventHandlingExecutor.submit(task);
+ }
+
@Override
public void close() {
// when requesting to close the context we aren't expecting to process any other
@@ -40,5 +62,15 @@ public void close() {
if (!rateLimiterExecutor.isShutdown()) {
rateLimiterExecutor.shutdownNow();
}
+ if (!eventHandlingExecutor.isShutdown()) {
+ try {
+ eventHandlingExecutor.shutdown();
+ if (!eventHandlingExecutor.awaitTermination(2, TimeUnit.MINUTES)) {
+ Log.warnf("Not all events were processed before the shutdown");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
}
}
diff --git a/src/main/java/org/hibernate/infra/replicate/jira/service/jira/JiraService.java b/src/main/java/org/hibernate/infra/replicate/jira/service/jira/JiraService.java
index 0c8db5b..11cd9d0 100644
--- a/src/main/java/org/hibernate/infra/replicate/jira/service/jira/JiraService.java
+++ b/src/main/java/org/hibernate/infra/replicate/jira/service/jira/JiraService.java
@@ -7,15 +7,11 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Supplier;
+import java.util.function.BooleanSupplier;
import org.hibernate.infra.replicate.jira.JiraConfig;
-import org.hibernate.infra.replicate.jira.ProcessingConfig;
import org.hibernate.infra.replicate.jira.service.jira.client.JiraRestClient;
import org.hibernate.infra.replicate.jira.service.jira.client.JiraRestClientBuilder;
import org.hibernate.infra.replicate.jira.service.jira.model.hook.JiraWebHookEvent;
@@ -47,19 +43,12 @@
public class JiraService {
private final ReportingConfig reportingConfig;
- private final ExecutorService executor;
- private final Supplier workQueueSize;
private final Map contextPerProject;
private final JiraConfig jiraConfig;
private final Scheduler scheduler;
@Inject
- public JiraService(ProcessingConfig processingConfig, JiraConfig jiraConfig, ReportingConfig reportingConfig,
- Scheduler scheduler) {
- LinkedBlockingDeque workQueue = new LinkedBlockingDeque<>(processingConfig.queueSize());
- workQueueSize = workQueue::size;
- executor = new ThreadPoolExecutor(processingConfig.threads(), processingConfig.threads(), 0L,
- TimeUnit.MILLISECONDS, workQueue);
+ public JiraService(JiraConfig jiraConfig, ReportingConfig reportingConfig, Scheduler scheduler) {
Map contextMap = new HashMap<>();
for (var entry : jiraConfig.projectGroup().entrySet()) {
@@ -122,6 +111,10 @@ public void registerManagementRoutes(@Observes ManagementInterface mi) {
// TODO: we can remove this one once we figure out why POST management does not
// work correctly...
String project = rc.pathParam("project");
+ List maxToSyncList = rc.queryParam("maxToSync");
+ AtomicInteger maxToSync = maxToSyncList.isEmpty()
+ ? null
+ : new AtomicInteger(Integer.parseInt(maxToSyncList.get(0)) + 1);
HandlerProjectContext context = contextPerProject.get(project);
@@ -130,14 +123,14 @@ public void registerManagementRoutes(@Observes ManagementInterface mi) {
}
AtomicLong largestSyncedJiraIssueKeyNumber = new AtomicLong(context.getLargestSyncedJiraIssueKeyNumber());
-
+ BooleanSupplier continueSyncing = maxToSync == null ? () -> true : () -> maxToSync.decrementAndGet() > 0;
String identity = "Init Sync for project %s".formatted(project);
scheduler.newJob(identity).setConcurrentExecution(Scheduled.ConcurrentExecution.SKIP)
// every 10 seconds:
.setCron("0/10 * * * * ?").setTask(executionContext -> {
Optional issueToSync = context
.getNextIssueToSync(largestSyncedJiraIssueKeyNumber.get());
- if (issueToSync.isEmpty()) {
+ if (issueToSync.isEmpty() || !continueSyncing.getAsBoolean()) {
scheduler.unscheduleJob(identity);
} else {
triggerSyncEvent(issueToSync.get(), context);
@@ -158,7 +151,7 @@ public void registerManagementRoutes(@Observes ManagementInterface mi) {
throw new IllegalArgumentException("Unknown project '%s'".formatted(project));
}
- executor.submit(() -> {
+ context.submitTask(() -> {
for (String issueKey : issueKeys) {
triggerSyncEvent(context.sourceJiraClient().getIssue(issueKey), context);
}
@@ -176,7 +169,7 @@ public void registerManagementRoutes(@Observes ManagementInterface mi) {
throw new IllegalArgumentException("Unknown project '%s'".formatted(project));
}
- executor.submit(() -> syncByQuery(query, context));
+ context.submitTask(() -> syncByQuery(query, context));
rc.end();
});
mi.router().post("/sync/comments/list").consumes(MediaType.APPLICATION_JSON).blockingHandler(rc -> {
@@ -223,7 +216,7 @@ public void acknowledge(String project, JiraWebHookEvent event) {
}
for (Runnable handler : eventType.handlers(reportingConfig, event, context)) {
- executor.submit(handler);
+ context.submitTask(handler);
}
}, () -> Log.infof("Event type %s is not supported and cannot be handled.", event.webhookEvent));
}
@@ -254,20 +247,12 @@ public void syncLastUpdated(String projectGroup) {
@PreDestroy
public void finishProcessingAndShutdown() {
- try {
- executor.shutdown();
- if (!executor.awaitTermination(2, TimeUnit.MINUTES)) {
- Log.infof("Not all events were processed before the shutdown");
+ for (HandlerProjectContext context : contextPerProject.values()) {
+ try {
+ context.close();
+ } catch (Exception e) {
+ Log.errorf(e, "Error closing context %s: %s", context, e.getMessage());
}
- for (HandlerProjectContext context : contextPerProject.values()) {
- try {
- context.close();
- } catch (Exception e) {
- Log.errorf(e, "Error closing context %s: %s", context, e.getMessage());
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
}
}
@@ -285,7 +270,8 @@ private void syncByQuery(String query, HandlerProjectContext context) {
private void triggerSyncEvent(JiraIssue jiraIssue, HandlerProjectContext context) {
Log.infof("Adding sync events for a jira issue: %s; Already queued events: %s", jiraIssue.key,
- workQueueSize.get());
+ context.pendingEventsInCurrentContext());
+
JiraWebHookIssue issue = new JiraWebHookIssue();
issue.id = jiraIssue.id;
issue.key = jiraIssue.key;
diff --git a/src/main/java/org/hibernate/infra/replicate/jira/service/jira/handler/JiraEventHandler.java b/src/main/java/org/hibernate/infra/replicate/jira/service/jira/handler/JiraEventHandler.java
index fcbe713..f651ea9 100644
--- a/src/main/java/org/hibernate/infra/replicate/jira/service/jira/handler/JiraEventHandler.java
+++ b/src/main/java/org/hibernate/infra/replicate/jira/service/jira/handler/JiraEventHandler.java
@@ -16,6 +16,7 @@
import org.hibernate.infra.replicate.jira.service.reporting.FailureCollector;
import org.hibernate.infra.replicate.jira.service.reporting.ReportingConfig;
+import io.quarkus.logging.Log;
import jakarta.ws.rs.core.UriBuilder;
public abstract class JiraEventHandler implements Runnable {
@@ -158,6 +159,8 @@ public final void run() {
Thread.currentThread().interrupt();
} finally {
failureCollector.close();
+ Log.infof("Pending events in %s to process: %s", context.projectGroupName(),
+ context.pendingEventsInCurrentContext());
}
}
diff --git a/src/main/java/org/hibernate/infra/replicate/jira/service/reporting/LoggingFailureCollector.java b/src/main/java/org/hibernate/infra/replicate/jira/service/reporting/LoggingFailureCollector.java
index 21ab8c2..028cd9d 100644
--- a/src/main/java/org/hibernate/infra/replicate/jira/service/reporting/LoggingFailureCollector.java
+++ b/src/main/java/org/hibernate/infra/replicate/jira/service/reporting/LoggingFailureCollector.java
@@ -16,7 +16,7 @@ public void warning(String details) {
@Override
public void warning(String details, Exception exception) {
- Log.warn(details, exception);
+ Log.warnf(exception, details);
}
@Override
@@ -26,7 +26,7 @@ public void critical(String details) {
@Override
public void critical(String details, Exception exception) {
- Log.error(details, exception);
+ Log.errorf(exception, details);
}
@Override
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index 10e05f9..a87eeb0 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -15,6 +15,8 @@ jira.project-group."hibernate".destination.login-kind=BEARER_TOKEN
jira.project-group."hibernate".destination.api-user.email=${JIRA_API_USER_REDHAT}
jira.project-group."hibernate".destination.api-user.token=${JIRA_API_TOKEN_REDHAT}
jira.project-group."hibernate".can-set-reporter=true
+# Processing queue configuration:
+jira.project-group."hibernate".processing.queue-size=${PROCESSING_QUEUE_SIZE:10000}
#
# Management endpoints:
quarkus.management.enabled=true
@@ -29,9 +31,6 @@ quarkus.security.users.embedded.enabled=true
quarkus.security.users.embedded.plain-text=true
quarkus.security.users.embedded.users."management-user"=${MANAGEMENT_USER_PASSWORD}
#
-# Processing queue configuration:
-processing.events.queue-size=${PROCESSING_QUEUE_SIZE:1000}
-#
# Scheduler:
# >> By default, the scheduler is not started unless a @Scheduled business method is found.
# >> You may need to force the start of the scheduler for "pure" programmatic scheduling via quarkus.scheduler.start-mode=forced
@@ -65,12 +64,12 @@ quarkus.scheduler.start-mode=forced
%prod,test.jira.project-group."hibernate".users.mapping."557058\:71e31052-f0d7-46e3-a9d7-8b9acd6998d8"=guillaume.smet
## Gunnar
#%prod,test.jira.project-group."hibernate".users.mapping."557058\:6a9959ae-3b15-4370-ad41-e78c978f4f7b"=gunnar.morling
-# Sanne
-# or JIRAUSER247447
-%prod,test.jira.project-group."hibernate".users.mapping."557058\:99e61e65-956b-4a21-b29c-06057642e9ea"=sgrinove@redhat.com
-%prod,test.jira.project-group."hibernate".users.mapping."557058\:690dd548-c602-4a58-9c7e-0923346f4e97"=sgrinove@redhat.com
-%prod,test.jira.project-group."hibernate".users.mapping."70121\:05895251-ccc4-42d9-acf3-7800fcf17a4c"=sgrinove@redhat.com
-%prod,test.jira.project-group."hibernate".users.mapping."557058\:f781de72-2c68-4904-9be1-937e6dcca29f"=sgrinove@redhat.com
+## Sanne
+## or JIRAUSER247447
+#%prod,test.jira.project-group."hibernate".users.mapping."557058\:99e61e65-956b-4a21-b29c-06057642e9ea"=sgrinove@redhat.com
+#%prod,test.jira.project-group."hibernate".users.mapping."557058\:690dd548-c602-4a58-9c7e-0923346f4e97"=sgrinove@redhat.com
+#%prod,test.jira.project-group."hibernate".users.mapping."70121\:05895251-ccc4-42d9-acf3-7800fcf17a4c"=sgrinove@redhat.com
+#%prod,test.jira.project-group."hibernate".users.mapping."557058\:f781de72-2c68-4904-9be1-937e6dcca29f"=sgrinove@redhat.com
## Emmanuel
#%prod,test.jira.project-group."hibernate".users.mapping."557058\:7146ccce-8d92-4967-a2e1-f09ff06cc122"=rhn-engineering-ebernard
#%prod,test.jira.project-group."hibernate".users.mapping."557058\:60723d35-67e0-400b-9749-abf4e657a6bd"=rhn-engineering-ebernard
diff --git a/src/test/java/org/hibernate/infra/replicate/jira/export/ExportProjectTest.java b/src/test/java/org/hibernate/infra/replicate/jira/export/ExportProjectTest.java
index d7fd633..556946f 100644
--- a/src/test/java/org/hibernate/infra/replicate/jira/export/ExportProjectTest.java
+++ b/src/test/java/org/hibernate/infra/replicate/jira/export/ExportProjectTest.java
@@ -154,7 +154,7 @@ private List formatLabels(JiraIssue issue) {
if (issue.fields.fixVersions != null) {
for (JiraSimpleObject fixVersion : issue.fields.fixVersions) {
- labels.add("Fix version: %s".formatted(fixVersion.name));
+ labels.add("Fix version: %s".formatted(fixVersion.name).replace(' ', '_'));
}
}