Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
aws-secrets
aws2-s3
azure-secrets
backoff
bean
bean-model
blocked
Expand All @@ -18,6 +17,7 @@ gcp-secrets
hashicorp-secrets
health
inflight
internal-tasks
java-security
jvm
kafka
Expand All @@ -30,7 +30,6 @@ memory
micrometer
platform-http
properties
protocol
quartz
receive
reload
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
"console": {
"kind": "console",
"group": "camel",
"name": "backoff",
"title": "BackOff",
"description": "Display information about BackOff tasks",
"name": "internal-tasks",
"title": "Internal Tasks",
"description": "Display information about internal tasks",
"deprecated": false,
"javaType": "org.apache.camel.impl.console.BackOffDevConsole",
"javaType": "org.apache.camel.impl.console.TaskRegistryDevConsole",
"groupId": "org.apache.camel",
"artifactId": "camel-console",
"version": "4.13.0-SNAPSHOT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ protected void setupResources() throws Exception {
final ForegroundTask task = Tasks.foregroundTask()
.withBudget(budget).build();

final boolean cacheCreated = task.run(this::createCache);
final boolean cacheCreated = task.run(null, this::createCache);
Assumptions.assumeTrue(cacheCreated, "The container cache is not running healthily");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ private void waitForTableToBecomeAvailable(String tableName) {
.build())
.build();

if (!task.run(this::waitForTable, tableName)) {
if (!task.run(getCamelContext(), this::waitForTable, tableName)) {
throw new RuntimeCamelException("Table " + tableName + " never went active");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public boolean acquireExclusiveReadLock(
FilesExclusiveReadLockCheck exclusiveReadLockCheck
= new FilesExclusiveReadLockCheck(fastExistsCheck, minAge, minLength);

if (!task.run(() -> exclusiveReadLockCheck.tryAcquireExclusiveReadLock(operations, file))) {
if (!task.run(exchange.getContext(), () -> exclusiveReadLockCheck.tryAcquireExclusiveReadLock(operations, file))) {
CamelLogger.log(LOG, readLockLoggingLevel,
"Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ protected boolean doConnect(RemoteFileConfiguration configuration, Exchange exch

TaskPayload payload = new TaskPayload(configuration);

if (!task.run(this::tryConnect, payload)) {
if (!task.run(endpoint.getCamelContext(), this::tryConnect, payload)) {
if (exchange != null) {
exchange.getIn().setHeader(FtpConstants.FTP_REPLY_CODE, client.getReplyCode());
exchange.getIn().setHeader(FtpConstants.FTP_REPLY_STRING, client.getReplyString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public boolean connect(RemoteFileConfiguration configuration, Exchange exchange)

TaskPayload payload = new TaskPayload(configuration);

if (!task.run(this::tryConnect, payload)) {
if (!task.run(endpoint.getCamelContext(), this::tryConnect, payload)) {
throw new GenericFileOperationFailedException(
"Cannot connect to " + configuration.remoteServerInformation(),
payload.exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public boolean acquireExclusiveReadLock(

ExclusiveReadLockCheck exclusiveReadLockCheck = new ExclusiveReadLockCheck(fastExistsCheck, minAge, minLength);

if (!task.run(() -> exclusiveReadLockCheck.tryAcquireExclusiveReadLock(operations, file))) {
if (!task.run(exchange.getContext(), () -> exclusiveReadLockCheck.tryAcquireExclusiveReadLock(operations, file))) {
CamelLogger.log(LOG, readLockLoggingLevel,
"Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ protected void setupResources() throws Exception {
final ForegroundTask task = Tasks.foregroundTask()
.withBudget(budget).build();

final boolean cacheCreated = task.run(this::createCache);
final boolean cacheCreated = task.run(null, this::createCache);
Assumptions.assumeTrue(cacheCreated, "The container cache is not running healthily");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public Destination getReplyTo() {
.withInterval(Duration.ofMillis(interval))
.build())
.build();
boolean done = task.run(() -> {
boolean done = task.run(camelContext, () -> {
log.trace("Waiting for replyTo to be ready: {}", replyTo != null);
return replyTo != null;
});
Expand Down Expand Up @@ -249,7 +249,7 @@ protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String correlation
.build())
.build();

return task.run(() -> getReplyHandler(correlationID), Objects::nonNull).orElse(null);
return task.run(camelContext, () -> getReplyHandler(correlationID), Objects::nonNull).orElse(null);
}

private ReplyHandler getReplyHandler(String correlationID) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public void run() {
.withInterval(Duration.ofMillis(currentBackoffInterval))
.build())
.build();
boolean success = task.run(this::createConsumerTask);
boolean success = task.run(kafkaConsumer.getEndpoint().getCamelContext(), this::createConsumerTask);
if (!success) {
int max = kafkaConsumer.getEndpoint().getComponent().getCreateConsumerBackoffMaxAttempts();
setupCreateConsumerException(task, max);
Expand All @@ -168,7 +168,7 @@ public void run() {
.withInterval(Duration.ofMillis(currentBackoffInterval))
.build())
.build();
success = task.run(this::initializeConsumerTask);
success = task.run(kafkaConsumer.getEndpoint().getCamelContext(), this::initializeConsumerTask);
if (!success) {
int max = kafkaConsumer.getEndpoint().getComponent().getSubscribeConsumerBackoffMaxAttempts();
setupInitializeErrorException(task, max);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private void onLeadershipTaken() throws Exception {
}

final BackgroundTask leaderTask = createTask();
leaderTask.run(() -> {
leaderTask.run(getEndpoint().getCamelContext(), () -> {
if (!isRunAllowed()) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ public class TcpServerBindThread extends Thread {
private final SSLContextParameters sslContextParameters;

public TcpServerBindThread(MllpTcpServerConsumer consumer, final SSLContextParameters sslParams) {

this.consumer = consumer;
this.sslContextParameters = sslParams;

Expand Down Expand Up @@ -107,7 +106,7 @@ private void doAccept(ServerSocket serverSocket, InetSocketAddress socketAddress
.withName("mllp-tcp-server-accept")
.build();

if (task.run(() -> doBind(serverSocket, socketAddress))) {
if (task.run(consumer.getEndpoint().getCamelContext(), () -> doBind(serverSocket, socketAddress))) {
consumer.startAcceptThread(serverSocket);
} else {
log.error("Failed to bind to address {} within timeout {}", socketAddress,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,9 @@ public void run() {
MongoCollection<Document> finalPtsCollection = ptsCollection;
Date finalFromDate = fromDate;
Document finalPersistentTimestamp = persistentTimestamp;
task.run(() -> processCollection(finalFromDate, usesTimestamp, persistsTimestamp, usesAttribute, finalPtsCollection,
finalPersistentTimestamp));
task.run(endpoint.getCamelContext(),
() -> processCollection(finalFromDate, usesTimestamp, persistsTimestamp, usesAttribute, finalPtsCollection,
finalPersistentTimestamp));
}

private boolean processCollection(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public class PgEventListener implements PGNotificationListener {
public void reconnect() {
// only submit the task if not already running
if (!reconnectTask.isRunning()) {
reconnectTask.run(() -> {
reconnectTask.run(endpoint.getCamelContext(), () -> {
if (isRunAllowed()) {
LOG.debug("Connecting attempt #{}", reconnectTask.iteration());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ protected void scheduleConnectionRecovery() {
}
if (recoverTask == null) {
recoverTask = createTask();
recoverFuture = recoverTask.schedule(() -> recoverConnection(recoverTask));
recoverFuture = recoverTask.schedule(endpoint.getCamelContext(), () -> recoverConnection(recoverTask));
}
} finally {
connectionLock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public class SmbChangedExclusiveReadLockStrategy

@Override
public void prepareOnStartup(
GenericFileOperations<FileIdBothDirectoryInformation> tGenericFileOperations,
GenericFileEndpoint<FileIdBothDirectoryInformation> tGenericFileEndpoint) {
GenericFileOperations<FileIdBothDirectoryInformation> operations,
GenericFileEndpoint<FileIdBothDirectoryInformation> endpoint) {
// noop
}

Expand All @@ -66,7 +66,7 @@ public boolean acquireExclusiveReadLock(

SmbExclusiveReadLockCheck exclusiveReadLockCheck = new SmbExclusiveReadLockCheck(minAge, minLength);

if (!task.run(() -> exclusiveReadLockCheck.tryAcquireExclusiveReadLock(operations, file))) {
if (!task.run(exchange.getContext(), () -> exclusiveReadLockCheck.tryAcquireExclusiveReadLock(operations, file))) {
CamelLogger.log(LOG, readLockLoggingLevel,
"Cannot acquire read lock within " + timeout + " millis. Will skip the file: " + file);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ private void reconnect() {
configuration.getMaxReconnect());

try {
task.run(this::doReconnect);
task.run(getEndpoint().getCamelContext(), this::doReconnect);
} finally {
reconnectLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private void reconnect() {
configuration.getReconnectDelay(), configuration.getMaxReconnect());

try {
task.run(this::doReconnect);
task.run(getEndpoint().getCamelContext(), this::doReconnect);
} finally {
connectLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ public void internalSessionStateListenerShouldCloseSessionAndReconnect(SessionSt
"");
when(session.connectAndBind("localhost", Integer.valueOf(2775), expectedBindParameter))
.thenReturn("1");
when(endpoint.getCamelContext()).thenReturn(null);
smppUtilsMock.when(() -> SmppUtils.newReconnectTask(any(), anyString(), anyLong(), anyLong(), anyInt()))
.thenReturn(new BackgroundTask.BackgroundTaskBuilder().withScheduledExecutor(reconnectService)
.withBudget(Budgets.timeBudget().build()).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public void internalSessionStateListenerShouldCloseSessionAndReconnect(SessionSt
when(session.connectAndBind("localhost", Integer.valueOf(2775), expectedBindParameters))
.thenReturn("1");
when(endpoint.isSingleton()).thenReturn(true);
when(endpoint.getCamelContext()).thenReturn(null);
smppUtilsMock.when(() -> SmppUtils.newReconnectTask(any(), anyString(), anyLong(), anyLong(), anyInt()))
.thenReturn(new BackgroundTask.BackgroundTaskBuilder().withScheduledExecutor(reconnectService)
.withBudget(Budgets.timeBudget().build()).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public class SplunkDataReader {
private static final String SPLUNK_TIME_FORMAT = "%m/%d/%y %H:%M:%S:%3N";

private transient Calendar lastSuccessfulReadTime;
private SplunkEndpoint endpoint;
private ConsumerType consumerType;
private final SplunkEndpoint endpoint;
private final ConsumerType consumerType;

public SplunkDataReader(SplunkEndpoint endpoint, ConsumerType consumerType) {
this.endpoint = endpoint;
Expand Down Expand Up @@ -241,7 +241,7 @@ private void waitForJob(long interval, BooleanSupplier supplier) {
.withMaxIterations(IterationBoundedBudget.UNLIMITED_ITERATIONS)
.withInterval(Duration.ofMillis(interval))
.build())
.build().run(supplier);
.build().run(endpoint.getCamelContext(), supplier);
}

private List<SplunkEvent> nonBlockingSearch(SplunkResultProcessor callback) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
.build())
.build();

task.run(() -> !isRunAllowed());
task.run(getCamelContext(), () -> !isRunAllowed());

fireLeadershipChangedEvent(getLeader().orElse(null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@
import org.apache.camel.support.jsse.SSLContextParameters;
import org.apache.camel.support.service.BaseService;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.support.task.TaskManagerRegistry;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StopWatch;
Expand Down Expand Up @@ -2244,6 +2245,9 @@ public void doBuild() throws Exception {
startupStepRecorder.endStep(step4);
}

// setup internal task registry
getCamelContextExtension().addContextPlugin(TaskManagerRegistry.class, createTaskManagerRegistry());

// setup dev-console registry as its needed this early phase for 3rd party to register custom consoles
DevConsoleRegistry dcr = getCamelContextExtension().getContextPlugin(DevConsoleRegistry.class);
if (dcr == null) {
Expand Down Expand Up @@ -4346,6 +4350,8 @@ protected abstract EndpointRegistry createEndpointRegistry(

protected abstract BackOffTimerFactory createBackOffTimerFactory();

protected abstract TaskManagerRegistry createTaskManagerRegistry();

protected RestConfiguration createRestConfiguration() {
// lookup a global which may have been on a container such spring-boot / CDI / etc.
RestConfiguration conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@
import org.apache.camel.support.scan.DefaultPackageScanResourceResolver;
import org.apache.camel.support.scan.WebSpherePackageScanClassResolver;
import org.apache.camel.support.startup.DefaultStartupConditionStrategy;
import org.apache.camel.support.task.DefaultTaskManagerRegistry;
import org.apache.camel.support.task.TaskManagerRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -746,6 +748,11 @@ protected BackOffTimerFactory createBackOffTimerFactory() {
return new DefaultBackOffTimerFactory(this);
}

@Override
protected TaskManagerRegistry createTaskManagerRegistry() {
return new DefaultTaskManagerRegistry(this);
}

@Override
protected TransformerRegistry createTransformerRegistry() {
return new DefaultTransformerRegistry(getCamelContextReference());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
"console": {
"kind": "console",
"group": "camel",
"name": "protocol",
"title": "Protocols",
"description": "Protocols used for network communication with clients",
"name": "internal-tasks",
"title": "Internal Tasks",
"description": "Display information about internal tasks",
"deprecated": false,
"javaType": "org.apache.camel.impl.console.ProtocolDevConsole",
"javaType": "org.apache.camel.impl.console.TaskRegistryDevConsole",
"groupId": "org.apache.camel",
"artifactId": "camel-console",
"version": "4.13.0-SNAPSHOT"
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Generated by camel build tools - do NOT edit this file!
class=org.apache.camel.impl.console.TaskRegistryDevConsole
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Generated by camel build tools - do NOT edit this file!
dev-consoles=backoff bean blocked browse circuit-breaker consumer context debug endpoint event gc health inflight java-security jvm log memory properties receive reload rest route route-controller route-dump send service source startup-recorder system-properties thread top trace transformers type-converters variables
dev-consoles=bean blocked browse circuit-breaker consumer context debug endpoint event gc health inflight internal-tasks java-security jvm log memory properties receive reload rest route route-controller route-dump send service source startup-recorder system-properties thread top trace transformers type-converters variables
groupId=org.apache.camel
artifactId=camel-console
version=4.13.0-SNAPSHOT
Expand Down
Loading
Loading