Skip to content

Commit

Permalink
(stackexchange-integration) Tools for reading stackexchange xml files
Browse files Browse the repository at this point in the history
  • Loading branch information
vlofgren committed Sep 20, 2023
1 parent d895f83 commit 6bbf40d
Show file tree
Hide file tree
Showing 14 changed files with 612 additions and 0 deletions.
39 changes: 39 additions & 0 deletions code/features-convert/stackexchange-xml/build.gradle
@@ -0,0 +1,39 @@
plugins {
id 'java'
id "io.freefair.lombok" version "8.2.2"
id 'jvm-test-suite'
}

java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(20))
}
}

dependencies {
implementation libs.lombok
annotationProcessor libs.lombok
implementation libs.bundles.slf4j

implementation 'org.tukaani:xz:1.8'
implementation project(':code:libraries:blocking-thread-pool')
implementation libs.notnull

implementation libs.jsoup
implementation libs.sqlite

implementation libs.guice
implementation libs.guava
implementation libs.zstd
implementation libs.trove
implementation libs.commons.compress

testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}

test {
maxHeapSize = "8G"
useJUnitPlatform()
}
@@ -0,0 +1,5 @@
package nu.marginalia.integration.stackexchange.model;

public record StackExchangeComment(int id, int postId, String text) {

}
@@ -0,0 +1,16 @@
package nu.marginalia.integration.stackexchange.model;

import javax.annotation.Nullable;
import java.util.List;

public record StackExchangePost(@Nullable
String title,
List<String> tags,
int year,
int id,
@Nullable Integer parentId,
int postTypeId,
String body)
{

}
@@ -0,0 +1,171 @@
package nu.marginalia.integration.stackexchange.sqlite;

import com.github.luben.zstd.Zstd;
import gnu.trove.list.TIntList;
import gnu.trove.list.array.TIntArrayList;
import gnu.trove.map.TIntIntMap;
import gnu.trove.map.hash.TIntIntHashMap;
import lombok.SneakyThrows;
import nu.marginalia.integration.stackexchange.xml.StackExchangeXmlPostReader;
import org.apache.commons.compress.compressors.zstandard.ZstdUtils;

import javax.xml.stream.XMLStreamException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.function.Predicate;

public class StackExchangePostsDb {

@SneakyThrows
public static void create(Path sqliteFile,
Path stackExchange7zFile) {
if (Files.exists(sqliteFile))
Files.delete(sqliteFile);
String connStr = "jdbc:sqlite:" + sqliteFile;

try (var connection = DriverManager.getConnection(connStr);
var stream = ClassLoader.getSystemResourceAsStream("db/stackexchange.sql");
var updateStmt = connection.createStatement()
) {
var sql = new String(stream.readAllBytes());

String[] sqlParts = sql.split(";");
for (var part : sqlParts) {
if (part.isBlank()) {
continue;
}
updateStmt.executeUpdate(part);
}
updateStmt.execute("PRAGMA synchronous = OFF");

var postReader = new StackExchangeXmlPostReader(
stackExchange7zFile
);

var insertPost = connection.prepareStatement("""
INSERT INTO post(id, threadId, postYear, title, body, origSize, tags)
VALUES (?, ?, ?, ?, ?, ?, ?)
""");

var iter = postReader.iterator();

int cnt = 0;
while (iter.hasNext()) {
var post = iter.next();
insertPost.setInt(1, post.id());

if (post.parentId() == null) insertPost.setInt(2, post.id());
else insertPost.setInt(2, post.parentId());

insertPost.setInt(3, post.year());

if (post.title() == null)
insertPost.setString(4, "");
else
insertPost.setString(4, post.title());

byte[] bodyBytes = post.body().getBytes();
insertPost.setBytes(5, Zstd.compress(bodyBytes));
insertPost.setInt(6, bodyBytes.length);

insertPost.setString(7, String.join(",", post.tags()));
insertPost.addBatch();
if (++cnt > 100) {
insertPost.executeBatch();
cnt = 0;
}
}
if (cnt > 0) {
insertPost.executeBatch();
}
}
catch (IOException | SQLException | XMLStreamException e) {
e.printStackTrace();
}
}

@SneakyThrows
public static void forEachPost(
Path sqliteFile,
Predicate<CombinedPostModel> consumer) {

String connStr = "jdbc:sqlite:" + sqliteFile;


try (var connection = DriverManager.getConnection(connStr);
var selectThreadIds = connection.prepareStatement("SELECT DISTINCT(threadId) FROM post");
var queryPostContents = connection.prepareStatement("""
SELECT postYear, title, body, origSize, tags
FROM post
WHERE threadId = ?
""")
) {
TIntList threadIds = new TIntArrayList(10_000);
ResultSet rs = selectThreadIds.executeQuery();

while (rs.next()) {
threadIds.add(rs.getInt(1));
}

System.out.println("Got " + threadIds.size() + " IDs");

var idIterator = threadIds.iterator();
int ordinal = 0;

while (idIterator.hasNext()) {
queryPostContents.setInt(1, idIterator.next());
rs = queryPostContents.executeQuery();

List<String> parts = new ArrayList<>();
String title = "";
int year = 2023;

List<Future<String>> partWork = new ArrayList<>();
var commonPool = ForkJoinPool.commonPool();
while (rs.next()) {
String maybeTitle = rs.getString("title");

if (maybeTitle != null && !maybeTitle.isBlank())
title = maybeTitle;
int origSize = rs.getInt("origSize");

year = Math.min(year, rs.getInt("postYear"));

byte[] bytes = rs.getBytes("body");
partWork.add(commonPool.submit(
() -> new String(Zstd.decompress(bytes, origSize)
)));
}

for (var workItem : partWork) {
parts.add(workItem.get());
}

if (!consumer.test(new CombinedPostModel(ordinal++, title, year, parts)))
break;
}

}
catch (SQLException ex) {
ex.printStackTrace();
}

}

public record CombinedPostModel(int ordinal,
String title,
int year,
List<String> bodies)
{

}

}
@@ -0,0 +1,58 @@
package nu.marginalia.integration.stackexchange.xml;

import org.apache.commons.compress.archivers.sevenz.SevenZArchiveEntry;
import org.apache.commons.compress.archivers.sevenz.SevenZFile;

import javax.xml.stream.XMLEventReader;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Path;

public class StackExchange7zXmlEventReaderSource implements XmlEventReaderSource {

static {
// We need to set this for SAX reasons. Something to do with reading
// XML files with more than 50,000,000 entities being forbidden to enhance
// security somehow. Since we're using STAX, these aren't
// software-configurable.
System.setProperty("jdk.xml.totalEntitySizeLimit", "0");
}

private final XMLEventReader reader;
private final SevenZFile postsFile;
public StackExchange7zXmlEventReaderSource(Path pathTo7zFile, String xmlFileName)
throws IOException, XMLStreamException
{
postsFile = new SevenZFile(pathTo7zFile.toFile());

SevenZArchiveEntry postsEntry = null;

for (SevenZArchiveEntry entry : postsFile.getEntries()) {
if (xmlFileName.equals(entry.getName())) {
postsEntry = entry;
break;
}
}

if (postsEntry == null) {
postsFile.close();
throw new FileNotFoundException("No " + xmlFileName + " in provided archive");
}

XMLInputFactory xmlInputFactory = XMLInputFactory.newInstance();
reader = xmlInputFactory.createXMLEventReader(postsFile.getInputStream(postsEntry));
}

@Override
public XMLEventReader reader() {
return reader;
}

@Override
public void close() throws Exception {
reader.close();
postsFile.close();
}
}
@@ -0,0 +1,58 @@
package nu.marginalia.integration.stackexchange.xml;

import nu.marginalia.integration.stackexchange.model.StackExchangeComment;

import javax.xml.namespace.QName;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.events.XMLEvent;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Iterator;

public class StackExchangeXmlCommentReader {
private final Path pathTo7zFile;

public StackExchangeXmlCommentReader(Path pathTo7zFile) {
this.pathTo7zFile = pathTo7zFile;
}

public Iterator<StackExchangeComment> iterator() throws IOException, XMLStreamException {
return iterator(new StackExchange7zXmlEventReaderSource(pathTo7zFile, "Comments.xml"));
}

// exposed for testability
static Iterator<StackExchangeComment> iterator(XmlEventReaderSource source) {
return new StackExchangeXmlIterator<>(source, StackExchangeXmlCommentReader::parseEvent);
}

private static final QName idName = new QName("Id");
private static final QName postIdName = new QName("PostId");
private static final QName textName = new QName("Text");

private static StackExchangeComment parseEvent(XMLEvent event) {
if (!event.isStartElement())
return null;

var startEvent = event.asStartElement();
if (!"row".equals(startEvent.getName().getLocalPart()))
return null;

var postIdAttribute = startEvent.getAttributeByName(postIdName);
if (postIdAttribute == null)
return null;
int postId = Integer.parseInt(postIdAttribute.getValue());

var commentIdAttribute = startEvent.getAttributeByName(idName);
if (commentIdAttribute == null)
return null;
int commentId = Integer.parseInt(commentIdAttribute.getValue());

var textAttribute = startEvent.getAttributeByName(textName);
if (textAttribute == null)
return null;
String text = textAttribute.getValue();

return new StackExchangeComment(commentId, postId, text);
}

}

0 comments on commit 6bbf40d

Please sign in to comment.