diff --git a/code/common/process/src/main/java/nu/marginalia/util/ParallelPipe.java b/code/common/process/src/main/java/nu/marginalia/util/ParallelPipe.java deleted file mode 100644 index fc95debeb..000000000 --- a/code/common/process/src/main/java/nu/marginalia/util/ParallelPipe.java +++ /dev/null @@ -1,112 +0,0 @@ -package nu.marginalia.util; - -import lombok.SneakyThrows; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -/** Generalization of the workflow
- * -- single provider thread reading sequentially from disk
- * -> multiple independent CPU-bound processing tasks
- * -> single consumer thread writing to network/disk
- *

- */ -public abstract class ParallelPipe { - private final LinkedBlockingQueue inputs; - private final LinkedBlockingQueue intermediates; - - private final Logger logger = LoggerFactory.getLogger(getClass()); - - private final List processThreads = new ArrayList<>(); - private final Thread receiverThread; - - private volatile boolean expectingInput = true; - private volatile boolean expectingOutput = true; - - public ParallelPipe(String name, int numberOfThreads, int inputQueueSize, int intermediateQueueSize) { - inputs = new LinkedBlockingQueue<>(inputQueueSize); - intermediates = new LinkedBlockingQueue<>(intermediateQueueSize); - - for (int i = 0; i < numberOfThreads; i++) { - processThreads.add(new Thread(this::runProcessThread, name + "-process["+i+"]")); - } - receiverThread = new Thread(this::runReceiverThread, name + "-receiver"); - - processThreads.forEach(Thread::start); - receiverThread.start(); - } - - public void clearQueues() { - inputs.clear(); - intermediates.clear(); - } - - @SneakyThrows - private void runProcessThread() { - while (expectingInput || !inputs.isEmpty()) { - var in = inputs.poll(10, TimeUnit.SECONDS); - - if (in != null) { - try { - var ret = onProcess(in); - if (ret != null) { - intermediates.put(ret); - } - } - catch (InterruptedException ex) { - throw ex; - } - catch (Exception ex) { - logger.error("Exception", ex); - } - - } - } - - logger.info("Terminating {}", Thread.currentThread().getName()); - } - - @SneakyThrows - private void runReceiverThread() { - while (expectingOutput || !inputs.isEmpty() || !intermediates.isEmpty()) { - var intermediate = intermediates.poll(997, TimeUnit.MILLISECONDS); - if (intermediate != null) { - try { - onReceive(intermediate); - } - catch (Exception ex) { - logger.error("Exception", ex); - } - } - } - - logger.info("Terminating {}", Thread.currentThread().getName()); - } - - /** Begin processing an item */ - @SneakyThrows - public void accept(INPUT input) { - inputs.put(input); - } - - /** The meat of the processor thread runtime */ - protected abstract INTERMEDIATE onProcess(INPUT input) throws Exception; - - /** The meat of the consumer thread runtime */ - protected abstract void onReceive(INTERMEDIATE intermediate) throws Exception; - - public void join() throws InterruptedException { - expectingInput = false; - - for (var thread : processThreads) { - thread.join(); - } - - expectingOutput = false; - receiverThread.join(); - } -} diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java index f35740ce6..9c8373e10 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java @@ -4,7 +4,7 @@ import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; -import nu.marginalia.crawling.io.SerializableCrawlDataStream; +import nu.marginalia.converting.model.ProcessedDomain; import nu.marginalia.db.storage.FileStorageService; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MqMessage; @@ -17,7 +17,6 @@ import nu.marginalia.converting.compiler.InstructionsCompiler; import nu.marginalia.converting.instruction.Instruction; import nu.marginalia.converting.processor.DomainProcessor; -import nu.marginalia.util.ParallelPipe; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,6 +25,9 @@ import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -55,7 +57,7 @@ public static void main(String... args) throws Exception { var request = converter.fetchInstructions(); try { - converter.load(request); + converter.convert(request); request.ok(); } catch (Exception ex) { @@ -87,58 +89,64 @@ public ConverterMain( heartbeat.start(); } - - - public void load(ConvertRequest request) throws Exception { + public void convert(ConvertRequest request) throws Exception { var plan = request.getPlan(); + final int maxPoolSize = 16; + try (WorkLog processLog = plan.createProcessWorkLog(); ConversionLog log = new ConversionLog(plan.process.getDir())) { - var instructionWriter = new InstructionWriter(log, plan.process.getDir(), gson); + var instructionWriter = new InstructionWriterFactory(log, plan.process.getDir(), gson); + + Semaphore semaphore = new Semaphore(maxPoolSize); + var pool = new ThreadPoolExecutor( + maxPoolSize/4, + maxPoolSize, + 5, TimeUnit.MINUTES, + new LinkedBlockingQueue<>(8) + ); int totalDomains = plan.countCrawledDomains(); AtomicInteger processedDomains = new AtomicInteger(0); - var pipe = new ParallelPipe("Converter", 16, 4, 2) { - - @Override - protected ProcessingInstructions onProcess(SerializableCrawlDataStream dataStream) { - var processed = processor.process(dataStream); - var compiled = compiler.compile(processed); - - return new ProcessingInstructions(processed.id, compiled); - } + // Advance the progress bar to the current position if this is a resumption + processedDomains.set(processLog.countFinishedJobs()); + heartbeat.setProgress(processedDomains.get() / (double) totalDomains); - @Override - protected void onReceive(ProcessingInstructions processedInstructions) throws IOException { - Thread.currentThread().setName("Converter:Receiver["+processedInstructions.id+"]"); + for (var domain : plan.crawlDataIterable(id -> !processLog.isJobFinished(id))) + { + semaphore.acquire(); + pool.execute(() -> { try { - var instructions = processedInstructions.instructions; - instructions.removeIf(Instruction::isNoOp); + ProcessedDomain processed = processor.process(domain); + + final String where; + final int size; - String where = instructionWriter.accept(processedInstructions.id, instructions); - processLog.setJobToFinished(processedInstructions.id, where, instructions.size()); + try (var writer = instructionWriter.createInstructionsForDomainWriter(processed.id)) { + compiler.compile(processed, writer::accept); + where = writer.getFileName(); + size = writer.getSize(); + } + processLog.setJobToFinished(processed.id, where, size); heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains); } + catch (IOException ex) { + logger.warn("IO exception in converter", ex); + } finally { - Thread.currentThread().setName("Converter:Receiver[IDLE]"); + semaphore.release(); } - } - - }; - - // Advance the progress bar to the current position if this is a resumption - processedDomains.set(processLog.countFinishedJobs()); - heartbeat.setProgress(processedDomains.get() / (double) totalDomains); - - for (var domain : plan.crawlDataIterable(id -> !processLog.isJobFinished(id))) - { - pipe.accept(domain); + }); } - pipe.join(); + pool.shutdown(); + do { + System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining"); + } while (!pool.awaitTermination(60, TimeUnit.SECONDS)); + request.ok(); } catch (Exception e) { @@ -205,7 +213,4 @@ private Optional getMessage(MqSingleShotInbox inbox, String expectedF } } - - record ProcessingInstructions(String id, List instructions) {} - } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/InstructionWriter.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/InstructionWriterFactory.java similarity index 65% rename from code/processes/converting-process/src/main/java/nu/marginalia/converting/InstructionWriter.java rename to code/processes/converting-process/src/main/java/nu/marginalia/converting/InstructionWriterFactory.java index 826c41cd3..e6009d0e8 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/InstructionWriter.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/InstructionWriterFactory.java @@ -15,22 +15,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedOutputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; +import java.io.*; import java.nio.file.Files; import java.nio.file.Path; -import java.util.List; -public class InstructionWriter { +public class InstructionWriterFactory { - private ConversionLog log; + private final ConversionLog log; private final Path outputDir; private final Gson gson; - private static final Logger logger = LoggerFactory.getLogger(InstructionWriter.class); + private static final Logger logger = LoggerFactory.getLogger(InstructionWriterFactory.class); - public InstructionWriter(ConversionLog log, Path outputDir, Gson gson) { + public InstructionWriterFactory(ConversionLog log, Path outputDir, Gson gson) { this.log = log; this.outputDir = outputDir; this.gson = gson; @@ -40,29 +36,57 @@ public InstructionWriter(ConversionLog log, Path outputDir, Gson gson) { } } - public String accept(String id, List instructionList) throws IOException { + public InstructionWriter createInstructionsForDomainWriter(String id) throws IOException { Path outputFile = getOutputFile(id); + return new InstructionWriter(outputFile); + } + + public class InstructionWriter implements AutoCloseable { + private final OutputStreamWriter outputStream; + private final String where; + private final SummarizingInterpreter summary = new SummarizingInterpreter(); + + private int size = 0; + - if (Files.exists(outputFile)) { - Files.delete(outputFile); + InstructionWriter(Path filename) throws IOException { + where = filename.getFileName().toString(); + Files.deleteIfExists(filename); + outputStream = new OutputStreamWriter(new ZstdOutputStream(new BufferedOutputStream(new FileOutputStream(filename.toFile())))); } - try (var outputStream = new OutputStreamWriter(new ZstdOutputStream(new BufferedOutputStream(new FileOutputStream(outputFile.toFile()))))) { + public void accept(Instruction instruction) { + if (instruction.isNoOp()) return; - SummarizingInterpreter summary = new SummarizingInterpreter(instructionList); - logger.info("Writing {} - {} - {}", id, instructionList.size(), summary); + instruction.apply(summary); + instruction.apply(log); - for (var instr : instructionList) { - instr.apply(log); + size++; - outputStream.append(instr.tag().name()); + try { + outputStream.append(instruction.tag().name()); outputStream.append(' '); - gson.toJson(instr, outputStream); + gson.toJson(instruction, outputStream); outputStream.append('\n'); } + catch (IOException ex) { + logger.warn("IO exception writing instruction", ex); + } + } + + @Override + public void close() throws IOException { + logger.info("Wrote {} - {} - {}", where, size, summary); + outputStream.close(); + } + + public String getFileName() { + return where; } - return outputFile.getFileName().toString(); + public int getSize() { + return size; + } } private Path getOutputFile(String id) throws IOException { @@ -79,12 +103,6 @@ private Path getOutputFile(String id) throws IOException { private static class SummarizingInterpreter implements Interpreter { - private SummarizingInterpreter(List instructions) { - for (var i : instructions) { - i.apply(this); - } - } - private String domainName; private int ok = 0; private int error = 0; diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DocumentsCompiler.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DocumentsCompiler.java index 3849f015b..881a1a33d 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DocumentsCompiler.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DocumentsCompiler.java @@ -7,34 +7,35 @@ import nu.marginalia.model.crawl.HtmlFeature; import java.util.List; +import java.util.function.Consumer; public class DocumentsCompiler { - public void compile(List ret, List documents) { + public void compile(Consumer instructionConsumer, List documents) { for (var doc : documents) { - compileDocumentDetails(ret, doc); + compileDocumentDetails(instructionConsumer, doc); } for (var doc : documents) { - compileWords(ret, doc); + compileWords(instructionConsumer, doc); } } - private void compileDocumentDetails(List ret, ProcessedDocument doc) { + private void compileDocumentDetails(Consumer instructionConsumer, ProcessedDocument doc) { var details = doc.details; if (details != null) { - ret.add(new LoadProcessedDocument(doc.url, doc.state, details.title, details.description, HtmlFeature.encode(details.features), details.standard.name(), details.length, details.hashCode, details.quality, details.pubYear)); + instructionConsumer.accept(new LoadProcessedDocument(doc.url, doc.state, details.title, details.description, HtmlFeature.encode(details.features), details.standard.name(), details.length, details.hashCode, details.quality, details.pubYear)); } } - private void compileWords(List ret, ProcessedDocument doc) { + private void compileWords(Consumer instructionConsumer, ProcessedDocument doc) { var words = doc.words; if (words != null) { - ret.add(new LoadKeywords(doc.url, doc.details.metadata, words.build())); + instructionConsumer.accept(new LoadKeywords(doc.url, doc.details.metadata, words.build())); } } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DomainMetadataCompiler.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DomainMetadataCompiler.java index e80f42eb6..74ae58169 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DomainMetadataCompiler.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DomainMetadataCompiler.java @@ -11,11 +11,12 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.function.Consumer; public class DomainMetadataCompiler { - public void compile(List ret, EdgeDomain domain, @NotNull List documents) { + public void compile(Consumer instructionConsumer, EdgeDomain domain, @NotNull List documents) { int visitedUrls = 0; int goodUrls = 0; @@ -36,7 +37,7 @@ public void compile(List ret, EdgeDomain domain, @NotNull List ret, List documents) { + public void compile(Consumer instructionConsumer, List documents) { EdgeUrl[] feeds = documents.stream().map(doc -> doc.details) .filter(Objects::nonNull) @@ -18,6 +19,6 @@ public void compile(List ret, List documents) { .distinct() .toArray(EdgeUrl[]::new); - ret.add(new LoadRssFeed(feeds)); + instructionConsumer.accept(new LoadRssFeed(feeds)); } } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/InstructionsCompiler.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/InstructionsCompiler.java index 71bf77857..9b32ed8df 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/InstructionsCompiler.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/InstructionsCompiler.java @@ -8,6 +8,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.function.Consumer; import static java.util.Objects.requireNonNullElse; @@ -35,25 +36,21 @@ public InstructionsCompiler(UrlsCompiler urlsCompiler, this.redirectCompiler = redirectCompiler; } - public List compile(ProcessedDomain domain) { - List ret = new ArrayList<>(domain.size()*4); - + public void compile(ProcessedDomain domain, Consumer instructionConsumer) { // Guaranteed to always be first - ret.add(new LoadProcessedDomain(domain.domain, domain.state, domain.ip)); + instructionConsumer.accept(new LoadProcessedDomain(domain.domain, domain.state, domain.ip)); if (domain.documents != null) { - urlsCompiler.compile(ret, domain.documents); - documentsCompiler.compile(ret, domain.documents); + urlsCompiler.compile(instructionConsumer, domain.documents); + documentsCompiler.compile(instructionConsumer, domain.documents); - feedsCompiler.compile(ret, domain.documents); - linksCompiler.compile(ret, domain.domain, domain.documents); + feedsCompiler.compile(instructionConsumer, domain.documents); + linksCompiler.compile(instructionConsumer, domain.domain, domain.documents); } if (domain.redirect != null) { - redirectCompiler.compile(ret, domain.domain, domain.redirect); + redirectCompiler.compile(instructionConsumer, domain.domain, domain.redirect); } - domainMetadataCompiler.compile(ret, domain.domain, requireNonNullElse(domain.documents, Collections.emptyList())); - - return ret; + domainMetadataCompiler.compile(instructionConsumer, domain.domain, requireNonNullElse(domain.documents, Collections.emptyList())); } } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/LinksCompiler.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/LinksCompiler.java index a578602db..e100cb860 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/LinksCompiler.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/LinksCompiler.java @@ -8,10 +8,11 @@ import java.util.List; import java.util.Objects; +import java.util.function.Consumer; public class LinksCompiler { - public void compile(List ret, EdgeDomain from, List documents) { + public void compile(Consumer instructionConsumer, EdgeDomain from, List documents) { DomainLink[] links = documents.stream().map(doc -> doc.details) .filter(Objects::nonNull) @@ -21,6 +22,6 @@ public void compile(List ret, EdgeDomain from, List new DomainLink(from, domain)) .toArray(DomainLink[]::new); - ret.add(new LoadDomainLink(links)); + instructionConsumer.accept(new LoadDomainLink(links)); } } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/RedirectCompiler.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/RedirectCompiler.java index b14dedca0..dcd0201fd 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/RedirectCompiler.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/RedirectCompiler.java @@ -8,12 +8,13 @@ import nu.marginalia.model.EdgeDomain; import java.util.List; +import java.util.function.Consumer; public class RedirectCompiler { - public void compile(List ret, EdgeDomain from, EdgeDomain to) { - ret.add(new LoadDomain(to)); - ret.add(new LoadDomainLink(new DomainLink(from, to))); - ret.add(new LoadDomainRedirect(new DomainLink(from, to))); + public void compile(Consumer instructionConsumer, EdgeDomain from, EdgeDomain to) { + instructionConsumer.accept(new LoadDomain(to)); + instructionConsumer.accept(new LoadDomainLink(new DomainLink(from, to))); + instructionConsumer.accept(new LoadDomainRedirect(new DomainLink(from, to))); } } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/UrlsCompiler.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/UrlsCompiler.java index d5184cfc3..ba3470580 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/UrlsCompiler.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/UrlsCompiler.java @@ -13,13 +13,14 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.function.Consumer; public class UrlsCompiler { private static final int MAX_INTERNAL_LINKS = 25; private final Logger logger = LoggerFactory.getLogger(getClass()); - public void compile(List ret, List documents) { + public void compile(Consumer instructionConsumer, List documents) { Set seenUrls = new HashSet<>(documents.size()*4); Set seenDomains = new HashSet<>(documents.size()); @@ -53,8 +54,8 @@ public void compile(List ret, List documents) { } } - ret.add(new LoadDomain(seenDomains.toArray(EdgeDomain[]::new))); - ret.add(new LoadUrl(seenUrls.toArray(EdgeUrl[]::new))); + instructionConsumer.accept(new LoadDomain(seenDomains.toArray(EdgeDomain[]::new))); + instructionConsumer.accept(new LoadUrl(seenUrls.toArray(EdgeUrl[]::new))); } } diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessService.java index 124a2a494..032f2c234 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessService.java @@ -119,6 +119,7 @@ private String[] env() { } opts.put("WMSA_HOME", WMSA_HOME); opts.put("JAVA_HOME", System.getenv("JAVA_HOME")); + opts.put("JAVA_OPTS", ""); opts.put("CONVERTER_OPTS", System.getenv("CONVERTER_OPTS")); opts.put("LOADER_OPTS", System.getenv("LOADER_OPTS")); opts.put("CRAWLER_OPTS", System.getenv("CRAWLER_OPTS"));