From faae178c004c2e96b216192b8ee76892b671d44b Mon Sep 17 00:00:00 2001 From: pujav65 Date: Mon, 21 Dec 2015 10:12:30 -0500 Subject: [PATCH] RYA-21 adding input format and sample mappers/reducers --- dao/accumulo.rya/pom.xml | 7 + .../accumulo/mr/RyaStatementInputFormat.java | 94 ++++++++ .../rya/accumulo/mr/RyaStatementMapper.java | 85 +++++++ .../rya/accumulo/mr/RyaStatementReducer.java | 87 +++++++ .../accumulo/mr}/RyaStatementWritable.java | 24 +- .../mr/fileinput/RdfFileInputFormat.java | 12 +- .../mr/fileinput/RdfFileInputTool.java | 18 +- .../rya/accumulo/mr/RyaInputFormatTest.java | 223 ++++++++++++++++++ 8 files changed, 528 insertions(+), 22 deletions(-) create mode 100644 dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementInputFormat.java create mode 100644 dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementMapper.java create mode 100644 dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementReducer.java rename {common/rya.api/src/main/java/mvm/rya/api/domain/utils => dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr}/RyaStatementWritable.java (91%) create mode 100644 dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java diff --git a/dao/accumulo.rya/pom.xml b/dao/accumulo.rya/pom.xml index 5328945e9..d7810a843 100644 --- a/dao/accumulo.rya/pom.xml +++ b/dao/accumulo.rya/pom.xml @@ -66,6 +66,13 @@ under the License. junit test + + org.apache.mrunit + mrunit + hadoop2 + 1.1.0 + test + diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementInputFormat.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementInputFormat.java new file mode 100644 index 000000000..c6f12becb --- /dev/null +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementInputFormat.java @@ -0,0 +1,94 @@ +package mvm.rya.accumulo.mr; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import java.io.IOException; +import java.util.Map.Entry; + +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.mapreduce.AbstractInputFormat; +import org.apache.accumulo.core.client.mapreduce.RangeInputSplit; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaTripleContext; +import mvm.rya.api.resolver.triple.TripleRow; +import mvm.rya.api.resolver.triple.TripleRowResolverException; + +public class RyaStatementInputFormat extends AbstractInputFormat { + @Override + public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + return new RyaStatementRecordReader(); + } + + + public static void setTableLayout(Job conf, TABLE_LAYOUT layout) { + conf.getConfiguration().set(MRUtils.TABLE_LAYOUT_PROP, layout.name()); + } + + public class RyaStatementRecordReader extends AbstractRecordReader { + + private RyaTripleContext ryaContext; + private TABLE_LAYOUT tableLayout; + + @Override + protected void setupIterators(TaskAttemptContext context, Scanner scanner, String tableName, RangeInputSplit split) { + + } + + @Override + public void initialize(InputSplit inSplit, TaskAttemptContext attempt) throws IOException { + super.initialize(inSplit, attempt); + this.tableLayout = TABLE_LAYOUT.valueOf(attempt.getConfiguration().get(MRUtils.TABLE_LAYOUT_PROP, TABLE_LAYOUT.OSP.toString())); + //TODO verify that this is correct + this.ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(attempt.getConfiguration())); + } + + @Override + public boolean nextKeyValue() throws IOException { + if (!scannerIterator.hasNext()) + return false; + + Entry entry = scannerIterator.next(); + ++numKeysRead; + currentKey = entry.getKey(); + + try { + currentK = currentKey.getRow(); + RyaStatement stmt = this.ryaContext.deserializeTriple(this.tableLayout, new TripleRow(entry.getKey().getRow().getBytes(), entry.getKey().getColumnFamily().getBytes(), entry.getKey().getColumnQualifier().getBytes())); + RyaStatementWritable writable = new RyaStatementWritable(); + writable.setRyaStatement(stmt); + currentV = writable; + } catch(TripleRowResolverException e) { + throw new IOException(e); + } + return true; + } + + } + +} \ No newline at end of file diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementMapper.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementMapper.java new file mode 100644 index 000000000..d90215b05 --- /dev/null +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementMapper.java @@ -0,0 +1,85 @@ + +package mvm.rya.accumulo.mr; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX; +import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_PO_SUFFIX; +import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_PRFX_DEF; +import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +import org.apache.accumulo.core.data.Mutation; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.RyaTableMutationsFactory; +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.resolver.RyaTripleContext; + +public class RyaStatementMapper extends Mapper { + + private Text spoTable; + private Text poTable; + private Text ospTable; + private RyaTableMutationsFactory mutationsFactory; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + + String tablePrefix = context.getConfiguration().get(MRUtils.TABLE_PREFIX_PROPERTY, TBL_PRFX_DEF); + spoTable = new Text(tablePrefix + TBL_SPO_SUFFIX); + poTable = new Text(tablePrefix + TBL_PO_SUFFIX); + ospTable = new Text(tablePrefix + TBL_OSP_SUFFIX); + + RyaTripleContext ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(context.getConfiguration())); + mutationsFactory = new RyaTableMutationsFactory(ryaContext); + } + + @Override + protected void map(Text key, RyaStatementWritable value, Context context) throws IOException, InterruptedException { + + Map> mutations = mutationsFactory.serialize(value.getRyaStatement()); + + for(TABLE_LAYOUT layout : mutations.keySet()) { + + Text table = null; + switch (layout) { + case SPO: + table = spoTable; + break; + case OSP: + table = ospTable; + break; + case PO: + table = poTable; + break; + } + + for(Mutation mutation : mutations.get(layout)) { + context.write(table, mutation); + } + } + } +} \ No newline at end of file diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementReducer.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementReducer.java new file mode 100644 index 000000000..e353528bc --- /dev/null +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementReducer.java @@ -0,0 +1,87 @@ +package mvm.rya.accumulo.mr; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.RyaTableMutationsFactory; +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.resolver.RyaTripleContext; +import org.apache.accumulo.core.data.Mutation; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_OSP_SUFFIX; +import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_PO_SUFFIX; +import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_PRFX_DEF; +import static mvm.rya.api.RdfCloudTripleStoreConstants.TBL_SPO_SUFFIX; + +public class RyaStatementReducer extends Reducer { + + private Text spoTable; + private Text poTable; + private Text ospTable; + private RyaTableMutationsFactory mutationsFactory; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + + String tablePrefix = context.getConfiguration().get(MRUtils.TABLE_PREFIX_PROPERTY, TBL_PRFX_DEF); + spoTable = new Text(tablePrefix + TBL_SPO_SUFFIX); + poTable = new Text(tablePrefix + TBL_PO_SUFFIX); + ospTable = new Text(tablePrefix + TBL_OSP_SUFFIX); + + RyaTripleContext ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration(context.getConfiguration())); + mutationsFactory = new RyaTableMutationsFactory(ryaContext); + } + + @Override + protected void reduce(WritableComparable key, Iterable values, Context context) throws IOException, InterruptedException { + + for(RyaStatementWritable value : values) { + Map> mutations = mutationsFactory.serialize(value.getRyaStatement()); + + for(TABLE_LAYOUT layout : mutations.keySet()) { + + Text table = null; + switch (layout) { + case SPO: + table = spoTable; + break; + case OSP: + table = ospTable; + break; + case PO: + table = poTable; + break; + } + + for(Mutation mutation : mutations.get(layout)) { + context.write(table, mutation); + } + } + + } + } +} \ No newline at end of file diff --git a/common/rya.api/src/main/java/mvm/rya/api/domain/utils/RyaStatementWritable.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java similarity index 91% rename from common/rya.api/src/main/java/mvm/rya/api/domain/utils/RyaStatementWritable.java rename to dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java index 13d82da94..87a94333a 100644 --- a/common/rya.api/src/main/java/mvm/rya/api/domain/utils/RyaStatementWritable.java +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/RyaStatementWritable.java @@ -1,4 +1,4 @@ -package mvm.rya.api.domain.utils; +package mvm.rya.accumulo.mr; /* * Licensed to the Apache Software Foundation (ASF) under one @@ -26,14 +26,16 @@ import java.io.IOException; import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.WritableComparable; + +import mvm.rya.accumulo.AccumuloRdfConfiguration; import mvm.rya.api.RdfCloudTripleStoreConstants; import mvm.rya.api.domain.RyaStatement; import mvm.rya.api.resolver.RyaTripleContext; import mvm.rya.api.resolver.triple.TripleRow; import mvm.rya.api.resolver.triple.TripleRowResolverException; -import org.apache.hadoop.io.WritableComparable; - /** * Date: 7/17/12 * Time: 1:29 PM @@ -42,12 +44,19 @@ public class RyaStatementWritable implements WritableComparable { private RyaTripleContext ryaContext; private RyaStatement ryaStatement; - + + public RyaStatementWritable(Configuration conf) { + this(); + } public RyaStatementWritable(RyaTripleContext ryaContext) { this.ryaContext = ryaContext; } - + + public RyaStatementWritable() { + this.ryaContext = RyaTripleContext.getInstance(new AccumuloRdfConfiguration()); + } + public RyaStatementWritable(RyaStatement ryaStatement, RyaTripleContext ryaContext) { this(ryaContext); this.ryaStatement = ryaStatement; @@ -87,8 +96,8 @@ public void write(DataOutput dataOutput) throws IOException { write(dataOutput, ryaStatement.getValue()); Long timestamp = ryaStatement.getTimestamp(); boolean b = timestamp != null; + dataOutput.writeBoolean(b); if (b) { - dataOutput.writeBoolean(b); dataOutput.writeLong(timestamp); } } catch (TripleRowResolverException e) { @@ -111,8 +120,9 @@ protected byte[] read(DataInput dataInput) throws IOException { byte[] bytes = new byte[len]; dataInput.readFully(bytes); return bytes; + }else { + return null; } - return null; } @Override diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputFormat.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputFormat.java index f20dfe3f2..3d2fd78c2 100644 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputFormat.java +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputFormat.java @@ -25,12 +25,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.mr.utils.MRUtils; -import mvm.rya.api.domain.utils.RyaStatementWritable; -import mvm.rya.api.resolver.RdfToRyaConversions; -import mvm.rya.api.resolver.RyaTripleContext; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -48,6 +42,12 @@ import org.openrdf.rio.RDFParser; import org.openrdf.rio.Rio; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.mr.RyaStatementWritable; +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.api.resolver.RdfToRyaConversions; +import mvm.rya.api.resolver.RyaTripleContext; + /** * Be able to input multiple rdf formatted files. Convert from rdf format to statements. * Class RdfFileInputFormat diff --git a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputTool.java b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputTool.java index 673d65fed..f44b6aa21 100644 --- a/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputTool.java +++ b/dao/accumulo.rya/src/main/java/mvm/rya/accumulo/mr/fileinput/RdfFileInputTool.java @@ -28,15 +28,6 @@ import java.util.Date; import java.util.Map; -import mvm.rya.accumulo.AccumuloRdfConfiguration; -import mvm.rya.accumulo.RyaTableMutationsFactory; -import mvm.rya.accumulo.mr.AbstractAccumuloMRTool; -import mvm.rya.accumulo.mr.utils.MRUtils; -import mvm.rya.api.RdfCloudTripleStoreConstants; -import mvm.rya.api.domain.RyaStatement; -import mvm.rya.api.domain.utils.RyaStatementWritable; -import mvm.rya.api.resolver.RyaTripleContext; - import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.data.Mutation; import org.apache.hadoop.conf.Configuration; @@ -49,6 +40,15 @@ import org.apache.hadoop.util.ToolRunner; import org.openrdf.rio.RDFFormat; +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.RyaTableMutationsFactory; +import mvm.rya.accumulo.mr.AbstractAccumuloMRTool; +import mvm.rya.accumulo.mr.RyaStatementWritable; +import mvm.rya.accumulo.mr.utils.MRUtils; +import mvm.rya.api.RdfCloudTripleStoreConstants; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.resolver.RyaTripleContext; + /** * Do bulk import of rdf files * Class RdfFileInputTool diff --git a/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java new file mode 100644 index 000000000..0e03fe368 --- /dev/null +++ b/dao/accumulo.rya/src/test/java/mvm/rya/accumulo/mr/RyaInputFormatTest.java @@ -0,0 +1,223 @@ +package mvm.rya.accumulo.mr; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import mvm.rya.accumulo.AccumuloRdfConfiguration; +import mvm.rya.accumulo.AccumuloRyaDAO; +import mvm.rya.accumulo.RyaTableMutationsFactory; +import mvm.rya.accumulo.mr.RyaStatementInputFormat.RyaStatementRecordReader; +import mvm.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT; +import mvm.rya.api.domain.RyaStatement; +import mvm.rya.api.domain.RyaURI; +import mvm.rya.api.resolver.RyaTripleContext; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat; +import org.apache.accumulo.core.client.mock.MockInstance; +import org.apache.accumulo.core.client.security.tokens.PasswordToken; +import org.apache.accumulo.core.data.Mutation; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.hadoop.mrunit.mapreduce.MapDriver; +import org.apache.hadoop.mrunit.mapreduce.ReduceDriver; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +public class RyaInputFormatTest { + + static String username = "root", table = "rya_spo"; + static PasswordToken password = new PasswordToken(""); + + static Instance instance; + static AccumuloRyaDAO apiImpl; + + @BeforeClass + public static void init() throws Exception { + instance = new MockInstance("mock_instance"); + Connector connector = instance.getConnector(username, password); + connector.tableOperations().create(table); + + AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration(); + conf.setTablePrefix("rya_"); + conf.setDisplayQueryPlan(false); + + apiImpl = new AccumuloRyaDAO(); + apiImpl.setConf(conf); + apiImpl.setConnector(connector); + } + + @Before + public void before() throws Exception { + apiImpl.init(); + } + + @After + public void after() throws Exception { + apiImpl.dropAndDestroy(); + } + + @Test + public void testInputFormat() throws Exception { + + + RyaStatement input = RyaStatement.builder() + .setSubject(new RyaURI("http://www.google.com")) + .setPredicate(new RyaURI("http://some_other_uri")) + .setObject(new RyaURI("http://www.yahoo.com")) + .build(); + + apiImpl.add(input); + + Job jobConf = Job.getInstance(); + + RyaStatementInputFormat.setMockInstance(jobConf, instance.getInstanceName()); + RyaStatementInputFormat.setConnectorInfo(jobConf, username, password); + RyaStatementInputFormat.setTableLayout(jobConf, TABLE_LAYOUT.SPO); + + AccumuloInputFormat.setInputTableName(jobConf, table); + AccumuloInputFormat.setInputTableName(jobConf, table); + AccumuloInputFormat.setScanIsolation(jobConf, false); + AccumuloInputFormat.setLocalIterators(jobConf, false); + AccumuloInputFormat.setOfflineTableScan(jobConf, false); + + RyaStatementInputFormat inputFormat = new RyaStatementInputFormat(); + + JobContext context = new JobContextImpl(jobConf.getConfiguration(), jobConf.getJobID()); + + List splits = inputFormat.getSplits(context); + + Assert.assertEquals(1, splits.size()); + + TaskAttemptContext taskAttemptContext = new TaskAttemptContextImpl(context.getConfiguration(), new TaskAttemptID(new TaskID(), 1)); + + RecordReader reader = inputFormat.createRecordReader(splits.get(0), taskAttemptContext); + + RyaStatementRecordReader ryaStatementRecordReader = (RyaStatementRecordReader)reader; + ryaStatementRecordReader.initialize(splits.get(0), taskAttemptContext); + + List results = new ArrayList(); + while(ryaStatementRecordReader.nextKeyValue()) { + RyaStatementWritable writable = ryaStatementRecordReader.getCurrentValue(); + RyaStatement value = writable.getRyaStatement(); + Text text = ryaStatementRecordReader.getCurrentKey(); + RyaStatement stmt = RyaStatement.builder() + .setSubject(value.getSubject()) + .setPredicate(value.getPredicate()) + .setObject(value.getObject()) + .setContext(value.getContext()) + .setQualifier(value.getQualifer()) + .setColumnVisibility(value.getColumnVisibility()) + .setValue(value.getValue()) + .build(); + results.add(stmt); + + System.out.println(text); + System.out.println(value); + } + + Assert.assertTrue(results.size() == 2); + Assert.assertTrue(results.contains(input)); + } + + @Test + public void mapperTest() throws Exception { + + RyaStatement input = RyaStatement.builder() + .setSubject(new RyaURI("http://www.google.com")) + .setPredicate(new RyaURI("http://some_other_uri")) + .setObject(new RyaURI("http://www.yahoo.com")) + .setValue(new byte[0]) + .setTimestamp(0L) + .build(); + + RyaStatementWritable writable = new RyaStatementWritable(); + writable.setRyaStatement(input); + + RyaStatementMapper mapper = new RyaStatementMapper(); + MapDriver mapDriver = MapDriver.newMapDriver(mapper); + + RyaTripleContext context = RyaTripleContext.getInstance(new AccumuloRdfConfiguration()); + RyaTableMutationsFactory mutationsFactory = new RyaTableMutationsFactory(context); + + Map> mutations = mutationsFactory.serialize(input); + + mapDriver.withInput(new Text("sometext"), writable); + + for(TABLE_LAYOUT key : mutations.keySet()) { + Collection mutationCollection = mutations.get(key); + for(Mutation m : mutationCollection) { + mapDriver.withOutput(new Text("rya_" + key.name().toLowerCase()), m); + } + } + + mapDriver.runTest(false); + + } + + @Test + public void reducerTest() throws Exception { + RyaStatement input = RyaStatement.builder() + .setSubject(new RyaURI("http://www.google.com")) + .setPredicate(new RyaURI("http://some_other_uri")) + .setObject(new RyaURI("http://www.yahoo.com")) + .setValue(new byte[0]) + .setTimestamp(0L) + .build(); + + RyaStatementWritable writable = new RyaStatementWritable(); + writable.setRyaStatement(input); + + RyaStatementReducer reducer = new RyaStatementReducer(); + ReduceDriver reduceDriver = ReduceDriver.newReduceDriver(reducer); + + RyaTripleContext context = RyaTripleContext.getInstance(new AccumuloRdfConfiguration()); + RyaTableMutationsFactory mutationsFactory = new RyaTableMutationsFactory(context); + + Map> mutations = mutationsFactory.serialize(input); + + reduceDriver.withInput(new Text("sometext"), Arrays.asList(writable)); + + for(TABLE_LAYOUT key : mutations.keySet()) { + Collection mutationCollection = mutations.get(key); + for(Mutation m : mutationCollection) { + reduceDriver.withOutput(new Text("rya_" + key.name().toLowerCase()), m); + } + } + + reduceDriver.runTest(false); + } + +} \ No newline at end of file