From f11103d31de7823c095223bdae0e538b1db515a4 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Fri, 28 Jul 2023 18:14:43 +0200 Subject: [PATCH] (WIP) Make it possible to sideload encyclopedia data. This is mostly a pilot track for sideloading other large websites. Also change coverter to produce a more compact output (java serialization instead of json). --- .../mqapi/converting/ConvertAction.java | 6 + .../mqapi/converting/ConvertRequest.java | 2 + .../processes/converting-process/build.gradle | 1 + .../marginalia/converting/ConverterMain.java | 141 +++++++--- .../converting/InstructionWriterFactory.java | 14 +- .../compiler/DocumentsCompiler.java | 4 +- .../compiler/DomainMetadataCompiler.java | 4 + .../compiler/InstructionsCompiler.java | 41 ++- .../converting/compiler/UrlsCompiler.java | 21 +- .../converting/processor/DomainProcessor.java | 3 +- .../EncyclopediaMarginaliaNuSideloader.java | 247 ++++++++++++++++++ .../converting/sideload/SideloadSource.java | 15 ++ .../sideload/SideloadSourceFactory.java | 23 ++ .../loading/ConvertedDomainReader.java | 59 +++-- .../nu/marginalia/loading/LoaderMain.java | 38 ++- .../nu/marginalia/loading/loader/Loader.java | 40 +-- .../loader/SqlLoadProcessedDocument.java | 2 +- .../loader/SqlLoadProcessedDomain.java | 9 +- .../loading/loader/SqlLoadUrls.java | 66 +++-- .../loader/SqlLoadProcessedDomainTest.java | 9 +- .../actor/task/ReconvertAndLoadActor.java | 6 +- settings.gradle | 2 + 22 files changed, 614 insertions(+), 139 deletions(-) create mode 100644 code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertAction.java create mode 100644 code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/EncyclopediaMarginaliaNuSideloader.java create mode 100644 code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/SideloadSource.java create mode 100644 code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/SideloadSourceFactory.java diff --git a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertAction.java b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertAction.java new file mode 100644 index 000000000..0c3f575a9 --- /dev/null +++ b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertAction.java @@ -0,0 +1,6 @@ +package nu.marginalia.mqapi.converting; + +public enum ConvertAction { + ConvertCrawlData, + SideloadEncyclopedia +} diff --git a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertRequest.java b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertRequest.java index 640911463..abacf8af7 100644 --- a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertRequest.java +++ b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertRequest.java @@ -5,6 +5,8 @@ @AllArgsConstructor public class ConvertRequest { + public final ConvertAction action; + public final String inputSource; public final FileStorageId crawlStorage; public final FileStorageId processedDataStorage; } diff --git a/code/processes/converting-process/build.gradle b/code/processes/converting-process/build.gradle index b85a829b9..a14ee5966 100644 --- a/code/processes/converting-process/build.gradle +++ b/code/processes/converting-process/build.gradle @@ -79,6 +79,7 @@ dependencies { implementation libs.crawlercommons implementation libs.commons.lang3 + implementation libs.sqlite testImplementation libs.bundles.slf4j.test testImplementation libs.bundles.junit diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java index 9c8373e10..c7584a6c9 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java @@ -5,24 +5,26 @@ import com.google.inject.Inject; import com.google.inject.Injector; import nu.marginalia.converting.model.ProcessedDomain; +import nu.marginalia.converting.sideload.SideloadSource; +import nu.marginalia.converting.sideload.SideloadSourceFactory; import nu.marginalia.db.storage.FileStorageService; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.inbox.MqInboxResponse; import nu.marginalia.mq.inbox.MqSingleShotInbox; +import nu.marginalia.mqapi.converting.ConvertAction; import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.process.log.WorkLog; import nu.marginalia.service.module.DatabaseModule; import plan.CrawlPlan; import nu.marginalia.converting.compiler.InstructionsCompiler; -import nu.marginalia.converting.instruction.Instruction; import nu.marginalia.converting.processor.DomainProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.file.Path; import java.sql.SQLException; -import java.util.List; import java.util.Optional; import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; @@ -34,7 +36,6 @@ import static nu.marginalia.mqapi.ProcessInboxNames.CONVERTER_INBOX; public class ConverterMain { - private static final Logger logger = LoggerFactory.getLogger(ConverterMain.class); private final DomainProcessor processor; private final InstructionsCompiler compiler; @@ -42,10 +43,9 @@ public class ConverterMain { private final ProcessHeartbeat heartbeat; private final MessageQueueFactory messageQueueFactory; private final FileStorageService fileStorageService; + private final SideloadSourceFactory sideloadSourceFactory; public static void main(String... args) throws Exception { - - Injector injector = Guice.createInjector( new ConverterModule(), new DatabaseModule() @@ -55,15 +55,9 @@ public static void main(String... args) throws Exception { logger.info("Starting pipe"); - var request = converter.fetchInstructions(); - try { - converter.convert(request); - request.ok(); - } - catch (Exception ex) { - logger.error("Conversion failed", ex); - request.err(); - } + converter + .fetchInstructions() + .execute(converter); logger.info("Finished"); @@ -77,21 +71,42 @@ public ConverterMain( Gson gson, ProcessHeartbeat heartbeat, MessageQueueFactory messageQueueFactory, - FileStorageService fileStorageService - ) { + FileStorageService fileStorageService, + SideloadSourceFactory sideloadSourceFactory + ) + { this.processor = processor; this.compiler = compiler; this.gson = gson; this.heartbeat = heartbeat; this.messageQueueFactory = messageQueueFactory; this.fileStorageService = fileStorageService; + this.sideloadSourceFactory = sideloadSourceFactory; heartbeat.start(); } - public void convert(ConvertRequest request) throws Exception { + public void convert(SideloadSource sideloadSource, Path writeDir) throws Exception { + int maxPoolSize = 16; + + try (WorkLog workLog = new WorkLog(writeDir.resolve("processor.log")); + ConversionLog conversionLog = new ConversionLog(writeDir)) { + var instructionWriter = new InstructionWriterFactory(conversionLog, writeDir, gson); + + final String where; + final int size; + + try (var writer = instructionWriter.createInstructionsForDomainWriter(sideloadSource.getId())) { + compiler.compileStreaming(sideloadSource, writer::accept); + where = writer.getFileName(); + size = writer.getSize(); + } + + workLog.setJobToFinished(sideloadSource.getId(), where, size); + } + } - var plan = request.getPlan(); + public void convert(CrawlPlan plan) throws Exception { final int maxPoolSize = 16; @@ -146,29 +161,19 @@ public void convert(ConvertRequest request) throws Exception { do { System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining"); } while (!pool.awaitTermination(60, TimeUnit.SECONDS)); - - request.ok(); - } - catch (Exception e) { - request.err(); - throw e; } } - private static class ConvertRequest { - private final CrawlPlan plan; + private abstract static class ConvertRequest { private final MqMessage message; private final MqSingleShotInbox inbox; - ConvertRequest(CrawlPlan plan, MqMessage message, MqSingleShotInbox inbox) { - this.plan = plan; + private ConvertRequest(MqMessage message, MqSingleShotInbox inbox) { this.message = message; this.inbox = inbox; } - public CrawlPlan getPlan() { - return plan; - } + public abstract void execute(ConverterMain converterMain) throws Exception; public void ok() { inbox.sendResponse(message, MqInboxResponse.ok()); @@ -176,9 +181,55 @@ public void ok() { public void err() { inbox.sendResponse(message, MqInboxResponse.err()); } + } + + private static class SideloadAction extends ConvertRequest { + + private final SideloadSource sideloadSource; + private final Path workDir; + + SideloadAction(SideloadSource sideloadSource, + Path workDir, + MqMessage message, MqSingleShotInbox inbox) { + super(message, inbox); + this.sideloadSource = sideloadSource; + this.workDir = workDir; + } + @Override + public void execute(ConverterMain converterMain) throws Exception { + try { + converterMain.convert(sideloadSource, workDir); + ok(); + } + catch (Exception ex) { + logger.error("Error sideloading", ex); + err(); + } + } + } + + private static class ConvertCrawlDataAction extends ConvertRequest { + private final CrawlPlan plan; + + private ConvertCrawlDataAction(CrawlPlan plan, MqMessage message, MqSingleShotInbox inbox) { + super(message, inbox); + this.plan = plan; + } + + @Override + public void execute(ConverterMain converterMain) throws Exception { + try { + converterMain.convert(plan); + ok(); + } + catch (Exception ex) { + err(); + } + } } + private ConvertRequest fetchInstructions() throws Exception { var inbox = messageQueueFactory.createSingleShotInbox(CONVERTER_INBOX, UUID.randomUUID()); @@ -188,14 +239,30 @@ private ConvertRequest fetchInstructions() throws Exception { var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.converting.ConvertRequest.class); - var crawlData = fileStorageService.getStorage(request.crawlStorage); - var processData = fileStorageService.getStorage(request.processedDataStorage); + if (request.action == ConvertAction.ConvertCrawlData) { - var plan = new CrawlPlan(null, - new CrawlPlan.WorkDir(crawlData.path(), "crawler.log"), - new CrawlPlan.WorkDir(processData.path(), "processor.log")); + var crawlData = fileStorageService.getStorage(request.crawlStorage); + var processData = fileStorageService.getStorage(request.processedDataStorage); + + var plan = new CrawlPlan(null, + new CrawlPlan.WorkDir(crawlData.path(), "crawler.log"), + new CrawlPlan.WorkDir(processData.path(), "processor.log")); + + return new ConvertCrawlDataAction(plan, msg, inbox); + } - return new ConvertRequest(plan, msg, inbox); + if (request.action == ConvertAction.SideloadEncyclopedia) { + var processData = fileStorageService.getStorage(request.processedDataStorage); + var filePath = Path.of(request.inputSource); + + return new SideloadAction(sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(filePath), + processData.asPath(), + msg, inbox); + } + + else { + throw new RuntimeException("Unknown action: " + request.action); + } } private Optional getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException { diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/InstructionWriterFactory.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/InstructionWriterFactory.java index e6009d0e8..fee4fc191 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/InstructionWriterFactory.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/InstructionWriterFactory.java @@ -42,7 +42,7 @@ public InstructionWriter createInstructionsForDomainWriter(String id) throws IOE } public class InstructionWriter implements AutoCloseable { - private final OutputStreamWriter outputStream; + private final ObjectOutputStream outputStream; private final String where; private final SummarizingInterpreter summary = new SummarizingInterpreter(); @@ -52,7 +52,7 @@ public class InstructionWriter implements AutoCloseable { InstructionWriter(Path filename) throws IOException { where = filename.getFileName().toString(); Files.deleteIfExists(filename); - outputStream = new OutputStreamWriter(new ZstdOutputStream(new BufferedOutputStream(new FileOutputStream(filename.toFile())))); + outputStream = new ObjectOutputStream(new ZstdOutputStream(new FileOutputStream(filename.toFile()))); } public void accept(Instruction instruction) { @@ -64,10 +64,12 @@ public void accept(Instruction instruction) { size++; try { - outputStream.append(instruction.tag().name()); - outputStream.append(' '); - gson.toJson(instruction, outputStream); - outputStream.append('\n'); + outputStream.writeObject(instruction); + + // Reset the stream to avoid keeping references to the objects + // (as this will cause the memory usage to grow indefinitely when + // writing huge amounts of data) + outputStream.reset(); } catch (IOException ex) { logger.warn("IO exception writing instruction", ex); diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DocumentsCompiler.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DocumentsCompiler.java index 881a1a33d..9bc3f6b35 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DocumentsCompiler.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DocumentsCompiler.java @@ -23,7 +23,7 @@ public void compile(Consumer instructionConsumer, List instructionConsumer, ProcessedDocument doc) { + public void compileDocumentDetails(Consumer instructionConsumer, ProcessedDocument doc) { var details = doc.details; if (details != null) { @@ -31,7 +31,7 @@ private void compileDocumentDetails(Consumer instructionConsumer, P } } - private void compileWords(Consumer instructionConsumer, ProcessedDocument doc) { + public void compileWords(Consumer instructionConsumer, ProcessedDocument doc) { var words = doc.words; if (words != null) { diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DomainMetadataCompiler.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DomainMetadataCompiler.java index 74ae58169..3909edb15 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DomainMetadataCompiler.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/DomainMetadataCompiler.java @@ -40,4 +40,8 @@ public void compile(Consumer instructionConsumer, EdgeDomain domain instructionConsumer.accept(new LoadDomainMetadata(domain, knownUrls.size(), goodUrls, visitedUrls)); } + public void compileFake(Consumer instructionConsumer, EdgeDomain domain, int countAll, int countGood) { + instructionConsumer.accept(new LoadDomainMetadata(domain, countAll, countGood, countAll)); + } + } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/InstructionsCompiler.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/InstructionsCompiler.java index 9b32ed8df..87f28e3cc 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/InstructionsCompiler.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/InstructionsCompiler.java @@ -3,11 +3,15 @@ import com.google.inject.Inject; import nu.marginalia.converting.instruction.Instruction; import nu.marginalia.converting.instruction.instructions.LoadProcessedDomain; +import nu.marginalia.converting.model.ProcessedDocument; import nu.marginalia.converting.model.ProcessedDomain; +import nu.marginalia.converting.sideload.SideloadSource; +import nu.marginalia.model.EdgeUrl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.util.ArrayList; import java.util.Collections; -import java.util.List; +import java.util.Iterator; import java.util.function.Consumer; import static java.util.Objects.requireNonNullElse; @@ -20,6 +24,8 @@ public class InstructionsCompiler { private final LinksCompiler linksCompiler; private final RedirectCompiler redirectCompiler; + private final Logger logger = LoggerFactory.getLogger(InstructionsCompiler.class); + @Inject public InstructionsCompiler(UrlsCompiler urlsCompiler, DocumentsCompiler documentsCompiler, @@ -53,4 +59,35 @@ public void compile(ProcessedDomain domain, Consumer instructionCon domainMetadataCompiler.compile(instructionConsumer, domain.domain, requireNonNullElse(domain.documents, Collections.emptyList())); } + + public void compileStreaming(SideloadSource sideloadSource, + Consumer instructionConsumer) { + ProcessedDomain domain = sideloadSource.getDomain(); + Iterator urlsIterator = sideloadSource.getUrlsIterator(); + Iterator documentsIterator = sideloadSource.getDocumentsStream(); + + // Guaranteed to always be first + instructionConsumer.accept(new LoadProcessedDomain(domain.domain, domain.state, domain.ip)); + + int countAll = 0; + int countGood = 0; + + logger.info("Writing domains"); + urlsCompiler.compileJustDomain(instructionConsumer, domain.domain); + logger.info("Writing urls"); + urlsCompiler.compileJustUrls(instructionConsumer, urlsIterator); + + logger.info("Writing docs"); + + while (documentsIterator.hasNext()) { + var doc = documentsIterator.next(); + countAll++; + if (doc.isOk()) countGood++; + + documentsCompiler.compileDocumentDetails(instructionConsumer, doc); + documentsCompiler.compileWords(instructionConsumer, doc); + } + + domainMetadataCompiler.compileFake(instructionConsumer, domain.domain, countAll, countGood); + } } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/UrlsCompiler.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/UrlsCompiler.java index ba3470580..34e243b30 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/UrlsCompiler.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/compiler/UrlsCompiler.java @@ -9,10 +9,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import java.util.*; import java.util.function.Consumer; public class UrlsCompiler { @@ -58,4 +55,20 @@ public void compile(Consumer instructionConsumer, List instructionConsumer, Iterator urlsIterator) { + var urls = new ArrayList(1000); + + while (urlsIterator.hasNext()) { + if (urls.size() >= 1000) { + instructionConsumer.accept(new LoadUrl(urls.toArray(EdgeUrl[]::new))); + urls.clear(); + } + + urls.add(urlsIterator.next()); + } + } + + public void compileJustDomain(Consumer instructionConsumer, EdgeDomain domain) { + instructionConsumer.accept(new LoadDomain(domain)); + } } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java index 26ade3c67..e313bcdf5 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java @@ -28,7 +28,8 @@ public class DomainProcessor { @Inject public DomainProcessor(DocumentProcessor documentProcessor, SiteWords siteWords, - LshDocumentDeduplicator documentDeduplicator) { + LshDocumentDeduplicator documentDeduplicator) + { this.documentProcessor = documentProcessor; this.siteWords = siteWords; this.documentDeduplicator = documentDeduplicator; diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/EncyclopediaMarginaliaNuSideloader.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/EncyclopediaMarginaliaNuSideloader.java new file mode 100644 index 000000000..ef5a5874b --- /dev/null +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/EncyclopediaMarginaliaNuSideloader.java @@ -0,0 +1,247 @@ +package nu.marginalia.converting.sideload; + +import com.github.luben.zstd.ZstdInputStream; +import com.google.gson.Gson; +import lombok.SneakyThrows; +import nu.marginalia.converting.model.DisqualifiedException; +import nu.marginalia.converting.model.ProcessedDocument; +import nu.marginalia.converting.model.ProcessedDomain; +import nu.marginalia.converting.processor.plugin.HtmlDocumentProcessorPlugin; +import nu.marginalia.crawling.model.CrawledDocument; +import nu.marginalia.model.EdgeDomain; +import nu.marginalia.model.EdgeUrl; +import nu.marginalia.model.crawl.DomainIndexingState; +import nu.marginalia.model.crawl.UrlIndexingState; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.sql.*; +import java.time.LocalDateTime; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; + +/** This is an experimental sideloader for encyclopedia.marginalia.nu's database; + * (which serves as a way of loading wikipedia's zim files without binding to GPL2'd code) + * + * See https://github.com/MarginaliaSearch/encyclopedia.marginalia.nu for extracting the data + */ +public class EncyclopediaMarginaliaNuSideloader implements SideloadSource, AutoCloseable { + + private final Connection connection; + private final Gson gson; + private final HtmlDocumentProcessorPlugin htmlProcessorPlugin; + + public EncyclopediaMarginaliaNuSideloader(Path pathToDbFile, + Gson gson, + HtmlDocumentProcessorPlugin htmlProcessorPlugin) throws SQLException { + this.gson = gson; + this.htmlProcessorPlugin = htmlProcessorPlugin; + String sqliteDbString = "jdbc:sqlite:" + pathToDbFile.toString(); + + connection = DriverManager.getConnection(sqliteDbString); + + } + + @Override + public ProcessedDomain getDomain() { + var ret = new ProcessedDomain(); + + ret.domain = new EdgeDomain("encyclopedia.marginalia.nu"); + ret.id = "encyclopedia.marginalia.nu"; + ret.ip = "127.0.0.1"; + ret.state = DomainIndexingState.ACTIVE; + + return ret; + } + + @Override + @SneakyThrows + public Iterator getUrlsIterator() { + EdgeUrl base = new EdgeUrl("https://encyclopedia.marginalia.nu/"); + + return new SqlQueryIterator<>(connection.prepareStatement(""" + SELECT url, html FROM articles + """)) + { + @Override + public EdgeUrl convert(ResultSet rs) throws Exception { + var path = URLEncoder.encode(rs.getString("url"), StandardCharsets.UTF_8); + + return base.withPathAndParam("/article/"+path, null); + } + }; + } + + + @SneakyThrows + @Override + public Iterator getDocumentsStream() { + LinkedBlockingQueue docs = new LinkedBlockingQueue<>(32); + AtomicBoolean isFinished = new AtomicBoolean(false); + + ExecutorService executorService = Executors.newFixedThreadPool(16); + Semaphore sem = new Semaphore(16); + + executorService.submit(() -> { + try { + var stmt = connection.prepareStatement(""" + SELECT url,title,html FROM articles + """); + stmt.setFetchSize(100); + + var rs = stmt.executeQuery(); + while (rs.next()) { + var articleParts = fromCompressedJson(rs.getBytes("html"), ArticleParts.class); + String title = rs.getString("title"); + String url = rs.getString("url"); + + sem.acquire(); + + executorService.submit(() -> { + try { + docs.add(convertDocument(articleParts.parts, title, url)); + } catch (URISyntaxException | DisqualifiedException e) { + e.printStackTrace(); + } finally { + sem.release(); + } + }); + } + + stmt.close(); + } + catch (Exception e) { + e.printStackTrace(); + } + finally { + isFinished.set(true); + } + }); + + return new Iterator<>() { + @Override + public boolean hasNext() { + return !isFinished.get() || !docs.isEmpty() || sem.availablePermits() < 16; + } + + @SneakyThrows + @Override + public ProcessedDocument next() { + return docs.take(); + } + }; + } + + private ProcessedDocument convertDocument(List parts, String title, String url) throws URISyntaxException, DisqualifiedException { + String fullUrl = "https://encyclopedia.marginalia.nu/article/"+url; + + StringBuilder fullHtml = new StringBuilder(); + fullHtml.append("").append(title).append(""); + for (String part : parts) { + fullHtml.append("

"); + fullHtml.append(part); + fullHtml.append("

"); + } + fullHtml.append(""); + + var crawledDoc = new CrawledDocument( + "encyclopedia.marginalia.nu", + fullUrl, + "text/html", + LocalDateTime.now().toString(), + 200, + "OK", + "NP", + "", + fullHtml.toString(), + Integer.toHexString(fullHtml.hashCode()), + fullUrl, + "", + "SIDELOAD" + ); + + var ret = new ProcessedDocument(); + try { + var details = htmlProcessorPlugin.createDetails(crawledDoc); + + ret.words = details.words(); + ret.details = details.details(); + ret.url = new EdgeUrl(fullUrl); + ret.state = UrlIndexingState.OK; + ret.stateReason = "SIDELOAD"; + } + catch (Exception e) { + ret.url = new EdgeUrl(fullUrl); + ret.state = UrlIndexingState.DISQUALIFIED; + ret.stateReason = "SIDELOAD"; + } + + return ret; + + } + + private T fromCompressedJson(byte[] stream, Class type) throws IOException { + return gson.fromJson(new InputStreamReader(new ZstdInputStream(new ByteArrayInputStream(stream))), type); + } + + private record ArticleParts(List parts) {} + + @Override + public String getId() { + return "encyclopedia.marginalia.nu"; + } + + @Override + public void close() throws Exception { + connection.close(); + } + + private abstract static class SqlQueryIterator implements Iterator { + PreparedStatement stmt; + ResultSet rs; + T next = null; + + public SqlQueryIterator(PreparedStatement stmt) throws SQLException { + this.stmt = stmt; + stmt.setFetchSize(1000); + rs = stmt.executeQuery(); + } + + @SneakyThrows + @Override + public boolean hasNext() { + if (next != null) { + return true; + } + if (!rs.next()) { + stmt.close(); + return false; + } + + next = convert(rs); + + return true; + } + + public abstract T convert(ResultSet rs) throws Exception; + + @Override + public T next () { + if (!hasNext()) + throw new IllegalStateException("No next element"); + var ret = next; + next = null; + return ret; + } + } +} diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/SideloadSource.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/SideloadSource.java new file mode 100644 index 000000000..d23a81ae8 --- /dev/null +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/SideloadSource.java @@ -0,0 +1,15 @@ +package nu.marginalia.converting.sideload; + +import nu.marginalia.converting.model.ProcessedDocument; +import nu.marginalia.converting.model.ProcessedDomain; +import nu.marginalia.model.EdgeUrl; + +import java.util.Iterator; + +public interface SideloadSource { + ProcessedDomain getDomain(); + Iterator getUrlsIterator(); + Iterator getDocumentsStream(); + + String getId(); +} diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/SideloadSourceFactory.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/SideloadSourceFactory.java new file mode 100644 index 000000000..83c629d3e --- /dev/null +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/SideloadSourceFactory.java @@ -0,0 +1,23 @@ +package nu.marginalia.converting.sideload; + +import com.google.gson.Gson; +import com.google.inject.Inject; +import nu.marginalia.converting.processor.plugin.HtmlDocumentProcessorPlugin; + +import java.nio.file.Path; +import java.sql.SQLException; + +public class SideloadSourceFactory { + private final Gson gson; + private final HtmlDocumentProcessorPlugin htmlProcessorPlugin; + + @Inject + public SideloadSourceFactory(Gson gson, HtmlDocumentProcessorPlugin htmlProcessorPlugin) { + this.gson = gson; + this.htmlProcessorPlugin = htmlProcessorPlugin; + } + + public SideloadSource sideloadEncyclopediaMarginaliaNu(Path pathToDbFile) throws SQLException { + return new EncyclopediaMarginaliaNuSideloader(pathToDbFile, gson, htmlProcessorPlugin); + } +} diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/ConvertedDomainReader.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/ConvertedDomainReader.java index 6b9dfbbda..1c06510e2 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/ConvertedDomainReader.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/ConvertedDomainReader.java @@ -2,10 +2,8 @@ import com.github.luben.zstd.ZstdInputStream; import com.google.gson.Gson; -import com.google.gson.JsonParseException; +import lombok.SneakyThrows; import nu.marginalia.converting.instruction.Instruction; -import nu.marginalia.converting.instruction.InstructionTag; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -13,6 +11,7 @@ import java.io.*; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; public class ConvertedDomainReader { @@ -27,30 +26,48 @@ public ConvertedDomainReader(Gson gson) { public List read(Path path, int cntHint) throws IOException { List ret = new ArrayList<>(cntHint); - try (var br = new BufferedReader(new InputStreamReader(new ZstdInputStream(new BufferedInputStream(new FileInputStream(path.toFile())))))) { - String line; - for (;;) { - line = br.readLine(); + try (var or = new ObjectInputStream(new ZstdInputStream(new FileInputStream(path.toFile())))) { + var object = or.readObject(); + if (object instanceof Instruction is) { + ret.add(is); + } + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } - if (line == null) { - break; - } - if (line.isBlank()) { - continue; - } - var parts= line.split(" ", 2); - var type = InstructionTag.valueOf(parts[0]).clazz; + return ret; + } + + public Iterator createIterator(Path path) throws IOException { + var or = new ObjectInputStream(new ZstdInputStream(new BufferedInputStream(new FileInputStream(path.toFile())))); + + return new Iterator<>() { + Instruction next; + @SneakyThrows + @Override + public boolean hasNext() { + if (next != null) + return true; try { - ret.add(gson.fromJson(parts[1], type)); + next = (Instruction) or.readObject(); + return true; } - catch (NullPointerException|JsonParseException ex) { - logger.warn("Failed to deserialize {} {}", type.getSimpleName(), StringUtils.abbreviate(parts[1], 255)); - logger.warn("Json error", ex); + catch (java.io.EOFException ex) { + or.close(); + return false; } } - } - return ret; + @Override + public Instruction next() { + if (next != null || hasNext()) { + var ret = next; + next = null; + return ret; + } + throw new IllegalStateException(); + } + }; } } diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java index fc1694618..c84413307 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java @@ -23,7 +23,7 @@ import java.nio.file.Path; import java.sql.SQLException; -import java.util.List; +import java.util.Iterator; import java.util.Optional; import java.util.UUID; import java.util.concurrent.LinkedBlockingQueue; @@ -55,8 +55,13 @@ public static void main(String... args) throws Exception { ); var instance = injector.getInstance(LoaderMain.class); - var instructions = instance.fetchInstructions(); - instance.run(instructions); + try { + var instructions = instance.fetchInstructions(); + instance.run(instructions); + } + catch (Exception ex) { + logger.error("Error running loader", ex); + } } @Inject @@ -101,7 +106,19 @@ public void run(LoadRequest instructions) { for (var entry : WorkLog.iterable(logFile)) { heartbeat.setProgress(loaded++ / (double) loadTotal); - load(plan, entry.path(), entry.cnt()); + var loader = loaderFactory.create(entry.cnt()); + Path destDir = plan.getProcessedFilePath(entry.path()); + + var instructionsIter = instructionsReader.createIterator(destDir); + while (instructionsIter.hasNext()) { + var next = instructionsIter.next(); + try { + next.apply(loader); + } + catch (Exception ex) { + logger.error("Failed to load instruction {}", next); + } + } } running = false; @@ -110,6 +127,7 @@ public void run(LoadRequest instructions) { // This needs to be done in order to have a readable index journal indexLoadKeywords.close(); + logger.info("Loading finished"); } catch (Exception ex) { logger.error("Failed to load", ex); @@ -119,6 +137,7 @@ public void run(LoadRequest instructions) { finally { heartbeat.shutDown(); } + System.exit(0); } @@ -128,7 +147,7 @@ private void load(CrawlPlan plan, String path, int cnt) { Path destDir = plan.getProcessedFilePath(path); try { var loader = loaderFactory.create(cnt); - var instructions = instructionsReader.read(destDir, cnt); + var instructions = instructionsReader.createIterator(destDir); processQueue.put(new LoadJob(path, loader, instructions)); } catch (Exception e) { logger.error("Failed to load " + destDir, e); @@ -137,15 +156,16 @@ private void load(CrawlPlan plan, String path, int cnt) { static final TaskStats taskStats = new TaskStats(100); - private record LoadJob(String path, Loader loader, List instructionList) { + private record LoadJob(String path, Loader loader, Iterator instructionIterator) { public void run() { long startTime = System.currentTimeMillis(); - for (var i : instructionList) { + while (instructionIterator.hasNext()) { + var next = instructionIterator.next(); try { - i.apply(loader); + next.apply(loader); } catch (Exception ex) { - logger.error("Failed to load instruction {}", i); + logger.error("Failed to load instruction {}", next); } } diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/Loader.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/Loader.java index 21216b35c..96c5a21c0 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/Loader.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/Loader.java @@ -86,40 +86,24 @@ public void loadProcessedDomain(EdgeDomain domain, DomainIndexingState state, St @Override public void loadProcessedDocument(LoadProcessedDocument document) { - deferralCheck(document.url()); - processedDocumentList.add(document); + if (processedDocumentList.size() > 100) { + sqlLoadProcessedDocument.load(data, processedDocumentList); + processedDocumentList.clear(); + } } @Override public void loadProcessedDocumentWithError(LoadProcessedDocumentWithError document) { - deferralCheck(document.url()); - processedDocumentWithErrorList.add(document); - } - - private void deferralCheck(EdgeUrl url) { - if (data.getDomainId(url.domain) <= 0) - deferredDomains.add(url.domain); - - if (data.getUrlId(url) <= 0) - deferredUrls.add(url); + if (processedDocumentWithErrorList.size() > 100) { + sqlLoadProcessedDocument.loadWithError(data, processedDocumentWithErrorList); + processedDocumentWithErrorList.clear(); + } } @Override public void loadKeywords(EdgeUrl url, DocumentMetadata metadata, DocumentKeywords words) { - // This is a bit of a bandaid safeguard against a bug in - // in the converter, shouldn't be necessary in the future - if (!deferredDomains.isEmpty()) { - loadDomain(deferredDomains.toArray(EdgeDomain[]::new)); - deferredDomains.clear(); - } - - if (!deferredUrls.isEmpty()) { - loadUrl(deferredUrls.toArray(EdgeUrl[]::new)); - deferredUrls.clear(); - } - try { indexLoadKeywords.load(data, url, metadata, words); } catch (InterruptedException e) { @@ -140,8 +124,12 @@ public void loadDomainMetadata(EdgeDomain domain, int knownUrls, int goodUrls, i public void finish() { // Some work needs to be processed out of order for the database relations to work out - sqlLoadProcessedDocument.load(data, processedDocumentList); - sqlLoadProcessedDocument.loadWithError(data, processedDocumentWithErrorList); + if (processedDocumentList.size() > 0) { + sqlLoadProcessedDocument.load(data, processedDocumentList); + } + if (processedDocumentWithErrorList.size() > 0) { + sqlLoadProcessedDocument.loadWithError(data, processedDocumentWithErrorList); + } } } diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadProcessedDocument.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadProcessedDocument.java index 2aec488df..02c4202ca 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadProcessedDocument.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadProcessedDocument.java @@ -73,7 +73,7 @@ public void load(LoaderData data, List documents) { int urlId = data.getUrlId(doc.url()); if (urlId <= 0) { logger.warn("Failed to resolve ID for URL {}", doc.url()); - return; + continue; } stmt.setInt(1, urlId); diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadProcessedDomain.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadProcessedDomain.java index c06ff84c9..df598b147 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadProcessedDomain.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadProcessedDomain.java @@ -14,12 +14,14 @@ public class SqlLoadProcessedDomain { private final HikariDataSource dataSource; private final SqlLoadDomains loadDomains; + private final SqlLoadUrls loadUrls; private static final Logger logger = LoggerFactory.getLogger(SqlLoadProcessedDomain.class); @Inject - public SqlLoadProcessedDomain(HikariDataSource dataSource, SqlLoadDomains loadDomains) { + public SqlLoadProcessedDomain(HikariDataSource dataSource, SqlLoadDomains loadDomains, SqlLoadUrls loadUrls) { this.dataSource = dataSource; this.loadDomains = loadDomains; + this.loadUrls = loadUrls; try (var conn = dataSource.getConnection()) { @@ -34,7 +36,7 @@ IN IP VARCHAR(48)) BEGIN DELETE FROM DOMAIN_METADATA WHERE ID=DID; DELETE FROM EC_DOMAIN_LINK WHERE SOURCE_DOMAIN_ID=DID; - DELETE FROM EC_URL WHERE DOMAIN_ID = DID; + DELETE FROM EC_PAGE_DATA WHERE ID IN (SELECT ID FROM EC_URL WHERE DOMAIN_ID = DID); UPDATE EC_DOMAIN SET INDEX_DATE=NOW(), STATE=ST, DOMAIN_ALIAS=NULL, INDEXED=GREATEST(INDEXED,IDX), IP=IP WHERE ID=DID; DELETE FROM EC_DOMAIN_LINK WHERE SOURCE_DOMAIN_ID=DID; END @@ -47,6 +49,7 @@ IN IP VARCHAR(48)) } public void load(LoaderData data, EdgeDomain domain, DomainIndexingState state, String ip) { + data.setTargetDomain(domain); loadDomains.load(data, domain); @@ -63,6 +66,8 @@ public void load(LoaderData data, EdgeDomain domain, DomainIndexingState state, if (rc < 1) { logger.warn("load({},{}) -- bad rowcount {}", domain, state, rc); } + + loadUrls.loadUrlsForDomain(data, domain, 0); } catch (SQLException ex) { logger.warn("SQL error initializing domain", ex); diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadUrls.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadUrls.java index 18bd32b16..a0b0f8cbb 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadUrls.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/SqlLoadUrls.java @@ -30,23 +30,35 @@ public SqlLoadUrls(HikariDataSource dataSource) { public void load(LoaderData data, EdgeUrl[] urls) { Set affectedDomains = new HashSet<>(); + if (urls.length == 0) + return; + + int maxOldId = 0; try (var conn = dataSource.getConnection(); var insertCall = conn.prepareStatement("INSERT IGNORE INTO EC_URL (PROTO,DOMAIN_ID,PORT,PATH,PARAM,PATH_HASH) VALUES (?,?,?,?,?,?)"); - var queryCall = conn.prepareStatement("SELECT ID, PROTO, PATH, PARAM FROM EC_URL WHERE DOMAIN_ID=?") - ) + var queryMaxId = conn.prepareStatement("SELECT MAX(ID) FROM EC_URL")) { conn.setAutoCommit(false); + var rs = queryMaxId.executeQuery(); + if (rs.next()) { + maxOldId = rs.getInt(1); + } int cnt = 0; int batchOffset = 0; + for (var url : urls) { + if (data.getUrlId(url) != 0) + continue; if (url.path.length() >= 255) { - logger.debug("Skipping bad URL {}", url); + logger.info("Skipping bad URL {}", url); continue; } + var domainId = data.getDomainId(url.domain); + affectedDomains.add(url.domain); insertCall.setString(1, url.proto); - insertCall.setInt(2, data.getDomainId(url.domain)); + insertCall.setInt(2, domainId); if (url.port != null) { insertCall.setInt(3, url.port); } @@ -58,10 +70,8 @@ public void load(LoaderData data, EdgeUrl[] urls) { insertCall.setLong(6, hashPath(url.path, url.param)); insertCall.addBatch(); - if (cnt++ == 1000) { + if (++cnt == 1000) { var ret = insertCall.executeBatch(); - conn.commit(); - for (int rv = 0; rv < cnt; rv++) { if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) { logger.warn("load({}) -- bad row count {}", urls[batchOffset + rv], ret[rv]); @@ -72,10 +82,9 @@ public void load(LoaderData data, EdgeUrl[] urls) { cnt = 0; } } + if (cnt > 0) { var ret = insertCall.executeBatch(); - conn.commit(); - for (int rv = 0; rv < cnt; rv++) { if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) { logger.warn("load({}) -- bad row count {}", urls[batchOffset + rv], ret[rv]); @@ -83,24 +92,12 @@ public void load(LoaderData data, EdgeUrl[] urls) { } } + conn.commit(); conn.setAutoCommit(true); - for (var domain : affectedDomains) { - queryCall.setInt(1, data.getDomainId(domain)); - var rsp = queryCall.executeQuery(); - rsp.setFetchSize(1000); - - while (rsp.next()) { - int urlId = rsp.getInt(1); - String proto = rsp.getString(2); - String path = rsp.getString(3); - String param = rsp.getString(4); - - data.addUrl(new EdgeUrl(proto, domain, null, path, param), urlId); - } + loadUrlsForDomain(data, domain, maxOldId); } - } catch (SQLException ex) { logger.warn("SQL error inserting URLs", ex); @@ -121,4 +118,27 @@ private long hashPath(String path, String queryParam) { return pathHash + murmur3_128.hashString(queryParam, StandardCharsets.UTF_8).padToLong(); } } + + /** Loads urlIDs for the domain into `data` from the database, starting at URL ID minId. */ + public void loadUrlsForDomain(LoaderData data, EdgeDomain domain, int minId) throws SQLException { + try (var conn = dataSource.getConnection(); + var queryCall = conn.prepareStatement("SELECT ID, PROTO, PATH, PARAM FROM EC_URL WHERE DOMAIN_ID=? AND ID > ?")) { + + queryCall.setInt(1, data.getDomainId(domain)); + queryCall.setInt(2, minId); + + var rsp = queryCall.executeQuery(); + rsp.setFetchSize(1000); + + while (rsp.next()) { + int urlId = rsp.getInt(1); + String proto = rsp.getString(2); + String path = rsp.getString(3); + String param = rsp.getString(4); + + data.addUrl(new EdgeUrl(proto, domain, null, path, param), urlId); + } + } + + } } diff --git a/code/processes/loading-process/src/test/java/nu/marginalia/loader/SqlLoadProcessedDomainTest.java b/code/processes/loading-process/src/test/java/nu/marginalia/loader/SqlLoadProcessedDomainTest.java index b595c1fa8..75c747526 100644 --- a/code/processes/loading-process/src/test/java/nu/marginalia/loader/SqlLoadProcessedDomainTest.java +++ b/code/processes/loading-process/src/test/java/nu/marginalia/loader/SqlLoadProcessedDomainTest.java @@ -5,6 +5,7 @@ import nu.marginalia.loading.loader.SqlLoadDomains; import nu.marginalia.loading.loader.SqlLoadProcessedDomain; import nu.marginalia.converting.instruction.instructions.DomainLink; +import nu.marginalia.loading.loader.SqlLoadUrls; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.crawl.DomainIndexingState; import org.junit.jupiter.api.AfterEach; @@ -50,18 +51,18 @@ public void tearDown() { @Test public void loadProcessedDomain() { - var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource)); + var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource), new SqlLoadUrls(dataSource)); loader.load(loaderData, new EdgeDomain("www.marginalia.nu"), DomainIndexingState.BLOCKED, "127.0.0.1"); } @Test public void loadProcessedDomainTwice() { - var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource)); + var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource), new SqlLoadUrls(dataSource)); loader.load(loaderData, new EdgeDomain("www.marginalia.nu"), DomainIndexingState.BLOCKED, "127.0.0.1"); } @Test public void loadProcessedDomaiWithExtremelyLongIP() { - var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource)); + var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource), new SqlLoadUrls(dataSource)); String ip = Stream.generate(() -> "127.").limit(1024).collect(Collectors.joining()); @@ -70,7 +71,7 @@ public void loadProcessedDomaiWithExtremelyLongIP() { @Test public void loadDomainAlias() { - var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource)); + var loader = new SqlLoadProcessedDomain(dataSource, new SqlLoadDomains(dataSource), new SqlLoadUrls(dataSource)); loader.loadAlias(loaderData, new DomainLink(new EdgeDomain("memex.marginalia.nu"), new EdgeDomain("www.marginalia.nu"))); } } \ No newline at end of file diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/task/ReconvertAndLoadActor.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/task/ReconvertAndLoadActor.java index 96730aa26..c6d020e94 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/task/ReconvertAndLoadActor.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/actor/task/ReconvertAndLoadActor.java @@ -10,6 +10,7 @@ import nu.marginalia.control.svc.ProcessService; import nu.marginalia.index.client.IndexClient; import nu.marginalia.index.client.IndexMqEndpoints; +import nu.marginalia.mqapi.converting.ConvertAction; import nu.marginalia.mqapi.converting.ConvertRequest; import nu.marginalia.mqapi.loading.LoadRequest; import nu.marginalia.db.storage.FileStorageService; @@ -121,7 +122,10 @@ public Message reconvert(Message message) throws Exception { storageService.relateFileStorages(toProcess.id(), processedArea.id()); // Pre-send convert request - var request = new ConvertRequest(message.crawlStorageId, processedArea.id()); + var request = new ConvertRequest(ConvertAction.ConvertCrawlData, + null, + message.crawlStorageId, + processedArea.id()); long id = mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request)); return message diff --git a/settings.gradle b/settings.gradle index 7e6d02a02..131b449e3 100644 --- a/settings.gradle +++ b/settings.gradle @@ -175,6 +175,8 @@ dependencyResolutionManagement { library('handlebars','com.github.jknack','handlebars').version('4.3.1') library('handlebars.markdown','com.github.jknack','handlebars-markdown').version('4.2.1') + library('sqlite','org.xerial','sqlite-jdbc').version('3.41.2.1') + bundle('slf4j', ['slf4j.api', 'log4j.api', 'log4j.core', 'log4j.slf4j']) bundle('slf4j.test', ['slf4j.jdk14']) bundle('prometheus', ['prometheus', 'prometheus-servlet', 'prometheus-server', 'prometheus-hotspot'])