From 0caef1b307120ee58a8a08a43f713b5a8a291808 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Fri, 12 Jan 2024 13:45:14 +0100 Subject: [PATCH] (warc) Toggle for saving WARC data Add a toggle for saving the WARC data generated by the search engine's crawler. Normally this is discarded, but for debugging or archival purposes, retaining it may be of interest. The warc files are concatenated into larger archives, up to about 1 GB each. An index is also created containing filenames, domain names, offsets and sizes to help navigate these larger archives. The warc data is saved in a directory warc/ under the crawl data storage. --- .../nodecfg/NodeConfigurationService.java | 18 ++- .../nodecfg/model/NodeConfiguration.java | 1 + .../nodecfg/NodeConfigurationServiceTest.java | 4 +- .../V24_01_0_001__node_config__keep_warc.sql | 1 + .../service/server/NodeStatusWatcher.java | 2 +- .../java/nu/marginalia/crawl/CrawlerMain.java | 13 +- .../crawl/warc/WarcArchiverFactory.java | 122 ++++++++++++++++++ .../marginalia/crawl/warc/WarcArchiverIf.java | 12 ++ .../control/node/svc/ControlNodeService.java | 1 + .../templates/control/node/node-config.hdb | 11 ++ 10 files changed, 174 insertions(+), 11 deletions(-) create mode 100644 code/common/db/src/main/resources/db/migration/V24_01_0_001__node_config__keep_warc.sql create mode 100644 code/processes/crawling-process/src/main/java/nu/marginalia/crawl/warc/WarcArchiverFactory.java create mode 100644 code/processes/crawling-process/src/main/java/nu/marginalia/crawl/warc/WarcArchiverIf.java diff --git a/code/common/config/src/main/java/nu/marginalia/nodecfg/NodeConfigurationService.java b/code/common/config/src/main/java/nu/marginalia/nodecfg/NodeConfigurationService.java index d1b2b9c91..d04033fbd 100644 --- a/code/common/config/src/main/java/nu/marginalia/nodecfg/NodeConfigurationService.java +++ b/code/common/config/src/main/java/nu/marginalia/nodecfg/NodeConfigurationService.java @@ -17,16 +17,17 @@ public NodeConfigurationService(HikariDataSource dataSource) { this.dataSource = dataSource; } - public NodeConfiguration create(int id, String description, boolean acceptQueries) throws SQLException { + public NodeConfiguration create(int id, String description, boolean acceptQueries, boolean keepWarcs) throws SQLException { try (var conn = dataSource.getConnection(); var is = conn.prepareStatement(""" - INSERT IGNORE INTO NODE_CONFIGURATION(ID, DESCRIPTION, ACCEPT_QUERIES) VALUES(?, ?, ?) + INSERT IGNORE INTO NODE_CONFIGURATION(ID, DESCRIPTION, ACCEPT_QUERIES) VALUES(?, ?, ?, ?) """) ) { is.setInt(1, id); is.setString(2, description); is.setBoolean(3, acceptQueries); + is.setBoolean(4, keepWarcs); if (is.executeUpdate() <= 0) { throw new IllegalStateException("Failed to insert configuration"); @@ -39,7 +40,7 @@ INSERT IGNORE INTO NODE_CONFIGURATION(ID, DESCRIPTION, ACCEPT_QUERIES) VALUES(?, public List getAll() throws SQLException { try (var conn = dataSource.getConnection(); var qs = conn.prepareStatement(""" - SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, DISABLED + SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, KEEP_WARCS, DISABLED FROM NODE_CONFIGURATION """)) { var rs = qs.executeQuery(); @@ -53,6 +54,7 @@ public List getAll() throws SQLException { rs.getBoolean("ACCEPT_QUERIES"), rs.getBoolean("AUTO_CLEAN"), rs.getBoolean("PRECESSION"), + rs.getBoolean("KEEP_WARCS"), rs.getBoolean("DISABLED") )); } @@ -63,7 +65,7 @@ public List getAll() throws SQLException { public NodeConfiguration get(int nodeId) throws SQLException { try (var conn = dataSource.getConnection(); var qs = conn.prepareStatement(""" - SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, DISABLED + SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, KEEP_WARCS, DISABLED FROM NODE_CONFIGURATION WHERE ID=? """)) { @@ -76,6 +78,7 @@ public NodeConfiguration get(int nodeId) throws SQLException { rs.getBoolean("ACCEPT_QUERIES"), rs.getBoolean("AUTO_CLEAN"), rs.getBoolean("PRECESSION"), + rs.getBoolean("KEEP_WARCS"), rs.getBoolean("DISABLED") ); } @@ -88,7 +91,7 @@ public void save(NodeConfiguration config) throws SQLException { try (var conn = dataSource.getConnection(); var us = conn.prepareStatement(""" UPDATE NODE_CONFIGURATION - SET DESCRIPTION=?, ACCEPT_QUERIES=?, AUTO_CLEAN=?, PRECESSION=?, DISABLED=? + SET DESCRIPTION=?, ACCEPT_QUERIES=?, AUTO_CLEAN=?, PRECESSION=?, KEEP_WARCS=?, DISABLED=? WHERE ID=? """)) { @@ -96,8 +99,9 @@ public void save(NodeConfiguration config) throws SQLException { us.setBoolean(2, config.acceptQueries()); us.setBoolean(3, config.autoClean()); us.setBoolean(4, config.includeInPrecession()); - us.setBoolean(5, config.disabled()); - us.setInt(6, config.node()); + us.setBoolean(5, config.keepWarcs()); + us.setBoolean(6, config.disabled()); + us.setInt(7, config.node()); if (us.executeUpdate() <= 0) throw new IllegalStateException("Failed to update configuration"); diff --git a/code/common/config/src/main/java/nu/marginalia/nodecfg/model/NodeConfiguration.java b/code/common/config/src/main/java/nu/marginalia/nodecfg/model/NodeConfiguration.java index 3b04f5f78..10816f0fc 100644 --- a/code/common/config/src/main/java/nu/marginalia/nodecfg/model/NodeConfiguration.java +++ b/code/common/config/src/main/java/nu/marginalia/nodecfg/model/NodeConfiguration.java @@ -5,6 +5,7 @@ public record NodeConfiguration(int node, boolean acceptQueries, boolean autoClean, boolean includeInPrecession, + boolean keepWarcs, boolean disabled ) { diff --git a/code/common/config/src/test/java/nu/marginalia/nodecfg/NodeConfigurationServiceTest.java b/code/common/config/src/test/java/nu/marginalia/nodecfg/NodeConfigurationServiceTest.java index 4744dda82..2fdef80c7 100644 --- a/code/common/config/src/test/java/nu/marginalia/nodecfg/NodeConfigurationServiceTest.java +++ b/code/common/config/src/test/java/nu/marginalia/nodecfg/NodeConfigurationServiceTest.java @@ -50,8 +50,8 @@ public static void setup() { @Test public void test() throws SQLException { - var a = nodeConfigurationService.create(1, "Test", false); - var b = nodeConfigurationService.create(2, "Foo", true); + var a = nodeConfigurationService.create(1, "Test", false, false); + var b = nodeConfigurationService.create(2, "Foo", true, false); assertEquals(1, a.node()); assertEquals("Test", a.description()); diff --git a/code/common/db/src/main/resources/db/migration/V24_01_0_001__node_config__keep_warc.sql b/code/common/db/src/main/resources/db/migration/V24_01_0_001__node_config__keep_warc.sql new file mode 100644 index 000000000..781e38359 --- /dev/null +++ b/code/common/db/src/main/resources/db/migration/V24_01_0_001__node_config__keep_warc.sql @@ -0,0 +1 @@ +ALTER TABLE WMSA_prod.NODE_CONFIGURATION ADD COLUMN KEEP_WARCS BOOLEAN DEFAULT FALSE; \ No newline at end of file diff --git a/code/common/service/src/main/java/nu/marginalia/service/server/NodeStatusWatcher.java b/code/common/service/src/main/java/nu/marginalia/service/server/NodeStatusWatcher.java index 411b1b628..6bba6dfd7 100644 --- a/code/common/service/src/main/java/nu/marginalia/service/server/NodeStatusWatcher.java +++ b/code/common/service/src/main/java/nu/marginalia/service/server/NodeStatusWatcher.java @@ -57,7 +57,7 @@ public NodeStatusWatcher(NodeConfigurationService configurationService, private void setupNode() { try { - configurationService.create(nodeId, "Node " + nodeId, nodeId == 1); + configurationService.create(nodeId, "Node " + nodeId, nodeId == 1, false); fileStorageService.createStorageBase("Index Data", Path.of("/idx"), nodeId, FileStorageBaseType.CURRENT); fileStorageService.createStorageBase("Index Backups", Path.of("/backup"), nodeId, FileStorageBaseType.BACKUP); fileStorageService.createStorageBase("Crawl Data", Path.of("/storage"), nodeId, FileStorageBaseType.STORAGE); diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java index e3c2e1789..bb7e3ee8b 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java @@ -17,6 +17,8 @@ import nu.marginalia.crawl.spec.CrawlSpecProvider; import nu.marginalia.crawl.spec.DbCrawlSpecProvider; import nu.marginalia.crawl.spec.ParquetCrawlSpecProvider; +import nu.marginalia.crawl.warc.WarcArchiverFactory; +import nu.marginalia.crawl.warc.WarcArchiverIf; import nu.marginalia.crawling.io.CrawledDomainReader; import nu.marginalia.crawling.io.CrawlerOutputFile; import nu.marginalia.crawling.parquet.CrawledDocumentParquetRecordFileWriter; @@ -59,6 +61,7 @@ public class CrawlerMain { private final FileStorageService fileStorageService; private final DbCrawlSpecProvider dbCrawlSpecProvider; private final AnchorTagsSourceFactory anchorTagsSourceFactory; + private final WarcArchiverFactory warcArchiverFactory; private final Gson gson; private final int node; private final SimpleBlockingThreadPool pool; @@ -79,6 +82,7 @@ public CrawlerMain(UserAgent userAgent, ProcessConfiguration processConfiguration, DbCrawlSpecProvider dbCrawlSpecProvider, AnchorTagsSourceFactory anchorTagsSourceFactory, + WarcArchiverFactory warcArchiverFactory, Gson gson) { this.userAgent = userAgent; this.heartbeat = heartbeat; @@ -87,6 +91,7 @@ public CrawlerMain(UserAgent userAgent, this.fileStorageService = fileStorageService; this.dbCrawlSpecProvider = dbCrawlSpecProvider; this.anchorTagsSourceFactory = anchorTagsSourceFactory; + this.warcArchiverFactory = warcArchiverFactory; this.gson = gson; this.node = processConfiguration.node(); @@ -150,6 +155,7 @@ public void run(CrawlSpecProvider specProvider, Path outputDir) throws Interrupt heartbeat.start(); try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log")); + WarcArchiverIf warcArchiver = warcArchiverFactory.get(outputDir); AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(specProvider.getDomains()) ) { @@ -165,7 +171,7 @@ public void run(CrawlSpecProvider specProvider, Path outputDir) throws Interrupt .takeWhile((e) -> abortMonitor.isAlive()) .filter(e -> !workLog.isJobFinished(e.domain)) .filter(e -> processingIds.put(e.domain, "") == null) - .map(e -> new CrawlTask(e, anchorTagsSource, outputDir, workLog)) + .map(e -> new CrawlTask(e, anchorTagsSource, outputDir, warcArchiver, workLog)) .forEach(pool::submitQuietly); } @@ -202,15 +208,18 @@ class CrawlTask implements SimpleBlockingThreadPool.Task { private final AnchorTagsSource anchorTagsSource; private final Path outputDir; + private final WarcArchiverIf warcArchiver; private final WorkLog workLog; CrawlTask(CrawlSpecRecord specification, AnchorTagsSource anchorTagsSource, Path outputDir, + WarcArchiverIf warcArchiver, WorkLog workLog) { this.specification = specification; this.anchorTagsSource = anchorTagsSource; this.outputDir = outputDir; + this.warcArchiver = warcArchiver; this.workLog = workLog; this.domain = specification.domain; @@ -253,6 +262,8 @@ public void run() throws Exception { CrawledDocumentParquetRecordFileWriter .convertWarc(domain, userAgent, newWarcFile, parquetFile); + warcArchiver.consumeWarc(newWarcFile, domain); + workLog.setJobToFinished(domain, parquetFile.toString(), size); heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks); diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/warc/WarcArchiverFactory.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/warc/WarcArchiverFactory.java new file mode 100644 index 000000000..c1a537188 --- /dev/null +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/warc/WarcArchiverFactory.java @@ -0,0 +1,122 @@ +package nu.marginalia.crawl.warc; + +import com.google.inject.Inject; +import nu.marginalia.ProcessConfiguration; +import nu.marginalia.nodecfg.NodeConfigurationService; +import org.apache.commons.io.IOUtils; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.PrintWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +/** Factory for creating WarcArchiverIf instances. Depending on the node's configuration, + * either a shredder instance that just discards the Warc file, or a persisting instance + * that creates a series of concatenated warc.gz-files with an index + */ +public class WarcArchiverFactory { + private final boolean keepWarcs; + + @Inject + public WarcArchiverFactory(ProcessConfiguration processConfiguration, + NodeConfigurationService nodeConfigurationService) + throws Exception + { + keepWarcs = nodeConfigurationService.get(processConfiguration.node()).keepWarcs(); + } + + public WarcArchiverIf get(Path outputDir) throws IOException { + if (!keepWarcs) { + return new WarcArchiverShredder(); + } else { + return new WarcArchiver(outputDir); + } + } + +} + +/** Dummy archiver that just deletes the warc file. */ +class WarcArchiverShredder implements WarcArchiverIf { + @Override + public void consumeWarc(Path warcFile, String domain) throws IOException { + Files.deleteIfExists(warcFile); + } + + @Override + public void close() {} +} + +/** Archives warc files to disk. Concatenates all warc files into a single + * warc file, and creates an index file with the offsets and lengths of + * each domain segment. + * */ +class WarcArchiver implements WarcArchiverIf { + // Specs say the recommended maximum size of a warc file is ~1GB, after which a new file should be created + private static final long MAX_COMBINED_WARC_FILE_SIZE = 1_000_000_000; + + + private PrintWriter indexWriter; + private OutputStream warcWriter; + private final Path warcDir; + + String warcFileName = null; + String ts = LocalDateTime.now() + .format(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + .replace(':', '-'); + + long pos = 0; + int fileCounter = 0; + + public WarcArchiver(Path outputDir) throws IOException { + warcDir = outputDir.resolve("warc"); + + if (!Files.exists(warcDir)) { + Files.createDirectories(warcDir); + } + + switchFile(); + } + + private void switchFile() throws IOException { + if (warcWriter != null) warcWriter.close(); + + warcFileName = "marginalia-crawl-" + ts + "--" + String.format("%04d", fileCounter++) + ".warc.gz"; + + warcWriter = Files.newOutputStream(warcDir.resolve(warcFileName)); + + if (indexWriter == null) { + Path indexFile = warcDir.resolve("marginalia-crawl-" + ts + ".idx"); + indexWriter = new PrintWriter(Files.newBufferedWriter(indexFile)); + } + } + + @Override + public void consumeWarc(Path warcFile, String domain) throws IOException { + try { + synchronized (this) { + // Specs say the recommended maximum size of a warc file is ~1GB + if (pos > MAX_COMBINED_WARC_FILE_SIZE) { + switchFile(); + } + + indexWriter.printf("%s %s %d %d\n", warcFileName, domain, pos, Files.size(warcFile)); + indexWriter.flush(); + try (var is = Files.newInputStream(warcFile)) { + pos += IOUtils.copy(is, warcWriter); + } + } + } + finally { + Files.deleteIfExists(warcFile); + } + } + + @Override + public void close() throws IOException { + if (warcWriter != null) warcWriter.close(); + if (indexWriter != null) indexWriter.close(); + } +} \ No newline at end of file diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/warc/WarcArchiverIf.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/warc/WarcArchiverIf.java new file mode 100644 index 000000000..80e64d7a4 --- /dev/null +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/warc/WarcArchiverIf.java @@ -0,0 +1,12 @@ +package nu.marginalia.crawl.warc; + +import java.io.IOException; +import java.nio.file.Path; + +public interface WarcArchiverIf extends AutoCloseable { + /** Process the warc file. After processing, the warc file is deleted. + * Processing may be a no-op, depending on the implementation. + */ + void consumeWarc(Path warcFile, String domain) throws IOException; + void close() throws IOException; +} diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeService.java index 553704566..2fc0f2964 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeService.java @@ -300,6 +300,7 @@ private Object updateConfigModel(Request request, Response response) throws SQLE "on".equalsIgnoreCase(request.queryParams("acceptQueries")), "on".equalsIgnoreCase(request.queryParams("autoClean")), "on".equalsIgnoreCase(request.queryParams("includeInPrecession")), + "on".equalsIgnoreCase(request.queryParams("keepWarcs")), "on".equalsIgnoreCase(request.queryParams("disabled")) ); diff --git a/code/services-core/control-service/src/main/resources/templates/control/node/node-config.hdb b/code/services-core/control-service/src/main/resources/templates/control/node/node-config.hdb index f4a2cdd8a..23b8e7d59 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/node/node-config.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/node/node-config.hdb @@ -44,6 +44,17 @@
If true, this node will be included in the crawling precession.
+
+ + + +
This toggle makes the crawler retain copies of the WebARChive data that is + normally an intermediate product of the crawling. This is useful for debugging + and testing, but the WARC files are large and take up a lot of space. Unless + there is a need for exporting these files, it is recommended to leave this off. +
+
+