Skip to content

Commit

Permalink
(WIP) Make it possible to sideload encyclopedia data.
Browse files Browse the repository at this point in the history
This is mostly a pilot track for sideloading other large websites.

Also change coverter to produce a more compact output (java serialization instead of json).
  • Loading branch information
vlofgren committed Jul 28, 2023
1 parent 9288d31 commit f11103d
Show file tree
Hide file tree
Showing 22 changed files with 614 additions and 139 deletions.
@@ -0,0 +1,6 @@
package nu.marginalia.mqapi.converting;

public enum ConvertAction {
ConvertCrawlData,
SideloadEncyclopedia
}
Expand Up @@ -5,6 +5,8 @@

@AllArgsConstructor
public class ConvertRequest {
public final ConvertAction action;
public final String inputSource;
public final FileStorageId crawlStorage;
public final FileStorageId processedDataStorage;
}
1 change: 1 addition & 0 deletions code/processes/converting-process/build.gradle
Expand Up @@ -79,6 +79,7 @@ dependencies {
implementation libs.crawlercommons

implementation libs.commons.lang3
implementation libs.sqlite

testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
Expand Down
Expand Up @@ -5,24 +5,26 @@
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.converting.sideload.SideloadSource;
import nu.marginalia.converting.sideload.SideloadSourceFactory;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.mqapi.converting.ConvertAction;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.module.DatabaseModule;
import plan.CrawlPlan;
import nu.marginalia.converting.compiler.InstructionsCompiler;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.processor.DomainProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
Expand All @@ -34,18 +36,16 @@
import static nu.marginalia.mqapi.ProcessInboxNames.CONVERTER_INBOX;

public class ConverterMain {

private static final Logger logger = LoggerFactory.getLogger(ConverterMain.class);
private final DomainProcessor processor;
private final InstructionsCompiler compiler;
private final Gson gson;
private final ProcessHeartbeat heartbeat;
private final MessageQueueFactory messageQueueFactory;
private final FileStorageService fileStorageService;
private final SideloadSourceFactory sideloadSourceFactory;

public static void main(String... args) throws Exception {


Injector injector = Guice.createInjector(
new ConverterModule(),
new DatabaseModule()
Expand All @@ -55,15 +55,9 @@ public static void main(String... args) throws Exception {

logger.info("Starting pipe");

var request = converter.fetchInstructions();
try {
converter.convert(request);
request.ok();
}
catch (Exception ex) {
logger.error("Conversion failed", ex);
request.err();
}
converter
.fetchInstructions()
.execute(converter);

logger.info("Finished");

Expand All @@ -77,21 +71,42 @@ public ConverterMain(
Gson gson,
ProcessHeartbeat heartbeat,
MessageQueueFactory messageQueueFactory,
FileStorageService fileStorageService
) {
FileStorageService fileStorageService,
SideloadSourceFactory sideloadSourceFactory
)
{
this.processor = processor;
this.compiler = compiler;
this.gson = gson;
this.heartbeat = heartbeat;
this.messageQueueFactory = messageQueueFactory;
this.fileStorageService = fileStorageService;
this.sideloadSourceFactory = sideloadSourceFactory;

heartbeat.start();
}

public void convert(ConvertRequest request) throws Exception {
public void convert(SideloadSource sideloadSource, Path writeDir) throws Exception {
int maxPoolSize = 16;

try (WorkLog workLog = new WorkLog(writeDir.resolve("processor.log"));
ConversionLog conversionLog = new ConversionLog(writeDir)) {
var instructionWriter = new InstructionWriterFactory(conversionLog, writeDir, gson);

final String where;
final int size;

try (var writer = instructionWriter.createInstructionsForDomainWriter(sideloadSource.getId())) {
compiler.compileStreaming(sideloadSource, writer::accept);
where = writer.getFileName();
size = writer.getSize();
}

workLog.setJobToFinished(sideloadSource.getId(), where, size);
}
}

var plan = request.getPlan();
public void convert(CrawlPlan plan) throws Exception {

final int maxPoolSize = 16;

Expand Down Expand Up @@ -146,39 +161,75 @@ public void convert(ConvertRequest request) throws Exception {
do {
System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining");
} while (!pool.awaitTermination(60, TimeUnit.SECONDS));

request.ok();
}
catch (Exception e) {
request.err();
throw e;
}
}

private static class ConvertRequest {
private final CrawlPlan plan;
private abstract static class ConvertRequest {
private final MqMessage message;
private final MqSingleShotInbox inbox;

ConvertRequest(CrawlPlan plan, MqMessage message, MqSingleShotInbox inbox) {
this.plan = plan;
private ConvertRequest(MqMessage message, MqSingleShotInbox inbox) {
this.message = message;
this.inbox = inbox;
}

public CrawlPlan getPlan() {
return plan;
}
public abstract void execute(ConverterMain converterMain) throws Exception;

public void ok() {
inbox.sendResponse(message, MqInboxResponse.ok());
}
public void err() {
inbox.sendResponse(message, MqInboxResponse.err());
}
}

private static class SideloadAction extends ConvertRequest {

private final SideloadSource sideloadSource;
private final Path workDir;

SideloadAction(SideloadSource sideloadSource,
Path workDir,
MqMessage message, MqSingleShotInbox inbox) {
super(message, inbox);
this.sideloadSource = sideloadSource;
this.workDir = workDir;
}

@Override
public void execute(ConverterMain converterMain) throws Exception {
try {
converterMain.convert(sideloadSource, workDir);
ok();
}
catch (Exception ex) {
logger.error("Error sideloading", ex);
err();
}
}
}

private static class ConvertCrawlDataAction extends ConvertRequest {
private final CrawlPlan plan;

private ConvertCrawlDataAction(CrawlPlan plan, MqMessage message, MqSingleShotInbox inbox) {
super(message, inbox);
this.plan = plan;
}

@Override
public void execute(ConverterMain converterMain) throws Exception {
try {
converterMain.convert(plan);
ok();
}
catch (Exception ex) {
err();
}
}
}


private ConvertRequest fetchInstructions() throws Exception {

var inbox = messageQueueFactory.createSingleShotInbox(CONVERTER_INBOX, UUID.randomUUID());
Expand All @@ -188,14 +239,30 @@ private ConvertRequest fetchInstructions() throws Exception {

var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.converting.ConvertRequest.class);

var crawlData = fileStorageService.getStorage(request.crawlStorage);
var processData = fileStorageService.getStorage(request.processedDataStorage);
if (request.action == ConvertAction.ConvertCrawlData) {

var plan = new CrawlPlan(null,
new CrawlPlan.WorkDir(crawlData.path(), "crawler.log"),
new CrawlPlan.WorkDir(processData.path(), "processor.log"));
var crawlData = fileStorageService.getStorage(request.crawlStorage);
var processData = fileStorageService.getStorage(request.processedDataStorage);

var plan = new CrawlPlan(null,
new CrawlPlan.WorkDir(crawlData.path(), "crawler.log"),
new CrawlPlan.WorkDir(processData.path(), "processor.log"));

return new ConvertCrawlDataAction(plan, msg, inbox);
}

return new ConvertRequest(plan, msg, inbox);
if (request.action == ConvertAction.SideloadEncyclopedia) {
var processData = fileStorageService.getStorage(request.processedDataStorage);
var filePath = Path.of(request.inputSource);

return new SideloadAction(sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(filePath),
processData.asPath(),
msg, inbox);
}

else {
throw new RuntimeException("Unknown action: " + request.action);
}
}

private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {
Expand Down
Expand Up @@ -42,7 +42,7 @@ public InstructionWriter createInstructionsForDomainWriter(String id) throws IOE
}

public class InstructionWriter implements AutoCloseable {
private final OutputStreamWriter outputStream;
private final ObjectOutputStream outputStream;
private final String where;
private final SummarizingInterpreter summary = new SummarizingInterpreter();

Expand All @@ -52,7 +52,7 @@ public class InstructionWriter implements AutoCloseable {
InstructionWriter(Path filename) throws IOException {
where = filename.getFileName().toString();
Files.deleteIfExists(filename);
outputStream = new OutputStreamWriter(new ZstdOutputStream(new BufferedOutputStream(new FileOutputStream(filename.toFile()))));
outputStream = new ObjectOutputStream(new ZstdOutputStream(new FileOutputStream(filename.toFile())));
}

public void accept(Instruction instruction) {
Expand All @@ -64,10 +64,12 @@ public void accept(Instruction instruction) {
size++;

try {
outputStream.append(instruction.tag().name());
outputStream.append(' ');
gson.toJson(instruction, outputStream);
outputStream.append('\n');
outputStream.writeObject(instruction);

// Reset the stream to avoid keeping references to the objects
// (as this will cause the memory usage to grow indefinitely when
// writing huge amounts of data)
outputStream.reset();
}
catch (IOException ex) {
logger.warn("IO exception writing instruction", ex);
Expand Down
Expand Up @@ -23,15 +23,15 @@ public void compile(Consumer<Instruction> instructionConsumer, List<ProcessedDoc

}

private void compileDocumentDetails(Consumer<Instruction> instructionConsumer, ProcessedDocument doc) {
public void compileDocumentDetails(Consumer<Instruction> instructionConsumer, ProcessedDocument doc) {
var details = doc.details;

if (details != null) {
instructionConsumer.accept(new LoadProcessedDocument(doc.url, doc.state, details.title, details.description, HtmlFeature.encode(details.features), details.standard.name(), details.length, details.hashCode, details.quality, details.pubYear));
}
}

private void compileWords(Consumer<Instruction> instructionConsumer, ProcessedDocument doc) {
public void compileWords(Consumer<Instruction> instructionConsumer, ProcessedDocument doc) {
var words = doc.words;

if (words != null) {
Expand Down
Expand Up @@ -40,4 +40,8 @@ public void compile(Consumer<Instruction> instructionConsumer, EdgeDomain domain
instructionConsumer.accept(new LoadDomainMetadata(domain, knownUrls.size(), goodUrls, visitedUrls));
}

public void compileFake(Consumer<Instruction> instructionConsumer, EdgeDomain domain, int countAll, int countGood) {
instructionConsumer.accept(new LoadDomainMetadata(domain, countAll, countGood, countAll));
}

}
Expand Up @@ -3,11 +3,15 @@
import com.google.inject.Inject;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDomain;
import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.converting.sideload.SideloadSource;
import nu.marginalia.model.EdgeUrl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Iterator;
import java.util.function.Consumer;

import static java.util.Objects.requireNonNullElse;
Expand All @@ -20,6 +24,8 @@ public class InstructionsCompiler {
private final LinksCompiler linksCompiler;
private final RedirectCompiler redirectCompiler;

private final Logger logger = LoggerFactory.getLogger(InstructionsCompiler.class);

@Inject
public InstructionsCompiler(UrlsCompiler urlsCompiler,
DocumentsCompiler documentsCompiler,
Expand Down Expand Up @@ -53,4 +59,35 @@ public void compile(ProcessedDomain domain, Consumer<Instruction> instructionCon

domainMetadataCompiler.compile(instructionConsumer, domain.domain, requireNonNullElse(domain.documents, Collections.emptyList()));
}

public void compileStreaming(SideloadSource sideloadSource,
Consumer<Instruction> instructionConsumer) {
ProcessedDomain domain = sideloadSource.getDomain();
Iterator<EdgeUrl> urlsIterator = sideloadSource.getUrlsIterator();
Iterator<ProcessedDocument> documentsIterator = sideloadSource.getDocumentsStream();

// Guaranteed to always be first
instructionConsumer.accept(new LoadProcessedDomain(domain.domain, domain.state, domain.ip));

int countAll = 0;
int countGood = 0;

logger.info("Writing domains");
urlsCompiler.compileJustDomain(instructionConsumer, domain.domain);
logger.info("Writing urls");
urlsCompiler.compileJustUrls(instructionConsumer, urlsIterator);

logger.info("Writing docs");

while (documentsIterator.hasNext()) {
var doc = documentsIterator.next();
countAll++;
if (doc.isOk()) countGood++;

documentsCompiler.compileDocumentDetails(instructionConsumer, doc);
documentsCompiler.compileWords(instructionConsumer, doc);
}

domainMetadataCompiler.compileFake(instructionConsumer, domain.domain, countAll, countGood);
}
}

0 comments on commit f11103d

Please sign in to comment.