Skip to content

Commit

Permalink
(warc) Toggle for saving WARC data
Browse files Browse the repository at this point in the history
Add a toggle for saving the WARC data generated by the search engine's crawler.  Normally this is discarded, but for debugging or archival purposes, retaining it may be of interest.

The warc files are concatenated into larger archives, up to about 1 GB each.
An index is also created containing filenames, domain names, offsets and sizes
to help navigate these larger archives.

The warc data is saved in a directory warc/ under the crawl data storage.
  • Loading branch information
vlofgren committed Jan 12, 2024
1 parent 264e2db commit 0caef1b
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 11 deletions.
Expand Up @@ -17,16 +17,17 @@ public NodeConfigurationService(HikariDataSource dataSource) {
this.dataSource = dataSource;
}

public NodeConfiguration create(int id, String description, boolean acceptQueries) throws SQLException {
public NodeConfiguration create(int id, String description, boolean acceptQueries, boolean keepWarcs) throws SQLException {
try (var conn = dataSource.getConnection();
var is = conn.prepareStatement("""
INSERT IGNORE INTO NODE_CONFIGURATION(ID, DESCRIPTION, ACCEPT_QUERIES) VALUES(?, ?, ?)
INSERT IGNORE INTO NODE_CONFIGURATION(ID, DESCRIPTION, ACCEPT_QUERIES) VALUES(?, ?, ?, ?)
""")
)
{
is.setInt(1, id);
is.setString(2, description);
is.setBoolean(3, acceptQueries);
is.setBoolean(4, keepWarcs);

if (is.executeUpdate() <= 0) {
throw new IllegalStateException("Failed to insert configuration");
Expand All @@ -39,7 +40,7 @@ INSERT IGNORE INTO NODE_CONFIGURATION(ID, DESCRIPTION, ACCEPT_QUERIES) VALUES(?,
public List<NodeConfiguration> getAll() throws SQLException {
try (var conn = dataSource.getConnection();
var qs = conn.prepareStatement("""
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, DISABLED
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, KEEP_WARCS, DISABLED
FROM NODE_CONFIGURATION
""")) {
var rs = qs.executeQuery();
Expand All @@ -53,6 +54,7 @@ public List<NodeConfiguration> getAll() throws SQLException {
rs.getBoolean("ACCEPT_QUERIES"),
rs.getBoolean("AUTO_CLEAN"),
rs.getBoolean("PRECESSION"),
rs.getBoolean("KEEP_WARCS"),
rs.getBoolean("DISABLED")
));
}
Expand All @@ -63,7 +65,7 @@ public List<NodeConfiguration> getAll() throws SQLException {
public NodeConfiguration get(int nodeId) throws SQLException {
try (var conn = dataSource.getConnection();
var qs = conn.prepareStatement("""
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, DISABLED
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, KEEP_WARCS, DISABLED
FROM NODE_CONFIGURATION
WHERE ID=?
""")) {
Expand All @@ -76,6 +78,7 @@ public NodeConfiguration get(int nodeId) throws SQLException {
rs.getBoolean("ACCEPT_QUERIES"),
rs.getBoolean("AUTO_CLEAN"),
rs.getBoolean("PRECESSION"),
rs.getBoolean("KEEP_WARCS"),
rs.getBoolean("DISABLED")
);
}
Expand All @@ -88,16 +91,17 @@ public void save(NodeConfiguration config) throws SQLException {
try (var conn = dataSource.getConnection();
var us = conn.prepareStatement("""
UPDATE NODE_CONFIGURATION
SET DESCRIPTION=?, ACCEPT_QUERIES=?, AUTO_CLEAN=?, PRECESSION=?, DISABLED=?
SET DESCRIPTION=?, ACCEPT_QUERIES=?, AUTO_CLEAN=?, PRECESSION=?, KEEP_WARCS=?, DISABLED=?
WHERE ID=?
"""))
{
us.setString(1, config.description());
us.setBoolean(2, config.acceptQueries());
us.setBoolean(3, config.autoClean());
us.setBoolean(4, config.includeInPrecession());
us.setBoolean(5, config.disabled());
us.setInt(6, config.node());
us.setBoolean(5, config.keepWarcs());
us.setBoolean(6, config.disabled());
us.setInt(7, config.node());

if (us.executeUpdate() <= 0)
throw new IllegalStateException("Failed to update configuration");
Expand Down
Expand Up @@ -5,6 +5,7 @@ public record NodeConfiguration(int node,
boolean acceptQueries,
boolean autoClean,
boolean includeInPrecession,
boolean keepWarcs,
boolean disabled
)
{
Expand Down
Expand Up @@ -50,8 +50,8 @@ public static void setup() {

@Test
public void test() throws SQLException {
var a = nodeConfigurationService.create(1, "Test", false);
var b = nodeConfigurationService.create(2, "Foo", true);
var a = nodeConfigurationService.create(1, "Test", false, false);
var b = nodeConfigurationService.create(2, "Foo", true, false);

assertEquals(1, a.node());
assertEquals("Test", a.description());
Expand Down
@@ -0,0 +1 @@
ALTER TABLE WMSA_prod.NODE_CONFIGURATION ADD COLUMN KEEP_WARCS BOOLEAN DEFAULT FALSE;
Expand Up @@ -57,7 +57,7 @@ public NodeStatusWatcher(NodeConfigurationService configurationService,

private void setupNode() {
try {
configurationService.create(nodeId, "Node " + nodeId, nodeId == 1);
configurationService.create(nodeId, "Node " + nodeId, nodeId == 1, false);
fileStorageService.createStorageBase("Index Data", Path.of("/idx"), nodeId, FileStorageBaseType.CURRENT);
fileStorageService.createStorageBase("Index Backups", Path.of("/backup"), nodeId, FileStorageBaseType.BACKUP);
fileStorageService.createStorageBase("Crawl Data", Path.of("/storage"), nodeId, FileStorageBaseType.STORAGE);
Expand Down
Expand Up @@ -17,6 +17,8 @@
import nu.marginalia.crawl.spec.CrawlSpecProvider;
import nu.marginalia.crawl.spec.DbCrawlSpecProvider;
import nu.marginalia.crawl.spec.ParquetCrawlSpecProvider;
import nu.marginalia.crawl.warc.WarcArchiverFactory;
import nu.marginalia.crawl.warc.WarcArchiverIf;
import nu.marginalia.crawling.io.CrawledDomainReader;
import nu.marginalia.crawling.io.CrawlerOutputFile;
import nu.marginalia.crawling.parquet.CrawledDocumentParquetRecordFileWriter;
Expand Down Expand Up @@ -59,6 +61,7 @@ public class CrawlerMain {
private final FileStorageService fileStorageService;
private final DbCrawlSpecProvider dbCrawlSpecProvider;
private final AnchorTagsSourceFactory anchorTagsSourceFactory;
private final WarcArchiverFactory warcArchiverFactory;
private final Gson gson;
private final int node;
private final SimpleBlockingThreadPool pool;
Expand All @@ -79,6 +82,7 @@ public CrawlerMain(UserAgent userAgent,
ProcessConfiguration processConfiguration,
DbCrawlSpecProvider dbCrawlSpecProvider,
AnchorTagsSourceFactory anchorTagsSourceFactory,
WarcArchiverFactory warcArchiverFactory,
Gson gson) {
this.userAgent = userAgent;
this.heartbeat = heartbeat;
Expand All @@ -87,6 +91,7 @@ public CrawlerMain(UserAgent userAgent,
this.fileStorageService = fileStorageService;
this.dbCrawlSpecProvider = dbCrawlSpecProvider;
this.anchorTagsSourceFactory = anchorTagsSourceFactory;
this.warcArchiverFactory = warcArchiverFactory;
this.gson = gson;
this.node = processConfiguration.node();

Expand Down Expand Up @@ -150,6 +155,7 @@ public void run(CrawlSpecProvider specProvider, Path outputDir) throws Interrupt
heartbeat.start();

try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log"));
WarcArchiverIf warcArchiver = warcArchiverFactory.get(outputDir);
AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(specProvider.getDomains())
) {

Expand All @@ -165,7 +171,7 @@ public void run(CrawlSpecProvider specProvider, Path outputDir) throws Interrupt
.takeWhile((e) -> abortMonitor.isAlive())
.filter(e -> !workLog.isJobFinished(e.domain))
.filter(e -> processingIds.put(e.domain, "") == null)
.map(e -> new CrawlTask(e, anchorTagsSource, outputDir, workLog))
.map(e -> new CrawlTask(e, anchorTagsSource, outputDir, warcArchiver, workLog))
.forEach(pool::submitQuietly);
}

Expand Down Expand Up @@ -202,15 +208,18 @@ class CrawlTask implements SimpleBlockingThreadPool.Task {

private final AnchorTagsSource anchorTagsSource;
private final Path outputDir;
private final WarcArchiverIf warcArchiver;
private final WorkLog workLog;

CrawlTask(CrawlSpecRecord specification,
AnchorTagsSource anchorTagsSource,
Path outputDir,
WarcArchiverIf warcArchiver,
WorkLog workLog) {
this.specification = specification;
this.anchorTagsSource = anchorTagsSource;
this.outputDir = outputDir;
this.warcArchiver = warcArchiver;
this.workLog = workLog;

this.domain = specification.domain;
Expand Down Expand Up @@ -253,6 +262,8 @@ public void run() throws Exception {
CrawledDocumentParquetRecordFileWriter
.convertWarc(domain, userAgent, newWarcFile, parquetFile);

warcArchiver.consumeWarc(newWarcFile, domain);

workLog.setJobToFinished(domain, parquetFile.toString(), size);
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);

Expand Down
@@ -0,0 +1,122 @@
package nu.marginalia.crawl.warc;

import com.google.inject.Inject;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.nodecfg.NodeConfigurationService;
import org.apache.commons.io.IOUtils;

import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/** Factory for creating WarcArchiverIf instances. Depending on the node's configuration,
* either a shredder instance that just discards the Warc file, or a persisting instance
* that creates a series of concatenated warc.gz-files with an index
*/
public class WarcArchiverFactory {
private final boolean keepWarcs;

@Inject
public WarcArchiverFactory(ProcessConfiguration processConfiguration,
NodeConfigurationService nodeConfigurationService)
throws Exception
{
keepWarcs = nodeConfigurationService.get(processConfiguration.node()).keepWarcs();
}

public WarcArchiverIf get(Path outputDir) throws IOException {
if (!keepWarcs) {
return new WarcArchiverShredder();
} else {
return new WarcArchiver(outputDir);
}
}

}

/** Dummy archiver that just deletes the warc file. */
class WarcArchiverShredder implements WarcArchiverIf {
@Override
public void consumeWarc(Path warcFile, String domain) throws IOException {
Files.deleteIfExists(warcFile);
}

@Override
public void close() {}
}

/** Archives warc files to disk. Concatenates all warc files into a single
* warc file, and creates an index file with the offsets and lengths of
* each domain segment.
* */
class WarcArchiver implements WarcArchiverIf {
// Specs say the recommended maximum size of a warc file is ~1GB, after which a new file should be created
private static final long MAX_COMBINED_WARC_FILE_SIZE = 1_000_000_000;


private PrintWriter indexWriter;
private OutputStream warcWriter;
private final Path warcDir;

String warcFileName = null;
String ts = LocalDateTime.now()
.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
.replace(':', '-');

long pos = 0;
int fileCounter = 0;

public WarcArchiver(Path outputDir) throws IOException {
warcDir = outputDir.resolve("warc");

if (!Files.exists(warcDir)) {
Files.createDirectories(warcDir);
}

switchFile();
}

private void switchFile() throws IOException {
if (warcWriter != null) warcWriter.close();

warcFileName = "marginalia-crawl-" + ts + "--" + String.format("%04d", fileCounter++) + ".warc.gz";

warcWriter = Files.newOutputStream(warcDir.resolve(warcFileName));

if (indexWriter == null) {
Path indexFile = warcDir.resolve("marginalia-crawl-" + ts + ".idx");
indexWriter = new PrintWriter(Files.newBufferedWriter(indexFile));
}
}

@Override
public void consumeWarc(Path warcFile, String domain) throws IOException {
try {
synchronized (this) {
// Specs say the recommended maximum size of a warc file is ~1GB
if (pos > MAX_COMBINED_WARC_FILE_SIZE) {
switchFile();
}

indexWriter.printf("%s %s %d %d\n", warcFileName, domain, pos, Files.size(warcFile));
indexWriter.flush();
try (var is = Files.newInputStream(warcFile)) {
pos += IOUtils.copy(is, warcWriter);
}
}
}
finally {
Files.deleteIfExists(warcFile);
}
}

@Override
public void close() throws IOException {
if (warcWriter != null) warcWriter.close();
if (indexWriter != null) indexWriter.close();
}
}
@@ -0,0 +1,12 @@
package nu.marginalia.crawl.warc;

import java.io.IOException;
import java.nio.file.Path;

public interface WarcArchiverIf extends AutoCloseable {
/** Process the warc file. After processing, the warc file is deleted.
* Processing may be a no-op, depending on the implementation.
*/
void consumeWarc(Path warcFile, String domain) throws IOException;
void close() throws IOException;
}
Expand Up @@ -300,6 +300,7 @@ private Object updateConfigModel(Request request, Response response) throws SQLE
"on".equalsIgnoreCase(request.queryParams("acceptQueries")),
"on".equalsIgnoreCase(request.queryParams("autoClean")),
"on".equalsIgnoreCase(request.queryParams("includeInPrecession")),
"on".equalsIgnoreCase(request.queryParams("keepWarcs")),
"on".equalsIgnoreCase(request.queryParams("disabled"))
);

Expand Down
Expand Up @@ -44,6 +44,17 @@
<div class="form-text">If true, this node will be included in the crawling precession.</div>
</div>

<div class="form-check form-switch">
<input class="form-check-input" type="checkbox" role="switch" name="keepWarcs" {{#if config.keepWarcs}}checked{{/if}}>
<label class="form-check-label" for="includeInPrecession">Keep WARC files during crawling</label>

<div class="form-text">This toggle makes the crawler retain copies of the WebARChive data that is
normally an intermediate product of the crawling. This is useful for debugging
and testing, but the WARC files are large and take up a lot of space. Unless
there is a need for exporting these files, it is recommended to leave this off.
</div>
</div>

<div class="form-check form-switch mb-3">
<input class="form-check-input" type="checkbox" role="switch" name="disabled" {{#if config.disabled}}checked{{/if}}>
<label class="form-check-label" for="disabled">Disabled</label>
Expand Down

0 comments on commit 0caef1b

Please sign in to comment.