Skip to content

Commit

Permalink
Improve naming of threads used in batch source
Browse files Browse the repository at this point in the history
  • Loading branch information
Jerry Peng committed Nov 18, 2020
1 parent 72d6d28 commit e667582
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,19 @@ public class BatchSourceExecutor<T> implements Source<T> {
private String intermediateTopicName;
private volatile Exception currentError = null;
private volatile boolean isRunning = false;
private ExecutorService discoveryThread = Executors.newSingleThreadExecutor(new DefaultThreadFactory("batch-source-discovery"));
private ExecutorService discoveryThread;

@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
this.config = config;
this.sourceContext = sourceContext;
this.intermediateTopicName = SourceConfigUtils.computeBatchSourceIntermediateTopicName(sourceContext.getTenant(),
sourceContext.getNamespace(), sourceContext.getSourceName()).toString();
this.discoveryThread = Executors.newSingleThreadExecutor(
new DefaultThreadFactory(
String.format("%s-batch-source-discovery",
FunctionCommon.getFullyQualifiedName(
sourceContext.getTenant(), sourceContext.getNamespace(), sourceContext.getSourceName()))));
this.getBatchSourceConfigs(config);
this.initializeBatchSource();
this.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,16 @@ public void init(Map<String, Object> config, SourceContext sourceContext) {
} else {
throw new IllegalArgumentException("Cron Trigger is not provided with Cron String");
}
scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix(String.format("%s/%s/%s-cron-triggerer-",
sourceContext.getTenant(), sourceContext.getNamespace(), sourceContext.getSourceName()));

log.info("Initialized CronTrigger with expression: {}", cronExpression);
}

@Override
public void start(Consumer<String> trigger) {
scheduler = new ThreadPoolTaskScheduler();

scheduler.initialize();
scheduler.schedule(() -> trigger.accept("CRON"), new CronTrigger(cronExpression));
}
Expand Down

0 comments on commit e667582

Please sign in to comment.