Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDDS-9439. Refactor sortDatanodes to OM #5428

Closed
wants to merge 12 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ public NetworkTopologyImpl(ConfigurationSource conf) {
schemaManager.getCost(NetConstants.ROOT_LEVEL));
}

public NetworkTopologyImpl(String schemaFile) {
schemaManager = NodeSchemaManager.getInstance();
schemaManager.init(schemaFile);
maxLevel = schemaManager.getMaxLevel();
factory = InnerNodeImpl.FACTORY;
clusterTree = factory.newInnerNode(ROOT, null, null,
NetConstants.ROOT_LEVEL,
schemaManager.getCost(NetConstants.ROOT_LEVEL));
}

@VisibleForTesting
public NetworkTopologyImpl(NodeSchemaManager manager) {
schemaManager = manager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,21 @@ public void init(ConfigurationSource conf) {
}
}

public void init(String schemaFile) {
NodeSchemaLoadResult result;
try {
result = NodeSchemaLoader.getInstance().loadSchemaFromFile(schemaFile);
allSchema = result.getSchemaList();
enforcePrefix = result.isEnforePrefix();
maxLevel = allSchema.size();
} catch (Throwable e) {
String msg = "Failed to load schema file:" + schemaFile
+ ", error: " + e.getMessage();
LOG.error(msg, e);
throw new RuntimeException(msg, e);
}
}

@VisibleForTesting
public void init(NodeSchema[] schemas, boolean enforce) {
allSchema = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,18 @@ public final class OzoneConfigKeys {
public static final String OZONE_SCM_CLOSE_CONTAINER_WAIT_DURATION =
"ozone.scm.close.container.wait.duration";

public static final String
OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_REFRESH_DURATION =
"ozone.scm.network.topology.schema.file.refresh.duration";
public static final String
OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_REFRESH_DURATION_DEFAULT = "3h";

public static final String
OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_CHECK_DURATION =
"ozone.scm.network.topology.schema.file.check.duration";
public static final String
OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_CHECK_DURATION_DEFAULT = "5m";

/**
* There is no need to instantiate this class.
*/
Expand Down
17 changes: 17 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3649,6 +3649,23 @@
<description>Wait duration before which close container
is send to DN.</description>
</property>
<property>
<name>ozone.scm.network.topology.schema.file.refresh.duration</name>
<value>3h</value>
<tag>SCM, OZONE</tag>
<description>The duration at which we periodically fetch the updated network topology schema file from SCM.
</description>
</property>
<property>
<name>ozone.scm.network.topology.schema.file.check.duration</name>
<value>5m</value>
<tag>SCM, OZONE, RECON</tag>
<description>The duration at which we periodically check if it is the time for fetching the updated network
topology schema file from SCM. Example: If ozone.scm.network.topology.schema.file.refresh.duration=3d and
ozone.scm.network.topology.schema.file.check.duration=10m, the actual refresh duration will occur for each
3h +/- 5m.
</description>
</property>
<property>
<name>ozone.scm.ha.ratis.server.snapshot.creation.gap</name>
<value>1024</value>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.client;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_REFRESH_DURATION;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_REFRESH_DURATION_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_CHECK_DURATION;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_CHECK_DURATION_DEFAULT;

/**
* This client implements a background thread which periodically checks and
* gets the latest network topology schema file from SCM.
*/
public class ScmBlockLocationClient {
private static final Logger LOG =
LoggerFactory.getLogger(ScmBlockLocationClient.class);

private final ScmBlockLocationProtocol scmBlockLocationProtocol;
private final AtomicReference<String> cache = new AtomicReference<>();
private ScheduledExecutorService executorService;

public ScmBlockLocationClient(
ScmBlockLocationProtocol scmBlockLocationProtocol) {
this.scmBlockLocationProtocol = scmBlockLocationProtocol;
}

public String getTopologyInformation() {
return requireNonNull(cache.get(),
"ScmBlockLocationClient must have been initialized already.");
}

public void refetchTopologyInformation() {
checkAndRefresh(Duration.ZERO, Instant.now());
}

public void start(ConfigurationSource conf) throws IOException {
final String initialTopology =
scmBlockLocationProtocol.getTopologyInformation();
LOG.info("Initial topology information fetched from SCM: {}.",
initialTopology);
cache.set(initialTopology);
scheduleTopologyPoller(conf, Instant.now());
}

public void stop() {
if (executorService != null) {
executorService.shutdown();
try {
if (executorService.awaitTermination(1, TimeUnit.MINUTES)) {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
LOG.error("Interrupted while shutting down executor service.", e);
Thread.currentThread().interrupt();
}
}
}

private void scheduleTopologyPoller(ConfigurationSource conf,
Instant initialInvocation) {
Duration refreshDuration = parseRefreshDuration(conf);
Instant nextRefresh = initialInvocation.plus(refreshDuration);
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("NetworkTopologyPoller")
.setDaemon(true)
.build();
executorService = Executors.newScheduledThreadPool(1, threadFactory);
Duration interval = parseRefreshCheckDuration(conf);
Duration initialDelay = Duration.between(Instant.now(), nextRefresh);

LOG.info("Scheduling NetworkTopologyPoller with initial delay of {} " +
"and interval of {}", initialDelay, interval);
executorService.scheduleAtFixedRate(
() -> checkAndRefresh(refreshDuration, initialInvocation),
initialDelay.toMillis(), interval.toMillis(),
TimeUnit.MILLISECONDS);
}

public static Duration parseRefreshDuration(ConfigurationSource conf) {
long refreshDurationInMs = conf.getTimeDuration(
OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_REFRESH_DURATION,
OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_REFRESH_DURATION_DEFAULT,
TimeUnit.MILLISECONDS);
return Duration.ofMillis(refreshDurationInMs);
}

public static Duration parseRefreshCheckDuration(ConfigurationSource conf) {
long refreshCheckInMs = conf.getTimeDuration(
OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_CHECK_DURATION,
OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_CHECK_DURATION_DEFAULT,
TimeUnit.MILLISECONDS);
return Duration.ofMillis(refreshCheckInMs);
}

private synchronized void checkAndRefresh(Duration refreshDuration,
Instant initialInvocation) {
String current = cache.get();
Instant nextRefresh = initialInvocation.plus(refreshDuration);
if (nextRefresh.isBefore(Instant.now())) {
try {
String newTopology = scmBlockLocationProtocol.getTopologyInformation();
if (!newTopology.equals(current)) {
cache.set(newTopology);
LOG.info("Updated network topology schema file fetched from " +
"SCM: {}.", newTopology);
}
} catch (IOException e) {
throw new UncheckedIOException(
"Error fetching updated network topology schema file from SCM", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* <p>
* Freon related helper classes used for load testing.
*/

/**
* Contains SCM client related classes.
*/
package org.apache.hadoop.hdds.scm.client;
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,6 @@ List<AllocatedBlock> allocateBlock(long size, int numBlocks,
*/
List<DatanodeDetails> sortDatanodes(List<String> nodes,
String clientMachine) throws IOException;

String getTopologyInformation() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.GetTopologyInformationRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.GetTopologyInformationResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.KeyBlocks;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.SortDatanodesRequestProto;
Expand Down Expand Up @@ -320,6 +322,23 @@ public List<DatanodeDetails> sortDatanodes(List<String> nodes,
return results;
}

@Override
public String getTopologyInformation() throws IOException {
GetTopologyInformationRequestProto request =
GetTopologyInformationRequestProto.newBuilder().build();
SCMBlockLocationRequest wrapper =
createSCMBlockRequest(Type.GetTopologyInformation)
.setGetTopologyInformationRequest(request)
.build();

final SCMBlockLocationResponse wrappedResponse =
handleError(submitRequest(wrapper));
GetTopologyInformationResponseProto resp =
wrappedResponse.getGetTopologyInformationResponse();

return resp.getSchemaFile();
}

@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ enum Type {
GetScmInfo = 13;
SortDatanodes = 14;
AddScm = 15;
GetTopologyInformation = 16;
}

message SCMBlockLocationRequest {
Expand All @@ -56,6 +57,7 @@ message SCMBlockLocationRequest {
optional hadoop.hdds.GetScmInfoRequestProto getScmInfoRequest = 13;
optional SortDatanodesRequestProto sortDatanodesRequest = 14;
optional hadoop.hdds.AddScmRequestProto addScmRequestProto = 15;
optional GetTopologyInformationRequestProto getTopologyInformationRequest = 16;
}

message SCMBlockLocationResponse {
Expand All @@ -80,6 +82,7 @@ message SCMBlockLocationResponse {
optional hadoop.hdds.GetScmInfoResponseProto getScmInfoResponse = 13;
optional SortDatanodesResponseProto sortDatanodesResponse = 14;
optional hadoop.hdds.AddScmResponseProto addScmResponse = 15;
optional GetTopologyInformationResponseProto getTopologyInformationResponse = 16;
}

/**
Expand Down Expand Up @@ -226,6 +229,13 @@ message SortDatanodesResponseProto{
repeated DatanodeDetailsProto node = 1;
}

message GetTopologyInformationRequestProto {
}

message GetTopologyInformationResponseProto {
required string schemaFile = 1;
}

/**
* Protocol used from OzoneManager to StorageContainerManager.
* See request and response messages for details of the RPC calls.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteKeyBlocksResultProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.GetTopologyInformationResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationRequest;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationResponse;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SortDatanodesRequestProto;
Expand Down Expand Up @@ -159,6 +160,9 @@ private SCMBlockLocationResponse processMessage(
request.getSortDatanodesRequest(), request.getVersion()
));
break;
case GetTopologyInformation:
response.setGetTopologyInformationResponse(getTopologyInformation());
break;
default:
// Should never happen
throw new IOException("Unknown Operation " + request.getCmdType() +
Expand Down Expand Up @@ -275,4 +279,13 @@ public SortDatanodesResponseProto sortDatanodes(
throw new ServiceException(ex);
}
}

public GetTopologyInformationResponseProto getTopologyInformation()
throws IOException {
GetTopologyInformationResponseProto.Builder resp =
GetTopologyInformationResponseProto.newBuilder();
String schemaFile = impl.getTopologyInformation();
resp.setSchemaFile(schemaFile);
return resp.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
Expand Down Expand Up @@ -335,6 +336,14 @@ public boolean addSCM(AddSCMRequest request) throws IOException {
}
}

@Override
public String getTopologyInformation() {
String schemaFile = conf.get(
ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE,
ScmConfigKeys.OZONE_SCM_NETWORK_TOPOLOGY_SCHEMA_FILE_DEFAULT);
return schemaFile;
}

@Override
public List<DatanodeDetails> sortDatanodes(List<String> nodes,
String clientMachine) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ public static boolean isReadOnly(
case TenantListUser:
case ListSnapshot:
case RefetchSecretKey:
case RefetchTopologyInformation:
case RangerBGSync:
// RangerBGSync is a read operation in the sense that it doesn't directly
// write to OM DB. And therefore it doesn't need a OMClientRequest.
Expand Down
Loading