Skip to content

Commit

Permalink
Merge pull request #41 from Wikidata/dump-processing-example
Browse files Browse the repository at this point in the history
Complete dump processing pipeline + example
  • Loading branch information
Fredo Erxleben committed Mar 20, 2014
2 parents cea2338 + 270da12 commit 984bd23
Show file tree
Hide file tree
Showing 10 changed files with 485 additions and 41 deletions.
8 changes: 0 additions & 8 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,6 @@
<artifactId>slf4j-api</artifactId>
<version>${slf4jVersion}</version>
</dependency>
<dependency>
<!-- Simply forward log messages to System.err. -->
<!-- Users can use their own logging framework instead. -->
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4jVersion}</version>
<optional>true</optional>
</dependency>
<dependency>
<!-- Useful helpers for cleaner coding -->
<groupId>org.apache.commons</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package org.wikidata.wdtk.dumpfiles;

import org.wikidata.wdtk.datamodel.interfaces.EntityDocument;
import org.wikidata.wdtk.datamodel.interfaces.ItemDocument;
import org.wikidata.wdtk.datamodel.interfaces.PropertyDocument;

/**
* Interface for classes that are able to process {@link EntityDocument} objects
* in some way. Classes that implement this can subscribe to receive entity
* documents as obtained, e.g., from parsing dump files.
*
* @author Markus Kroetzsch
*
*/
public interface EntityDocumentProcessor {

/**
* Processes the given ItemDocument.
*
* @param itemDocument
* the ItemDocument
*/
public void processItemDocument(ItemDocument itemDocument);

/**
* Processes the given PropertyDocument.
*
* @param propertyDocument
* the PropertyDocument
*/
public void processPropertyDocument(PropertyDocument propertyDocument);

/**
* Performs final actions that should be done after all entity documents in
* a batch of entity documents have been processed. This is usually called
* after a whole dumpfile was completely processed.
*/
public void finishProcessingEntityDocuments();

}
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,14 @@ public int compare(MwDumpFile a, MwDumpFile b) {
* if the dump file contents could not be accessed
*/
public BufferedReader getDumpFileReader() throws IOException;

/**
* Prepares the dump file for access via {@link #getDumpFileStream()} or
* {@link #getDumpFileReader()}. In particular, this will download any
* remote files.
*
* @throws IOException
* if there was a problem preparing the files
*/
public void prepareDumpFile() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package org.wikidata.wdtk.dumpfiles;

import java.util.Map;

import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikidata.wdtk.datamodel.implementation.DataObjectFactoryImpl;
import org.wikidata.wdtk.datamodel.interfaces.DataObjectFactory;
import org.wikidata.wdtk.datamodel.interfaces.ItemDocument;
import org.wikidata.wdtk.datamodel.interfaces.PropertyDocument;

/**
* A revision processor that processes Wikibase entity content from a dump file.
* Revisions are parsed to obtain EntityDocument objects.
*
* @author Markus Kroetzsch
*
*/
public class WikibaseRevisionProcessor implements MwRevisionProcessor {

static final Logger logger = LoggerFactory
.getLogger(WikibaseRevisionProcessor.class);

JsonConverter jsonConverter;
final DataObjectFactory dataObjectFactory;
final EntityDocumentProcessor entityDocumentProcessor;

public WikibaseRevisionProcessor(
EntityDocumentProcessor entityDocumentProcessor) {
this.dataObjectFactory = new DataObjectFactoryImpl();
this.entityDocumentProcessor = entityDocumentProcessor;
}

@Override
public void startRevisionProcessing(String siteName, String baseUrl,
Map<Integer, String> namespaces) {
// FIXME the baseUrl from the dump is not the baseIri we need here
this.jsonConverter = new JsonConverter(baseUrl, this.dataObjectFactory);
}

@Override
public void processRevision(MwRevision mwRevision) {
if (MwRevision.MODEL_WIKIBASE_ITEM.equals(mwRevision.getModel())) {
processItemRevision(mwRevision);
} else if (MwRevision.MODEL_WIKIBASE_PROPERTY.equals(mwRevision
.getModel())) {
processPropertyRevision(mwRevision);
} // else: ignore this revision
}

public void processItemRevision(MwRevision mwRevision) {
try {
JSONObject jsonObject = new JSONObject(mwRevision.getText());
ItemDocument itemDocument = this.jsonConverter
.convertToItemDocument(jsonObject, mwRevision.getTitle());
this.entityDocumentProcessor.processItemDocument(itemDocument);
} catch (JSONException e) {
WikibaseRevisionProcessor.logger
.error("Failed to process JSON for item "
+ mwRevision.toString() + " (" + e.toString() + ")");
}

}

public void processPropertyRevision(MwRevision mwRevision) {
try {
JSONObject jsonObject = new JSONObject(mwRevision.getText());
PropertyDocument propertyDocument = this.jsonConverter
.convertToPropertyDocument(jsonObject,
mwRevision.getTitle());
this.entityDocumentProcessor
.processPropertyDocument(propertyDocument);
} catch (JSONException e) {
WikibaseRevisionProcessor.logger
.error("Failed to process JSON for property "
+ mwRevision.toString() + " (" + e.toString() + ")");
}

}

@Override
public void finishRevisionProcessing() {
// Nothing to do
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileAlreadyExistsException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -66,6 +67,11 @@ public class WmfDumpFileManager {
*/
static final String DATE_STAMP_PATTERN = "\\d\\d\\d\\d\\d\\d\\d\\d";

/**
* The name of the directory where downloaded dump files are stored.
*/
public static final String DOWNLOAD_DIRECTORY_NAME = "dumpfiles";

final String projectName;
final DirectoryManager dumpfileDirectoryManager;
final WebResourceFetcher webResourceFetcher;
Expand All @@ -92,9 +98,13 @@ public WmfDumpFileManager(String projectName,
WebResourceFetcher webResourceFetcher) throws IOException {
this.projectName = projectName;
this.dumpfileDirectoryManager = downloadDirectoryManager
.getSubdirectoryManager("dumpfiles").getSubdirectoryManager(
projectName);
.getSubdirectoryManager(
WmfDumpFileManager.DOWNLOAD_DIRECTORY_NAME)
.getSubdirectoryManager(projectName);
this.webResourceFetcher = webResourceFetcher;

WmfDumpFileManager.logger.info("Using download directory "
+ this.dumpfileDirectoryManager.toString());
}

/**
Expand All @@ -110,8 +120,15 @@ public void processAllRecentDumps(MwDumpFileProcessor dumpFileProcessor,

for (MwDumpFile dumpFile : findAllRelevantDumps(preferCurrent)) {
try (InputStream inputStream = dumpFile.getDumpFileStream()) {
logger.info("Processing dump file " + dumpFile.toString());
dumpFileProcessor
.processDumpFileContents(inputStream, dumpFile);
} catch (FileAlreadyExistsException e) {
logger.error("Dump file "
+ dumpFile.toString()
+ " could not be processed since file "
+ e.getFile()
+ " already exists. Try deleting the file or dumpfile directory to attempt a new download.");
} catch (IOException e) {
logger.error("Dump file " + dumpFile.toString()
+ " could not be processed: " + e.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public InputStream getDumpFileStream() throws IOException {
this.dumpContentType, this.projectName, this.dateStamp));
}

@Override
public void prepareDumpFile() throws IOException {
// nothing to do
}

@Override
protected Long fetchMaximalRevisionId() {
String inputLine;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikidata.wdtk.util.DirectoryManager;
import org.wikidata.wdtk.util.WebResourceFetcher;

Expand All @@ -39,6 +41,9 @@
*/
class WmfOnlineDailyDumpFile extends WmfDumpFile {

static final Logger logger = LoggerFactory
.getLogger(WmfOnlineDailyDumpFile.class);

final WebResourceFetcher webResourceFetcher;
final DirectoryManager dumpfileDirectoryManager;

Expand Down Expand Up @@ -70,10 +75,26 @@ public DumpContentType getDumpContentType() {

@Override
public InputStream getDumpFileStream() throws IOException {
prepareDumpFile();

String fileName = WmfDumpFile.getDumpFileName(DumpContentType.DAILY,
this.projectName, this.dateStamp);
DirectoryManager dailyDirectoryManager = this.dumpfileDirectoryManager
.getSubdirectoryManager(WmfDumpFile.getDumpFileDirectoryName(
DumpContentType.DAILY, this.dateStamp));

return dailyDirectoryManager.getInputStreamForBz2File(fileName);
}

@Override
public void prepareDumpFile() throws IOException {
String fileName = WmfDumpFile.getDumpFileName(DumpContentType.DAILY,
this.projectName, this.dateStamp);
String urlString = getBaseUrl() + fileName;

logger.info("Downloading daily dump file " + fileName + " from "
+ urlString + " ...");

if (this.getMaximalRevisionId() == -1L) {
throw new IOException(
"Failed to retrieve maximal revision id. Aborting dump retrieval.");
Expand All @@ -91,7 +112,8 @@ public InputStream getDumpFileStream() throws IOException {
dailyDirectoryManager.createFile(WmfDumpFile.LOCAL_FILENAME_MAXREVID,
this.getMaximalRevisionId().toString());

return dailyDirectoryManager.getInputStreamForBz2File(fileName);
logger.info("... Completed download of daily dump file " + fileName
+ " from " + urlString);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.wikidata.wdtk.util.DirectoryManager;
import org.wikidata.wdtk.util.WebResourceFetcher;

Expand All @@ -41,6 +43,9 @@
*/
public class WmfOnlineStandardDumpFile extends WmfDumpFile {

static final Logger logger = LoggerFactory
.getLogger(WmfOnlineStandardDumpFile.class);

final WebResourceFetcher webResourceFetcher;
final DirectoryManager dumpfileDirectoryManager;
final DumpContentType dumpContentType;
Expand Down Expand Up @@ -78,10 +83,28 @@ public DumpContentType getDumpContentType() {

@Override
public InputStream getDumpFileStream() throws IOException {

prepareDumpFile();

String fileName = WmfDumpFile.getDumpFileName(this.dumpContentType,
this.projectName, this.dateStamp);
DirectoryManager thisDumpDirectoryManager = this.dumpfileDirectoryManager
.getSubdirectoryManager(WmfDumpFile.getDumpFileDirectoryName(
this.dumpContentType, this.dateStamp));

return thisDumpDirectoryManager.getInputStreamForBz2File(fileName);
}

@Override
public void prepareDumpFile() throws IOException {
String fileName = WmfDumpFile.getDumpFileName(this.dumpContentType,
this.projectName, this.dateStamp);
String urlString = getBaseUrl() + fileName;

logger.info("Downloading "
+ this.dumpContentType.toString().toLowerCase() + " dump file "
+ fileName + " from " + urlString + " ...");

if (this.getMaximalRevisionId() == -1) {
throw new IOException(
"Failed to retrieve maximal revision id. Aborting dump retrieval.");
Expand All @@ -100,7 +123,9 @@ public InputStream getDumpFileStream() throws IOException {
WmfDumpFile.LOCAL_FILENAME_MAXREVID, this
.getMaximalRevisionId().toString());

return thisDumpDirectoryManager.getInputStreamForBz2File(fileName);
logger.info("... Completed download of "
+ this.dumpContentType.toString().toLowerCase() + " dump file "
+ fileName + " from " + urlString);
}

@Override
Expand Down Expand Up @@ -142,35 +167,6 @@ protected Long fetchMaximalRevisionId() {
return maxRevId;
}

// Old code below that extracts data from the site stats table.
// It is unclear how this data is related to the dumps (which are generated
// at different times)
// protected Long fetchMaximalRevisionId() {
// Long maxRevId = -1L;
// String urlString = getBaseUrl() + this.projectName + "-" + dateStamp
// + "-site_stats.sql.gz";
// try (BufferedReader in = this.webResourceFetcher
// .getBufferedReaderForGzipUrl(urlString)) {
// String inputLine;
// while (maxRevId < 0 && (inputLine = in.readLine()) != null) {
// if (inputLine.startsWith("INSERT INTO `site_stats` VALUES (")) {
// String[] values = inputLine.split(",", 4);
// if (values.length == 4) {
// try {
// maxRevId = new Long(values[2]);
// } catch (NumberFormatException e) {
// // could not parse number; just continue and
// // probably return -1
// }
// }
// }
// }
// } catch (IOException e) {
// // file not found or not readable; just fall through to return -1
// }
// return maxRevId;
// }

@Override
protected boolean fetchIsDone() {
boolean found = false;
Expand Down
Loading

0 comments on commit 984bd23

Please sign in to comment.