Skip to content

Commit

Permalink
Merge branch 'develop' into gh-1612-extractwalkvertexfromhop
Browse files Browse the repository at this point in the history
  • Loading branch information
m607123 authored Jan 15, 2018
2 parents 0de8c76 + ee374d5 commit 0b7d5fd
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -456,12 +456,7 @@ protected Edge getEdgeFromKey(final Key key, final byte[] row, final boolean inc
matchedVertex = EdgeId.MatchedVertex.SOURCE;
}

String group;
try {
group = new String(key.getColumnFamilyData().getBackingArray(), CommonConstants.UTF_8);
} catch (final UnsupportedEncodingException e) {
throw new AccumuloElementConversionException(e.getMessage(), e);
}
final String group = getGroupFromColumnFamily(key.getColumnFamilyData().getBackingArray());
try {
final Edge edge = new Edge(group, ((ToBytesSerialiser) schema.getVertexSerialiser()).deserialise(result[0]),
((ToBytesSerialiser) schema.getVertexSerialiser()).deserialise(result[1]), direction.isDirected(), matchedVertex, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,7 @@ public Value reduce(final Key key, final Iterator<Value> iter) {
if (!iter.hasNext()) {
return value;
}
final String group;
try {
group = new String(key.getColumnFamilyData().getBackingArray(), CommonConstants.UTF_8);
} catch (final UnsupportedEncodingException e) {
throw new AggregationException("Failed to recreate a graph element from a key and value", e);
}

final String group = elementConverter.getGroupFromColumnFamily(key.getColumnFamilyData().getBackingArray());
Properties properties;
final ElementAggregator aggregator = schema.getElement(group).getIngestAggregator();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ public void doOperation(final AddElementsFromHdfs operation,
throws OperationException {
validateOperation(operation);

final String splitsFilePath = getPathWithSlashSuffix(operation.getWorkingPath()) + context.getJobId() + "/splits";
LOGGER.info("Using working directory for splits files: " + splitsFilePath);
operation.setSplitsFilePath(splitsFilePath);
if (null == operation.getSplitsFilePath()) {
final String splitsFilePath = getPathWithSlashSuffix(operation.getWorkingPath()) + context.getJobId() + "/splits";
LOGGER.info("Using working directory for splits files: " + splitsFilePath);
operation.setSplitsFilePath(splitsFilePath);
}

try {
checkHdfsDirectories(operation, store);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
Expand All @@ -28,8 +33,12 @@
import uk.gov.gchq.gaffer.accumulostore.AccumuloProperties;
import uk.gov.gchq.gaffer.accumulostore.AccumuloStore;
import uk.gov.gchq.gaffer.accumulostore.SingleUseMockAccumuloStore;
import uk.gov.gchq.gaffer.accumulostore.key.AccumuloElementConverter;
import uk.gov.gchq.gaffer.accumulostore.key.MockAccumuloElementConverter;
import uk.gov.gchq.gaffer.accumulostore.utils.AccumuloPropertyNames;
import uk.gov.gchq.gaffer.accumulostore.utils.AccumuloStoreConstants;
import uk.gov.gchq.gaffer.commonutil.StreamUtil;
import uk.gov.gchq.gaffer.commonutil.StringUtil;
import uk.gov.gchq.gaffer.commonutil.TestGroups;
import uk.gov.gchq.gaffer.data.element.Edge;
import uk.gov.gchq.gaffer.data.element.Element;
Expand All @@ -40,12 +49,20 @@
import uk.gov.gchq.gaffer.operation.impl.get.GetElements;
import uk.gov.gchq.gaffer.store.StoreException;
import uk.gov.gchq.gaffer.store.schema.Schema;
import uk.gov.gchq.gaffer.store.schema.SchemaEdgeDefinition;
import uk.gov.gchq.gaffer.user.User;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.assertEquals;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

public class AggregatorIteratorTest {

Expand All @@ -54,26 +71,19 @@ public class AggregatorIteratorTest {
.storeProps(AggregatorIteratorTest.class));
private static final AccumuloProperties CLASSIC_PROPERTIES = AccumuloProperties
.loadStoreProperties(StreamUtil.openStream(AggregatorIteratorTest.class, "/accumuloStoreClassicKeys.properties"));
private static View defaultView;
private static AccumuloStore byteEntityStore;
private static AccumuloStore gaffer1KeyStore;

@BeforeClass
public static void setup() throws IOException, StoreException {
byteEntityStore = new SingleUseMockAccumuloStore();
gaffer1KeyStore = new SingleUseMockAccumuloStore();

defaultView = new View.Builder()
.edge(TestGroups.EDGE)
.entity(TestGroups.ENTITY)
.build();
}

@AfterClass
public static void tearDown() {
byteEntityStore = null;
gaffer1KeyStore = null;
defaultView = null;
}

@Before
Expand Down Expand Up @@ -164,4 +174,40 @@ private void test(final AccumuloStore store) throws OperationException {
assertEquals(expectedResult, aggregatedEdge);
assertEquals(expectedResult.getProperties(), aggregatedEdge.getProperties());
}

@Test
public void shouldGetGroupFromElementConverter() throws IOException {
MockAccumuloElementConverter.cleanUp();
// Given
MockAccumuloElementConverter.mock = mock(AccumuloElementConverter.class);
final Key key = mock(Key.class);
final List<Value> values = Arrays.asList(mock(Value.class), mock(Value.class));
final Schema schema = new Schema.Builder()
.edge(TestGroups.ENTITY, new SchemaEdgeDefinition())
.build();
final ByteSequence colFamData = mock(ByteSequence.class);
final byte[] colFam = StringUtil.toBytes(TestGroups.ENTITY);
final SortedKeyValueIterator sortedKeyValueIterator = mock(SortedKeyValueIterator.class);
final IteratorEnvironment iteratorEnvironment = mock(IteratorEnvironment.class);
final Map<String, String> options = new HashMap();

options.put("columns", "test");
options.put(AccumuloStoreConstants.SCHEMA, new String(schema.toCompactJson()));
options.put(AccumuloStoreConstants.ACCUMULO_ELEMENT_CONVERTER_CLASS, MockAccumuloElementConverter.class.getName());

given(colFamData.getBackingArray()).willReturn(colFam);
given(key.getColumnFamilyData()).willReturn(colFamData);
given(MockAccumuloElementConverter.mock.getGroupFromColumnFamily(colFam)).willReturn(TestGroups.ENTITY);

final AggregatorIterator aggregatorIterator = new AggregatorIterator();

// When
aggregatorIterator.init(sortedKeyValueIterator, options, iteratorEnvironment);
aggregatorIterator.reduce(key, values.iterator());

// Then
verify(MockAccumuloElementConverter.mock, times(1)).getGroupFromColumnFamily(colFam);

MockAccumuloElementConverter.cleanUp();
}
}

0 comments on commit 0b7d5fd

Please sign in to comment.