Skip to content

Commit

Permalink
chore: extract async task lib
Browse files Browse the repository at this point in the history
  • Loading branch information
ClemDoum committed May 13, 2024
1 parent 2d90dae commit d66f5ee
Show file tree
Hide file tree
Showing 113 changed files with 1,882 additions and 1,317 deletions.
29 changes: 8 additions & 21 deletions datashare-app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@
<version>${datashare-cli.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.icij.datashare</groupId>
<artifactId>datashare-tasks</artifactId>
<version>${datashare-tasks.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
Expand All @@ -95,26 +101,6 @@
<artifactId>fluent-swagger-apigen</artifactId>
<version>0.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-core</artifactId>
<version>9.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-plugins-amqp-0-8-protocol</artifactId>
<version>9.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-broker-plugins-memory-store</artifactId>
<version>9.0.0</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.18.0</version>
</dependency>

<!-- Logging -->
<dependency>
Expand Down Expand Up @@ -145,8 +131,9 @@
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.20.1</version>
<scope>test</scope>
</dependency>

<!--test dependencies-->
<dependency>
<groupId>com.github.voodoodyne</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package org.icij.datashare;

import org.icij.datashare.asynctasks.TaskFactory;
import org.icij.datashare.asynctasks.TaskRunnerLoop;
import org.icij.datashare.asynctasks.TaskSupplier;
import org.icij.datashare.mode.CommonMode;
import org.icij.datashare.tasks.TaskRunnerLoop;
import org.icij.datashare.tasks.TaskFactory;
import org.icij.datashare.text.indexing.Indexer;
import org.redisson.api.RedissonClient;

Expand All @@ -12,7 +13,7 @@
public class BatchDownloadApp {
public static void start(Properties properties) throws Exception {
CommonMode commonMode = CommonMode.create(properties);
TaskRunnerLoop taskRunnerLoop = commonMode.get(TaskFactory.class).createTaskRunnerLoop();
TaskRunnerLoop taskRunnerLoop = new TaskRunnerLoop(commonMode.get(TaskFactory.class), commonMode.get(TaskSupplier.class));
taskRunnerLoop.call();
commonMode.get(Indexer.class).close();
commonMode.get(RedissonClient.class).shutdown();
Expand Down
14 changes: 7 additions & 7 deletions datashare-app/src/main/java/org/icij/datashare/CliApp.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.icij.datashare;

import com.google.inject.ConfigurationException;
import org.icij.datashare.asynctasks.TaskView;
import org.icij.datashare.cli.CliExtensionService;
import org.icij.datashare.cli.spi.CliExtension;
import org.icij.datashare.mode.CommonMode;
Expand All @@ -12,7 +13,6 @@
import org.icij.datashare.tasks.ScanTask;
import org.icij.datashare.tasks.TaskFactory;
import org.icij.datashare.tasks.TaskManagerMemory;
import org.icij.datashare.tasks.TaskView;
import org.icij.datashare.text.indexing.Indexer;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
Expand Down Expand Up @@ -110,39 +110,39 @@ private static void runTaskRunner(CommonMode mode, Properties properties) throws
if (pipeline.has(Stage.DEDUPLICATE)) {
Long result = taskFactory.createDeduplicateTask(
new TaskView<>(DeduplicateTask.class.getName(), nullUser(), propertiesToMap(properties)),
(s, percentage) -> {logger.info("percentage: {}% done", percentage);return null;}).call();
(percentage) -> {logger.info("percentage: {}% done", percentage);return null;}).call();
logger.info("removed {} duplicates", result);
}

if (pipeline.has(Stage.SCANIDX)) {
Long result = taskFactory.createScanIndexTask(
new TaskView<>(ScanIndexTask.class.getName(), nullUser(), propertiesToMap(properties)),
(s, percentage) -> {logger.info("percentage: {}% done", percentage);return null;}).call();
(percentage) -> {logger.info("percentage: {}% done", percentage);return null;}).call();
logger.info("scanned {}", result);
}

if (pipeline.has(Stage.SCAN)) {
taskFactory.createScanTask(
new TaskView<>(ScanTask.class.getName(), nullUser(), propertiesToMap(properties)),
(s, percentage) -> {logger.info("percentage: {}% done", percentage); return null;}).call();
(percentage) -> {logger.info("percentage: {}% done", percentage); return null;}).call();
}

if (pipeline.has(Stage.INDEX)) {
taskFactory.createIndexTask(
new TaskView<>(IndexTask.class.getName(), nullUser(), propertiesToMap(properties)),
(s, percentage) -> {logger.info("percentage: {}% done", percentage); return null;}).call();
(percentage) -> {logger.info("percentage: {}% done", percentage); return null;}).call();
}

if (pipeline.has(Stage.ENQUEUEIDX)) {
taskFactory.createEnqueueFromIndexTask(
new TaskView<>(EnqueueFromIndexTask.class.getName(), nullUser(), propertiesToMap(properties)),
(s, percentage) -> {logger.info("percentage: {}% done", percentage); return null;}).call();
(percentage) -> {logger.info("percentage: {}% done", percentage); return null;}).call();
}

if (pipeline.has(Stage.NLP)) {
taskFactory.createExtractNlpTask(
new TaskView<>(ExtractNlpTask.class.getName(), nullUser(), propertiesToMap(properties)),
(s, percentage) -> {logger.info("percentage: {}% done", percentage); return null;}).call();
(percentage) -> {logger.info("percentage: {}% done", percentage); return null;}).call();
}
taskManager.shutdownAndAwaitTermination(Integer.MAX_VALUE, SECONDS);
indexer.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package org.icij.datashare;

import org.icij.datashare.asynctasks.TaskFactory;
import org.icij.datashare.asynctasks.TaskRunnerLoop;
import org.icij.datashare.asynctasks.TaskSupplier;
import org.icij.datashare.mode.CommonMode;
import org.icij.datashare.tasks.TaskFactory;
import org.icij.datashare.tasks.TaskRunnerLoop;
import org.icij.datashare.text.indexing.Indexer;
import org.redisson.api.RedissonClient;

Expand All @@ -12,7 +13,7 @@
public class TaskRunnerApp {
public static void start(Properties properties) throws Exception {
CommonMode mode = CommonMode.create(properties);
TaskRunnerLoop batchSearchLoop = mode.get(TaskFactory.class).createTaskRunnerLoop();
TaskRunnerLoop batchSearchLoop = new TaskRunnerLoop(mode.get(TaskFactory.class), mode.get(TaskSupplier.class));
batchSearchLoop.call();
batchSearchLoop.close();
mode.get(Indexer.class).close();// to avoid being blocked
Expand Down
8 changes: 2 additions & 6 deletions datashare-app/src/main/java/org/icij/datashare/WebApp.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
package org.icij.datashare;

import net.codestory.http.WebServer;
import org.icij.datashare.asynctasks.TaskManager;
import org.icij.datashare.asynctasks.bus.amqp.QpidAmqpServer;
import org.icij.datashare.batch.BatchSearch;
import org.icij.datashare.batch.BatchSearchRepository;
import org.icij.datashare.cli.DatashareCli;
import org.icij.datashare.cli.Mode;
import org.icij.datashare.cli.QueueType;
import org.icij.datashare.com.bus.amqp.QpidAmqpServer;
import org.icij.datashare.mode.CommonMode;
import org.icij.datashare.tasks.BatchSearchRunner;
import org.icij.datashare.tasks.TaskFactory;
import org.icij.datashare.tasks.TaskManager;

import java.awt.*;
import java.io.IOException;
import java.net.Socket;
import java.net.URI;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static java.lang.Boolean.parseBoolean;
import static java.lang.Integer.parseInt;
import static org.icij.datashare.cli.DatashareCliOptions.BATCH_QUEUE_TYPE_OPT;
import static org.icij.datashare.cli.DatashareCliOptions.BROWSER_OPEN_LINK_OPT;

public class WebApp {
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.icij.datashare.extract;

import com.google.common.collect.Streams;
import com.google.inject.Singleton;
import org.icij.datashare.PropertiesProvider;
import org.icij.extract.queue.DocumentQueue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,9 @@
import org.icij.datashare.PropertiesProvider;
import org.icij.datashare.user.User;
import org.icij.extract.redis.RedisReportMap;
import org.icij.task.Options;
import org.redisson.api.RedissonClient;

import java.nio.charset.Charset;
import java.util.HashMap;

import static org.icij.datashare.PropertiesProvider.MAP_NAME_OPTION;

public class RedisUserReportMap extends RedisReportMap {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,28 @@
import org.icij.datashare.PropertiesProvider;
import org.icij.datashare.Repository;
import org.icij.datashare.TesseractOCRParserWrapper;
import org.icij.datashare.asynctasks.TaskManager;
import org.icij.datashare.asynctasks.TaskModifier;
import org.icij.datashare.asynctasks.TaskSupplier;
import org.icij.datashare.asynctasks.TaskView;
import org.icij.datashare.asynctasks.bus.amqp.AmqpInterlocutor;
import org.icij.datashare.asynctasks.bus.memory.MemoryBlockingQueue;
import org.icij.datashare.asynctasks.bus.redis.RedisBlockingQueue;
import org.icij.datashare.batch.BatchSearchRepository;
import org.icij.datashare.cli.Mode;
import org.icij.datashare.cli.QueueType;
import org.icij.datashare.com.bus.amqp.AmqpInterlocutor;
import org.icij.datashare.db.RepositoryFactoryImpl;
import org.icij.datashare.extension.ExtensionLoader;
import org.icij.datashare.extension.PipelineRegistry;
import org.icij.datashare.extract.*;
import org.icij.datashare.nlp.EmailPipeline;
import org.icij.datashare.nlp.OptimaizeLanguageGuesser;
import org.icij.datashare.tasks.TaskFactory;
import org.icij.datashare.tasks.TaskManager;
import org.icij.datashare.tasks.TaskManagerAmqp;
import org.icij.datashare.tasks.TaskManagerMemory;
import org.icij.datashare.tasks.TaskManagerRedis;
import org.icij.datashare.tasks.TaskModifier;
import org.icij.datashare.tasks.TaskSupplier;
import org.icij.datashare.tasks.TaskSupplierAmqp;
import org.icij.datashare.tasks.TaskSupplierRedis;
import org.icij.datashare.tasks.TaskView;
import org.icij.datashare.text.indexing.Indexer;
import org.icij.datashare.text.indexing.LanguageGuesser;
import org.icij.datashare.text.indexing.elasticsearch.ElasticsearchIndexer;
Expand Down Expand Up @@ -161,7 +163,7 @@ protected void configure() {
bind(TaskModifier.class).to(TaskSupplierAmqp.class);
break;
default:
configureBatchQueuesMemory(propertiesProvider);
configureBatchQueuesMemory();
bind(TaskManager.class).to(TaskManagerMemory.class);
bind(TaskModifier.class).to(TaskManagerMemory.class);
bind(TaskSupplier.class).to(TaskManagerMemory.class);
Expand All @@ -188,9 +190,9 @@ private void configureIndexingQueues(final PropertiesProvider propertiesProvider
}
}

private void configureBatchQueuesMemory(PropertiesProvider propertiesProvider) {
bind(new TypeLiteral<BlockingQueue<String>>(){}).toInstance(new MemoryBlockingQueue<>(propertiesProvider, DS_BATCHSEARCH_QUEUE_NAME));
bind(new TypeLiteral<BlockingQueue<TaskView<?>>>(){}).toInstance(new MemoryBlockingQueue<>(propertiesProvider, DS_BATCHDOWNLOAD_QUEUE_NAME));
private void configureBatchQueuesMemory() {
bind(new TypeLiteral<BlockingQueue<String>>(){}).toInstance(new MemoryBlockingQueue<>(DS_BATCHSEARCH_QUEUE_NAME));
bind(new TypeLiteral<BlockingQueue<TaskView<?>>>(){}).toInstance(new MemoryBlockingQueue<>(DS_BATCHDOWNLOAD_QUEUE_NAME));
}

private void configureBatchQueuesRedis(RedissonClient redissonClient) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.icij.datashare.Entity;
import org.icij.datashare.HumanReadableSize;
import org.icij.datashare.PropertiesProvider;
import org.icij.datashare.asynctasks.TaskView;
import org.icij.datashare.batch.BatchDownload;
import org.icij.datashare.com.mail.Mail;
import org.icij.datashare.com.mail.MailException;
Expand All @@ -37,7 +38,6 @@
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.zip.ZipException;

Expand All @@ -57,15 +57,15 @@ public class BatchDownloadRunner implements Callable<UriResult>, Monitorable, Us
private final AtomicInteger numberOfResults = new AtomicInteger(0);
private final Indexer indexer;
private final PropertiesProvider propertiesProvider;
private final BiFunction<String, Double, Void> progressCallback;
private final Function<Double, Void> progressCallback;
private final Function<URI, MailSender> mailSenderSupplier;

@Inject
public BatchDownloadRunner(Indexer indexer, PropertiesProvider propertiesProvider, @Assisted TaskView<?> task, @Assisted BiFunction<String, Double, Void> progressCallback) {
public BatchDownloadRunner(Indexer indexer, PropertiesProvider propertiesProvider, @Assisted TaskView<?> task, @Assisted Function<Double, Void> progressCallback) {
this(indexer, propertiesProvider, progressCallback, task, MailSender::new);
}

BatchDownloadRunner(Indexer indexer, PropertiesProvider provider, BiFunction<String, Double, Void> progressCallback, TaskView<?> task, Function<URI, MailSender> mailSenderSupplier) {
BatchDownloadRunner(Indexer indexer, PropertiesProvider provider, Function<Double, Void> progressCallback, TaskView<?> task, Function<URI, MailSender> mailSenderSupplier) {
assert task.properties.get("batchDownload") != null : "'batchDownload' property in task shouldn't be null";
this.task = (TaskView<File>) task;
this.indexer = indexer;
Expand Down Expand Up @@ -115,7 +115,7 @@ public UriResult call() throws Exception {
if (addedBytes > 0) {
zippedFilesSize += addedBytes;
numberOfResults.incrementAndGet();
progressCallback.apply(task.id, getProgressRate());
progressCallback.apply(getProgressRate());
}
}
docsToProcess = searcher.scroll(scrollDuration).collect(toList());
Expand Down

0 comments on commit d66f5ee

Please sign in to comment.