Permalink
Browse files

[#1160] Provide functions to drop and truncate tables for the Accumul…

…o store. (#1164)

fixes #1160
  • Loading branch information...
p-f authored and galpha committed Jan 24, 2019
1 parent fee46c1 commit f9f95126b02da3f5b324a33e04be5bac7ce12c77
@@ -227,6 +227,37 @@ public void close() {
}
}

/**
* Drop all tables used by this store instance.
*
* @throws IOException when deleting tables fails.
*/
public void dropTables() throws IOException {
for (String tableName : new String[] {
getVertexTableName(), getEdgeTableName(), getGraphHeadName()}) {
try {
dropTableIfExists(tableName);
} catch (AccumuloSecurityException | AccumuloException e) {
throw new IOException("Failed to delete table " + tableName, e);
}
}
}

/**
* Truncate all tables handled by this store instance, i.e. delete all rows.
* The current implementations simply drops all tables and creates new ones.
*
* @throws IOException when deleting or creating tables fails.
*/
public void truncateTables() throws IOException {
dropTables();
try {
createTablesIfNotExists();
} catch (AccumuloSecurityException | AccumuloException e) {
throw new IOException("Failed to create tables.", e);
}
}

@Nullable
@Override
public GraphHead readGraph(@Nonnull GradoopId graphId) throws IOException {
@@ -468,4 +499,22 @@ private void createTablesIfNotExists() throws AccumuloSecurityException, Accumul
}
}

/**
* Delete a table if it exists.
*
* @param tableName The table name.
* @throws AccumuloSecurityException if the user does not have persmissions to delete the table.
* @throws AccumuloException if a general Accumulo error occurs.
*/
private void dropTableIfExists(@Nonnull String tableName) throws AccumuloSecurityException,
AccumuloException {
if (conn.tableOperations().exists(tableName)) {
try {
conn.tableOperations().delete(tableName);
} catch (TableNotFoundException e) {
// We checked before if it exists, so this should not happen.
throw new IllegalStateException(e);
}
}
}
}
@@ -26,6 +26,7 @@
import org.gradoop.storage.impl.accumulo.io.outputformats.ElementOutputFormat;

import javax.annotation.Nonnull;
import java.io.IOException;

/**
* Write graph or graph collection into accumulo store
@@ -46,22 +47,25 @@ public AccumuloDataSink(
}

@Override
public void write(LogicalGraph logicalGraph) {
public void write(LogicalGraph logicalGraph) throws IOException {
write(logicalGraph, false);
}

@Override
public void write(GraphCollection graphCollection) {
public void write(GraphCollection graphCollection) throws IOException {
write(graphCollection, false);
}

@Override
public void write(LogicalGraph logicalGraph, boolean overwrite) {
public void write(LogicalGraph logicalGraph, boolean overwrite) throws IOException {
write(getFlinkConfig().getGraphCollectionFactory().fromGraph(logicalGraph), overwrite);
}

@Override
public void write(GraphCollection graphCollection, boolean overWrite) {
public void write(GraphCollection graphCollection, boolean overWrite) throws IOException {
if (overWrite) {
getStore().truncateTables();
}
graphCollection.getGraphHeads()
.output(new ElementOutputFormat<>(GraphHead.class, getAccumuloConfig()));
graphCollection.getVertices()
@@ -96,13 +96,23 @@ protected void doTest(
return ret;
}

/**
* Wraps a test function.
*/
public interface SocialTestContext {

/**
* Run the test.
*
* @param loader The loader used for access to a test graph.
* @param store The store instance to test.
* @param config The gradoop flink config used to run tests.
* @throws Throwable an Exception thrown by the test.
*/
void test(
AsciiGraphLoader<GraphHead, Vertex, Edge> loader,
AccumuloEPGMStore store,
GradoopFlinkConfig config
) throws Throwable;

}
}
@@ -19,6 +19,7 @@
import com.google.common.collect.Queues;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.gradoop.common.GradoopTestUtils;
import org.gradoop.common.config.GradoopConfig;
import org.gradoop.common.exceptions.UnsupportedTypeException;
@@ -83,8 +84,7 @@
import static org.gradoop.common.GradoopTestUtils.validateEPGMElements;
import static org.gradoop.common.GradoopTestUtils.validateEPGMGraphElementCollections;
import static org.gradoop.common.GradoopTestUtils.validateEPGMGraphElements;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

/**
* Accumulo graph store test
@@ -98,6 +98,66 @@
private static final String TEST04 = "basic_04";
private static final String TEST05 = "basic_05";

/**
* Creates tables, deletes them and checks if they were deleted.
*
* @throws AccumuloSecurityException when the user does not have permissions for some actions.
* @throws AccumuloException on general accumulo errors.
* @throws IOException when deleting tables fails.
*/
@Test
public void deleteTablesTest() throws AccumuloSecurityException, AccumuloException, IOException {
final String table = "test_to_delete";
GradoopAccumuloConfig config = AccumuloTestSuite.getAcConfig(table);
AccumuloEPGMStore store = new AccumuloEPGMStore(config);
Connector connector = store.createConnector();
// Make sure that the tables were created.
assertTrue(connector.tableOperations().exists(store.getVertexTableName()));
assertTrue(connector.tableOperations().exists(store.getEdgeTableName()));
assertTrue(connector.tableOperations().exists(store.getGraphHeadName()));
// Delete tables.
store.dropTables();
// Check if they were deleted.
assertFalse(connector.tableOperations().exists(store.getVertexTableName()));
assertFalse(connector.tableOperations().exists(store.getEdgeTableName()));
assertFalse(connector.tableOperations().exists(store.getGraphHeadName()));
}

/**
* Stores a graph, truncates tables and checks if the tables still exist and if they are empty.
*
* @throws AccumuloSecurityException when the user does not have permissions for some actions.
* @throws AccumuloException on general accumulo errors.
*/
@Test
public void truncateTablesTest() throws AccumuloSecurityException, AccumuloException,
IOException {
final String table = "test_to_truncate";
GradoopAccumuloConfig config = AccumuloTestSuite.getAcConfig(table);

AccumuloEPGMStore graphStore = new AccumuloEPGMStore(config);

AsciiGraphLoader<GraphHead, Vertex, Edge> loader = getMinimalFullFeaturedGraphLoader();

GraphHead graphHead = loader.getGraphHeads().iterator().next();
Vertex vertex = loader.getVertices().iterator().next();
Edge edge = loader.getEdges().iterator().next();

graphStore.writeGraphHead(graphHead);
graphStore.writeVertex(vertex);
graphStore.writeEdge(edge);

// re-open
graphStore.close();
graphStore = new AccumuloEPGMStore(config);
graphStore.truncateTables();

assertFalse(graphStore.getVertexSpace().hasNext());
assertFalse(graphStore.getEdgeSpace().hasNext());
assertFalse(graphStore.getGraphSpace().hasNext());
graphStore.close();
}

/**
* Creates persistent graph, vertex and edge data. Writes data to Accumulo,
* closes the store, opens it and reads/validates the data again.

0 comments on commit f9f9512

Please sign in to comment.