Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
version: 2
updates:
- package-ecosystem: maven
directory: "/"
schedule:
interval: weekly
groups:
maven-plugins:
patterns:
- "*maven*plugin*"
- "org.apache.maven*:*"
exclude-patterns:
- "io.quarkus*"
quarkus:
patterns:
- "io.quarkus:*"
- "io.quarkus.*:*"
- "io.quarkiverse:*"
- "io.quarkiverse.*:*"
ignore:
# Releases too often, it's annoying
- dependency-name: "org.assertj:*"
update-types: ["version-update:semver-patch"]
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<quarkus.platform.artifact-id>quarkus-bom</quarkus.platform.artifact-id>
<quarkus.platform.group-id>io.quarkus.platform</quarkus.platform.group-id>
<quarkus.platform.version>3.14.3</quarkus.platform.version>
<version.commons-csv>1.11.0</version.commons-csv>
<quarkus.platform.version>3.15.1</quarkus.platform.version>
<version.commons-csv>1.12.0</version.commons-csv>
<version.assertj>3.26.0</version.assertj>
<skipITs>true</skipITs>
<surefire-plugin.version>3.3.1</surefire-plugin.version>
<surefire-plugin.version>3.5.1</surefire-plugin.version>
<spotless-maven-plugin.version>2.43.0</spotless-maven-plugin.version>
<spotless.goal>apply</spotless.goal>
<quarkus-helm.version>1.2.4</quarkus-helm.version>
<quarkus-helm.version>1.2.5</quarkus-helm.version>
</properties>

<dependencyManagement>
Expand Down
35 changes: 27 additions & 8 deletions src/main/java/org/hibernate/infra/replicate/jira/JiraConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,24 @@ interface JiraProjectGroup {
}

interface EventProcessing {

/**
* Define how many events can be acknowledged and put on the pending queue
* before acknowledging an event results in blocking the response and waiting
* for the queue to free some space.
*/
@WithDefault("10000")
int queueSize();

/**
* Define the number of threads to use when processing queued events.
* <p>
* Note, having a lot of processing threads might not bring much benefit as
* processing may also be limited by {@link JiraConfig.EventProcessing}
*/
@WithDefault("2")
int threads();

/**
* Defines how many events can be processed within the
* {@link #timeframeInSeconds() timeframe}
Expand Down Expand Up @@ -127,20 +145,20 @@ interface Scheduled {

interface JiraProject {
/**
* Downstream project id (not a project key!).
* Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get the info.
* Downstream project id (not a project key!). Use
* {@code rest/api/2/project/YOUR-PROJECT-ID} to get the info.
*/
String projectId();

/**
* Downstream project key.
* Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get the info.
* Downstream project key. Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get
* the info.
*/
String projectKey();

/**
* Upstream project key.
* Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get the info.
* Upstream project key. Use {@code rest/api/2/project/YOUR-PROJECT-ID} to get
* the info.
*/
String originalProjectKey();

Expand All @@ -154,8 +172,9 @@ interface WebHookSecurity {
/**
* Whether to enable signature verification.
* <p>
* Jira web hooks can send a {@code x-hub-signature} header with a signature of a request body.
* This signature can be then verified using the secret used to configure the web hook.
* Jira web hooks can send a {@code x-hub-signature} header with a signature of
* a request body. This signature can be then verified using the secret used to
* configure the web hook.
*/
@WithDefault("false")
boolean enabled();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ public void createNextPlaceholderBatch(Long upToKeyNumber) {
JiraIssueBulkResponse response = destinationJiraClient.create(bulk);
response.issues.stream().mapToLong(i -> JiraIssue.keyToLong(i.key)).max()
.ifPresent(currentIssueKeyNumber::set);
Log.infof(
"Created more sync placeholders for %s; Current latest Jira key number is %s while required key is %s",
projectName, currentIssueKeyNumber.get(), upToKeyNumber);
} while (currentIssueKeyNumber.get() < upToKeyNumber);
} finally {
lock.unlock();
Expand Down Expand Up @@ -183,4 +186,12 @@ public String toString() {
public void close() {
projectGroupContext.close();
}

public int pendingEventsInCurrentContext() {
return projectGroupContext.pendingEventsInCurrentContext();
}

public void submitTask(Runnable runnable) {
projectGroupContext.submitTask(runnable);
}
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,42 @@
package org.hibernate.infra.replicate.jira.service.jira;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import org.hibernate.infra.replicate.jira.JiraConfig;

import io.quarkus.logging.Log;

public final class HandlerProjectGroupContext implements AutoCloseable {

private final ExecutorService eventHandlingExecutor;
private final Supplier<Integer> workQueueSize;
private final ScheduledExecutorService rateLimiterExecutor = Executors.newScheduledThreadPool(1);
private final Semaphore rateLimiter;
private final JiraConfig.JiraProjectGroup projectGroup;

public HandlerProjectGroupContext(JiraConfig.JiraProjectGroup projectGroup) {
this.projectGroup = projectGroup;

final int permits = projectGroup.processing().eventsPerTimeframe();
JiraConfig.EventProcessing processing = projectGroup.processing();

final int permits = processing.eventsPerTimeframe();
this.rateLimiter = new Semaphore(permits);
rateLimiterExecutor.scheduleAtFixedRate(() -> {
rateLimiter.drainPermits();
rateLimiter.release(permits);
}, projectGroup.processing().timeframeInSeconds(), projectGroup.processing().timeframeInSeconds(),
TimeUnit.SECONDS);
}, processing.timeframeInSeconds(), processing.timeframeInSeconds(), TimeUnit.SECONDS);

LinkedBlockingDeque<Runnable> workQueue = new LinkedBlockingDeque<>(processing.queueSize());
workQueueSize = workQueue::size;
eventHandlingExecutor = new ThreadPoolExecutor(processing.threads(), processing.threads(), 0L,
TimeUnit.MILLISECONDS, workQueue);
}

public void startProcessingEvent() throws InterruptedException {
Expand All @@ -33,12 +47,30 @@ public JiraConfig.JiraProjectGroup projectGroup() {
return projectGroup;
}

public int pendingEventsInCurrentContext() {
return workQueueSize.get();
}

public void submitTask(Runnable task) {
eventHandlingExecutor.submit(task);
}

@Override
public void close() {
// when requesting to close the context we aren't expecting to process any other
// events hence there's no point in continuing "releasing" more "permits":
if (!rateLimiterExecutor.isShutdown()) {
rateLimiterExecutor.shutdownNow();
}
if (!eventHandlingExecutor.isShutdown()) {
try {
eventHandlingExecutor.shutdown();
if (!eventHandlingExecutor.awaitTermination(2, TimeUnit.MINUTES)) {
Log.warnf("Not all events were processed before the shutdown");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,11 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.function.BooleanSupplier;

import org.hibernate.infra.replicate.jira.JiraConfig;
import org.hibernate.infra.replicate.jira.ProcessingConfig;
import org.hibernate.infra.replicate.jira.service.jira.client.JiraRestClient;
import org.hibernate.infra.replicate.jira.service.jira.client.JiraRestClientBuilder;
import org.hibernate.infra.replicate.jira.service.jira.model.hook.JiraWebHookEvent;
Expand Down Expand Up @@ -47,19 +43,12 @@
public class JiraService {

private final ReportingConfig reportingConfig;
private final ExecutorService executor;
private final Supplier<Integer> workQueueSize;
private final Map<String, HandlerProjectContext> contextPerProject;
private final JiraConfig jiraConfig;
private final Scheduler scheduler;

@Inject
public JiraService(ProcessingConfig processingConfig, JiraConfig jiraConfig, ReportingConfig reportingConfig,
Scheduler scheduler) {
LinkedBlockingDeque<Runnable> workQueue = new LinkedBlockingDeque<>(processingConfig.queueSize());
workQueueSize = workQueue::size;
executor = new ThreadPoolExecutor(processingConfig.threads(), processingConfig.threads(), 0L,
TimeUnit.MILLISECONDS, workQueue);
public JiraService(JiraConfig jiraConfig, ReportingConfig reportingConfig, Scheduler scheduler) {

Map<String, HandlerProjectContext> contextMap = new HashMap<>();
for (var entry : jiraConfig.projectGroup().entrySet()) {
Expand Down Expand Up @@ -122,6 +111,10 @@ public void registerManagementRoutes(@Observes ManagementInterface mi) {
// TODO: we can remove this one once we figure out why POST management does not
// work correctly...
String project = rc.pathParam("project");
List<String> maxToSyncList = rc.queryParam("maxToSync");
AtomicInteger maxToSync = maxToSyncList.isEmpty()
? null
: new AtomicInteger(Integer.parseInt(maxToSyncList.get(0)) + 1);

HandlerProjectContext context = contextPerProject.get(project);

Expand All @@ -130,14 +123,14 @@ public void registerManagementRoutes(@Observes ManagementInterface mi) {
}

AtomicLong largestSyncedJiraIssueKeyNumber = new AtomicLong(context.getLargestSyncedJiraIssueKeyNumber());

BooleanSupplier continueSyncing = maxToSync == null ? () -> true : () -> maxToSync.decrementAndGet() > 0;
String identity = "Init Sync for project %s".formatted(project);
scheduler.newJob(identity).setConcurrentExecution(Scheduled.ConcurrentExecution.SKIP)
// every 10 seconds:
.setCron("0/10 * * * * ?").setTask(executionContext -> {
Optional<JiraIssue> issueToSync = context
.getNextIssueToSync(largestSyncedJiraIssueKeyNumber.get());
if (issueToSync.isEmpty()) {
if (issueToSync.isEmpty() || !continueSyncing.getAsBoolean()) {
scheduler.unscheduleJob(identity);
} else {
triggerSyncEvent(issueToSync.get(), context);
Expand All @@ -158,7 +151,7 @@ public void registerManagementRoutes(@Observes ManagementInterface mi) {
throw new IllegalArgumentException("Unknown project '%s'".formatted(project));
}

executor.submit(() -> {
context.submitTask(() -> {
for (String issueKey : issueKeys) {
triggerSyncEvent(context.sourceJiraClient().getIssue(issueKey), context);
}
Expand All @@ -176,7 +169,7 @@ public void registerManagementRoutes(@Observes ManagementInterface mi) {
throw new IllegalArgumentException("Unknown project '%s'".formatted(project));
}

executor.submit(() -> syncByQuery(query, context));
context.submitTask(() -> syncByQuery(query, context));
rc.end();
});
mi.router().post("/sync/comments/list").consumes(MediaType.APPLICATION_JSON).blockingHandler(rc -> {
Expand Down Expand Up @@ -223,7 +216,7 @@ public void acknowledge(String project, JiraWebHookEvent event) {
}

for (Runnable handler : eventType.handlers(reportingConfig, event, context)) {
executor.submit(handler);
context.submitTask(handler);
}
}, () -> Log.infof("Event type %s is not supported and cannot be handled.", event.webhookEvent));
}
Expand Down Expand Up @@ -254,20 +247,12 @@ public void syncLastUpdated(String projectGroup) {

@PreDestroy
public void finishProcessingAndShutdown() {
try {
executor.shutdown();
if (!executor.awaitTermination(2, TimeUnit.MINUTES)) {
Log.infof("Not all events were processed before the shutdown");
for (HandlerProjectContext context : contextPerProject.values()) {
try {
context.close();
} catch (Exception e) {
Log.errorf(e, "Error closing context %s: %s", context, e.getMessage());
}
for (HandlerProjectContext context : contextPerProject.values()) {
try {
context.close();
} catch (Exception e) {
Log.errorf(e, "Error closing context %s: %s", context, e.getMessage());
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

Expand All @@ -285,7 +270,8 @@ private void syncByQuery(String query, HandlerProjectContext context) {

private void triggerSyncEvent(JiraIssue jiraIssue, HandlerProjectContext context) {
Log.infof("Adding sync events for a jira issue: %s; Already queued events: %s", jiraIssue.key,
workQueueSize.get());
context.pendingEventsInCurrentContext());

JiraWebHookIssue issue = new JiraWebHookIssue();
issue.id = jiraIssue.id;
issue.key = jiraIssue.key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.hibernate.infra.replicate.jira.service.reporting.FailureCollector;
import org.hibernate.infra.replicate.jira.service.reporting.ReportingConfig;

import io.quarkus.logging.Log;
import jakarta.ws.rs.core.UriBuilder;

public abstract class JiraEventHandler implements Runnable {
Expand Down Expand Up @@ -158,6 +159,8 @@ public final void run() {
Thread.currentThread().interrupt();
} finally {
failureCollector.close();
Log.infof("Pending events in %s to process: %s", context.projectGroupName(),
context.pendingEventsInCurrentContext());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public void warning(String details) {

@Override
public void warning(String details, Exception exception) {
Log.warn(details, exception);
Log.warnf(exception, details);
}

@Override
Expand All @@ -26,7 +26,7 @@ public void critical(String details) {

@Override
public void critical(String details, Exception exception) {
Log.error(details, exception);
Log.errorf(exception, details);
}

@Override
Expand Down
Loading
Loading