Skip to content

Commit

Permalink
(control) Fully automatic conversion
Browse files Browse the repository at this point in the history
Removed the need to have to run an external tool to pre-process the data in order to load stackexchange-style data into the search engine.

Removed the tool itself.

This stirred up some issues with the dependencies, that were due to both third-party:ing xz and importing it as a dependency.  This has been fixed, and :third-party:xz was removed.
  • Loading branch information
vlofgren committed Jan 22, 2024
1 parent 3a32584 commit 40c9d20
Show file tree
Hide file tree
Showing 74 changed files with 336 additions and 4,063 deletions.
2 changes: 1 addition & 1 deletion code/features-convert/stackexchange-xml/build.gradle
Expand Up @@ -13,7 +13,6 @@ java {
dependencies {
implementation libs.bundles.slf4j

implementation 'org.tukaani:xz:1.8'
implementation project(':code:libraries:blocking-thread-pool')
implementation project(':code:common:model')
implementation libs.notnull
Expand All @@ -26,6 +25,7 @@ dependencies {
implementation libs.zstd
implementation libs.trove
implementation libs.commons.compress
implementation libs.xz

testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
Expand Down
Expand Up @@ -36,8 +36,8 @@ public class StackExchangePostsDb {
public static void create(String domain,
Path sqliteFile,
Path stackExchange7zFile) {
if (Files.exists(sqliteFile))
Files.delete(sqliteFile);
Files.deleteIfExists(sqliteFile);

String connStr = "jdbc:sqlite:" + sqliteFile;

try (var connection = DriverManager.getConnection(connStr);
Expand Down
2 changes: 0 additions & 2 deletions code/processes/converting-process/build.gradle
Expand Up @@ -95,8 +95,6 @@ dependencies {
testImplementation libs.bundles.junit
testImplementation libs.mockito

implementation 'org.tukaani:xz:1.8'

testImplementation project(':code:processes:test-data')
testImplementation project(':code:processes:crawling-process')
}
Expand Up @@ -240,50 +240,57 @@ private ConvertRequest fetchInstructions() throws Exception {
var msgOpt = getMessage(inbox, nu.marginalia.mqapi.converting.ConvertRequest.class.getSimpleName());
var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received"));

var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.converting.ConvertRequest.class);

return switch(request.action) {
case ConvertCrawlData -> {
var crawlData = fileStorageService.getStorage(request.crawlStorage);
var processData = fileStorageService.getStorage(request.processedDataStorage);

var plan = new CrawlPlan(null,
new CrawlPlan.WorkDir(crawlData.path(), "crawler.log"),
new CrawlPlan.WorkDir(processData.path(), "processor.log"));

yield new ConvertCrawlDataAction(plan, msg, inbox);
}
case SideloadEncyclopedia -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);

yield new SideloadAction(sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(Path.of(request.inputSource), request.baseUrl),
processData.asPath(),
msg, inbox);
}
case SideloadDirtree -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);

yield new SideloadAction(
sideloadSourceFactory.sideloadDirtree(Path.of(request.inputSource)),
processData.asPath(),
msg, inbox);
}
case SideloadWarc -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);

yield new SideloadAction(
sideloadSourceFactory.sideloadWarc(Path.of(request.inputSource)),
processData.asPath(),
msg, inbox);
}
case SideloadStackexchange -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
try {
var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.converting.ConvertRequest.class);

return switch (request.action) {
case ConvertCrawlData -> {
var crawlData = fileStorageService.getStorage(request.crawlStorage);
var processData = fileStorageService.getStorage(request.processedDataStorage);

var plan = new CrawlPlan(null,
new CrawlPlan.WorkDir(crawlData.path(), "crawler.log"),
new CrawlPlan.WorkDir(processData.path(), "processor.log"));

yield new ConvertCrawlDataAction(plan, msg, inbox);
}
case SideloadEncyclopedia -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);

yield new SideloadAction(sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(Path.of(request.inputSource), request.baseUrl),
processData.asPath(),
msg, inbox);
}
case SideloadDirtree -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);

yield new SideloadAction(
sideloadSourceFactory.sideloadDirtree(Path.of(request.inputSource)),
processData.asPath(),
msg, inbox);
}
case SideloadWarc -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);

yield new SideloadAction(
sideloadSourceFactory.sideloadWarc(Path.of(request.inputSource)),
processData.asPath(),
msg, inbox);
}
case SideloadStackexchange -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);

yield new SideloadAction(sideloadSourceFactory.sideloadStackexchange(Path.of(request.inputSource)),
processData.asPath(),
msg, inbox);
}
};
}
catch (Exception ex) {
inbox.sendResponse(msg, MqInboxResponse.err(STR."\{ex.getClass().getSimpleName()}: \{ex.getMessage()}"));

yield new SideloadAction(sideloadSourceFactory.sideloadStackexchange(Path.of(request.inputSource)),
processData.asPath(),
msg, inbox);
}
};
throw ex;
}
}

private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {
Expand Down
Expand Up @@ -16,6 +16,7 @@
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Collection;
import java.util.List;

public class SideloadSourceFactory {
private final Gson gson;
Expand Down Expand Up @@ -57,14 +58,21 @@ public Collection<? extends SideloadSource> sideloadWarc(Path pathToWarcFiles) t
return warcSideloadFactory.createSideloaders(pathToWarcFiles);
}

/** Do not use, this code isn't finished */
public Collection<? extends SideloadSource> sideloadStackexchange(Path pathToDbFileRoot) throws IOException {
try (var dirs = Files.walk(pathToDbFileRoot)) {
return dirs
.filter(Files::isRegularFile)
.filter(f -> f.toFile().getName().endsWith(".db"))
.map(dbFile -> new StackexchangeSideloader(dbFile, sentenceExtractorProvider, documentKeywordExtractor))
.toList();
if (Files.isRegularFile(pathToDbFileRoot)) {
return List.of(new StackexchangeSideloader(pathToDbFileRoot, sentenceExtractorProvider, documentKeywordExtractor));
}
else if (Files.isDirectory(pathToDbFileRoot)) {
try (var dirs = Files.walk(pathToDbFileRoot)) {
return dirs
.filter(Files::isRegularFile)
.filter(f -> f.toFile().getName().endsWith(".db"))
.map(dbFile -> new StackexchangeSideloader(dbFile, sentenceExtractorProvider, documentKeywordExtractor))
.toList();
}
}
else { // unix socket, etc
throw new IllegalArgumentException("Path to stackexchange db file(s) must be a file or directory");
}
}
}
Expand Up @@ -21,23 +21,33 @@ public WarcSideloadFactory(SideloaderProcessing processing) {
}

public Collection<? extends SideloadSource> createSideloaders(Path pathToWarcFiles) throws IOException {
final List<Path> files = new ArrayList<>();

try (var stream = Files.list(pathToWarcFiles)) {
stream
.filter(Files::isRegularFile)
.filter(this::isWarcFile)
.forEach(files::add);

if (Files.isRegularFile(pathToWarcFiles)) {
return List.of(new WarcSideloader(pathToWarcFiles, processing));
}
else if (Files.isDirectory(pathToWarcFiles)) {

List<WarcSideloader> sources = new ArrayList<>();
final List<Path> files = new ArrayList<>();

for (Path file : files) {
sources.add(new WarcSideloader(file, processing));
}
try (var stream = Files.list(pathToWarcFiles)) {
stream
.filter(Files::isRegularFile)
.filter(this::isWarcFile)
.forEach(files::add);

}

List<WarcSideloader> sources = new ArrayList<>();

return sources;
for (Path file : files) {
sources.add(new WarcSideloader(file, processing));
}

return sources;
}
else {
throw new IllegalArgumentException("Path " + pathToWarcFiles + " is neither a file nor a directory");
}
}

private boolean isWarcFile(Path path) {
Expand Down
1 change: 1 addition & 0 deletions code/services-core/executor-service/build.gradle
Expand Up @@ -40,6 +40,7 @@ dependencies {
implementation project(':code:process-models:crawling-model')
implementation project(':code:features-crawl:link-parser')
implementation project(':code:features-convert:data-extractors')
implementation project(':code:features-convert:stackexchange-xml')
implementation project(':code:features-index:index-journal')
implementation project(':code:api:index-api')
implementation project(':code:api:query-api')
Expand Down
Expand Up @@ -10,6 +10,8 @@
import nu.marginalia.encyclopedia.EncyclopediaConverter;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService;
import nu.marginalia.sideload.SideloadHelper;
import nu.marginalia.sideload.StackExchangeSideloadHelper;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageBaseType;
import nu.marginalia.storage.model.FileStorageId;
Expand All @@ -21,11 +23,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.zip.CRC32;

@Singleton
public class ConvertActor extends RecordActorPrototype {
Expand Down Expand Up @@ -109,7 +108,7 @@ case ConvertEncyclopedia(String source, String baseUrl) -> {

if (source.toLowerCase().endsWith(".zim")) {
// If we're fed a ZIM file, we need to convert it to a sqlite database first
String hash = getCrc32FileHash(sourcePath);
String hash = SideloadHelper.getCrc32FileHash(sourcePath);

// To avoid re-converting the same file, we'll assign the file a name based on its hash
// and the original filename. This way, if we're fed the same file again, we'll be able to just
Expand Down Expand Up @@ -179,6 +178,10 @@ case ConvertStackexchange(String source) -> {

storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);

// Convert stackexchange data to sqlite database
// (we can't use a Predigest- step here because the conversion is too complicated)
StackExchangeSideloadHelper.convertStackexchangeData(sourcePath);

// Pre-send convert request

yield new ConvertWait(
Expand All @@ -200,21 +203,7 @@ case ConvertWait(FileStorageId destFid, long msgId) -> {
};
}

private String getCrc32FileHash(Path file) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(8192);

try (var channel = Files.newByteChannel(file)) {
CRC32 crc = new CRC32();

while (channel.read(buffer) > 0) {
buffer.flip();
crc.update(buffer);
buffer.clear();
}

return Long.toHexString(crc.getValue());
}
}

@Override
public String describe() {
Expand Down
@@ -0,0 +1,25 @@
package nu.marginalia.sideload;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.zip.CRC32;

public class SideloadHelper {
public static String getCrc32FileHash(Path file) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(8192);

try (var channel = Files.newByteChannel(file)) {
CRC32 crc = new CRC32();

while (channel.read(buffer) > 0) {
buffer.flip();
crc.update(buffer);
buffer.clear();
}

return Long.toHexString(crc.getValue());
}
}
}

0 comments on commit 40c9d20

Please sign in to comment.