Skip to content

Commit

Permalink
(converter) Refactor converter to not keep instructions list in RAM.
Browse files Browse the repository at this point in the history
(converter) Refactor converter to not keep instructions list in RAM.

(converter) Refactor converter to not keep instructions list in RAM.
  • Loading branch information
vlofgren committed Jul 25, 2023
1 parent fd44e09 commit 507f26a
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 209 deletions.
112 changes: 0 additions & 112 deletions code/common/process/src/main/java/nu/marginalia/util/ParallelPipe.java

This file was deleted.

Expand Up @@ -4,7 +4,7 @@
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.crawling.io.SerializableCrawlDataStream;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
Expand All @@ -17,7 +17,6 @@
import nu.marginalia.converting.compiler.InstructionsCompiler;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.processor.DomainProcessor;
import nu.marginalia.util.ParallelPipe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -26,6 +25,9 @@
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -55,7 +57,7 @@ public static void main(String... args) throws Exception {

var request = converter.fetchInstructions();
try {
converter.load(request);
converter.convert(request);
request.ok();
}
catch (Exception ex) {
Expand Down Expand Up @@ -87,58 +89,64 @@ public ConverterMain(
heartbeat.start();
}



public void load(ConvertRequest request) throws Exception {
public void convert(ConvertRequest request) throws Exception {

var plan = request.getPlan();

final int maxPoolSize = 16;

try (WorkLog processLog = plan.createProcessWorkLog();
ConversionLog log = new ConversionLog(plan.process.getDir())) {
var instructionWriter = new InstructionWriter(log, plan.process.getDir(), gson);
var instructionWriter = new InstructionWriterFactory(log, plan.process.getDir(), gson);

Semaphore semaphore = new Semaphore(maxPoolSize);
var pool = new ThreadPoolExecutor(
maxPoolSize/4,
maxPoolSize,
5, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(8)
);

int totalDomains = plan.countCrawledDomains();
AtomicInteger processedDomains = new AtomicInteger(0);

var pipe = new ParallelPipe<SerializableCrawlDataStream, ProcessingInstructions>("Converter", 16, 4, 2) {

@Override
protected ProcessingInstructions onProcess(SerializableCrawlDataStream dataStream) {
var processed = processor.process(dataStream);
var compiled = compiler.compile(processed);

return new ProcessingInstructions(processed.id, compiled);
}
// Advance the progress bar to the current position if this is a resumption
processedDomains.set(processLog.countFinishedJobs());
heartbeat.setProgress(processedDomains.get() / (double) totalDomains);

@Override
protected void onReceive(ProcessingInstructions processedInstructions) throws IOException {
Thread.currentThread().setName("Converter:Receiver["+processedInstructions.id+"]");
for (var domain : plan.crawlDataIterable(id -> !processLog.isJobFinished(id)))
{
semaphore.acquire();
pool.execute(() -> {
try {
var instructions = processedInstructions.instructions;
instructions.removeIf(Instruction::isNoOp);
ProcessedDomain processed = processor.process(domain);

final String where;
final int size;

String where = instructionWriter.accept(processedInstructions.id, instructions);
processLog.setJobToFinished(processedInstructions.id, where, instructions.size());
try (var writer = instructionWriter.createInstructionsForDomainWriter(processed.id)) {
compiler.compile(processed, writer::accept);
where = writer.getFileName();
size = writer.getSize();
}

processLog.setJobToFinished(processed.id, where, size);
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
}
catch (IOException ex) {
logger.warn("IO exception in converter", ex);
}
finally {
Thread.currentThread().setName("Converter:Receiver[IDLE]");
semaphore.release();
}
}

};

// Advance the progress bar to the current position if this is a resumption
processedDomains.set(processLog.countFinishedJobs());
heartbeat.setProgress(processedDomains.get() / (double) totalDomains);

for (var domain : plan.crawlDataIterable(id -> !processLog.isJobFinished(id)))
{
pipe.accept(domain);
});
}

pipe.join();
pool.shutdown();
do {
System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining");
} while (!pool.awaitTermination(60, TimeUnit.SECONDS));

request.ok();
}
catch (Exception e) {
Expand Down Expand Up @@ -205,7 +213,4 @@ private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedF
}
}


record ProcessingInstructions(String id, List<Instruction> instructions) {}

}
Expand Up @@ -15,22 +15,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;

public class InstructionWriter {
public class InstructionWriterFactory {

private ConversionLog log;
private final ConversionLog log;
private final Path outputDir;
private final Gson gson;
private static final Logger logger = LoggerFactory.getLogger(InstructionWriter.class);
private static final Logger logger = LoggerFactory.getLogger(InstructionWriterFactory.class);

public InstructionWriter(ConversionLog log, Path outputDir, Gson gson) {
public InstructionWriterFactory(ConversionLog log, Path outputDir, Gson gson) {
this.log = log;
this.outputDir = outputDir;
this.gson = gson;
Expand All @@ -40,29 +36,57 @@ public InstructionWriter(ConversionLog log, Path outputDir, Gson gson) {
}
}

public String accept(String id, List<Instruction> instructionList) throws IOException {
public InstructionWriter createInstructionsForDomainWriter(String id) throws IOException {
Path outputFile = getOutputFile(id);
return new InstructionWriter(outputFile);
}

public class InstructionWriter implements AutoCloseable {
private final OutputStreamWriter outputStream;
private final String where;
private final SummarizingInterpreter summary = new SummarizingInterpreter();

private int size = 0;


if (Files.exists(outputFile)) {
Files.delete(outputFile);
InstructionWriter(Path filename) throws IOException {
where = filename.getFileName().toString();
Files.deleteIfExists(filename);
outputStream = new OutputStreamWriter(new ZstdOutputStream(new BufferedOutputStream(new FileOutputStream(filename.toFile()))));
}

try (var outputStream = new OutputStreamWriter(new ZstdOutputStream(new BufferedOutputStream(new FileOutputStream(outputFile.toFile()))))) {
public void accept(Instruction instruction) {
if (instruction.isNoOp()) return;

SummarizingInterpreter summary = new SummarizingInterpreter(instructionList);
logger.info("Writing {} - {} - {}", id, instructionList.size(), summary);
instruction.apply(summary);
instruction.apply(log);

for (var instr : instructionList) {
instr.apply(log);
size++;

outputStream.append(instr.tag().name());
try {
outputStream.append(instruction.tag().name());
outputStream.append(' ');
gson.toJson(instr, outputStream);
gson.toJson(instruction, outputStream);
outputStream.append('\n');
}
catch (IOException ex) {
logger.warn("IO exception writing instruction", ex);
}
}

@Override
public void close() throws IOException {
logger.info("Wrote {} - {} - {}", where, size, summary);
outputStream.close();
}

public String getFileName() {
return where;
}

return outputFile.getFileName().toString();
public int getSize() {
return size;
}
}

private Path getOutputFile(String id) throws IOException {
Expand All @@ -79,12 +103,6 @@ private Path getOutputFile(String id) throws IOException {

private static class SummarizingInterpreter implements Interpreter {

private SummarizingInterpreter(List<Instruction> instructions) {
for (var i : instructions) {
i.apply(this);
}
}

private String domainName;
private int ok = 0;
private int error = 0;
Expand Down

0 comments on commit 507f26a

Please sign in to comment.