Skip to content

Commit

Permalink
Adding an option for Halyard Stats to calculate/update statistics of …
Browse files Browse the repository at this point in the history
…a particular named graph context
  • Loading branch information
Adam Sotona committed Aug 23, 2018
1 parent d91ede7 commit dc6a572
Show file tree
Hide file tree
Showing 7 changed files with 233 additions and 15 deletions.
Expand Up @@ -102,7 +102,7 @@ public final class HalyardTableUtils {

private static final int PREFIXES = 3;
private static final byte[] START_KEY = new byte[20];
static final byte[] STOP_KEY = new byte[20];
public static final byte[] STOP_KEY = new byte[20];
static {
Arrays.fill(START_KEY, (byte)0);
Arrays.fill(STOP_KEY, (byte)0xff); /* 0xff is 255 in decimal */
Expand Down
5 changes: 4 additions & 1 deletion docs/tools.md
Expand Up @@ -251,7 +251,8 @@ hdfs:///my_tmp_workdir -t mydataset
### Halyard Stats
```
$ ./halyard stats -h
usage: halyard stats [-h] [-v] -s <dataset_table> [-t <target_url>] [-r <size>] [-g <target_graph>]
usage: halyard stats [-h] [-v] -s <dataset_table> [-t <target_url>] [-r <size>] [-c <graph_context>]
[-g <target_graph>]
Halyard Stats is a MapReduce application that calculates dataset statistics and stores them in the
named graph within the dataset or exports them into a file. The generated statistics are described
by the VoID vocabulary, its extensions, and the SPARQL 1.1 Service Description.
Expand All @@ -263,6 +264,8 @@ by the VoID vocabulary, its extensions, and the SPARQL 1.1 Service Description.
hdfs://<path>/<file_name>[{0}].<RDF_ext>[.<compression>]
-r,--threshold <size> Optional minimal size of a named graph to calculate
statistics for (default is 1000)
-c,--graph-context <graph_context> Optional restrict stats calculation to the given named graph
context only
-g,--target-graph <target_graph> Optional target graph context of the exported statistics
(default is 'http://merck.github.io/Halyard/ns#statsContext')
Example: halyard stats -s my_dataset [-g 'http://whatever/mystats'] [-t
Expand Down
58 changes: 46 additions & 12 deletions tools/src/main/java/com/msd/gin/halyard/tools/HalyardStats.java
Expand Up @@ -31,8 +31,10 @@
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.WeakHashMap;
Expand All @@ -47,6 +49,8 @@
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.MultiRowRangeFilter;
import org.apache.hadoop.hbase.filter.MultiRowRangeFilter.RowRange;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
Expand Down Expand Up @@ -83,6 +87,7 @@ public final class HalyardStats extends AbstractHalyardTool {
private static final String SOURCE = "halyard.stats.source";
private static final String TARGET = "halyard.stats.target";
private static final String THRESHOLD = "halyard.stats.threshold";
private static final String TARGET_GRAPH = "halyard.stats.target.graph";
private static final String GRAPH_CONTEXT = "halyard.stats.graph.context";

private static final Charset UTF8 = Charset.forName("UTF-8");
Expand All @@ -95,7 +100,7 @@ static final class StatsMapper extends TableMapper<ImmutableBytesWritable, LongW
final SimpleValueFactory ssf = SimpleValueFactory.getInstance();

final byte[] lastKeyFragment = new byte[20], lastCtxFragment = new byte[20], lastClassFragment = new byte[20];
IRI statsContext;
IRI statsContext, graphContext;
byte[] statsContextHash;
byte lastRegion = -1;
long counter = 0;
Expand All @@ -114,7 +119,9 @@ protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
update = conf.get(TARGET) == null;
threshold = conf.getLong(THRESHOLD, 1000);
statsContext = ssf.createIRI(conf.get(GRAPH_CONTEXT, HALYARD.STATS_GRAPH_CONTEXT.stringValue()));
statsContext = ssf.createIRI(conf.get(TARGET_GRAPH, HALYARD.STATS_GRAPH_CONTEXT.stringValue()));
String gc = conf.get(GRAPH_CONTEXT);
if (gc != null) graphContext = ssf.createIRI(gc);
statsContextHash = HalyardTableUtils.hashKey(NTriplesUtil.toNTriplesString(statsContext).getBytes(UTF8));
}

Expand All @@ -130,6 +137,14 @@ private boolean matchAndCopyKey(byte[] source, int offset, byte[] target) {
return match;
}

private boolean matchingGraphContext(Resource subject) {
return graphContext == null
|| subject.equals(graphContext)
|| subject.stringValue().startsWith(graphContext.stringValue() + "_subject_")
|| subject.stringValue().startsWith(graphContext.stringValue() + "_property_")
|| subject.stringValue().startsWith(graphContext.stringValue() + "_object_");
}

@Override
protected void map(ImmutableBytesWritable key, Result value, Context output) throws IOException, InterruptedException {
byte region = key.get()[key.getOffset()];
Expand All @@ -156,7 +171,7 @@ protected void map(ImmutableBytesWritable key, Result value, Context output) thr
sail.initialize();
}
for (Statement st : HalyardTableUtils.parseStatements(value)) {
if (statsContext.equals(st.getContext())) {
if (statsContext.equals(st.getContext()) && matchingGraphContext(st.getSubject())) {
sail.removeStatement(null, st.getSubject(), st.getPredicate(), st.getObject(), st.getContext());
removed++;
}
Expand Down Expand Up @@ -236,7 +251,7 @@ protected void map(ImmutableBytesWritable key, Result value, Context output) thr
}

private void report(Context output, IRI property, String partitionId, long value) throws IOException, InterruptedException {
if (value > 0) {
if (value > 0 && (graphContext == null || graphContext.equals(graph))) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (DataOutputStream dos = new DataOutputStream(baos)) {
dos.writeUTF(graph.stringValue());
Expand Down Expand Up @@ -322,7 +337,7 @@ static final class StatsReducer extends Reducer<ImmutableBytesWritable, LongWrit
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration conf = context.getConfiguration();
statsGraphContext = SVF.createIRI(conf.get(GRAPH_CONTEXT, HALYARD.STATS_GRAPH_CONTEXT.stringValue()));
statsGraphContext = SVF.createIRI(conf.get(TARGET_GRAPH, HALYARD.STATS_GRAPH_CONTEXT.stringValue()));
String targetUrl = conf.get(TARGET);
if (targetUrl == null) {
sail = new HBaseSail(conf, conf.get(SOURCE), false, 0, true, 0, null, null);
Expand Down Expand Up @@ -354,10 +369,12 @@ protected void setup(Context context) throws IOException, InterruptedException {
writer.handleNamespace(HALYARD.PREFIX, HALYARD.NAMESPACE);
writer.startRDF();
}
writeStatement(HALYARD.STATS_ROOT_NODE, RDF.TYPE, VOID.DATASET);
writeStatement(HALYARD.STATS_ROOT_NODE, RDF.TYPE, SD.DATASET);
writeStatement(HALYARD.STATS_ROOT_NODE, RDF.TYPE, SD.GRAPH_CLASS);
writeStatement(HALYARD.STATS_ROOT_NODE, SD.DEFAULT_GRAPH, HALYARD.STATS_ROOT_NODE);
if (conf.get(GRAPH_CONTEXT) == null) {
writeStatement(HALYARD.STATS_ROOT_NODE, RDF.TYPE, VOID.DATASET);
writeStatement(HALYARD.STATS_ROOT_NODE, RDF.TYPE, SD.DATASET);
writeStatement(HALYARD.STATS_ROOT_NODE, RDF.TYPE, SD.GRAPH_CLASS);
writeStatement(HALYARD.STATS_ROOT_NODE, SD.DEFAULT_GRAPH, HALYARD.STATS_ROOT_NODE);
}
graphs = new WeakHashMap<>();
}

Expand Down Expand Up @@ -438,14 +455,20 @@ public HalyardStats() {
addOption("s", "source-dataset", "dataset_table", "Source HBase table with Halyard RDF store", true, true);
addOption("t", "target-file", "target_url", "Optional target file to export the statistics (instead of update) hdfs://<path>/<file_name>[{0}].<RDF_ext>[.<compression>]", false, true);
addOption("r", "threshold", "size", "Optional minimal size of a named graph to calculate statistics for (default is 1000)", false, true);
addOption("c", "graph-context", "graph_context", "Optional restrict stats calculation to the given named graph context only", false, true);
addOption("g", "target-graph", "target_graph", "Optional target graph context of the exported statistics (default is '" + HALYARD.STATS_GRAPH_CONTEXT.stringValue() + "')", false, true);
}

private static RowRange rowRange(byte prefix, byte[] hash) {
return new RowRange(HalyardTableUtils.concat(prefix, false, hash), true, HalyardTableUtils.concat(prefix, true, hash, HalyardTableUtils.STOP_KEY, HalyardTableUtils.STOP_KEY, HalyardTableUtils.STOP_KEY), true);
}

@Override
public int run(CommandLine cmd) throws Exception {
String source = cmd.getOptionValue('s');
String target = cmd.getOptionValue('t');
String graph = cmd.getOptionValue('g');
String targetGraph = cmd.getOptionValue('g');
String graphContext = cmd.getOptionValue('c');
String thresh = cmd.getOptionValue('r');
TableMapReduceUtil.addDependencyJars(getConf(),
HalyardExport.class,
Expand All @@ -463,7 +486,8 @@ public int run(CommandLine cmd) throws Exception {
Job job = Job.getInstance(getConf(), "HalyardStats " + source + (target == null ? " update" : " -> " + target));
job.getConfiguration().set(SOURCE, source);
if (target != null) job.getConfiguration().set(TARGET, target);
if (graph != null) job.getConfiguration().set(GRAPH_CONTEXT, graph);
if (targetGraph != null) job.getConfiguration().set(TARGET_GRAPH, targetGraph);
if (graphContext != null) job.getConfiguration().set(GRAPH_CONTEXT, graphContext);
if (thresh != null) job.getConfiguration().setLong(THRESHOLD, Long.parseLong(thresh));
job.setJarByClass(HalyardStats.class);
TableMapReduceUtil.initCredentials(job);
Expand All @@ -473,7 +497,17 @@ public int run(CommandLine cmd) throws Exception {
scan.setMaxVersions(1);
scan.setBatch(10);
scan.setAllowPartialResults(true);

if (graphContext != null) { //restricting stats to scan given graph context only
List<RowRange> ranges = new ArrayList<>();
byte[] gcHash = HalyardTableUtils.hashKey(NTriplesUtil.toNTriplesString(SimpleValueFactory.getInstance().createIRI(graphContext)).getBytes(UTF8));
ranges.add(rowRange(HalyardTableUtils.CSPO_PREFIX, gcHash));
ranges.add(rowRange(HalyardTableUtils.CPOS_PREFIX, gcHash));
ranges.add(rowRange(HalyardTableUtils.COSP_PREFIX, gcHash));
if (target == null) { //add stats context to the scanned row ranges (when in update mode) to delete the related stats during MapReduce
ranges.add(rowRange(HalyardTableUtils.CSPO_PREFIX, HalyardTableUtils.hashKey(NTriplesUtil.toNTriplesString(targetGraph == null ? HALYARD.STATS_GRAPH_CONTEXT : SimpleValueFactory.getInstance().createIRI(targetGraph)).getBytes(UTF8))));
}
scan.setFilter(new MultiRowRangeFilter(ranges));
}
TableMapReduceUtil.initTableMapperJob(
source,
scan,
Expand Down
Expand Up @@ -146,6 +146,8 @@ static void assertEqualModels(Set<Statement> ref, Set<Statement> m) {
public void testStatsUpdate() throws Exception {
final HBaseSail sail = new HBaseSail(HBaseServerTestInstance.getInstanceConfig(), "statsTable2", true, -1, true, 0, null, null);
sail.initialize();

//load test data
try (InputStream ref = HalyardStatsTest.class.getResourceAsStream("testData.trig")) {
RDFParser p = Rio.createParser(RDFFormat.TRIG);
p.setPreserveBNodeIDs(true);
Expand All @@ -158,20 +160,87 @@ public void handleStatement(Statement st) throws RDFHandlerException {
}
sail.commit();

//update stats
assertEquals(0, ToolRunner.run(HBaseServerTestInstance.getInstanceConfig(), new HalyardStats(),
new String[]{"-s", "statsTable2", "-r", "100"}));

//verify with golden file
Set<Statement> statsM = new HashSet<>();
try (CloseableIteration<? extends Statement,SailException> it = sail.getStatements(null, null, null, true, HALYARD.STATS_GRAPH_CONTEXT)) {
while (it.hasNext()) {
statsM.add(it.next());
}
}
sail.close();
try (InputStream refStream = HalyardStatsTest.class.getResourceAsStream("testStatsTarget.trig")) {
Model refM = Rio.parse(refStream, "", RDFFormat.TRIG, new ParserConfig().set(BasicParserSettings.PRESERVE_BNODE_IDS, true), SimpleValueFactory.getInstance(), new ParseErrorLogger());
assertEqualModels(refM, statsM);
}

//load additional data
try (InputStream ref = HalyardStatsTest.class.getResourceAsStream("testMoreData.trig")) {
RDFParser p = Rio.createParser(RDFFormat.TRIG);
p.setPreserveBNodeIDs(true);
p.setRDFHandler(new AbstractRDFHandler() {
@Override
public void handleStatement(Statement st) throws RDFHandlerException {
sail.addStatement(st.getSubject(), st.getPredicate(), st.getObject(), st.getContext());
}
}).parse(ref, "");
}
sail.commit();

//update stats only for graph1
assertEquals(0, ToolRunner.run(HBaseServerTestInstance.getInstanceConfig(), new HalyardStats(),
new String[]{"-s", "statsTable2", "-r", "100", "-c", "http://whatever/graph1"}));

//verify with golden file
statsM = new HashSet<>();
try (CloseableIteration<? extends Statement,SailException> it = sail.getStatements(null, null, null, true, HALYARD.STATS_GRAPH_CONTEXT)) {
while (it.hasNext()) {
statsM.add(it.next());
}
}
try (InputStream refStream = HalyardStatsTest.class.getResourceAsStream("testStatsMoreTarget.trig")) {
Model refM = Rio.parse(refStream, "", RDFFormat.TRIG, new ParserConfig().set(BasicParserSettings.PRESERVE_BNODE_IDS, true), SimpleValueFactory.getInstance(), new ParseErrorLogger());
assertEqualModels(refM, statsM);
}

sail.close();
}

@Test
public void testStatsTargetPartial() throws Exception {
final HBaseSail sail = new HBaseSail(HBaseServerTestInstance.getInstanceConfig(), "statsTable3", true, -1, true, 0, null, null);
sail.initialize();
try (InputStream ref = HalyardStatsTest.class.getResourceAsStream("testData.trig")) {
RDFParser p = Rio.createParser(RDFFormat.TRIG);
p.setPreserveBNodeIDs(true);
p.setRDFHandler(new AbstractRDFHandler() {
@Override
public void handleStatement(Statement st) throws RDFHandlerException {
sail.addStatement(st.getSubject(), st.getPredicate(), st.getObject(), st.getContext());
}
}).parse(ref, "");
}
sail.commit();
sail.close();

File root = File.createTempFile("test_stats", "");
root.delete();
root.mkdirs();

assertEquals(0, ToolRunner.run(HBaseServerTestInstance.getInstanceConfig(), new HalyardStats(),
new String[]{"-s", "statsTable3", "-t", root.toURI().toURL().toString() + "stats{0}.trig", "-r", "100", "-g", "http://whatever/myStats", "-c", "http://whatever/graph0"}));

File stats = new File(root, "stats0.trig");
assertTrue(stats.isFile());
try (InputStream statsStream = new FileInputStream(stats)) {
try (InputStream refStream = HalyardStatsTest.class.getResourceAsStream("testStatsTargetPartial.trig")) {
Model statsM = Rio.parse(statsStream, "", RDFFormat.TRIG, new ParserConfig().set(BasicParserSettings.PRESERVE_BNODE_IDS, true), SimpleValueFactory.getInstance(), new ParseErrorLogger());
Model refM = Rio.parse(refStream, "", RDFFormat.TRIG, new ParserConfig().set(BasicParserSettings.PRESERVE_BNODE_IDS, true), SimpleValueFactory.getInstance(), new ParseErrorLogger(), SimpleValueFactory.getInstance().createIRI("http://whatever/myStats"));
assertEqualModels(refM, statsM);
}
}
}

public void testRunNoArgs() throws Exception {
Expand Down
@@ -0,0 +1,16 @@
@prefix : <http://whatever/> .
{
:subjA :predB "whatever value ABC".
}

:graph0 {
:subjA :predB "whatever value ABC".
}

:graph1 {
:subjA :predB "whatever value ABC".
}

:graph2 {
:subjA :predB "whatever value ABC".
}
@@ -0,0 +1,66 @@
@prefix : <http://whatever/> .
@prefix halyard: <http://merck.github.io/Halyard/ns#> .
@prefix void: <http://rdfs.org/ns/void#> .
@prefix void-ext: <http://ldf.fi/void-ext#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
@prefix sd: <http://www.w3.org/ns/sparql-service-description#> .
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .

halyard:statsContext {
halyard:statsRoot a void:Dataset , sd:Dataset , sd:Graph ;
sd:defaultGraph halyard:statsRoot ;
sd:namedGraph :graph0 , :graph1 , :graph2 ;
void:classes "1000"^^xsd:long ;
void:triples "2001"^^xsd:long ;
void:propertyPartition halyard:statsRoot_property_v0EPmHVxqhkyM3Yh_Wfu7gMOZGU ;
void:properties "14"^^xsd:long ;
void-ext:distinctLiterals "857"^^xsd:long ;
void:distinctObjects "1957"^^xsd:long ;
void:distinctSubjects "191"^^xsd:long ;
void-ext:distinctBlankNodeObjects "100"^^xsd:long ;
void-ext:distinctBlankNodeSubjects "91"^^xsd:long ;
void-ext:distinctIRIReferenceObjects "1000"^^xsd:long ;
void-ext:distinctIRIReferenceSubjects "100"^^xsd:long .

:graph0 sd:name :graph0 ;
sd:graph :graph0 ;
a sd:NamedGraph , sd:Graph , void:Dataset ;
void:classes "450"^^xsd:long ;
void:triples "900"^^xsd:long ;
void:propertyPartition :graph0_property_v0EPmHVxqhkyM3Yh_Wfu7gMOZGU ;
void:properties "14"^^xsd:long ;
void-ext:distinctLiterals "386"^^xsd:long ;
void:distinctObjects "886"^^xsd:long ;
void:distinctSubjects "91"^^xsd:long ;
void-ext:distinctBlankNodeObjects "50"^^xsd:long ;
void-ext:distinctBlankNodeSubjects "41"^^xsd:long ;
void-ext:distinctIRIReferenceObjects "450"^^xsd:long ;
void-ext:distinctIRIReferenceSubjects "50"^^xsd:long .

:graph0_property_v0EPmHVxqhkyM3Yh_Wfu7gMOZGU a void:Dataset ;
void:property rdf:type ;
void:triples "450"^^xsd:long .

:graph1 sd:name :graph1 ;
sd:graph :graph1 ;
a sd:NamedGraph , sd:Graph , void:Dataset ;
void:classes "450"^^xsd:long ;
void:triples "901"^^xsd:long ;
void:propertyPartition :graph1_property_v0EPmHVxqhkyM3Yh_Wfu7gMOZGU ;
void:properties "15"^^xsd:long ;
void-ext:distinctLiterals "387"^^xsd:long ;
void:distinctObjects "887"^^xsd:long ;
void:distinctSubjects "91"^^xsd:long ;
void-ext:distinctBlankNodeObjects "50"^^xsd:long ;
void-ext:distinctBlankNodeSubjects "40"^^xsd:long ;
void-ext:distinctIRIReferenceObjects "450"^^xsd:long ;
void-ext:distinctIRIReferenceSubjects "51"^^xsd:long .

:graph1_property_v0EPmHVxqhkyM3Yh_Wfu7gMOZGU a void:Dataset ;
void:property rdf:type ;
void:triples "450"^^xsd:long .

halyard:statsRoot_property_v0EPmHVxqhkyM3Yh_Wfu7gMOZGU a void:Dataset ;
void:property rdf:type ;
void:triples "1000"^^xsd:long .
}

0 comments on commit dc6a572

Please sign in to comment.