Skip to content

Commit

Permalink
Merge pull request #2436 from gchq/gh-2435-FederatedStore-changeGraph…
Browse files Browse the repository at this point in the history
…Id-table-name-fix

gh-2435 federated store change graph id table name fix
  • Loading branch information
p3430233 committed Aug 25, 2021
2 parents 1586f43 + cae6d6b commit 4393151
Show file tree
Hide file tree
Showing 7 changed files with 303 additions and 31 deletions.
1 change: 0 additions & 1 deletion store-implementation/federated-store/pom.xml
Expand Up @@ -44,7 +44,6 @@
<groupId>uk.gov.gchq.gaffer</groupId>
<artifactId>accumulo-store</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>uk.gov.gchq.gaffer</groupId>
Expand Down
Expand Up @@ -17,9 +17,13 @@
package uk.gov.gchq.gaffer.federatedstore;

import com.google.common.collect.Sets;
import org.apache.accumulo.core.client.Connector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import uk.gov.gchq.gaffer.accumulostore.AccumuloProperties;
import uk.gov.gchq.gaffer.accumulostore.AccumuloStore;
import uk.gov.gchq.gaffer.accumulostore.utils.TableUtils;
import uk.gov.gchq.gaffer.cache.CacheServiceLoader;
import uk.gov.gchq.gaffer.cache.exception.CacheOperationException;
import uk.gov.gchq.gaffer.commonutil.JsonUtil;
Expand Down Expand Up @@ -227,7 +231,7 @@ private boolean remove(final String graphId, final Predicate<Entry<FederatedAcce

private void deleteFromCache(final String graphId) {
if (isCacheEnabled()) {
federatedStoreCache.deleteFromCache(graphId);
federatedStoreCache.deleteGraphFromCache(graphId);
}
}

Expand Down Expand Up @@ -550,16 +554,31 @@ public boolean changeGraphAccessAsAdmin(final String graphId, final FederatedAcc

private boolean changeGraphAccess(final String graphId, final FederatedAccess newFederatedAccess, final Predicate<FederatedAccess> accessPredicate) throws StorageException {
boolean rtn;
Graph graphToMove = getGraphToMove(graphId, accessPredicate);
final Graph graphToMove = getGraphToMove(graphId, accessPredicate);

if (nonNull(graphToMove)) {
//remove graph to be moved
FederatedAccess oldAccess = null;
for (final Entry<FederatedAccess, Set<Graph>> entry : storage.entrySet()) {
entry.getValue().removeIf(graph -> graph.getGraphId().equals(graphId));
oldAccess = entry.getKey();
}

//add the graph being moved.
this.put(new GraphSerialisable.Builder().graph(graphToMove).build(), newFederatedAccess);

if (isCacheEnabled()) {
//Update cache
try {
federatedStoreCache.addGraphToCache(graphToMove, newFederatedAccess, true/*true because graphLibrary should have throw error*/);
} catch (final CacheOperationException e) {
//TODO FS recovery
String s = "Error occurred updating graphAccess. GraphStorage=updated, Cache=outdated. graphId:" + graphId;
LOGGER.error(s + " graphStorage access:{} cache access:{}", newFederatedAccess, oldAccess);
throw new StorageException(s, e);
}
}

rtn = true;
} else {
rtn = false;
Expand All @@ -568,28 +587,20 @@ private boolean changeGraphAccess(final String graphId, final FederatedAccess ne
}

public boolean changeGraphId(final String graphId, final String newGraphId, final User requestingUser) throws StorageException {
final Graph graphToMove = getGraphToMove(graphId, access -> access.hasWriteAccess(requestingUser));
return changeGraphId(graphId, newGraphId, graphToMove);
return changeGraphId(graphId, newGraphId, access -> access.hasWriteAccess(requestingUser));
}

public boolean changeGraphId(final String graphId, final String newGraphId, final User requestingUser, final String adminAuth) throws StorageException {
final Graph graphToMove = getGraphToMove(graphId, access -> access.hasWriteAccess(requestingUser, adminAuth));
return changeGraphId(graphId, newGraphId, graphToMove);
}


@Deprecated
public boolean changeGraphIdAsAdmin(final String graphId, final String newGraphId) throws StorageException {
final Graph graphToMove = getGraphToMove(graphId, access -> true);
return changeGraphId(graphId, newGraphId, graphToMove);
return changeGraphId(graphId, newGraphId, access -> access.hasWriteAccess(requestingUser, adminAuth));
}

private boolean changeGraphId(final String graphId, final String newGraphId, final Graph graphToMove) throws StorageException {
private boolean changeGraphId(final String graphId, final String newGraphId, final Predicate<FederatedAccess> accessPredicate) throws StorageException {
boolean rtn;
final Graph graphToMove = getGraphToMove(graphId, accessPredicate);

if (nonNull(graphToMove)) {
FederatedAccess key = null;
//remove graph to be moved
//remove graph to be moved from storage
for (final Entry<FederatedAccess, Set<Graph>> entry : storage.entrySet()) {
final boolean removed = entry.getValue().removeIf(graph -> graph.getGraphId().equals(graphId));
if (removed) {
Expand All @@ -598,23 +609,68 @@ private boolean changeGraphId(final String graphId, final String newGraphId, fin
}
}

final GraphConfig configWithNewGraphId = new GraphConfig.Builder()
.json(new GraphSerialisable.Builder().graph(graphToMove).build().getConfig())
.graphId(newGraphId)
.build();
//Update Tables
String storeClass = graphToMove.getStoreProperties().getStoreClass();
if (nonNull(storeClass) && storeClass.startsWith(AccumuloStore.class.getPackage().getName())) {
/*
* uk.gov.gchq.gaffer.accumulostore.[AccumuloStore, SingleUseAccumuloStore,
* SingleUseMockAccumuloStore, MockAccumuloStore, MiniAccumuloStore]
*/
try {
AccumuloProperties tmpAccumuloProps = (AccumuloProperties) graphToMove.getStoreProperties();
Connector connection = TableUtils.getConnector(tmpAccumuloProps.getInstance(),
tmpAccumuloProps.getZookeepers(),
tmpAccumuloProps.getUser(),
tmpAccumuloProps.getPassword());

if (connection.tableOperations().exists(graphId)) {
connection.tableOperations().offline(graphId);
connection.tableOperations().clone(graphId, newGraphId, true, null, null);
connection.tableOperations().online(newGraphId);
connection.tableOperations().delete(graphId);
}
} catch (final Exception e) {
LOGGER.warn("Error trying to update tables for graphID:{} graphToMove:{}", graphId, graphToMove);
LOGGER.warn("Error trying to update tables.", e);
}
}

final GraphConfig configWithNewGraphId = cloneGraphConfigWithNewGraphId(newGraphId, graphToMove);

//add the graph being renamed.
this.put(new GraphSerialisable.Builder()
GraphSerialisable newGraphSerialisable = new GraphSerialisable.Builder()
.graph(graphToMove)
.config(configWithNewGraphId)
.build(), key);
.build();
this.put(newGraphSerialisable, key);

//Update cache
if (isCacheEnabled()) {
try {
federatedStoreCache.addGraphToCache(newGraphSerialisable, key, true/*true because graphLibrary should have throw error*/);
} catch (final CacheOperationException e) {
//TODO FS recovery
String s = "Error occurred updating graphId. GraphStorage=updated, Cache=outdated graphId.";
LOGGER.error(s + " graphStorage graphId:{} cache graphId:{}", newGraphId, graphId);
throw new StorageException(s, e);
}
federatedStoreCache.deleteGraphFromCache(graphId);
}

rtn = true;
} else {
rtn = false;
}
return rtn;
}

private GraphConfig cloneGraphConfigWithNewGraphId(final String newGraphId, final Graph graphToMove) {
return new GraphConfig.Builder()
.json(new GraphSerialisable.Builder().graph(graphToMove).build().getConfig())
.graphId(newGraphId)
.build();
}

private Graph getGraphToMove(final String graphId, final Predicate<FederatedAccess> accessPredicate) {
Graph graphToMove = null;
for (final Entry<FederatedAccess, Set<Graph>> entry : storage.entrySet()) {
Expand Down
Expand Up @@ -55,15 +55,31 @@ public Set<String> getAllGraphIds() {
* @throws CacheOperationException if there was an error trying to add to the cache
*/
public void addGraphToCache(final Graph graph, final FederatedAccess access, final boolean overwrite) throws CacheOperationException {
String graphId = graph.getGraphId();
Pair<GraphSerialisable, FederatedAccess> pair = new Pair<>(new GraphSerialisable.Builder().graph(graph).build(), access);
addGraphToCache(new GraphSerialisable.Builder().graph(graph).build(), access, overwrite);
}

/**
* Add the specified {@link Graph} to the cache.
*
* @param graphSerialisable the serialised {@link Graph} to be added
* @param access Access for the graph being stored.
* @param overwrite if true, overwrite any graphs already in the cache with the same ID
* @throws CacheOperationException if there was an error trying to add to the cache
*/
public void addGraphToCache(final GraphSerialisable graphSerialisable, final FederatedAccess access, final boolean overwrite) throws CacheOperationException {
String graphId = graphSerialisable.getDeserialisedConfig().getGraphId();
Pair<GraphSerialisable, FederatedAccess> pair = new Pair<>(graphSerialisable, access);
try {
addToCache(graphId, pair, overwrite);
} catch (final CacheOperationException e) {
throw new CacheOperationException(String.format(ERROR_ADDING_GRAPH_TO_CACHE_GRAPH_ID_S, graphId), e.getCause());
}
}

public void deleteGraphFromCache(final String graphId) {
super.deleteFromCache(graphId);
}

/**
* Retrieve the {@link Graph} with the specified ID from the cache.
*
Expand All @@ -88,6 +104,6 @@ public GraphSerialisable getGraphSerialisableFromCache(final String graphId) {

public FederatedAccess getAccessFromCache(final String graphId) {
final Pair<GraphSerialisable, FederatedAccess> fromCache = getFromCache(graphId);
return fromCache.getSecond();
return (isNull(fromCache)) ? null : fromCache.getSecond();
}
}
Expand Up @@ -92,7 +92,7 @@ public void shouldDeleteFromCache() throws CacheOperationException {
assertEquals(1, cachedGraphIds.size());
assertTrue(cachedGraphIds.contains(testGraph.getGraphId()));

federatedStoreCache.deleteFromCache(testGraph.getGraphId());
federatedStoreCache.deleteGraphFromCache(testGraph.getGraphId());
Set<String> cachedGraphIdsAfterDelete = federatedStoreCache.getAllGraphIds();
assertEquals(0, cachedGraphIdsAfterDelete.size());
}
Expand All @@ -111,7 +111,7 @@ public void shouldThrowExceptionIfGraphAlreadyExistsInCache() throws CacheOperat
@Test
public void shouldThrowExceptionIfGraphIdToBeRemovedIsNull() throws CacheOperationException {
federatedStoreCache.addGraphToCache(testGraph, null, false);
federatedStoreCache.deleteFromCache(null);
federatedStoreCache.deleteGraphFromCache(null);
assertEquals(1, federatedStoreCache.getAllGraphIds().size());
}

Expand Down
Expand Up @@ -16,8 +16,10 @@

package uk.gov.gchq.gaffer.federatedstore.integration;

import org.junit.AfterClass;
import org.junit.jupiter.api.BeforeEach;

import uk.gov.gchq.gaffer.cache.CacheServiceLoader;
import uk.gov.gchq.gaffer.commonutil.StreamUtil;
import uk.gov.gchq.gaffer.federatedstore.FederatedStoreProperties;
import uk.gov.gchq.gaffer.graph.Graph;
Expand All @@ -36,6 +38,16 @@ public void setUp() throws Exception {
_setUp();
}

@AfterClass
public void tearDown() throws Exception {
CacheServiceLoader.shutdown();
_tearDown();
}

private void _tearDown() throws Exception {
}


protected void _setUp() throws Exception {
// Override if required;
}
Expand Down

0 comments on commit 4393151

Please sign in to comment.