Skip to content

Commit

Permalink
HDDS-8076. Use container cache in Key listing API. (#4346)
Browse files Browse the repository at this point in the history
  • Loading branch information
duongkame committed Mar 15, 2023
1 parent 62692bc commit f83b008
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersion;
Expand Down Expand Up @@ -134,6 +136,7 @@
import static org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType.KEY;
import static org.apache.hadoop.util.Time.monotonicNow;

import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -1520,7 +1523,8 @@ public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
if (args.getLatestVersionLocation()) {
slimLocationVersion(keyInfoList.toArray(new OmKeyInfo[0]));
}
refreshPipeline(keyInfoList);

refreshPipelineFromCache(keyInfoList);

if (args.getSortDatanodes()) {
sortDatanodes(clientAddress, keyInfoList.toArray(new OmKeyInfo[0]));
Expand Down Expand Up @@ -1652,7 +1656,7 @@ private List<OzoneFileStatus> sortPipelineInfo(
// refreshPipeline flag check has been removed as part of
// https://issues.apache.org/jira/browse/HDDS-3658.
// Please refer this jira for more details.
refreshPipeline(keyInfoList);
refreshPipelineFromCache(keyInfoList);

if (omKeyArgs.getSortDatanodes()) {
sortDatanodes(clientAddress, keyInfoList.toArray(new OmKeyInfo[0]));
Expand Down Expand Up @@ -1914,18 +1918,39 @@ public OmKeyInfo getKeyInfo(OmKeyArgs args, String clientAddress)
return value;
}

private void refreshPipelineFromCache(Iterable<OmKeyInfo> keyInfos)
throws IOException {
Set<Long> containerIds = new HashSet<>();
for (OmKeyInfo keyInfo : keyInfos) {
extractContainerIDs(keyInfo).forEach(containerIds::add);
}

// List API never force cache refresh. If a client detects a block
// location is outdated, it'll call getKeyInfo with cacheRefresh=true
// to request cache refresh on individual container.
Map<Long, Pipeline> containerLocations =
scmClient.getContainerLocations(containerIds, false);

for (OmKeyInfo keyInfo : keyInfos) {
setUpdatedContainerLocation(keyInfo, containerLocations);
}
}

protected void refreshPipelineFromCache(OmKeyInfo keyInfo,
boolean forceRefresh)
throws IOException {
Set<Long> containerIds = keyInfo.getKeyLocationVersions().stream()
.flatMap(v -> v.getLocationList().stream())
.map(BlockLocationInfo::getContainerID)
Set<Long> containerIds = extractContainerIDs(keyInfo)
.collect(Collectors.toSet());

metrics.setForceContainerCacheRefresh(forceRefresh);
Map<Long, Pipeline> containerLocations =
scmClient.getContainerLocations(containerIds, forceRefresh);

setUpdatedContainerLocation(keyInfo, containerLocations);
}

private void setUpdatedContainerLocation(OmKeyInfo keyInfo,
Map<Long, Pipeline> containerLocations) {
for (OmKeyLocationInfoGroup key : keyInfo.getKeyLocationVersions()) {
for (List<OmKeyLocationInfo> omKeyLocationInfoList :
key.getLocationLists()) {
Expand All @@ -1940,4 +1965,11 @@ protected void refreshPipelineFromCache(OmKeyInfo keyInfo,
}
}
}

@NotNull
private Stream<Long> extractContainerIDs(OmKeyInfo keyInfo) {
return keyInfo.getKeyLocationVersions().stream()
.flatMap(v -> v.getLocationList().stream())
.map(BlockLocationInfo::getContainerID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,18 @@

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.CacheLoader.InvalidCacheLoadException;
import com.google.common.cache.LoadingCache;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.update.client.SCMUpdateServiceGrpcClient;
import org.apache.hadoop.util.CacheMetrics;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -49,7 +50,6 @@ public class ScmClient {
private final StorageContainerLocationProtocol containerClient;
private final LoadingCache<Long, Pipeline> containerLocationCache;
private final CacheMetrics containerCacheMetrics;
private SCMUpdateServiceGrpcClient updateServiceGrpcClient;

ScmClient(ScmBlockLocationProtocol blockClient,
StorageContainerLocationProtocol containerClient,
Expand Down Expand Up @@ -104,15 +104,6 @@ public StorageContainerLocationProtocol getContainerClient() {
return this.containerClient;
}

public void setUpdateServiceGrpcClient(
SCMUpdateServiceGrpcClient updateClient) {
this.updateServiceGrpcClient = updateClient;
}

public SCMUpdateServiceGrpcClient getUpdateServiceGrpcClient() {
return updateServiceGrpcClient;
}

public Map<Long, Pipeline> getContainerLocations(Iterable<Long> containerIds,
boolean forceRefresh)
throws IOException {
Expand All @@ -123,6 +114,18 @@ public Map<Long, Pipeline> getContainerLocations(Iterable<Long> containerIds,
return containerLocationCache.getAll(containerIds);
} catch (ExecutionException e) {
return handleCacheExecutionException(e);
} catch (InvalidCacheLoadException e) {
// this is thrown when a container is not found from SCM.
// In this case, return available, instead of propagating the
// exception to client code.
Map<Long, Pipeline> result = new HashMap<>();
for (Long containerId : containerIds) {
Pipeline p = containerLocationCache.getIfPresent(containerId);
if (p != null) {
result.put(containerId, p);
}
}
return result;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.HashMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -84,6 +85,7 @@
import static java.util.Collections.singletonList;
import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toList;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -574,7 +576,7 @@ public void listStatus() throws Exception {
.map(DatanodeDetails::getUuidString)
.collect(toList());

List<Long> containerIDs = new ArrayList<>();
Set<Long> containerIDs = new HashSet<>();
List<ContainerWithPipeline> containersWithPipeline = new ArrayList<>();
for (long i = 1; i <= 10; i++) {
final OmKeyLocationInfo keyLocationInfo = new OmKeyLocationInfo.Builder()
Expand Down Expand Up @@ -622,6 +624,12 @@ public void listStatus() throws Exception {
Assert.assertEquals(10, fileStatusList.size());
verify(containerClient).getContainerWithPipelineBatch(containerIDs);
verify(blockClient).sortDatanodes(nodes, client);

// call list status the second time, and verify no more calls to
// SCM.
keyManager.listStatus(builder.build(), false,
null, Long.MAX_VALUE, client);
verify(containerClient, times(1)).getContainerWithPipelineBatch(anySet());
}

@Test
Expand Down

0 comments on commit f83b008

Please sign in to comment.