diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties index 37744abd89c39..ca2eb91779dd9 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles.properties @@ -1,7 +1,6 @@ aws-secrets aws2-s3 azure-secrets -backoff bean bean-model blocked @@ -18,6 +17,7 @@ gcp-secrets hashicorp-secrets health inflight +internal-tasks java-security jvm kafka @@ -30,7 +30,6 @@ memory micrometer platform-http properties -protocol quartz receive reload diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/backoff.json b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/internal-tasks.json similarity index 50% rename from catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/backoff.json rename to catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/internal-tasks.json index 8cae12a4e9204..e90a9fd8d10af 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/backoff.json +++ b/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/internal-tasks.json @@ -2,11 +2,11 @@ "console": { "kind": "console", "group": "camel", - "name": "backoff", - "title": "BackOff", - "description": "Display information about BackOff tasks", + "name": "internal-tasks", + "title": "Internal Tasks", + "description": "Display information about internal tasks", "deprecated": false, - "javaType": "org.apache.camel.impl.console.BackOffDevConsole", + "javaType": "org.apache.camel.impl.console.TaskRegistryDevConsole", "groupId": "org.apache.camel", "artifactId": "camel-console", "version": "4.13.0-SNAPSHOT" diff --git a/components/camel-ai/camel-langchain4j-embeddings/src/test/java/org/apache/camel/component/langchain4j/embeddings/LangChain4jEmbeddingsComponentInfinispanTargetIT.java b/components/camel-ai/camel-langchain4j-embeddings/src/test/java/org/apache/camel/component/langchain4j/embeddings/LangChain4jEmbeddingsComponentInfinispanTargetIT.java index 03c3a7916596d..84c4a43b0905a 100644 --- a/components/camel-ai/camel-langchain4j-embeddings/src/test/java/org/apache/camel/component/langchain4j/embeddings/LangChain4jEmbeddingsComponentInfinispanTargetIT.java +++ b/components/camel-ai/camel-langchain4j-embeddings/src/test/java/org/apache/camel/component/langchain4j/embeddings/LangChain4jEmbeddingsComponentInfinispanTargetIT.java @@ -210,7 +210,7 @@ protected void setupResources() throws Exception { final ForegroundTask task = Tasks.foregroundTask() .withBudget(budget).build(); - final boolean cacheCreated = task.run(this::createCache); + final boolean cacheCreated = task.run(null, this::createCache); Assumptions.assumeTrue(cacheCreated, "The container cache is not running healthily"); } } diff --git a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Endpoint.java b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Endpoint.java index 5d802a3dc6815..d89f38d4ac92f 100644 --- a/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Endpoint.java +++ b/components/camel-aws/camel-aws2-ddb/src/main/java/org/apache/camel/component/aws2/ddb/Ddb2Endpoint.java @@ -157,7 +157,7 @@ private void waitForTableToBecomeAvailable(String tableName) { .build()) .build(); - if (!task.run(this::waitForTable, tableName)) { + if (!task.run(getCamelContext(), this::waitForTable, tableName)) { throw new RuntimeCamelException("Table " + tableName + " never went active"); } } diff --git a/components/camel-azure/camel-azure-files/src/main/java/org/apache/camel/component/file/azure/strategy/FilesChangedExclusiveReadLockStrategy.java b/components/camel-azure/camel-azure-files/src/main/java/org/apache/camel/component/file/azure/strategy/FilesChangedExclusiveReadLockStrategy.java index b67479900a635..df911f374b88c 100644 --- a/components/camel-azure/camel-azure-files/src/main/java/org/apache/camel/component/file/azure/strategy/FilesChangedExclusiveReadLockStrategy.java +++ b/components/camel-azure/camel-azure-files/src/main/java/org/apache/camel/component/file/azure/strategy/FilesChangedExclusiveReadLockStrategy.java @@ -66,7 +66,7 @@ public boolean acquireExclusiveReadLock( FilesExclusiveReadLockCheck exclusiveReadLockCheck = new FilesExclusiveReadLockCheck(fastExistsCheck, minAge, minLength); - if (!task.run(() -> exclusiveReadLockCheck.tryAcquireExclusiveReadLock(operations, file))) { + if (!task.run(exchange.getContext(), () -> exclusiveReadLockCheck.tryAcquireExclusiveReadLock(operations, file))) { CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file); diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java index 8c827aa5ef4a1..d1dc62fcb5c89 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpOperations.java @@ -139,7 +139,7 @@ protected boolean doConnect(RemoteFileConfiguration configuration, Exchange exch TaskPayload payload = new TaskPayload(configuration); - if (!task.run(this::tryConnect, payload)) { + if (!task.run(endpoint.getCamelContext(), this::tryConnect, payload)) { if (exchange != null) { exchange.getIn().setHeader(FtpConstants.FTP_REPLY_CODE, client.getReplyCode()); exchange.getIn().setHeader(FtpConstants.FTP_REPLY_STRING, client.getReplyString()); diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java index 24f8799b23cf8..76b40e95de385 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpOperations.java @@ -138,7 +138,7 @@ public boolean connect(RemoteFileConfiguration configuration, Exchange exchange) TaskPayload payload = new TaskPayload(configuration); - if (!task.run(this::tryConnect, payload)) { + if (!task.run(endpoint.getCamelContext(), this::tryConnect, payload)) { throw new GenericFileOperationFailedException( "Cannot connect to " + configuration.remoteServerInformation(), payload.exception); diff --git a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java index 54130f50b1291..b58469f1eb6b5 100644 --- a/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java +++ b/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/strategy/FtpChangedExclusiveReadLockStrategy.java @@ -64,7 +64,7 @@ public boolean acquireExclusiveReadLock( ExclusiveReadLockCheck exclusiveReadLockCheck = new ExclusiveReadLockCheck(fastExistsCheck, minAge, minLength); - if (!task.run(() -> exclusiveReadLockCheck.tryAcquireExclusiveReadLock(operations, file))) { + if (!task.run(exchange.getContext(), () -> exclusiveReadLockCheck.tryAcquireExclusiveReadLock(operations, file))) { CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file); diff --git a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteTestSupport.java b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteTestSupport.java index 0a7f7b3089fc1..85c436e163fbc 100644 --- a/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteTestSupport.java +++ b/components/camel-infinispan/camel-infinispan/src/test/java/org/apache/camel/component/infinispan/remote/InfinispanRemoteTestSupport.java @@ -59,7 +59,7 @@ protected void setupResources() throws Exception { final ForegroundTask task = Tasks.foregroundTask() .withBudget(budget).build(); - final boolean cacheCreated = task.run(this::createCache); + final boolean cacheCreated = task.run(null, this::createCache); Assumptions.assumeTrue(cacheCreated, "The container cache is not running healthily"); } } diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java index fffc609d9d229..39984178c11cb 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java @@ -108,7 +108,7 @@ public Destination getReplyTo() { .withInterval(Duration.ofMillis(interval)) .build()) .build(); - boolean done = task.run(() -> { + boolean done = task.run(camelContext, () -> { log.trace("Waiting for replyTo to be ready: {}", replyTo != null); return replyTo != null; }); @@ -249,7 +249,7 @@ protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String correlation .build()) .build(); - return task.run(() -> getReplyHandler(correlationID), Objects::nonNull).orElse(null); + return task.run(camelContext, () -> getReplyHandler(correlationID), Objects::nonNull).orElse(null); } private ReplyHandler getReplyHandler(String correlationID) { diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java index 83058a95861aa..c98a48740b28a 100644 --- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java +++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java @@ -148,7 +148,7 @@ public void run() { .withInterval(Duration.ofMillis(currentBackoffInterval)) .build()) .build(); - boolean success = task.run(this::createConsumerTask); + boolean success = task.run(kafkaConsumer.getEndpoint().getCamelContext(), this::createConsumerTask); if (!success) { int max = kafkaConsumer.getEndpoint().getComponent().getCreateConsumerBackoffMaxAttempts(); setupCreateConsumerException(task, max); @@ -168,7 +168,7 @@ public void run() { .withInterval(Duration.ofMillis(currentBackoffInterval)) .build()) .build(); - success = task.run(this::initializeConsumerTask); + success = task.run(kafkaConsumer.getEndpoint().getCamelContext(), this::initializeConsumerTask); if (!success) { int max = kafkaConsumer.getEndpoint().getComponent().getSubscribeConsumerBackoffMaxAttempts(); setupInitializeErrorException(task, max); diff --git a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java index 603fa9d5073f5..7954c40e28dab 100644 --- a/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java +++ b/components/camel-master/src/main/java/org/apache/camel/component/master/MasterConsumer.java @@ -162,7 +162,7 @@ private void onLeadershipTaken() throws Exception { } final BackgroundTask leaderTask = createTask(); - leaderTask.run(() -> { + leaderTask.run(getEndpoint().getCamelContext(), () -> { if (!isRunAllowed()) { return false; } diff --git a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerBindThread.java b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerBindThread.java index 27a25e043cfe7..8c676ae6dcd9a 100644 --- a/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerBindThread.java +++ b/components/camel-mllp/src/main/java/org/apache/camel/component/mllp/internal/TcpServerBindThread.java @@ -47,7 +47,6 @@ public class TcpServerBindThread extends Thread { private final SSLContextParameters sslContextParameters; public TcpServerBindThread(MllpTcpServerConsumer consumer, final SSLContextParameters sslParams) { - this.consumer = consumer; this.sslContextParameters = sslParams; @@ -107,7 +106,7 @@ private void doAccept(ServerSocket serverSocket, InetSocketAddress socketAddress .withName("mllp-tcp-server-accept") .build(); - if (task.run(() -> doBind(serverSocket, socketAddress))) { + if (task.run(consumer.getEndpoint().getCamelContext(), () -> doBind(serverSocket, socketAddress))) { consumer.startAcceptThread(serverSocket); } else { log.error("Failed to bind to address {} within timeout {}", socketAddress, diff --git a/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java b/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java index 69941a752051c..3ca6c9fe2b52e 100644 --- a/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java +++ b/components/camel-mongodb-gridfs/src/main/java/org/apache/camel/component/mongodb/gridfs/GridFsConsumer.java @@ -118,8 +118,9 @@ public void run() { MongoCollection finalPtsCollection = ptsCollection; Date finalFromDate = fromDate; Document finalPersistentTimestamp = persistentTimestamp; - task.run(() -> processCollection(finalFromDate, usesTimestamp, persistsTimestamp, usesAttribute, finalPtsCollection, - finalPersistentTimestamp)); + task.run(endpoint.getCamelContext(), + () -> processCollection(finalFromDate, usesTimestamp, persistsTimestamp, usesAttribute, finalPtsCollection, + finalPersistentTimestamp)); } private boolean processCollection( diff --git a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java index 666b61f4a1c87..4710b3ee9a913 100644 --- a/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java +++ b/components/camel-pgevent/src/main/java/org/apache/camel/component/pgevent/PgEventConsumer.java @@ -102,7 +102,7 @@ public class PgEventListener implements PGNotificationListener { public void reconnect() { // only submit the task if not already running if (!reconnectTask.isRunning()) { - reconnectTask.run(() -> { + reconnectTask.run(endpoint.getCamelContext(), () -> { if (isRunAllowed()) { LOG.debug("Connecting attempt #{}", reconnectTask.iteration()); try { diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java index 23723daf9a46a..e3ad104f1e1b7 100644 --- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java +++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/SimpleMessageListenerContainer.java @@ -223,7 +223,7 @@ protected void scheduleConnectionRecovery() { } if (recoverTask == null) { recoverTask = createTask(); - recoverFuture = recoverTask.schedule(() -> recoverConnection(recoverTask)); + recoverFuture = recoverTask.schedule(endpoint.getCamelContext(), () -> recoverConnection(recoverTask)); } } finally { connectionLock.unlock(); diff --git a/components/camel-smb/src/main/java/org/apache/camel/component/smb/strategy/SmbChangedExclusiveReadLockStrategy.java b/components/camel-smb/src/main/java/org/apache/camel/component/smb/strategy/SmbChangedExclusiveReadLockStrategy.java index a1e5da9b4135e..a1666b263ef52 100644 --- a/components/camel-smb/src/main/java/org/apache/camel/component/smb/strategy/SmbChangedExclusiveReadLockStrategy.java +++ b/components/camel-smb/src/main/java/org/apache/camel/component/smb/strategy/SmbChangedExclusiveReadLockStrategy.java @@ -44,8 +44,8 @@ public class SmbChangedExclusiveReadLockStrategy @Override public void prepareOnStartup( - GenericFileOperations tGenericFileOperations, - GenericFileEndpoint tGenericFileEndpoint) { + GenericFileOperations operations, + GenericFileEndpoint endpoint) { // noop } @@ -66,7 +66,7 @@ public boolean acquireExclusiveReadLock( SmbExclusiveReadLockCheck exclusiveReadLockCheck = new SmbExclusiveReadLockCheck(minAge, minLength); - if (!task.run(() -> exclusiveReadLockCheck.tryAcquireExclusiveReadLock(operations, file))) { + if (!task.run(exchange.getContext(), () -> exclusiveReadLockCheck.tryAcquireExclusiveReadLock(operations, file))) { CamelLogger.log(LOG, readLockLoggingLevel, "Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file); diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java index 0184cba8b00d4..d60eaf990adba 100644 --- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java +++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppConsumer.java @@ -193,7 +193,7 @@ private void reconnect() { configuration.getMaxReconnect()); try { - task.run(this::doReconnect); + task.run(getEndpoint().getCamelContext(), this::doReconnect); } finally { reconnectLock.unlock(); } diff --git a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java index 4e130ce783b70..ecac4c9598bdb 100644 --- a/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java +++ b/components/camel-smpp/src/main/java/org/apache/camel/component/smpp/SmppProducer.java @@ -212,7 +212,7 @@ private void reconnect() { configuration.getReconnectDelay(), configuration.getMaxReconnect()); try { - task.run(this::doReconnect); + task.run(getEndpoint().getCamelContext(), this::doReconnect); } finally { connectLock.unlock(); } diff --git a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java index 9ff39852988ec..7dae8133ccb77 100644 --- a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java +++ b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppConsumerTest.java @@ -190,6 +190,7 @@ public void internalSessionStateListenerShouldCloseSessionAndReconnect(SessionSt ""); when(session.connectAndBind("localhost", Integer.valueOf(2775), expectedBindParameter)) .thenReturn("1"); + when(endpoint.getCamelContext()).thenReturn(null); smppUtilsMock.when(() -> SmppUtils.newReconnectTask(any(), anyString(), anyLong(), anyLong(), anyInt())) .thenReturn(new BackgroundTask.BackgroundTaskBuilder().withScheduledExecutor(reconnectService) .withBudget(Budgets.timeBudget().build()).build()); diff --git a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppProducerTest.java b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppProducerTest.java index d855a13696616..c82e81fdce167 100644 --- a/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppProducerTest.java +++ b/components/camel-smpp/src/test/java/org/apache/camel/component/smpp/SmppProducerTest.java @@ -163,6 +163,7 @@ public void internalSessionStateListenerShouldCloseSessionAndReconnect(SessionSt when(session.connectAndBind("localhost", Integer.valueOf(2775), expectedBindParameters)) .thenReturn("1"); when(endpoint.isSingleton()).thenReturn(true); + when(endpoint.getCamelContext()).thenReturn(null); smppUtilsMock.when(() -> SmppUtils.newReconnectTask(any(), anyString(), anyLong(), anyLong(), anyInt())) .thenReturn(new BackgroundTask.BackgroundTaskBuilder().withScheduledExecutor(reconnectService) .withBudget(Budgets.timeBudget().build()).build()); diff --git a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java index 1e4c6795bbbaf..985bb4b6dfe1d 100644 --- a/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java +++ b/components/camel-splunk/src/main/java/org/apache/camel/component/splunk/support/SplunkDataReader.java @@ -56,8 +56,8 @@ public class SplunkDataReader { private static final String SPLUNK_TIME_FORMAT = "%m/%d/%y %H:%M:%S:%3N"; private transient Calendar lastSuccessfulReadTime; - private SplunkEndpoint endpoint; - private ConsumerType consumerType; + private final SplunkEndpoint endpoint; + private final ConsumerType consumerType; public SplunkDataReader(SplunkEndpoint endpoint, ConsumerType consumerType) { this.endpoint = endpoint; @@ -241,7 +241,7 @@ private void waitForJob(long interval, BooleanSupplier supplier) { .withMaxIterations(IterationBoundedBudget.UNLIMITED_ITERATIONS) .withInterval(Duration.ofMillis(interval)) .build()) - .build().run(supplier); + .build().run(endpoint.getCamelContext(), supplier); } private List nonBlockingSearch(SplunkResultProcessor callback) throws Exception { diff --git a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/cluster/ZooKeeperClusterView.java b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/cluster/ZooKeeperClusterView.java index 12b6114491a51..c276b152d26ae 100644 --- a/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/cluster/ZooKeeperClusterView.java +++ b/components/camel-zookeeper/src/main/java/org/apache/camel/component/zookeeper/cluster/ZooKeeperClusterView.java @@ -150,7 +150,7 @@ public void takeLeadership(CuratorFramework curatorFramework) throws Exception { .build()) .build(); - task.run(() -> !isRunAllowed()); + task.run(getCamelContext(), () -> !isRunAllowed()); fireLeadershipChangedEvent(getLeader().orElse(null)); } diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java index e1b7d7942513c..072d1775448c2 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/AbstractCamelContext.java @@ -190,6 +190,7 @@ import org.apache.camel.support.jsse.SSLContextParameters; import org.apache.camel.support.service.BaseService; import org.apache.camel.support.service.ServiceHelper; +import org.apache.camel.support.task.TaskManagerRegistry; import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.StopWatch; @@ -2244,6 +2245,9 @@ public void doBuild() throws Exception { startupStepRecorder.endStep(step4); } + // setup internal task registry + getCamelContextExtension().addContextPlugin(TaskManagerRegistry.class, createTaskManagerRegistry()); + // setup dev-console registry as its needed this early phase for 3rd party to register custom consoles DevConsoleRegistry dcr = getCamelContextExtension().getContextPlugin(DevConsoleRegistry.class); if (dcr == null) { @@ -4346,6 +4350,8 @@ protected abstract EndpointRegistry createEndpointRegistry( protected abstract BackOffTimerFactory createBackOffTimerFactory(); + protected abstract TaskManagerRegistry createTaskManagerRegistry(); + protected RestConfiguration createRestConfiguration() { // lookup a global which may have been on a container such spring-boot / CDI / etc. RestConfiguration conf diff --git a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java index 715557ebbc96f..28a49861e9967 100644 --- a/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java +++ b/core/camel-base-engine/src/main/java/org/apache/camel/impl/engine/SimpleCamelContext.java @@ -104,6 +104,8 @@ import org.apache.camel.support.scan.DefaultPackageScanResourceResolver; import org.apache.camel.support.scan.WebSpherePackageScanClassResolver; import org.apache.camel.support.startup.DefaultStartupConditionStrategy; +import org.apache.camel.support.task.DefaultTaskManagerRegistry; +import org.apache.camel.support.task.TaskManagerRegistry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -746,6 +748,11 @@ protected BackOffTimerFactory createBackOffTimerFactory() { return new DefaultBackOffTimerFactory(this); } + @Override + protected TaskManagerRegistry createTaskManagerRegistry() { + return new DefaultTaskManagerRegistry(this); + } + @Override protected TransformerRegistry createTransformerRegistry() { return new DefaultTransformerRegistry(getCamelContextReference()); diff --git a/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/backoff.json b/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/backoff.json deleted file mode 100644 index 8cae12a4e9204..0000000000000 --- a/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/backoff.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "console": { - "kind": "console", - "group": "camel", - "name": "backoff", - "title": "BackOff", - "description": "Display information about BackOff tasks", - "deprecated": false, - "javaType": "org.apache.camel.impl.console.BackOffDevConsole", - "groupId": "org.apache.camel", - "artifactId": "camel-console", - "version": "4.13.0-SNAPSHOT" - } -} - diff --git a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/protocol.json b/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/internal-tasks.json similarity index 50% rename from catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/protocol.json rename to core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/internal-tasks.json index cb3588457a9c1..e90a9fd8d10af 100644 --- a/catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/dev-consoles/protocol.json +++ b/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/internal-tasks.json @@ -2,11 +2,11 @@ "console": { "kind": "console", "group": "camel", - "name": "protocol", - "title": "Protocols", - "description": "Protocols used for network communication with clients", + "name": "internal-tasks", + "title": "Internal Tasks", + "description": "Display information about internal tasks", "deprecated": false, - "javaType": "org.apache.camel.impl.console.ProtocolDevConsole", + "javaType": "org.apache.camel.impl.console.TaskRegistryDevConsole", "groupId": "org.apache.camel", "artifactId": "camel-console", "version": "4.13.0-SNAPSHOT" diff --git a/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/protocol.json b/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/protocol.json deleted file mode 100644 index cb3588457a9c1..0000000000000 --- a/core/camel-console/src/generated/resources/META-INF/org/apache/camel/dev-console/protocol.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "console": { - "kind": "console", - "group": "camel", - "name": "protocol", - "title": "Protocols", - "description": "Protocols used for network communication with clients", - "deprecated": false, - "javaType": "org.apache.camel.impl.console.ProtocolDevConsole", - "groupId": "org.apache.camel", - "artifactId": "camel-console", - "version": "4.13.0-SNAPSHOT" - } -} - diff --git a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/backoff b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/backoff deleted file mode 100644 index 88b4c5ca27b71..0000000000000 --- a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/backoff +++ /dev/null @@ -1,2 +0,0 @@ -# Generated by camel build tools - do NOT edit this file! -class=org.apache.camel.impl.console.BackOffDevConsole diff --git a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/internal-tasks b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/internal-tasks new file mode 100644 index 0000000000000..2bb254b0dc43d --- /dev/null +++ b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-console/internal-tasks @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +class=org.apache.camel.impl.console.TaskRegistryDevConsole diff --git a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties index 1ff8974c70e99..728e2cb6dbff9 100644 --- a/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties +++ b/core/camel-console/src/generated/resources/META-INF/services/org/apache/camel/dev-consoles.properties @@ -1,5 +1,5 @@ # Generated by camel build tools - do NOT edit this file! -dev-consoles=backoff bean blocked browse circuit-breaker consumer context debug endpoint event gc health inflight java-security jvm log memory properties receive reload rest route route-controller route-dump send service source startup-recorder system-properties thread top trace transformers type-converters variables +dev-consoles=bean blocked browse circuit-breaker consumer context debug endpoint event gc health inflight internal-tasks java-security jvm log memory properties receive reload rest route route-controller route-dump send service source startup-recorder system-properties thread top trace transformers type-converters variables groupId=org.apache.camel artifactId=camel-console version=4.13.0-SNAPSHOT diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/BackOffDevConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/BackOffDevConsole.java deleted file mode 100644 index 4b8f03f3e26c3..0000000000000 --- a/core/camel-console/src/main/java/org/apache/camel/impl/console/BackOffDevConsole.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.impl.console; - -import java.util.Map; -import java.util.Set; - -import org.apache.camel.spi.annotations.DevConsole; -import org.apache.camel.support.console.AbstractDevConsole; -import org.apache.camel.util.backoff.BackOffTimer; -import org.apache.camel.util.json.JsonArray; -import org.apache.camel.util.json.JsonObject; - -@DevConsole(name = "backoff", displayName = "BackOff", description = "Display information about BackOff tasks") -public class BackOffDevConsole extends AbstractDevConsole { - - public BackOffDevConsole() { - super("camel", "backoff", "BackOff", "Display information about BackOff tasks"); - } - - @Override - protected String doCallText(Map options) { - StringBuilder sb = new StringBuilder(); - - Set timers = getCamelContext().hasServices(BackOffTimer.class); - for (BackOffTimer timer : timers) { - sb.append(String.format("\nTimer: %s", timer.getName())); - sb.append(String.format("\nTasks: %s", timer.size())); - int id = 0; - for (BackOffTimer.Task task : timer.getTasks()) { - String failure = task.getException() != null ? task.getException().getMessage() : ""; - sb.append(String.format( - "\n #%d (name=%s status=%s attempts=%d delay=%d elapsed=%d first=%d last=%d next=%d failure=%s", - id, task.getName(), task.getStatus().name(), task.getCurrentAttempts(), task.getCurrentDelay(), - task.getCurrentElapsedTime(), task.getFirstAttemptTime(), task.getLastAttemptTime(), - task.getNextAttemptTime(), failure)); - id++; - } - } - return sb.toString(); - } - - @Override - protected JsonObject doCallJson(Map options) { - JsonObject root = new JsonObject(); - JsonArray arr = new JsonArray(); - root.put("timers", arr); - - Set timers = getCamelContext().hasServices(BackOffTimer.class); - for (BackOffTimer timer : timers) { - JsonObject jo = new JsonObject(); - jo.put("name", timer.getName()); - jo.put("size", timer.size()); - arr.add(jo); - if (timer.size() > 0) { - JsonArray arr2 = new JsonArray(); - jo.put("tasks", arr2); - for (BackOffTimer.Task task : timer.getTasks()) { - String failure = task.getException() != null ? task.getException().getMessage() : ""; - JsonObject jo2 = new JsonObject(); - jo2.put("name", task.getName()); - jo2.put("status", task.getStatus().name()); - jo2.put("attempts", task.getCurrentAttempts()); - jo2.put("delay", task.getCurrentDelay()); - jo2.put("elapsed", task.getCurrentElapsedTime()); - jo2.put("firstTime", task.getFirstAttemptTime()); - jo2.put("lastTime", task.getLastAttemptTime()); - jo2.put("nextTime", task.getNextAttemptTime()); - if (failure != null) { - jo2.put("error", failure); - } - arr2.add(jo2); - } - } - } - return root; - } - -} diff --git a/core/camel-console/src/main/java/org/apache/camel/impl/console/TaskRegistryDevConsole.java b/core/camel-console/src/main/java/org/apache/camel/impl/console/TaskRegistryDevConsole.java new file mode 100644 index 0000000000000..f1ea140a4e8f9 --- /dev/null +++ b/core/camel-console/src/main/java/org/apache/camel/impl/console/TaskRegistryDevConsole.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.console; + +import java.util.Map; + +import org.apache.camel.spi.annotations.DevConsole; +import org.apache.camel.support.PluginHelper; +import org.apache.camel.support.console.AbstractDevConsole; +import org.apache.camel.support.task.Task; +import org.apache.camel.support.task.TaskManagerRegistry; +import org.apache.camel.util.json.JsonArray; +import org.apache.camel.util.json.JsonObject; + +@DevConsole(name = "internal-tasks", displayName = "Internal Tasks", description = "Display information about internal tasks") +public class TaskRegistryDevConsole extends AbstractDevConsole { + + public TaskRegistryDevConsole() { + super("camel", "internal-tasks", "Internal Tasks", "Display information about internal tasks"); + } + + @Override + protected String doCallText(Map options) { + StringBuilder sb = new StringBuilder(); + + TaskManagerRegistry reg = PluginHelper.getTaskManagerRegistry(getCamelContext().getCamelContextExtension()); + sb.append(String.format("\nTasks: %s", reg.getSize())); + int id = 0; + for (Task task : reg.getTasks()) { + String failure = task.getException() != null ? task.getException().getMessage() : ""; + sb.append(String.format( + "\n #%d (name=%s status=%s attempts=%d delay=%d elapsed=%d first=%d last=%d next=%d failure=%s", + id, task.getName(), task.getStatus().name(), task.iteration(), task.getCurrentDelay(), + task.getCurrentElapsedTime(), task.getFirstAttemptTime(), task.getLastAttemptTime(), + task.getNextAttemptTime(), failure)); + id++; + } + return sb.toString(); + } + + @Override + protected JsonObject doCallJson(Map options) { + JsonObject root = new JsonObject(); + JsonArray arr = new JsonArray(); + root.put("tasks", arr); + + TaskManagerRegistry reg = PluginHelper.getTaskManagerRegistry(getCamelContext().getCamelContextExtension()); + for (Task task : reg.getTasks()) { + JsonObject jo = new JsonObject(); + jo.put("name", task.getName()); + jo.put("status", task.getStatus().name()); + jo.put("attempts", task.iteration()); + jo.put("delay", task.getCurrentDelay()); + jo.put("elapsed", task.getCurrentElapsedTime()); + jo.put("firstTime", task.getFirstAttemptTime()); + jo.put("lastTime", task.getLastAttemptTime()); + jo.put("nextTime", task.getNextAttemptTime()); + String failure = task.getException() != null ? task.getException().getMessage() : ""; + if (failure != null) { + jo.put("error", failure); + } + arr.add(jo); + } + return root; + } + +} diff --git a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java b/core/camel-core/src/test/java/org/apache/camel/support/task/task/BackgroundIterationTimeTaskTest.java similarity index 89% rename from core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java rename to core/camel-core/src/test/java/org/apache/camel/support/task/task/BackgroundIterationTimeTaskTest.java index d2f4b22886d37..9c3c5cab064b1 100644 --- a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundIterationTimeTaskTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/support/task/task/BackgroundIterationTimeTaskTest.java @@ -14,12 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.camel.support.task; +package org.apache.camel.support.task.task; import java.time.Duration; import java.util.concurrent.Executors; +import org.apache.camel.support.task.BackgroundTask; +import org.apache.camel.support.task.Tasks; import org.apache.camel.support.task.budget.Budgets; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -47,7 +48,7 @@ void testRunNoMoreSupplier() { .build()) .build(); - boolean completed = task.run(this::booleanSupplier); + boolean completed = task.run(camelContext, this::booleanSupplier); assertEquals(3, taskCount.intValue()); assertFalse(completed, "The task did not complete, the return should be false"); } @@ -66,7 +67,7 @@ void testRunNoMoreSupplierWithDelay() { .build()) .build(); - boolean completed = task.run(this::booleanSupplier); + boolean completed = task.run(camelContext, this::booleanSupplier); assertTrue(maxIterations > taskCount.intValue(), "The task execution should not exceed the max iterations"); assertFalse(completed, "The task did not complete, the return should be false"); } @@ -84,7 +85,7 @@ void testRunNoMorePredicate() { .build()) .build(); - boolean completed = task.run(this::taskPredicate, new Object()); + boolean completed = task.run(camelContext, this::taskPredicate, new Object()); assertEquals(3, taskCount.intValue()); assertFalse(completed, "The task did not complete, the return should be false"); } @@ -104,7 +105,7 @@ void testRunNoMorePredicateDeterministic() { .build()) .build(); - boolean completed = task.run(this::taskPredicateWithDeterministicStop, Integer.valueOf(3)); + boolean completed = task.run(camelContext, this::taskPredicateWithDeterministicStop, Integer.valueOf(3)); assertEquals(3, taskCount.intValue()); assertTrue(completed, "The task did complete, the return should be true"); } @@ -123,7 +124,7 @@ void testRunNoMorePredicateDeterministicSlow() { .build()) .build(); - boolean completed = task.run(this::taskPredicateWithDeterministicStopSlow, Integer.valueOf(3)); + boolean completed = task.run(camelContext, this::taskPredicateWithDeterministicStopSlow, Integer.valueOf(3)); assertTrue(taskCount.intValue() < maxIterations); assertFalse(completed, "The task did not complete because of timeout, the return should be false"); } @@ -142,7 +143,7 @@ void testRunNoMorePredicateDeterministicSlowWithDelay() { .build()) .build(); - boolean completed = task.run(this::taskPredicateWithDeterministicStopSlow, Integer.valueOf(3)); + boolean completed = task.run(camelContext, this::taskPredicateWithDeterministicStopSlow, Integer.valueOf(3)); assertTrue(taskCount.intValue() < maxIterations); assertFalse(completed, "The task did not complete because of timeout, the return should be false"); } @@ -160,7 +161,7 @@ void testRunNoMoreBooleanSupplierWithForever() { .build()) .build(); - boolean completed = task.run(this::taskPredicateWithDeterministicStop, 4); + boolean completed = task.run(camelContext, this::taskPredicateWithDeterministicStop, 4); assertTrue(maxIterations > taskCount.intValue(), "The task execution should not exceed the max iterations"); assertTrue(completed, "The task did not complete, the return should be false"); } diff --git a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundTaskTest.java b/core/camel-core/src/test/java/org/apache/camel/support/task/task/BackgroundTaskTest.java similarity index 90% rename from core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundTaskTest.java rename to core/camel-core/src/test/java/org/apache/camel/support/task/task/BackgroundTaskTest.java index b7734efaa9c09..e61ff91c32f7c 100644 --- a/core/camel-support/src/test/java/org/apache/camel/support/task/BackgroundTaskTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/support/task/task/BackgroundTaskTest.java @@ -14,12 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.camel.support.task; +package org.apache.camel.support.task.task; import java.time.Duration; import java.util.concurrent.Executors; +import org.apache.camel.support.task.BackgroundTask; +import org.apache.camel.support.task.Tasks; import org.apache.camel.support.task.budget.Budgets; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -57,7 +58,7 @@ void testRunNoMoreSupplier() { .build()) .build(); - boolean completed = task.run(this::booleanSupplier); + boolean completed = task.run(camelContext, this::booleanSupplier); assertTrue(taskCount.intValue() <= maxIterations); assertFalse(completed, "The task did not complete, the return should be false"); @@ -74,7 +75,7 @@ void testRunNoMoreSupplier() { @Timeout(10) void testRunNoMoreSupplierWithDelay() { /* - * It should run at most 4 times in 4 seconds because: + * It should run approx most 4 times in 4 seconds because: * 1) there is a delay. * 2) the interval is of 1 second */ @@ -87,8 +88,8 @@ void testRunNoMoreSupplierWithDelay() { .build()) .build(); - boolean completed = task.run(this::booleanSupplier); - assertTrue((maxIterations - 1) <= taskCount.intValue()); + boolean completed = task.run(camelContext, this::booleanSupplier); + assertTrue(taskCount.intValue() < maxIterations, "number of runs: " + taskCount.intValue()); assertFalse(completed, "The task did not complete, the return should be false"); Duration duration = task.elapsed(); @@ -117,7 +118,7 @@ void testRunNoMorePredicate() { .build()) .build(); - boolean completed = task.run(this::taskPredicate, new Object()); + boolean completed = task.run(camelContext, this::taskPredicate, new Object()); assertTrue(taskCount.intValue() <= maxIterations); assertFalse(completed, "The task did not complete, the return should be false"); @@ -146,7 +147,7 @@ void testRunNoMorePredicateWithSuccess() { .build()) .build(); - boolean completed = task.run(this::taskPredicateWithDeterministicStop, Integer.valueOf(3)); + boolean completed = task.run(camelContext, this::taskPredicateWithDeterministicStop, Integer.valueOf(3)); assertEquals(3, taskCount.intValue()); assertTrue(completed, "The task did complete, the return should be true"); } @@ -168,7 +169,7 @@ void testRunNoMorePredicateWithTimeout() { .build()) .build(); - boolean completed = task.run(this::taskPredicateWithDeterministicStopSlow, Integer.valueOf(3)); + boolean completed = task.run(camelContext, this::taskPredicateWithDeterministicStopSlow, Integer.valueOf(3)); assertTrue(taskCount.intValue() <= 2, "Slow task: it should not run more than 2 times in 4 seconds"); Duration duration = task.elapsed(); @@ -198,7 +199,7 @@ void testRunNoMorePredicateWithTimeoutAndDelay() { .build()) .build(); - boolean completed = task.run(this::taskPredicateWithDeterministicStopSlow, Integer.valueOf(3)); + boolean completed = task.run(camelContext, this::taskPredicateWithDeterministicStopSlow, Integer.valueOf(3)); Duration duration = task.elapsed(); assertNotNull(duration); assertFalse(duration.isNegative()); diff --git a/core/camel-support/src/test/java/org/apache/camel/support/task/ForegroundTaskTest.java b/core/camel-core/src/test/java/org/apache/camel/support/task/task/ForegroundTaskTest.java similarity index 89% rename from core/camel-support/src/test/java/org/apache/camel/support/task/ForegroundTaskTest.java rename to core/camel-core/src/test/java/org/apache/camel/support/task/task/ForegroundTaskTest.java index 7c31afb160b77..aa61b98c28751 100644 --- a/core/camel-support/src/test/java/org/apache/camel/support/task/ForegroundTaskTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/support/task/task/ForegroundTaskTest.java @@ -14,11 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.camel.support.task; +package org.apache.camel.support.task.task; import java.time.Duration; +import org.apache.camel.support.task.ForegroundTask; +import org.apache.camel.support.task.Tasks; import org.apache.camel.support.task.budget.Budgets; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -33,7 +34,7 @@ public class ForegroundTaskTest extends TaskTestSupport { @DisplayName("Test that the task does not run for more than the max iterations when using a supplier") @Test - // @Timeout(10) + @Timeout(10) void testRunNoMoreSupplier() { // It should run 5 times in 4 seconds because there is no delay ForegroundTask task = Tasks.foregroundTask() @@ -44,7 +45,7 @@ void testRunNoMoreSupplier() { .build()) .build(); - task.run(this::booleanSupplier); + task.run(camelContext, this::booleanSupplier); assertEquals(maxIterations, taskCount.intValue()); Duration duration = task.elapsed(); @@ -66,7 +67,7 @@ void testRunNoMoreSupplierWithDelay() { .build()) .build(); - task.run(this::booleanSupplier); + task.run(camelContext, this::booleanSupplier); assertEquals(maxIterations, taskCount.intValue()); } @@ -83,7 +84,7 @@ void testRunNoMorePredicate() { .build()) .build(); - task.run(this::taskPredicate, new Object()); + task.run(camelContext, this::taskPredicate, new Object()); assertEquals(maxIterations, taskCount.intValue()); } @@ -100,7 +101,7 @@ void testRunNoMorePredicateDeterministic() { .build()) .build(); - task.run(this::taskPredicateWithDeterministicStop, Integer.valueOf(3)); + task.run(camelContext, this::taskPredicateWithDeterministicStop, Integer.valueOf(3)); assertEquals(3, taskCount.intValue()); } @@ -117,7 +118,7 @@ void testRunNoMorePredicateDeterministicSlow() { .build()) .build(); - task.run(this::taskPredicateWithDeterministicStopSlow, Integer.valueOf(3)); + task.run(camelContext, this::taskPredicateWithDeterministicStopSlow, Integer.valueOf(3)); assertTrue(taskCount.intValue() < maxIterations); } @@ -134,7 +135,7 @@ void testRunNoMorePredicateDeterministicSlowWithDelay() { .build()) .build(); - task.run(this::taskPredicateWithDeterministicStopSlow, Integer.valueOf(3)); + task.run(camelContext, this::taskPredicateWithDeterministicStopSlow, Integer.valueOf(3)); assertTrue(taskCount.intValue() < maxIterations); } } diff --git a/core/camel-support/src/test/java/org/apache/camel/support/task/ForegroundTimeTaskTest.java b/core/camel-core/src/test/java/org/apache/camel/support/task/task/ForegroundTimeTaskTest.java similarity index 91% rename from core/camel-support/src/test/java/org/apache/camel/support/task/ForegroundTimeTaskTest.java rename to core/camel-core/src/test/java/org/apache/camel/support/task/task/ForegroundTimeTaskTest.java index e6b9a2bd9fa6a..09561838cbab5 100644 --- a/core/camel-support/src/test/java/org/apache/camel/support/task/ForegroundTimeTaskTest.java +++ b/core/camel-core/src/test/java/org/apache/camel/support/task/task/ForegroundTimeTaskTest.java @@ -14,11 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.camel.support.task; +package org.apache.camel.support.task.task; import java.time.Duration; +import org.apache.camel.support.task.ForegroundTask; +import org.apache.camel.support.task.Tasks; import org.apache.camel.support.task.budget.Budgets; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -45,7 +46,7 @@ void testRunNoMoreSupplier() { .build()) .build(); - task.run(this::booleanSupplier); + task.run(camelContext, this::booleanSupplier); assertEquals(maxIterations, taskCount.intValue()); Duration duration = task.elapsed(); @@ -69,7 +70,7 @@ void testRunNoMoreSupplierWithDelay() { .build()) .build(); - task.run(this::booleanSupplier); + task.run(camelContext, this::booleanSupplier); assertEquals(maxIterations, taskCount.intValue()); } @@ -87,7 +88,7 @@ void testRunNoMorePredicate() { .build()) .build(); - task.run(this::taskPredicate, new Object()); + task.run(camelContext, this::taskPredicate, new Object()); assertEquals(maxIterations, taskCount.intValue()); } @@ -105,7 +106,7 @@ void testRunNoMorePredicateMaxDuration() { .build()) .build(); - task.run(this::taskPredicate, new Object()); + task.run(camelContext, this::taskPredicate, new Object()); assertEquals(maxIterations, taskCount.intValue()); } @@ -123,7 +124,7 @@ void testRunNoMorePredicateDeterministic() { .build()) .build(); - task.run(this::taskPredicateWithDeterministicStop, 3); + task.run(camelContext, this::taskPredicateWithDeterministicStop, 3); assertEquals(3, taskCount.intValue()); } @@ -141,7 +142,7 @@ void testRunNoMorePredicateDeterministicSlow() { .build()) .build(); - task.run(this::taskPredicateWithDeterministicStopSlow, 3); + task.run(camelContext, this::taskPredicateWithDeterministicStopSlow, 3); assertTrue(taskCount.intValue() < maxIterations); } @@ -159,7 +160,7 @@ void testRunNoMorePredicateDeterministicSlowWithDelay() { .build()) .build(); - task.run(this::taskPredicateWithDeterministicStopSlow, 3); + task.run(camelContext, this::taskPredicateWithDeterministicStopSlow, 3); assertTrue(taskCount.intValue() < maxIterations); } } diff --git a/core/camel-support/src/test/java/org/apache/camel/support/task/TaskTestSupport.java b/core/camel-core/src/test/java/org/apache/camel/support/task/task/TaskTestSupport.java similarity index 91% rename from core/camel-support/src/test/java/org/apache/camel/support/task/TaskTestSupport.java rename to core/camel-core/src/test/java/org/apache/camel/support/task/task/TaskTestSupport.java index 3144ba231613b..4f53bbeccd711 100644 --- a/core/camel-support/src/test/java/org/apache/camel/support/task/TaskTestSupport.java +++ b/core/camel-core/src/test/java/org/apache/camel/support/task/task/TaskTestSupport.java @@ -14,11 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.camel.support.task; +package org.apache.camel.support.task.task; import java.util.concurrent.atomic.LongAdder; +import org.apache.camel.CamelContext; +import org.apache.camel.impl.DefaultCamelContext; import org.junit.jupiter.api.BeforeEach; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -26,6 +27,7 @@ public class TaskTestSupport { protected final int maxIterations = 5; protected final LongAdder taskCount = new LongAdder(); + protected CamelContext camelContext = new DefaultCamelContext(); protected boolean booleanSupplier() { taskCount.increment(); diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java index bdc658ca33383..48e83e07b00aa 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/CamelOpenMBeanTypes.java @@ -327,7 +327,7 @@ public static TabularType camelVariablesTabularType() throws OpenDataException { return new TabularType("variables", "Variables", ct, new String[] { "id", "key" }); } - public static TabularType listBackoffTTaskTabularType() throws OpenDataException { + public static TabularType listBackoffTaskTabularType() throws OpenDataException { CompositeType ct = listBackoffTaskCompositeType(); return new TabularType( "listBackoff", "Lists all the backoff tasks", ct, @@ -338,12 +338,35 @@ public static CompositeType listBackoffTaskCompositeType() throws OpenDataExcept return new CompositeType( "tasks", "Tasks", new String[] { - "name", "status", "attempts", "delay", "elapsed", "firstTime", "lastTime", "nextTime", "failure" }, + "name", "kind", "status", "attempts", "delay", "elapsed", "firstTime", "lastTime", "nextTime", + "failure" }, new String[] { - "Name", "Status", "Attempts", "Delay", "Elapsed", "FirstTime", "LastTime", "NextTime", "Failure" }, + "Name", "Kind", "Status", "Attempts", "Delay", "Elapsed", "FirstTime", "LastTime", "NextTime", + "Failure" }, new OpenType[] { - SimpleType.STRING, SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, SimpleType.LONG, - SimpleType.LONG, SimpleType.LONG, SimpleType.LONG, SimpleType.STRING }); + SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, + SimpleType.LONG, SimpleType.LONG, SimpleType.LONG, SimpleType.LONG, SimpleType.STRING }); + } + + public static TabularType listInternalTaskTabularType() throws OpenDataException { + CompositeType ct = listInternalTaskCompositeType(); + return new TabularType( + "listTask", "Lists all the internal tasks", ct, + new String[] { "name" }); + } + + public static CompositeType listInternalTaskCompositeType() throws OpenDataException { + return new CompositeType( + "tasks", "Tasks", + new String[] { + "name", "kind", "status", "attempts", "delay", "elapsed", "firstTime", "lastTime", "nextTime", + "failure" }, + new String[] { + "Name", "Kind", "Status", "Attempts", "Delay", "Elapsed", "FirstTime", "LastTime", "NextTime", + "Failure" }, + new OpenType[] { + SimpleType.STRING, SimpleType.STRING, SimpleType.STRING, SimpleType.LONG, SimpleType.LONG, + SimpleType.LONG, SimpleType.LONG, SimpleType.LONG, SimpleType.LONG, SimpleType.STRING }); } } diff --git a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBackoffTimerMBean.java b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedTaskManagerRegistryMBean.java similarity index 90% rename from core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBackoffTimerMBean.java rename to core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedTaskManagerRegistryMBean.java index 0d0812a08bad2..be378bd0ba4bd 100644 --- a/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedBackoffTimerMBean.java +++ b/core/camel-management-api/src/main/java/org/apache/camel/api/management/mbean/ManagedTaskManagerRegistryMBean.java @@ -21,10 +21,7 @@ import org.apache.camel.api.management.ManagedAttribute; import org.apache.camel.api.management.ManagedOperation; -public interface ManagedBackoffTimerMBean { - - @ManagedAttribute(description = "Name of the backoff timer") - String getName(); +public interface ManagedTaskManagerRegistryMBean { @ManagedAttribute(description = "Number of total tasks") Integer getSize(); diff --git a/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java b/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java index 9661bb1dfd615..e3fdb23c22166 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/JmxManagementLifecycleStrategy.java @@ -54,7 +54,6 @@ import org.apache.camel.management.mbean.ManagedAsyncProcessorAwaitManager; import org.apache.camel.management.mbean.ManagedBacklogDebugger; import org.apache.camel.management.mbean.ManagedBacklogTracer; -import org.apache.camel.management.mbean.ManagedBackoffTimer; import org.apache.camel.management.mbean.ManagedBeanIntrospection; import org.apache.camel.management.mbean.ManagedCamelContext; import org.apache.camel.management.mbean.ManagedConsumerCache; @@ -71,6 +70,7 @@ import org.apache.camel.management.mbean.ManagedService; import org.apache.camel.management.mbean.ManagedShutdownStrategy; import org.apache.camel.management.mbean.ManagedStreamCachingStrategy; +import org.apache.camel.management.mbean.ManagedTaskManagerRegistry; import org.apache.camel.management.mbean.ManagedThrottlingExceptionRoutePolicy; import org.apache.camel.management.mbean.ManagedThrottlingInflightRoutePolicy; import org.apache.camel.management.mbean.ManagedTracer; @@ -115,11 +115,11 @@ import org.apache.camel.spi.ValidatorRegistry; import org.apache.camel.support.TimerListenerManager; import org.apache.camel.support.service.ServiceSupport; +import org.apache.camel.support.task.TaskManagerRegistry; import org.apache.camel.throttling.ThrottlingExceptionRoutePolicy; import org.apache.camel.throttling.ThrottlingInflightRoutePolicy; import org.apache.camel.util.KeyValueHolder; import org.apache.camel.util.ObjectHelper; -import org.apache.camel.util.backoff.BackOffTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -579,8 +579,8 @@ else if (service instanceof TransformerRegistry transformerRegistry) { answer = new ManagedValidatorRegistry(context, validatorRegistry); } else if (service instanceof BrowsableVariableRepository variableRepository) { answer = new ManagedVariableRepository(context, variableRepository); - } else if (service instanceof BackOffTimer timer) { - answer = new ManagedBackoffTimer(camelContext, timer); + } else if (service instanceof TaskManagerRegistry registry) { + answer = new ManagedTaskManagerRegistry(camelContext, registry); } else if (service instanceof CamelClusterService camelClusterService) { answer = getManagementObjectStrategy().getManagedObjectForClusterService(context, camelClusterService); } else if (service != null) { diff --git a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBackoffTimer.java b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedTaskManagerRegistry.java similarity index 65% rename from core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBackoffTimer.java rename to core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedTaskManagerRegistry.java index 8e2f35753f93a..96d529312f041 100644 --- a/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedBackoffTimer.java +++ b/core/camel-management/src/main/java/org/apache/camel/management/mbean/ManagedTaskManagerRegistry.java @@ -16,8 +16,6 @@ */ package org.apache.camel.management.mbean; -import java.util.Set; - import javax.management.openmbean.CompositeData; import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.CompositeType; @@ -26,54 +24,50 @@ import org.apache.camel.CamelContext; import org.apache.camel.RuntimeCamelException; -import org.apache.camel.Service; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.api.management.mbean.CamelOpenMBeanTypes; -import org.apache.camel.api.management.mbean.ManagedBackoffTimerMBean; -import org.apache.camel.util.backoff.BackOffTimer; - -@ManagedResource(description = "Managed BackoffTimer") -public class ManagedBackoffTimer extends ManagedService implements ManagedBackoffTimerMBean { +import org.apache.camel.api.management.mbean.ManagedTaskManagerRegistryMBean; +import org.apache.camel.support.task.BackgroundTask; +import org.apache.camel.support.task.Task; +import org.apache.camel.support.task.TaskManagerRegistry; - private final BackOffTimer timer; +@ManagedResource(description = "Managed TaskManagerRegistry") +public class ManagedTaskManagerRegistry extends ManagedService implements ManagedTaskManagerRegistryMBean { - public ManagedBackoffTimer(CamelContext context, BackOffTimer timer) { - super(context, (Service) timer); - this.timer = timer; - } + private final TaskManagerRegistry registry; - @Override - public String getName() { - return timer.getName(); + public ManagedTaskManagerRegistry(CamelContext context, TaskManagerRegistry registry) { + super(context, registry); + this.registry = registry; } @Override public Integer getSize() { - return timer.size(); + return registry.getSize(); } @Override public TabularData listTasks() { try { - TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.listBackoffTTaskTabularType()); - Set tasks = timer.getTasks(); - for (BackOffTimer.Task task : tasks) { + TabularData answer = new TabularDataSupport(CamelOpenMBeanTypes.listInternalTaskTabularType()); + for (Task task : registry.getTasks()) { String name = task.getName(); + String kind = task instanceof BackgroundTask ? "background" : "foreground"; String status = task.getStatus().name(); - long attempts = task.getCurrentAttempts(); + long attempts = task.iteration(); long delay = task.getCurrentDelay(); long elapsed = task.getCurrentElapsedTime(); long firstTime = task.getFirstAttemptTime(); long lastTime = task.getLastAttemptTime(); long nextTime = task.getNextAttemptTime(); String failure = task.getException() != null ? task.getException().getMessage() : null; - CompositeType ct = CamelOpenMBeanTypes.listBackoffTaskCompositeType(); + CompositeType ct = CamelOpenMBeanTypes.listInternalTaskCompositeType(); CompositeData data = new CompositeDataSupport( ct, new String[] { - "name", "status", "attempts", "delay", "elapsed", "firstTime", "lastTime", "nextTime", + "name", "kind", "status", "attempts", "delay", "elapsed", "firstTime", "lastTime", "nextTime", "failure" }, - new Object[] { name, status, attempts, delay, elapsed, firstTime, lastTime, nextTime, failure }); + new Object[] { name, kind, status, attempts, delay, elapsed, firstTime, lastTime, nextTime, failure }); answer.put(data); } return answer; diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedBackOffTimerTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedBackOffTimerTest.java deleted file mode 100644 index 0aae5ca2d9c6c..0000000000000 --- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedBackOffTimerTest.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.management; - -import java.util.ArrayList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; - -import javax.management.MBeanServer; -import javax.management.ObjectName; -import javax.management.openmbean.TabularData; - -import org.apache.camel.support.PluginHelper; -import org.apache.camel.support.service.ServiceHelper; -import org.apache.camel.util.backoff.BackOff; -import org.apache.camel.util.backoff.BackOffTimer; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.DisabledOnOs; -import org.junit.jupiter.api.condition.OS; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -@DisabledOnOs(OS.AIX) -public class ManagedBackOffTimerTest extends ManagementTestSupport { - - @Override - public boolean isUseRouteBuilder() { - return false; - } - - @Test - public void testManageBackOffTimer() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - final AtomicInteger counter = new AtomicInteger(); - - BackOffTimer timer = PluginHelper.getBackOffTimerFactory(context.getCamelContextExtension()) - .newBackOffTimer("Cheese"); - ServiceHelper.startService(timer); - - context.start(); - - // get the bean introspection for the route - MBeanServer mbeanServer = getMBeanServer(); - Set set = mbeanServer.queryNames(new ObjectName("*:type=services,*"), null); - List list = new ArrayList<>(set); - ObjectName on = null; - for (ObjectName name : list) { - if (name.getCanonicalName().contains("DefaultBackOffTimer")) { - on = name; - break; - } - } - - assertNotNull(on, "Should have found DefaultBackOffTimer"); - - String name = (String) mbeanServer.getAttribute(on, "Name"); - assertEquals("Cheese", name); - - final BackOff backOff = BackOff.builder().delay(100).removeOnComplete(false).build(); - final AtomicLong first = new AtomicLong(); - - BackOffTimer.Task task = timer.schedule( - backOff, - context -> { - assertEquals(counter.incrementAndGet(), context.getCurrentAttempts()); - assertEquals(100, context.getCurrentDelay()); - assertEquals(100L * counter.get(), context.getCurrentElapsedTime()); - if (first.get() == 0) { - first.set(context.getFirstAttemptTime()); - } else { - assertEquals(first.get(), context.getFirstAttemptTime()); - } - - return counter.get() < 5; - }); - - task.whenComplete( - (context, throwable) -> { - assertEquals(5, counter.get()); - latch.countDown(); - }); - - latch.await(5, TimeUnit.SECONDS); - - Integer size = (Integer) mbeanServer.getAttribute(on, "Size"); - assertEquals(1, size); - - TabularData data = (TabularData) mbeanServer.invoke(on, "listTasks", null, null); - assertEquals(1, data.size()); - - ServiceHelper.stopService(timer); - } - -} diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java index 1f7e4e722cb71..c29db8af285d2 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedNonManagedServiceTest.java @@ -34,7 +34,7 @@ @DisabledOnOs(OS.AIX) public class ManagedNonManagedServiceTest extends ManagementTestSupport { - private static final int SERVICES = 16; + private static final int SERVICES = 17; @Test public void testService() throws Exception { diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java index d2509bdaa891a..0ff9794d08f0d 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedProducerRouteAddRemoveRegisterAlwaysTest.java @@ -36,7 +36,7 @@ @DisabledOnOs(OS.AIX) public class ManagedProducerRouteAddRemoveRegisterAlwaysTest extends ManagementTestSupport { - private static final int SERVICES = 16; + private static final int SERVICES = 17; @Override protected CamelContext createCamelContext() throws Exception { diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java index 0dc8926c0e754..1256143c2d604 100644 --- a/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedRouteAddRemoveTest.java @@ -40,7 +40,7 @@ @DisabledOnOs(OS.AIX) public class ManagedRouteAddRemoveTest extends ManagementTestSupport { - private static final int SERVICES = 16; + private static final int SERVICES = 17; @Override protected RouteBuilder createRouteBuilder() { diff --git a/core/camel-management/src/test/java/org/apache/camel/management/ManagedTaskRegistryTest.java b/core/camel-management/src/test/java/org/apache/camel/management/ManagedTaskRegistryTest.java new file mode 100644 index 0000000000000..b5a3e72b940f3 --- /dev/null +++ b/core/camel-management/src/test/java/org/apache/camel/management/ManagedTaskRegistryTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.openmbean.TabularData; + +import org.apache.camel.support.task.BackgroundTask; +import org.apache.camel.support.task.Tasks; +import org.apache.camel.support.task.budget.Budgets; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +@DisabledOnOs(OS.AIX) +public class ManagedTaskRegistryTest extends ManagementTestSupport { + + @Test + public void testManageTaskRegistry() throws Exception { + MBeanServer mbeanServer = getMBeanServer(); + Set set = mbeanServer.queryNames(new ObjectName("*:type=services,*"), null); + List list = new ArrayList<>(set); + ObjectName on = null; + for (ObjectName name : list) { + if (name.getCanonicalName().contains("DefaultTaskManagerRegistry")) { + on = name; + break; + } + } + assertNotNull(on, "Should have found DefaultTaskManagerRegistry"); + + Integer size = (Integer) mbeanServer.getAttribute(on, "Size"); + assertEquals(0, size); + + BackgroundTask task = Tasks.backgroundTask() + .withScheduledExecutor(Executors.newSingleThreadScheduledExecutor()) + .withBudget(Budgets.timeBudget() + .withInterval(Duration.ofMillis(100)) + .withMaxDuration(Duration.ofSeconds(5)) + .build()) + .build(); + + task.schedule(context, () -> false); + + final ObjectName oon = on; + Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).untilAsserted(() -> { + Integer size2 = (Integer) mbeanServer.getAttribute(oon, "Size"); + assertEquals(1, size2); + + TabularData data = (TabularData) mbeanServer.invoke(oon, "listTasks", null, null); + assertEquals(1, data.size()); + }); + } + +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java index 2c2c776cc78b3..aae3c7c8536b7 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/PluginHelper.java @@ -55,6 +55,7 @@ import org.apache.camel.spi.RoutesLoader; import org.apache.camel.spi.UnitOfWorkFactory; import org.apache.camel.spi.UriFactoryResolver; +import org.apache.camel.support.task.TaskManagerRegistry; /** * Convenient helper to get easy access to various extensions from {@link ExtendedCamelContext}. @@ -584,4 +585,12 @@ public static BackOffTimerFactory getBackOffTimerFactory( ExtendedCamelContext extendedCamelContext) { return extendedCamelContext.getContextPlugin(BackOffTimerFactory.class); } + + /** + * Gets the {@link TaskManagerRegistry} to use. + */ + public static TaskManagerRegistry getTaskManagerRegistry(ExtendedCamelContext extendedCamelContext) { + return extendedCamelContext.getContextPlugin(TaskManagerRegistry.class); + } + } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/AbstractTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/AbstractTask.java new file mode 100644 index 0000000000000..f613ccc9ed622 --- /dev/null +++ b/core/camel-support/src/main/java/org/apache/camel/support/task/AbstractTask.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support.task; + +abstract class AbstractTask implements Task { + + static final long NEVER = -1L; + + final String name; + Status status; + long firstAttemptTime; + long lastAttemptTime; + long nextAttemptTime; + Throwable cause; + + public AbstractTask(String name) { + this.name = name; + this.status = Status.Active; + this.firstAttemptTime = NEVER; + this.lastAttemptTime = NEVER; + this.nextAttemptTime = NEVER; + } + + @Override + public String getName() { + return name; + } + + @Override + public Status getStatus() { + return status; + } + + @Override + public long getFirstAttemptTime() { + return firstAttemptTime; + } + + @Override + public long getLastAttemptTime() { + return lastAttemptTime; + } + + @Override + public long getNextAttemptTime() { + return nextAttemptTime; + } + + @Override + public long getCurrentElapsedTime() { + if (firstAttemptTime > 0) { + return System.currentTimeMillis() - firstAttemptTime; + } + return NEVER; + } + + @Override + public Throwable getException() { + return cause; + } +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java index ff829d31ec1ca..bbbb6eb677075 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BackgroundTask.java @@ -26,16 +26,19 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BooleanSupplier; +import org.apache.camel.CamelContext; +import org.apache.camel.support.PluginHelper; import org.apache.camel.support.task.budget.TimeBoundedBudget; import org.apache.camel.support.task.budget.TimeBudget; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A sleepless blocking task that runs in a Thread the background. The execution is blocked until the task budget is - * exhausted. All background tasks are constrained by a time budget. + * A sleepless blocking task that runs in a thread in the background (using a scheduled thread pool). The execution is + * processed until the task budget is either completed, or exhausted. All background tasks are constrained by a time + * budget. */ -public class BackgroundTask implements BlockingTask { +public class BackgroundTask extends AbstractTask implements BlockingTask { /** * A builder helper for building new background tasks @@ -74,61 +77,85 @@ public BackgroundTask build() { private final TimeBudget budget; private final ScheduledExecutorService service; - private final String name; private final CountDownLatch latch = new CountDownLatch(1); private Duration elapsed = Duration.ZERO; private final AtomicBoolean running = new AtomicBoolean(); private final AtomicBoolean completed = new AtomicBoolean(); BackgroundTask(TimeBudget budget, ScheduledExecutorService service, String name) { + super(name); this.budget = budget; this.service = Objects.requireNonNull(service); - this.name = name; } - private void runTaskWrapper(BooleanSupplier supplier) { + private void runTaskWrapper(CamelContext camelContext, BooleanSupplier supplier) { LOG.trace("Current latch value: {}", latch.getCount()); if (latch.getCount() == 0) { return; } + TaskManagerRegistry registry = null; + if (camelContext != null) { + registry = PluginHelper.getTaskManagerRegistry(camelContext.getCamelContextExtension()); + registry.addTask(this); + } if (!budget.next()) { - LOG.warn("The task {} does not have more budget to continue running", name); + LOG.warn("The task {} does not have more budget to continue running", getName()); + status = Status.Exhausted; completed.set(false); + if (registry != null) { + registry.removeTask(this); + } latch.countDown(); return; } - if (supplier.getAsBoolean()) { - completed.set(true); - latch.countDown(); - LOG.trace("Task {} succeeded and the current task is unscheduled: {}", name, latch.getCount()); + lastAttemptTime = System.currentTimeMillis(); + if (firstAttemptTime < 0) { + firstAttemptTime = lastAttemptTime; + } + try { + if (supplier.getAsBoolean()) { + status = Status.Completed; + completed.set(true); + if (registry != null) { + registry.removeTask(this); + } + latch.countDown(); + LOG.trace("Task {} succeeded and the current task is unscheduled: {}", getName(), latch.getCount()); + } + } catch (Exception e) { + status = Status.Failed; + cause = e; + throw e; } + nextAttemptTime = lastAttemptTime + budget.interval(); } /** * Schedules the task to be run * - * @param supplier the task as a boolean supplier. The result is used to check if the task has completed or not. - * The supplier must return true if the execution has completed or false otherwise. - * @return a future for the task + * @param camelContext the camel context + * @param supplier the task as a boolean supplier. The result is used to check if the task has completed or + * not. The supplier must return true if the execution has completed or false otherwise. + * @return a future for the task */ - public Future schedule(BooleanSupplier supplier) { + public Future schedule(CamelContext camelContext, BooleanSupplier supplier) { running.set(true); - return service.scheduleAtFixedRate(() -> runTaskWrapper(supplier), budget.initialDelay(), + return service.scheduleWithFixedDelay(() -> runTaskWrapper(camelContext, supplier), budget.initialDelay(), budget.interval(), TimeUnit.MILLISECONDS); } @Override - public boolean run(BooleanSupplier supplier) { + public boolean run(CamelContext camelContext, BooleanSupplier supplier) { running.set(true); - Future task = service.scheduleAtFixedRate(() -> runTaskWrapper(supplier), budget.initialDelay(), + Future task = service.scheduleWithFixedDelay(() -> runTaskWrapper(camelContext, supplier), budget.initialDelay(), budget.interval(), TimeUnit.MILLISECONDS); - waitForTaskCompletion(task); + waitForTaskCompletion(camelContext, task); return completed.get(); } - private void waitForTaskCompletion(Future task) { + private void waitForTaskCompletion(CamelContext camelContext, Future task) { try { // We need it to be cancellable/non-runnable after reaching a certain point, and it needs to be deterministic. // This is why we ignore the ScheduledFuture returned and implement the go/no-go using a latch. @@ -142,6 +169,14 @@ private void waitForTaskCompletion(Future task) { } } + TaskManagerRegistry registry = null; + if (camelContext != null) { + registry = PluginHelper.getTaskManagerRegistry(camelContext.getCamelContextExtension()); + } + if (registry != null) { + registry.removeTask(this); + } + task.cancel(true); } catch (InterruptedException e) { LOG.warn("Interrupted while waiting for the repeatable task to execute: {}", e.getMessage(), e); @@ -166,4 +201,10 @@ public Duration elapsed() { public int iteration() { return budget.iteration(); } + + @Override + public long getCurrentDelay() { + return budget.interval(); + } + } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java index b07f05d5eb844..8f19a2b003b44 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/task/BlockingTask.java @@ -19,6 +19,8 @@ import java.util.function.BooleanSupplier; import java.util.function.Predicate; +import org.apache.camel.CamelContext; + /** * Defines a task that blocks the code execution when ran. The task under execution must be thread-safe. */ @@ -27,28 +29,30 @@ public interface BlockingTask extends Task { /** * Run the task * - * @param predicate the task as a predicate. The result of the predicate is used to check if the task has completed - * or not. The predicate must return true if the execution has completed or false otherwise. - * Failures on the task should be handled on the predicate using the payload as wrapper for In/Out - * if necessary - * @param payload a payload to be passed to the task - * @param The type of the payload passed to the predicate when testing the task - * @return true if the task has completed successfully or false if: 1) the budget is exhausted or 2) the - * task was interrupted. + * @param camelContext the camel context + * @param predicate the task as a predicate. The result of the predicate is used to check if the task has + * completed or not. The predicate must return true if the execution has completed or false + * otherwise. Failures on the task should be handled on the predicate using the payload as + * wrapper for In/Out if necessary + * @param payload a payload to be passed to the task + * @param The type of the payload passed to the predicate when testing the task + * @return true if the task has completed successfully or false if: 1) the budget is exhausted or 2) + * the task was interrupted. */ - default boolean run(Predicate predicate, T payload) { - return this.run(() -> predicate.test(payload)); + default boolean run(CamelContext camelContext, Predicate predicate, T payload) { + return this.run(camelContext, () -> predicate.test(payload)); } /** * Run the task * - * @param supplier the task as a boolean supplier. The result is used to check if the task has completed or not. - * The supplier must return true if the execution has completed or false otherwise. - * @return true if the task has completed successfully or false if: 1) the budget is exhausted or 2) the - * task was interrupted. + * @param camelContext the camel context + * @param supplier the task as a boolean supplier. The result is used to check if the task has completed or + * not. The supplier must return true if the execution has completed or false otherwise. + * @return true if the task has completed successfully or false if: 1) the budget is exhausted or 2) + * the task was interrupted. */ - boolean run(BooleanSupplier supplier); + boolean run(CamelContext camelContext, BooleanSupplier supplier); /** * Whether the task has been submitted for running (the state of the task can be waiting for next run etc). diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/DefaultTaskManagerRegistry.java b/core/camel-support/src/main/java/org/apache/camel/support/task/DefaultTaskManagerRegistry.java new file mode 100644 index 0000000000000..31636891e0fc0 --- /dev/null +++ b/core/camel-support/src/main/java/org/apache/camel/support/task/DefaultTaskManagerRegistry.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support.task; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; + +import org.apache.camel.CamelContext; +import org.apache.camel.support.service.ServiceSupport; + +/** + * Default {@link TaskManagerRegistry}. + */ +public class DefaultTaskManagerRegistry extends ServiceSupport implements TaskManagerRegistry { + + private final CamelContext camelContext; + private final Set tasks = new CopyOnWriteArraySet<>(); + + public DefaultTaskManagerRegistry(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + tasks.clear(); + } + + @Override + public void addTask(Task task) { + tasks.add(task); + } + + @Override + public void removeTask(Task task) { + tasks.remove(task); + } + + @Override + public int getSize() { + return tasks.size(); + } + + @Override + public Set getTasks() { + return Collections.unmodifiableSet(tasks); + } +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/ForegroundTask.java b/core/camel-support/src/main/java/org/apache/camel/support/task/ForegroundTask.java index 24a0d08aa29b7..e7df83511c1bf 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/task/ForegroundTask.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/task/ForegroundTask.java @@ -23,6 +23,8 @@ import java.util.function.Predicate; import java.util.function.Supplier; +import org.apache.camel.CamelContext; +import org.apache.camel.support.PluginHelper; import org.apache.camel.support.task.budget.IterationBudget; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,7 +32,7 @@ /** * Runs a task in the foreground, executing for a given number of iteration and sleeping between each of them. */ -public class ForegroundTask implements BlockingTask { +public class ForegroundTask extends AbstractTask implements BlockingTask { /** * A builder helper for building new foreground tasks @@ -71,44 +73,65 @@ public ForegroundTask build() { private static final Logger LOG = LoggerFactory.getLogger(ForegroundTask.class); - private final String name; private final IterationBudget budget; private Duration elapsed = Duration.ZERO; private final AtomicBoolean running = new AtomicBoolean(); ForegroundTask(IterationBudget budget, String name) { + super(name); this.budget = budget; - this.name = name; } @Override - public boolean run(BooleanSupplier supplier) { + public boolean run(CamelContext camelContext, BooleanSupplier supplier) { running.set(true); boolean completed = false; + TaskManagerRegistry registry = null; + if (camelContext != null) { + registry = PluginHelper.getTaskManagerRegistry(camelContext.getCamelContextExtension()); + } + if (registry != null) { + registry.addTask(this); + } try { if (budget.initialDelay() > 0) { Thread.sleep(budget.initialDelay()); } while (budget.next()) { + lastAttemptTime = System.currentTimeMillis(); + if (firstAttemptTime < 0) { + firstAttemptTime = lastAttemptTime; + } + nextAttemptTime = lastAttemptTime + budget.interval(); if (supplier.getAsBoolean()) { LOG.debug("Task {} is complete after {} iterations and it is ready to continue", - name, budget.iteration()); + getName(), budget.iteration()); + status = Status.Completed; completed = true; break; } if (budget.canContinue()) { Thread.sleep(budget.interval()); + } else { + status = Status.Exhausted; } } } catch (InterruptedException e) { - LOG.warn("Interrupted {} while waiting for the repeatable task to finish", name); + LOG.warn("Interrupted {} while waiting for the repeatable task to finish", getName()); Thread.currentThread().interrupt(); + } catch (Exception e) { + status = Status.Failed; + cause = e; + throw e; } finally { elapsed = budget.elapsed(); running.set(false); + if (registry != null) { + registry.removeTask(this); + } } return completed; @@ -117,36 +140,58 @@ public boolean run(BooleanSupplier supplier) { /** * Run a task until it produces a result * - * @param supplier the supplier of the result - * @param predicate a predicate to test if the result is acceptable - * @param the type for the result - * @return An optional with the result + * @param camelContext the camel context + * @param supplier the supplier of the result + * @param predicate a predicate to test if the result is acceptable + * @param the type for the result + * @return An optional with the result */ - public Optional run(Supplier supplier, Predicate predicate) { + public Optional run(CamelContext camelContext, Supplier supplier, Predicate predicate) { running.set(true); + TaskManagerRegistry registry = null; + if (camelContext != null) { + registry = PluginHelper.getTaskManagerRegistry(camelContext.getCamelContextExtension()); + } + if (registry != null) { + registry.addTask(this); + } try { if (budget.initialDelay() > 0) { Thread.sleep(budget.initialDelay()); } while (budget.next()) { + lastAttemptTime = System.currentTimeMillis(); + if (firstAttemptTime < 0) { + firstAttemptTime = lastAttemptTime; + } T ret = supplier.get(); if (predicate.test(ret)) { LOG.debug("Task {} is complete after {} iterations and it is ready to continue", - name, budget.iteration()); + getName(), budget.iteration()); + status = Status.Completed; return Optional.ofNullable(ret); } + nextAttemptTime = lastAttemptTime + budget.interval(); if (budget.canContinue()) { Thread.sleep(budget.interval()); + } else { + status = Status.Exhausted; } } } catch (InterruptedException e) { - LOG.warn("Interrupted {} while waiting for the repeatable task to finish", name); + LOG.warn("Interrupted {} while waiting for the repeatable task to finish", getName()); Thread.currentThread().interrupt(); + } catch (Exception e) { + status = Status.Failed; + cause = e; } finally { elapsed = budget.elapsed(); running.set(false); + if (registry != null) { + registry.removeTask(this); + } } return Optional.empty(); @@ -166,4 +211,10 @@ public Duration elapsed() { public int iteration() { return budget.iteration(); } + + @Override + public long getCurrentDelay() { + return budget.interval(); + } + } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/Task.java b/core/camel-support/src/main/java/org/apache/camel/support/task/Task.java index e980ac321f930..5f74bf5b0f4d9 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/task/Task.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/task/Task.java @@ -24,8 +24,26 @@ */ public interface Task { + enum Status { + Active, + Inactive, + Exhausted, + Completed, + Failed + } + + /** + * Optional name of the task + */ + String getName(); + + /** + * Gets the task status. + */ + Status getStatus(); + /** - * How long it took to run the task + * How long it took to run the task when the task was completed * * @return The duration to execute the task */ @@ -38,4 +56,34 @@ public interface Task { */ int iteration(); + /** + * The current computed delay. + */ + long getCurrentDelay(); + + /** + * The current elapsed time. + */ + long getCurrentElapsedTime(); + + /** + * The time the first attempt was performed. + */ + long getFirstAttemptTime(); + + /** + * The time the last attempt has been performed. + */ + long getLastAttemptTime(); + + /** + * An indication about the time the next attempt will be made. + */ + long getNextAttemptTime(); + + /** + * The task failed for some un-expected exception + */ + Throwable getException(); + } diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/TaskManagerRegistry.java b/core/camel-support/src/main/java/org/apache/camel/support/task/TaskManagerRegistry.java new file mode 100644 index 0000000000000..9a6124046266f --- /dev/null +++ b/core/camel-support/src/main/java/org/apache/camel/support/task/TaskManagerRegistry.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.support.task; + +import java.util.Set; + +import org.apache.camel.StaticService; + +/** + * A registry for current tasks (foreground and background) that has been submitted for running. + */ +public interface TaskManagerRegistry extends StaticService { + + /** + * Adds the task + */ + void addTask(Task task); + + /** + * Removes the task + */ + void removeTask(Task task); + + /** + * Number of tasks + */ + int getSize(); + + /** + * Gets the current list of tasks + */ + Set getTasks(); + +} diff --git a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/Budget.java b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/Budget.java index 1b38048617b09..3154704bda6eb 100644 --- a/core/camel-support/src/main/java/org/apache/camel/support/task/budget/Budget.java +++ b/core/camel-support/src/main/java/org/apache/camel/support/task/budget/Budget.java @@ -31,7 +31,8 @@ public interface Budget { long initialDelay(); /** - * The interval between each task execution + * The interval between each task execution (delay between the termination of one execution and the commencement of + * the next). * * @return the interval, in milliseconds, for each task execution */ diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_13.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_13.adoc index 3f6532ee1f303..de6cc78f5e669 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_13.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_13.adoc @@ -26,6 +26,10 @@ The `BackOffTimer` is mostly used internally in Camel components to conduct task be repeated until completed, such as recovery tasks. And as such this refactor is not expected to impact Camel end users. +Added `CamelContext` as first parameter to the `run` method(s) in `org.apache.camel.support.task.BlockingTask`. +The `org.apache.camel.support.task.BackgroundTask` is changed to use a fixed delay between runs, which +is similar to `BackOffTimer`. + === camel-file / camel-ftp / camel-smb / camel-azure-files When using `poll` or `pollEnrich` with the file based components, then the `eagerLimitMaxMessagesPerPoll` option diff --git a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java index 7ee908eabb6d2..625d0f27d8cf9 100644 --- a/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java +++ b/dsl/camel-cli-connector/src/main/java/org/apache/camel/cli/connector/LocalCliConnector.java @@ -1058,11 +1058,11 @@ protected void statusTask() { root.put("receive", json); } } - DevConsole dc24 = dcr.resolveById("backoff"); + DevConsole dc24 = dcr.resolveById("internal-tasks"); if (dc24 != null) { JsonObject json = (JsonObject) dc24.call(DevConsole.MediaType.JSON); if (json != null && !json.isEmpty()) { - root.put("backoff", json); + root.put("internal-tasks", json); } } } diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java index 3d628dfa5f4bd..b702a14ca30f3 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/CamelJBangMain.java @@ -108,7 +108,7 @@ public static void run(CamelJBangMain main, String... args) { .addSubcommand("event", new CommandLine(new ListEvent(main))) .addSubcommand("inflight", new CommandLine(new ListInflight(main))) .addSubcommand("blocked", new CommandLine(new ListBlocked(main))) - .addSubcommand("backoff", new CommandLine(new ListBackOff(main))) + .addSubcommand("internal-task", new CommandLine(new ListInternalTask(main))) .addSubcommand("bean", new CommandLine(new CamelBeanDump(main))) .addSubcommand("route-controller", new CommandLine(new RouteControllerAction(main))) .addSubcommand("transformer", new CommandLine(new ListTransformer(main))) diff --git a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListBackOff.java b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListInternalTask.java similarity index 80% rename from dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListBackOff.java rename to dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListInternalTask.java index 3e17c8a378e43..67a8164b6ca4e 100644 --- a/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListBackOff.java +++ b/dsl/camel-jbang/camel-jbang-core/src/main/java/org/apache/camel/dsl/jbang/core/commands/process/ListInternalTask.java @@ -33,9 +33,9 @@ import picocli.CommandLine; import picocli.CommandLine.Command; -@Command(name = "backoff", - description = "Get back-off tasks of Camel integrations", sortOptions = false, showDefaultValues = true) -public class ListBackOff extends ProcessWatchCommand { +@Command(name = "internal-tasks", + description = "List internal tasks of Camel integrations", sortOptions = false, showDefaultValues = true) +public class ListInternalTask extends ProcessWatchCommand { @CommandLine.Parameters(description = "Name or pid of running Camel integration", arity = "0..1") String name = "*"; @@ -44,7 +44,7 @@ public class ListBackOff extends ProcessWatchCommand { description = "Sort by pid, name or age", defaultValue = "pid") String sort; - public ListBackOff(CamelJBangMain main) { + public ListInternalTask(CamelJBangMain main) { super(main); } @@ -72,28 +72,22 @@ public Integer doProcessWatchCall() throws Exception { row.uptime = extractSince(ph); row.age = TimeUtils.printSince(row.uptime); - JsonObject jo = (JsonObject) root.get("backoff"); + JsonObject jo = (JsonObject) root.get("internal-tasks"); if (jo != null) { - JsonArray arr = (JsonArray) jo.get("timers"); - if (arr != null) { - for (int i = 0; i < arr.size(); i++) { - jo = (JsonObject) arr.get(i); - JsonArray arr2 = (JsonArray) jo.get("tasks"); - for (int j = 0; j < arr2.size(); j++) { - jo = (JsonObject) arr2.get(j); - row = row.copy(); - row.task = jo.getString("name"); - row.status = jo.getString("status"); - row.attempts = jo.getLong("attempts"); - row.delay = jo.getLong("delay"); - row.elapsed = jo.getLong("elapsed"); - row.firstTime = jo.getLong("firstTime"); - row.lastTime = jo.getLong("lastTime"); - row.nextTime = jo.getLong("nextTime"); - row.error = jo.getString("error"); - rows.add(row); - } - } + JsonArray arr = (JsonArray) jo.get("tasks"); + for (int i = 0; i < arr.size(); i++) { + jo = (JsonObject) arr.get(i); + row = row.copy(); + row.task = jo.getString("name"); + row.status = jo.getString("status"); + row.attempts = jo.getLong("attempts"); + row.delay = jo.getLong("delay"); + row.elapsed = jo.getLong("elapsed"); + row.firstTime = jo.getLong("firstTime"); + row.lastTime = jo.getLong("lastTime"); + row.nextTime = jo.getLong("nextTime"); + row.error = jo.getString("error"); + rows.add(row); } } }