From 843d48c9649ba68c6f17f9bed96c0179e9e54c36 Mon Sep 17 00:00:00 2001 From: Minni Mittal Date: Wed, 29 Dec 2021 12:38:13 +0530 Subject: [PATCH] YARN-11025. Implement distributed decommissioning --- .../hadoop/yarn/api/MaintenanceTracker.java | 34 ++++ .../SetMaintenanceModeRequest.java | 63 ++++++++ .../SetMaintenanceModeResponse.java | 52 ++++++ .../hadoop/yarn/conf/YarnConfiguration.java | 22 +++ .../src/main/proto/maintenance_tracker.proto | 35 ++++ .../src/main/proto/yarn_service_protos.proto | 12 ++ .../hadoop/yarn/api/MaintenanceTrackerPB.java | 34 ++++ .../MaintenanceTrackerPBClientImpl.java | 80 ++++++++++ .../MaintenanceTrackerPBServiceImpl.java | 58 +++++++ .../pb/SetMaintenanceModeRequestPBImpl.java | 120 ++++++++++++++ .../pb/SetMaintenanceModeResponsePBImpl.java | 96 +++++++++++ .../yarn/server/api/records/NodeStatus.java | 33 ++++ .../api/records/impl/pb/NodeStatusPBImpl.java | 15 ++ .../proto/yarn_server_common_protos.proto | 1 + .../yarn/server/nodemanager/NodeManager.java | 27 ++++ .../nodemanager/NodeStatusUpdaterImpl.java | 10 +- .../MaintenanceModeConfiguration.java | 55 +++++++ .../maintenance/MaintenanceModeConsumer.java | 149 ++++++++++++++++++ .../maintenance/MaintenanceModeEvent.java | 51 ++++++ .../maintenance/MaintenanceModeEventType.java | 28 ++++ .../maintenance/MaintenanceModeProvider.java | 125 +++++++++++++++ .../maintenance/MaintenanceModeService.java | 148 +++++++++++++++++ ...MHeartbeatMaintenanceModeConsumerImpl.java | 57 +++++++ .../MockMaintenanceModeConsumer.java | 41 +++++ .../MockMaintenanceModeProvider.java | 46 ++++++ .../TestMaintenanceModeService.java | 147 +++++++++++++++++ .../DecommissioningNodesWatcher.java | 34 +++- .../resourcemanager/rmnode/RMNodeImpl.java | 7 +- .../rmnode/RMNodeStatusEvent.java | 4 +- 29 files changed, 1574 insertions(+), 10 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/MaintenanceTracker.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SetMaintenanceModeRequest.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SetMaintenanceModeResponse.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/maintenance_tracker.proto create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/MaintenanceTrackerPB.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/MaintenanceTrackerPBClientImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/MaintenanceTrackerPBServiceImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SetMaintenanceModeRequestPBImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SetMaintenanceModeResponsePBImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeConfiguration.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeConsumer.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeEvent.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeEventType.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeProvider.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeService.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/NMHeartbeatMaintenanceModeConsumerImpl.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MockMaintenanceModeConsumer.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MockMaintenanceModeProvider.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/TestMaintenanceModeService.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/MaintenanceTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/MaintenanceTracker.java new file mode 100644 index 0000000000000..e9f6b40ce087f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/MaintenanceTracker.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.protocolrecords.SetMaintenanceModeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SetMaintenanceModeResponse; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * Set the maintenance mode of the node, and returns the number of running + * containers on the node. + */ +public interface MaintenanceTracker { + SetMaintenanceModeResponse setMaintenanceMode( + SetMaintenanceModeRequest request) throws YarnException, IOException; +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SetMaintenanceModeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SetMaintenanceModeRequest.java new file mode 100644 index 0000000000000..74fae26a8881e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SetMaintenanceModeRequest.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.yarn.util.Records; + +/** + * This is used by DecommissioningProviders to set a node into Maintenance Mode. + * This class is used by the interface MaintenanceTracker. + */ +public abstract class SetMaintenanceModeRequest { + /** + * Instantiates new SetMaintenanceModeRequest which accepts maintenance flag. + * + * @param maintenance + * @param minimumMaintenanceTime + * @return + */ + public static SetMaintenanceModeRequest newInstance(boolean maintenance, + long minimumMaintenanceTime) { + SetMaintenanceModeRequest request = + Records.newRecord(SetMaintenanceModeRequest.class); + request.setMaintenance(maintenance); + request.setMinimumMaintenanceTime(minimumMaintenanceTime); + return request; + } + + /** + * Returns maintenance state, returns true if ON else false. + * + * @return + */ + public abstract boolean isMaintenance(); + + public abstract void setMaintenance(boolean maintenance); + + /** + * Gets minimum maintenance time if set in request or default + * value is returned. + * + * @return + */ + public abstract long getMinimumMaintenanceTime(); + + public abstract void setMinimumMaintenanceTime(long value); +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SetMaintenanceModeResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SetMaintenanceModeResponse.java new file mode 100644 index 0000000000000..1c0f74b750c56 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/SetMaintenanceModeResponse.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords; + +import org.apache.hadoop.yarn.util.Records; + +/** + * The response the services get when request is sent to put a node into + * maintenance ON/OFF. + */ +public abstract class SetMaintenanceModeResponse { + public static SetMaintenanceModeResponse newInstance( + long runningContainersCount, + long elapsedTimeSinceLastRetentionInMilliseconds) { + SetMaintenanceModeResponse response = + Records.newRecord(SetMaintenanceModeResponse.class); + response.setRunningContainersCount(runningContainersCount); + response.setElapsedTimeSinceLastRetentionInMilliseconds( + elapsedTimeSinceLastRetentionInMilliseconds); + return response; + } + + /** + * Gets the running containers on the node. + * + * @return + */ + public abstract long getRunningContainersCount(); + + public abstract void setRunningContainersCount(long value); + + public abstract long getElapsedTimeSinceLastRetentionInMilliseconds(); + + public abstract void setElapsedTimeSinceLastRetentionInMilliseconds( + long value); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 57cc247a941cb..5cfd9723f2a1b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -4620,6 +4620,28 @@ public static boolean areNodeLabelsEnabled( public static final String DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX = "workflowid:"; + /** Maintenance tracker ports and host name properties */ + public static final int DEFAULT_NM_MAINTENANCE_TRACKER_PORT = 9034; + public static final String DEFAULT_NM_MAINTENANCE_TRACKER_ADDRESS = + "0.0.0.0:" + DEFAULT_NM_MAINTENANCE_TRACKER_PORT; + + /** Timeout for a Node to react to the maintenance command. */ + public static final String MAINTENANCE_COMMAND_TIMEOUT = + YarnConfiguration.YARN_PREFIX + "rpc.maintenance-command-timeout"; + + public static final int DEFAULT_MAINTENANCE_COMMAND_TIMEOUT_MS = 60000; + + public static final String NM_MAINTENANCE_MODE_PROVIDER_CLASSNAMES = + YarnConfiguration.NM_PREFIX + "maintenance.mode.provider.classnames"; + public static final String NM_MAINTENANCE_MODE_CONSUMER_CLASSNAMES = + YarnConfiguration.NM_PREFIX + "maintenance.mode.consumer.classnames"; + + /** + * All required NM Services classes need to be added in config for it to + * be started. + */ + public static final String YARNPP_NM_SERVICES = "yarnpp.nm.services"; + public YarnConfiguration() { super(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/maintenance_tracker.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/maintenance_tracker.proto new file mode 100644 index 0000000000000..6af6773325235 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/maintenance_tracker.proto @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * These .proto interfaces are public and stable. + * Please see http://wiki.apache.org/hadoop/Compatibility + * for what changes are allowed for a *stable* .proto interface. + */ + +option java_package = "org.apache.hadoop.yarn.proto"; +option java_outer_classname = "MaintenanceTracker"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +package hadoop.yarn; + +import "yarn_service_protos.proto"; + +service MaintenanceTrackerService { + rpc setMaintenanceMode(SetMaintenanceModeRequestProto) returns (SetMaintenanceModeResponseProto); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto index 8a0273d7a792e..9f1287b509902 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto @@ -573,3 +573,15 @@ message ContainerLocalizationStatusesProto { optional ContainerIdProto container_id = 1; repeated LocalizationStatusProto localization_statuses = 2; } + +/////// MaintenanceTracker protos ///////////////// +////////////////////////////////////////////////////// +message SetMaintenanceModeRequestProto { + optional bool maintenance = 1; + optional int64 minimum_maintenance_time = 2; +} + +message SetMaintenanceModeResponseProto { + optional int64 running_containers_count = 1; + optional int64 elapsed_time_since_last_retention_in_milliseconds = 2 [default = 18000000]; +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/MaintenanceTrackerPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/MaintenanceTrackerPB.java new file mode 100644 index 0000000000000..0b2dd069ff198 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/MaintenanceTrackerPB.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.ipc.ProtocolInfo; +import org.apache.hadoop.yarn.proto.MaintenanceTracker.MaintenanceTrackerService; + +@Private +@Unstable +@ProtocolInfo( + protocolName = "org.apache.hadoop.yarn.api.MaintenanceTrackerPB", + protocolVersion = 1) +public interface MaintenanceTrackerPB extends + MaintenanceTrackerService.BlockingInterface { + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/MaintenanceTrackerPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/MaintenanceTrackerPBClientImpl.java new file mode 100644 index 0000000000000..51338d6dd288b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/MaintenanceTrackerPBClientImpl.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.impl.pb.client; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.ProtobufRpcEngine2; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.MaintenanceTracker; +import org.apache.hadoop.yarn.api.MaintenanceTrackerPB; +import org.apache.hadoop.yarn.api.protocolrecords.SetMaintenanceModeRequest; +import org.apache.hadoop.yarn.api.protocolrecords.SetMaintenanceModeResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SetMaintenanceModeRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SetMaintenanceModeResponsePBImpl; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.ipc.RPCUtil; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SetMaintenanceModeRequestProto; + +import org.apache.hadoop.thirdparty.protobuf.ServiceException; + +/** + * Client to set a node into Decommissioning mode. + */ +public class MaintenanceTrackerPBClientImpl + implements MaintenanceTracker, Closeable { + private MaintenanceTrackerPB proxy; + + public MaintenanceTrackerPBClientImpl(long clientVersion, + InetSocketAddress addr, Configuration conf) throws IOException { + RPC.setProtocolEngine(conf, MaintenanceTrackerPB.class, + ProtobufRpcEngine2.class); + UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + this.proxy = (MaintenanceTrackerPB) RPC + .getProxy(MaintenanceTrackerPB.class, clientVersion, addr, ugi, conf, + NetUtils.getDefaultSocketFactory(conf), + conf.getInt(YarnConfiguration.MAINTENANCE_COMMAND_TIMEOUT, + YarnConfiguration.DEFAULT_MAINTENANCE_COMMAND_TIMEOUT_MS)); + } + + @Override public void close() throws IOException { + if (this.proxy != null) { + RPC.stopProxy(this.proxy); + } + } + + @Override public SetMaintenanceModeResponse setMaintenanceMode( + SetMaintenanceModeRequest request) throws YarnException, IOException { + SetMaintenanceModeRequestProto protoRequest = + ((SetMaintenanceModeRequestPBImpl) request).getProto(); + try { + return new SetMaintenanceModeResponsePBImpl( + this.proxy.setMaintenanceMode(null, protoRequest)); + } catch (ServiceException e) { + RPCUtil.unwrapAndThrowException(e); + return null; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/MaintenanceTrackerPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/MaintenanceTrackerPBServiceImpl.java new file mode 100644 index 0000000000000..92682eb41507a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/service/MaintenanceTrackerPBServiceImpl.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.impl.pb.service; + +import org.apache.hadoop.thirdparty.protobuf.RpcController; +import org.apache.hadoop.thirdparty.protobuf.ServiceException; +import org.apache.hadoop.yarn.api.MaintenanceTracker; +import org.apache.hadoop.yarn.api.MaintenanceTrackerPB; +import org.apache.hadoop.yarn.api.protocolrecords.SetMaintenanceModeResponse; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SetMaintenanceModeRequestPBImpl; +import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SetMaintenanceModeResponsePBImpl; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.proto.YarnServiceProtos; + +import java.io.IOException; + +public class MaintenanceTrackerPBServiceImpl implements MaintenanceTrackerPB { + + private MaintenanceTracker real; + + public MaintenanceTrackerPBServiceImpl(MaintenanceTracker impl) { + this.real = impl; + } + + @Override + public YarnServiceProtos.SetMaintenanceModeResponseProto setMaintenanceMode( + RpcController controller, + YarnServiceProtos.SetMaintenanceModeRequestProto proto) + throws ServiceException { + SetMaintenanceModeRequestPBImpl request = + new SetMaintenanceModeRequestPBImpl(proto); + SetMaintenanceModeResponse response; + try { + response = real.setMaintenanceMode(request); + } catch (YarnException e) { + throw new ServiceException(e); + } catch (IOException e) { + throw new ServiceException(e); + } + return ((SetMaintenanceModeResponsePBImpl) response).getProto(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SetMaintenanceModeRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SetMaintenanceModeRequestPBImpl.java new file mode 100644 index 0000000000000..e376b5361f4c5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SetMaintenanceModeRequestPBImpl.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.SetMaintenanceModeRequest; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SetMaintenanceModeRequestProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SetMaintenanceModeRequestProtoOrBuilder; + +public class SetMaintenanceModeRequestPBImpl extends SetMaintenanceModeRequest { + SetMaintenanceModeRequestProto proto = + SetMaintenanceModeRequestProto.getDefaultInstance(); + SetMaintenanceModeRequestProto.Builder builder = null; + boolean viaProto = false; + + private static final long DEFAULT_MAINTENANCE_TIME = 0L; + private long minimumMaintenanceTime = DEFAULT_MAINTENANCE_TIME; + + public SetMaintenanceModeRequestPBImpl() { + builder = SetMaintenanceModeRequestProto.newBuilder(); + } + + public SetMaintenanceModeRequestPBImpl(SetMaintenanceModeRequestProto proto) { + this.proto = proto; + viaProto = true; + } + + public SetMaintenanceModeRequestProto getProto() { + mergeLocalToProto(); + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + private void mergeLocalToProto() { + if (viaProto) + maybeInitBuilder(); + mergeLocalToBuilder(); + proto = builder.build(); + viaProto = true; + } + + private void mergeLocalToBuilder() { + if (minimumMaintenanceTime >= DEFAULT_MAINTENANCE_TIME) { + this.builder.setMinimumMaintenanceTime(this.minimumMaintenanceTime); + } + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SetMaintenanceModeRequestProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public void setMaintenance(boolean maintenance) { + maybeInitBuilder(); + builder.setMaintenance(maintenance); + } + + @Override + public boolean isMaintenance() { + SetMaintenanceModeRequestProtoOrBuilder p = viaProto ? proto : builder; + if (!p.hasMaintenance()) { + return false; + } + return p.getMaintenance(); + } + + @Override + public long getMinimumMaintenanceTime() { + if (this.minimumMaintenanceTime <= DEFAULT_MAINTENANCE_TIME) { + SetMaintenanceModeRequestProtoOrBuilder p = viaProto ? proto : builder; + this.minimumMaintenanceTime = p.hasMinimumMaintenanceTime() ? + p.getMinimumMaintenanceTime() : + DEFAULT_MAINTENANCE_TIME; + } + return this.minimumMaintenanceTime; + } + + @Override + public void setMinimumMaintenanceTime(long value) { + maybeInitBuilder(); + if (value < DEFAULT_MAINTENANCE_TIME) { + this.builder.clearMinimumMaintenanceTime(); + } + this.minimumMaintenanceTime = value; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SetMaintenanceModeResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SetMaintenanceModeResponsePBImpl.java new file mode 100644 index 0000000000000..cf592ed64ff7d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/SetMaintenanceModeResponsePBImpl.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.protocolrecords.impl.pb; + +import org.apache.hadoop.yarn.api.protocolrecords.SetMaintenanceModeResponse; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SetMaintenanceModeResponseProto; +import org.apache.hadoop.yarn.proto.YarnServiceProtos.SetMaintenanceModeResponseProtoOrBuilder; + +public class SetMaintenanceModeResponsePBImpl + extends SetMaintenanceModeResponse { + SetMaintenanceModeResponseProto proto = + SetMaintenanceModeResponseProto.getDefaultInstance(); + SetMaintenanceModeResponseProto.Builder builder = null; + boolean viaProto = false; + + public SetMaintenanceModeResponsePBImpl() { + builder = SetMaintenanceModeResponseProto.newBuilder(); + } + + public SetMaintenanceModeResponsePBImpl( + SetMaintenanceModeResponseProto proto) { + this.proto = proto; + viaProto = true; + } + + public SetMaintenanceModeResponseProto getProto() { + proto = viaProto ? proto : builder.build(); + viaProto = true; + return proto; + } + + @Override + public int hashCode() { + return getProto().hashCode(); + } + + @Override + public boolean equals(Object other) { + if (other == null) + return false; + if (other.getClass().isAssignableFrom(this.getClass())) { + return this.getProto().equals(this.getClass().cast(other).getProto()); + } + return false; + } + + private void maybeInitBuilder() { + if (viaProto || builder == null) { + builder = SetMaintenanceModeResponseProto.newBuilder(proto); + } + viaProto = false; + } + + @Override + public long getRunningContainersCount() { + SetMaintenanceModeResponseProtoOrBuilder p = + viaProto ? this.proto : this.builder; + return p.getRunningContainersCount(); + } + + @Override + public void setRunningContainersCount(long value) { + maybeInitBuilder(); + this.builder.setRunningContainersCount(value); + } + + @Override + public long getElapsedTimeSinceLastRetentionInMilliseconds() { + SetMaintenanceModeResponseProtoOrBuilder p = + viaProto ? this.proto : this.builder; + return p.getElapsedTimeSinceLastRetentionInMilliseconds(); + } + + @Override + public void setElapsedTimeSinceLastRetentionInMilliseconds( + long value) { + maybeInitBuilder(); + this.builder.setElapsedTimeSinceLastRetentionInMilliseconds(value); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index 440cd0a290294..78da85c0758c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -72,6 +72,33 @@ public static NodeStatus newInstance(NodeId nodeId, int responseId, return nodeStatus; } + /** + * Create a new {@code NodeStatus}. + * @param nodeId Identifier for this node. + * @param responseId Identifier for the response. + * @param containerStatuses Status of the containers running in this node. + * @param keepAliveApplications Applications to keep alive. + * @param nodeHealthStatus Health status of the node. + * @param containersUtilization Utilization of the containers in this node. + * @param nodeUtilization Utilization of the node. + * @param increasedContainers Containers whose resource has been increased. + * @param maintenance Node is marked for maintenance. + * @return New {@code NodeStatus} with the provided information. + */ + public static NodeStatus newInstance(NodeId nodeId, int responseId, + List containerStatuses, + List keepAliveApplications, + NodeHealthStatus nodeHealthStatus, + ResourceUtilization containersUtilization, + ResourceUtilization nodeUtilization, + List increasedContainers, boolean maintenance) { + NodeStatus nodeStatus = newInstance(nodeId, responseId, containerStatuses, + keepAliveApplications, nodeHealthStatus, containersUtilization, + nodeUtilization, increasedContainers); + nodeStatus.setMaintenance(maintenance); + return nodeStatus; + } + public abstract NodeId getNodeId(); public abstract int getResponseId(); @@ -132,4 +159,10 @@ public abstract void setIncreasedContainers( @Unstable public abstract void setOpportunisticContainersStatus( OpportunisticContainersStatus opportunisticContainersStatus); + + @Private + public abstract void setMaintenance(boolean maintenance); + + @Private + public abstract boolean isMaintenance(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 8aebc6fa913ce..cc87f554301c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -425,6 +425,21 @@ public synchronized void setOpportunisticContainersStatus( convertToProtoFormat(opportunisticContainersStatus)); } + @Override + public void setMaintenance(boolean decommissioningModeState) { + maybeInitBuilder(); + this.builder.setMaintenance(decommissioningModeState); + } + + @Override + public boolean isMaintenance() { + NodeStatusProtoOrBuilder p = this.viaProto ? this.proto : this.builder; + if (!p.hasMaintenance()) { + return false; + } + return p.getMaintenance(); + } + private NodeIdProto convertToProtoFormat(NodeId nodeId) { return ((NodeIdPBImpl)nodeId).getProto(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index ea8df4fb800dc..492cf3f9af500 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -41,6 +41,7 @@ message NodeStatusProto { optional ResourceUtilizationProto node_utilization = 7; repeated ContainerProto increased_containers = 8; optional OpportunisticContainersStatusProto opportunistic_containers_status = 9; + optional bool maintenance = 10; } message OpportunisticContainersStatusProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 81e60361dff16..590ef500eb3e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -29,6 +29,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.GenericOptionsParser; @@ -85,6 +86,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.lang.reflect.Constructor; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -481,6 +483,22 @@ protected void serviceInit(Configuration conf) throws Exception { ((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater); nmStore.setNodeStatusUpdater(nodeStatusUpdater); + String[] yarnppServices = + conf.getStrings(YarnConfiguration.YARNPP_NM_SERVICES); + if (yarnppServices != null) { + for (String yarnppService : yarnppServices) { + Class clazz = conf.getClassByNameOrNull(yarnppService); + + if (clazz == null || !AbstractService.class.isAssignableFrom(clazz)) + throw new RuntimeException( + clazz + " does not implement " + AbstractService.class); + + Constructor cons = clazz.getConstructor(NodeManager.class); + AbstractService service = (AbstractService) cons.newInstance(this); + addService(service); + } + } + // Do secure login before calling init for added services. try { doSecureLogin(); @@ -647,6 +665,7 @@ public static class NMContext implements Context { private boolean isDecommissioned = false; private final ConcurrentLinkedQueue logAggregationReportForApps; + private boolean maintenance = false; private NodeStatusUpdater nodeStatusUpdater; private final boolean isDistSchedulingEnabled; private DeletionService deletionService; @@ -804,6 +823,14 @@ public void setSystemCrendentialsForApps( return this.logAggregationReportForApps; } + public boolean isMaintenance() { + return this.maintenance; + } + + public void setMaintenance(boolean maintenance) { + this.maintenance = maintenance; + } + public NodeStatusUpdater getNodeStatusUpdater() { return this.nodeStatusUpdater; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 4dadf9c62e4c5..64208f8a70e73 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -525,10 +525,12 @@ protected NodeStatus getNodeStatus(int responseId) throws IOException { ResourceUtilization nodeUtilization = getNodeUtilization(); List increasedContainers = getIncreasedContainers(); - NodeStatus nodeStatus = - NodeStatus.newInstance(nodeId, responseId, containersStatuses, - createKeepAliveApplicationList(), nodeHealthStatus, - containersUtilization, nodeUtilization, increasedContainers); + boolean decommissioningMode = ((NMContext) context).isMaintenance(); + NodeStatus nodeStatus = NodeStatus + .newInstance(nodeId, responseId, containersStatuses, + createKeepAliveApplicationList(), nodeHealthStatus, + containersUtilization, nodeUtilization, increasedContainers, + decommissioningMode); nodeStatus.setOpportunisticContainersStatus( getOpportunisticContainersStatus()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeConfiguration.java new file mode 100644 index 0000000000000..92475cb523102 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeConfiguration.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.maintenance; + +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +public class MaintenanceModeConfiguration { + + /** + * Maintenance consumer heartbeat expiry time + */ + public static final String NM_MAINTENANCE_MODE_CONSUMER_HEARTBEAT_EXPIRY_SECONDS = + YarnConfiguration.NM_PREFIX + + "maintenance.mode.consumer.heartbeat.expiry.seconds"; + public static final int NM_MAINTENANCE_MODE_CONSUMER_DEFAULT_HEARTBEAT_EXPIRY_SECONDS = + 5 * 60; + + /** + * Maintenance provider heartbeat interval + */ + public static final String NM_MAINTENANCE_MODE_PROVIDER_HEARTBEAT_INTERVAL_SECONDS = + YarnConfiguration.NM_PREFIX + + "maintenance.mode.provider.heartbeat.interval.seconds"; + public static final int NM_MAINTENANCE_MODE_PROVIDER_DEFAULT_HEARTBEAT_INTERVAL_SECONDS = + 30; + + /** + * Environment flag in container launch context that determines whether the + * container should be counted when in maintenance mode. + */ + public static final String CONTAINER_DRAIN_FLAG = + "yarn.nodemanager.yarnpp.container-drain-flag"; + + /** + * Default flag. + */ + public static final String CONTAINER_DRAIN_FLAG_DEFAULT = + "container_drain_enabled"; +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeConsumer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeConsumer.java new file mode 100644 index 0000000000000..7d9238f55652b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeConsumer.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.maintenance; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.Preconditions; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Maintenance mode consumer class that has the common functionality for + * all the maintenance mode consumers. This class maintains a background + * task what is revived on each heartbeat from provider. If the provider + * does not provide heart beat for a configured timeout then the + * maintenance mode will be turned off on this consumer. + */ +public abstract class MaintenanceModeConsumer extends AbstractService implements + EventHandler { + private static final Logger LOG = LoggerFactory + .getLogger(MaintenanceModeConsumer.class); + + private final NodeManager nodeManager; + private int heartBeatExpirySeconds; + private boolean isMaintenanceModeOn; + private ScheduledFuture heartBeatTaskFuture; + private final ScheduledExecutorService heartBeatExecutor; + + /** + * Instantiates a new maintenance mode consumer. + * + * @param serviceName the service name + * @param nodeManager the node manager + */ + public MaintenanceModeConsumer(String serviceName, NodeManager nodeManager) { + super(serviceName); + this.isMaintenanceModeOn = false; + this.nodeManager = nodeManager; + this.heartBeatExecutor = Executors.newSingleThreadScheduledExecutor(); + } + + /** + * Gets the context. + * + * @return the context + */ + public Context getContext() { + return nodeManager.getNMContext(); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + heartBeatExpirySeconds = + conf.getInt( + MaintenanceModeConfiguration.NM_MAINTENANCE_MODE_CONSUMER_HEARTBEAT_EXPIRY_SECONDS, + MaintenanceModeConfiguration.NM_MAINTENANCE_MODE_CONSUMER_DEFAULT_HEARTBEAT_EXPIRY_SECONDS); + super.serviceInit(conf); + } + + @Override + synchronized public void handle(MaintenanceModeEvent event) { + switch (event.getType()) { + case MAINTENANCE_MODE_ON_HEARTBEAT: { + if (heartBeatTaskFuture != null) { + // remove the previously scheduled event, and schedule another. + // (More comments above on the class level) + heartBeatTaskFuture.cancel(false); + } + LOG.debug("Receiving heartbeat for maintenance mode"); + heartBeatTaskFuture = heartBeatExecutor.schedule(new Runnable() { + @Override + public void run() { + maintenanceModeEndedInternal(); + } + }, heartBeatExpirySeconds, TimeUnit.SECONDS); + maintenanceModeStartedInternal(); + } + break; + default: { + Preconditions + .checkArgument(false, "Unknown maintenance mode event type: {}", + event.getType().toString()); + } + } + } + + /** + * Calls maintenance mode started with try catch as maintenanceModeStarted + * should be implemented by derived class. + */ + private void maintenanceModeStartedInternal() { + try { + if (!isMaintenanceModeOn) { + LOG.info("Maintenance mode started on consumer service {}", getName()); + maintenanceModeStarted(); + isMaintenanceModeOn = true; + } + } catch (Throwable t) { + LOG.error("Exception calling maintenance mode started on service {}. " + + "MaintenanceMode is {}", getName(), isMaintenanceModeOn, t); + } + } + + /** + * Calls maintenance mode ended with try catch as maintenanceModeEnded should + * be implemented by derived class. + */ + private void maintenanceModeEndedInternal() { + try { + if (isMaintenanceModeOn) { + LOG.info("Maintenance mode ended on consumer service {}", getName()); + maintenanceModeEnded(); + isMaintenanceModeOn = false; + } + } catch (Throwable t) { + LOG.error("Exception calling maintenance mode ended on service {}. " + + "MaintenanceMode is {}", getName(), isMaintenanceModeOn, t); + } + + } + + protected abstract void maintenanceModeStarted(); + + protected abstract void maintenanceModeEnded(); +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeEvent.java new file mode 100644 index 0000000000000..9a866a225455a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeEvent.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.maintenance; + +import org.apache.hadoop.yarn.event.AbstractEvent; + +/** + * DecommissioningMode event is used to move NM into maintenance mode. + */ +public class MaintenanceModeEvent + extends AbstractEvent { + + private final String maintenanceModeProviderName; + + /** + * Instantiates a new maintenance mode event. + * + * @param eventType the event type + * @param providerName maintenance mode provider name + */ + public MaintenanceModeEvent(MaintenanceModeEventType eventType, + String providerName) { + super(eventType); + maintenanceModeProviderName = providerName; + } + + /** + * Gets the maintenance mode provider name. + * + * @return the maintenance mode provider name + */ + public String getMaintenanceModeProviderName() { + return maintenanceModeProviderName; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeEventType.java new file mode 100644 index 0000000000000..66e45f49cda02 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeEventType.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.maintenance; + +/** + * MaintenanceModeEvent Enum that is used to send heartbeats to + * maintenance mode consumers when producers what to enable + * maintenance mode. + */ +public enum MaintenanceModeEventType { + MAINTENANCE_MODE_ON_HEARTBEAT +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeProvider.java new file mode 100644 index 0000000000000..6cc487cc05004 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeProvider.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.maintenance; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.nodemanager.Context; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MaintenanceModeProvider provides common functionality across all the + * maintenance mode provider classes. When the derived class calls + * startMaintenanceMode, a background heartbeat thread is started that fires + * events so that all the consumers have the maintenance mode set. + */ +public abstract class MaintenanceModeProvider extends AbstractService { + + private static final Logger LOG = + LoggerFactory.getLogger(MaintenanceModeProvider.class); + + private final NodeManager nodeManager; + private final Dispatcher dispatcher; + private final ScheduledExecutorService taskExecutor; + // Future of the heartbeat task. This should be used to + // cancel the heartbeat task. + private ScheduledFuture heartBeatTaskFuture; + private int heartBeatIntervalSeconds; + + /** + * Instantiates a new maintenance mode provider. + * + * @param serviceName the service name + * @param nodeManager the node manager + * @param dispatcher the dispatcher + */ + public MaintenanceModeProvider(String serviceName, NodeManager nodeManager, + Dispatcher dispatcher) { + super(serviceName); + this.nodeManager = nodeManager; + this.dispatcher = dispatcher; + this.taskExecutor = Executors.newSingleThreadScheduledExecutor(); + } + + /** + * Gets the context. + * + * @return the context + */ + public Context getContext() { + return nodeManager.getNMContext(); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + heartBeatIntervalSeconds = conf.getInt( + MaintenanceModeConfiguration.NM_MAINTENANCE_MODE_PROVIDER_HEARTBEAT_INTERVAL_SECONDS, + MaintenanceModeConfiguration.NM_MAINTENANCE_MODE_PROVIDER_DEFAULT_HEARTBEAT_INTERVAL_SECONDS); + super.serviceInit(conf); + } + + /** + * Start maintenance mode. + */ + synchronized public void startMaintenanceMode() { + // If there is heart beat already going on then just return. + if (heartBeatTaskFuture == null) { + LOG.info("Maintenance mode started by provider service {}", getName()); + heartBeatTaskFuture = taskExecutor.scheduleWithFixedDelay(new Runnable() { + @Override public void run() { + dispatchMaintenanceEvent( + MaintenanceModeEventType.MAINTENANCE_MODE_ON_HEARTBEAT); + } + }, 0, heartBeatIntervalSeconds, TimeUnit.SECONDS); + } + } + + /** + * Stop maintenance mode. + */ + synchronized public void stopMaintenanceMode() { + if (heartBeatTaskFuture != null) { + LOG.info("Maintenance mode ended by provider service {}", getName()); + heartBeatTaskFuture.cancel(false); + heartBeatTaskFuture = null; + } + } + + /** + * Dispatch maintenance event. + * + * @param type the type + */ + @SuppressWarnings("unchecked") + protected void dispatchMaintenanceEvent( + MaintenanceModeEventType type) { + @SuppressWarnings("rawtypes") EventHandler eventHandler = + dispatcher.getEventHandler(); + eventHandler.handle(new MaintenanceModeEvent(type, getName())); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeService.java new file mode 100644 index 0000000000000..6f820507cb912 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MaintenanceModeService.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.maintenance; + +import java.lang.reflect.Constructor; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Main class that instantiates providers and consumers, and connects them using + * an event dispatcher. Providers provide maintenance on/off feature and + * consumers act on this. On maintenance mode on, providers periodically heart + * beat into consumers using events and consumers extents the maintenance time + * to a configurable amount on each heat beat. Once the heartbeat event stops + * and the heartbeat expiry time expires on the consumer side the consumer turns + * off maintenance mode. + */ +public class MaintenanceModeService extends AbstractService { + private AsyncDispatcher dispatcher; + private NodeManager nodeManager; + + private static final Logger LOG = + LoggerFactory.getLogger(MaintenanceModeService.class); + + private List maintenanceModeProviderList; + private List maintenanceModeConsumerList; + + /** + * Instantiates a new maintenance mode service. + * + * @param nodeManager the context + */ + public MaintenanceModeService(NodeManager nodeManager) { + super(MaintenanceModeService.class.getName()); + this.dispatcher = new AsyncDispatcher(); + this.nodeManager = nodeManager; + maintenanceModeProviderList = new ArrayList(); + maintenanceModeConsumerList = new ArrayList(); + } + + /** + * Creates and populates provider and consumer list from the config + * + * @param conf the conf + * @throws Exception the exception + */ + void populateProviderConsumerList(Configuration conf) throws Exception { + Class[] providerClasses = conf.getClasses( + YarnConfiguration.NM_MAINTENANCE_MODE_PROVIDER_CLASSNAMES); + + if (providerClasses == null || providerClasses.length == 0) { + LOG.warn("Maintenance mode disabled as provider list is empty."); + return; + } + + // Create instances of the provider list + for (int i = 0; i < providerClasses.length; i++) { + Constructor cons = providerClasses[i] + .getConstructor(NodeManager.class, Dispatcher.class); + this.maintenanceModeProviderList.add( + (MaintenanceModeProvider) cons.newInstance(nodeManager, dispatcher)); + } + + Class[] consumerClasses = conf.getClasses( + YarnConfiguration.NM_MAINTENANCE_MODE_CONSUMER_CLASSNAMES); + if (consumerClasses != null && consumerClasses.length > 0) { + // Create instances of the consumer list + for (int i = 0; i < consumerClasses.length; i++) { + Constructor cons = + consumerClasses[i].getConstructor(NodeManager.class); + this.maintenanceModeConsumerList + .add((MaintenanceModeConsumer) cons.newInstance(nodeManager)); + } + } else { + // No consumers specified for given providers + String temp = String.format( + "No maintenance mode consumers present for given providers."); + throw new YarnRuntimeException(temp); + } + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + populateProviderConsumerList(conf); + dispatcher.setDrainEventsOnStop(); + dispatcher.init(conf); + for (MaintenanceModeProvider provider : maintenanceModeProviderList) { + provider.init(conf); + } + for (MaintenanceModeConsumer consumer : maintenanceModeConsumerList) { + consumer.init(conf); + } + super.serviceInit(conf); + } + + @Override + protected void serviceStart() throws Exception { + dispatcher.start(); + for (MaintenanceModeConsumer consumer : maintenanceModeConsumerList) { + consumer.start(); + dispatcher.register(MaintenanceModeEventType.class, consumer); + } + + for (MaintenanceModeProvider provider : maintenanceModeProviderList) { + provider.start(); + } + + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + for (MaintenanceModeProvider provider : maintenanceModeProviderList) { + provider.stop(); + } + for (MaintenanceModeConsumer consumer : maintenanceModeConsumerList) { + consumer.stop(); + } + super.serviceStop(); + dispatcher.stop(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/NMHeartbeatMaintenanceModeConsumerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/NMHeartbeatMaintenanceModeConsumerImpl.java new file mode 100644 index 0000000000000..42178b0f6e507 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/NMHeartbeatMaintenanceModeConsumerImpl.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.nodemanager.maintenance; + +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Maintenance mode consumer class that sets + * maintenance mode on/off on NM. + */ +public class NMHeartbeatMaintenanceModeConsumerImpl + extends MaintenanceModeConsumer { + + private static final Logger LOG = + LoggerFactory.getLogger(NMHeartbeatMaintenanceModeConsumerImpl.class); + + public NMHeartbeatMaintenanceModeConsumerImpl(NodeManager nodeManager) { + super(NMHeartbeatMaintenanceModeConsumerImpl.class.getName(), nodeManager); + } + + /** + * Changes NMContext maintenance flag to true. + */ + @Override + protected void maintenanceModeStarted() { + LOG.info("Starting Maintenance mode on NM"); + ((NMContext) getContext()).setMaintenance(true); + } + + /** + * Changes NMContext maintenance flag to false. + */ + @Override + protected void maintenanceModeEnded() { + LOG.info("Stopping Maintenance mode on NM"); + ((NMContext) getContext()).setMaintenance(false); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MockMaintenanceModeConsumer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MockMaintenanceModeConsumer.java new file mode 100644 index 0000000000000..2262226a0897b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MockMaintenanceModeConsumer.java @@ -0,0 +1,41 @@ +package org.apache.hadoop.yarn.server.nodemanager.maintenance; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class MockMaintenanceModeConsumer extends MaintenanceModeConsumer { + private static final Logger LOG = + LoggerFactory.getLogger(MockMaintenanceModeConsumer.class); + + public static AtomicBoolean maintenance = new AtomicBoolean(false); + + public MockMaintenanceModeConsumer(NodeManager nodeManager) { + super(MockMaintenanceModeConsumer.class.getName(), nodeManager); + } + + public static boolean isMaintenance() { + return maintenance.get(); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + } + + @Override + protected void maintenanceModeStarted() { + LOG.info("Setting Maintenance mode ON"); + maintenance.set(true); + } + + @Override + protected void maintenanceModeEnded() { + LOG.info("Maintenance mode OFF"); + maintenance.set(false); + } +} + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MockMaintenanceModeProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MockMaintenanceModeProvider.java new file mode 100644 index 0000000000000..ae3d161b1ff5a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/MockMaintenanceModeProvider.java @@ -0,0 +1,46 @@ +package org.apache.hadoop.yarn.server.nodemanager.maintenance; + +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class MockMaintenanceModeProvider extends MaintenanceModeProvider { + private final ScheduledExecutorService taskExecutor = + Executors.newSingleThreadScheduledExecutor(); + public static AtomicBoolean maintenance = new AtomicBoolean(false); + + public MockMaintenanceModeProvider(NodeManager nodeManager, + Dispatcher dispatcher) { + super(MockMaintenanceModeProvider.class.getName(), nodeManager, dispatcher); + + } + + @Override + public void start() { + super.start(); + taskExecutor.scheduleAtFixedRate(new Runnable() { + @Override public void run() { + if (maintenance.get()) { + startMaintenanceMode(); + } else { + stopMaintenanceMode(); + } + + } + }, 0, 1, TimeUnit.SECONDS); + } + + @Override + synchronized public void startMaintenanceMode() { + super.startMaintenanceMode(); + } + + @Override + synchronized public void stopMaintenanceMode() { + super.stopMaintenanceMode(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/TestMaintenanceModeService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/TestMaintenanceModeService.java new file mode 100644 index 0000000000000..ba95b1d895730 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/maintenance/TestMaintenanceModeService.java @@ -0,0 +1,147 @@ +package org.apache.hadoop.yarn.server.nodemanager.maintenance; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager; +import org.junit.Assert; +import org.junit.Test; + +import static org.mockito.Mockito.mock; + +public class TestMaintenanceModeService { + /** + * Test maintenance mode service with valid config. This also tests + * maintenance mode service e2e. + * + * @throws InterruptedException the interrupted exception + */ + @Test + public void testMaintenanceModeServiceWithValidConfig() + throws InterruptedException { + + Configuration config = getDefaultMaintenanceModeServiceTestConfig(); + testMaintenanceModeServiceWithConfig(config); + } + + /** + * Test maintenance mode service with null config. This test maintenance mode + * service when we have no providers/consumers. + * + * @throws InterruptedException the interrupted exception + */ + @Test + public void testMaintenanceModeServiceWithNullConfig() + throws InterruptedException { + Configuration conf = new Configuration(); + NodeManager nodeManager = mock(NodeManager.class); + MaintenanceModeService ms = new MaintenanceModeService(nodeManager); + ms.init(conf); + ms.start(); + } + + /** + * Test maintenance mode service with config. + * + * @param conf the conf + * @throws InterruptedException the interrupted exception + */ + private void testMaintenanceModeServiceWithConfig(Configuration conf) + throws InterruptedException { + MaintenanceModeService ms = new MaintenanceModeService(null); + ms.init(conf); + ms.start(); + + // Execute the maintenance cycle 5 times + for (int i = 0; i < 3; i++) { + maintenanceModeCycle(); + } + ms.stop(); + } + + /** + * Verify maintenance mode till timeout. + * + * @param isOnMode the is on mode + * @param timeoutInSeconds the timeout in seconds + * @throws InterruptedException the interrupted exception + */ + private void verifymaintenanceModeTillTimeout(boolean isOnMode, + int timeoutInSeconds) throws InterruptedException { + int elapsedTime = 0; + while (elapsedTime < timeoutInSeconds) { + Assert + .assertTrue(isOnMode == MockMaintenanceModeConsumer.isMaintenance()); + Thread.sleep(1 * 1000); + elapsedTime++; + } + } + + /** + * Gets the default maintenance mode service test configuration. + * + * @return the default maintenance mode service test configuration + */ + private Configuration getDefaultMaintenanceModeServiceTestConfig() { + Configuration conf = new Configuration(); + // Set mock provider and consumer + conf.set(YarnConfiguration.NM_MAINTENANCE_MODE_PROVIDER_CLASSNAMES, + "org.apache.hadoop.yarn.server.nodemanager.maintenance.MockMaintenanceModeProvider"); + + conf.set(YarnConfiguration.NM_MAINTENANCE_MODE_CONSUMER_CLASSNAMES, + "org.apache.hadoop.yarn.server.nodemanager.maintenance.MockMaintenanceModeConsumer"); + + // Set heartbeat settings for producer and consumer + conf.set( + MaintenanceModeConfiguration.NM_MAINTENANCE_MODE_CONSUMER_HEARTBEAT_EXPIRY_SECONDS, + "10"); + conf.set( + MaintenanceModeConfiguration.NM_MAINTENANCE_MODE_PROVIDER_HEARTBEAT_INTERVAL_SECONDS, + "2"); + return conf; + } + + /** + * Maintenance mode cycle. + * + * @throws InterruptedException the interrupted exception + */ + private void maintenanceModeCycle() throws InterruptedException { + // 1) Consumer should not be maintenance mode on init + Assert.assertTrue(!MockMaintenanceModeConsumer.isMaintenance()); + + // 2) Set maintenance mode on in mock producer. + MockMaintenanceModeProvider.maintenance.set(true); + + // 3) Verify that the consumer has received the maintenance mode event + waitForMaintenanceModeAndAssertOnTimeout(true, 10); + + // 4) Set maintenance mode OFF + MockMaintenanceModeProvider.maintenance.set(false); + + // 5) Verify that the maintenanceMode is off on consumer + waitForMaintenanceModeAndAssertOnTimeout(false, 20); + + } + + /** + * Wait for maintenance mode and assert on timeout. + * + * @param isOnMode the is on mode + * @param timeoutInSeconds the timeout in seconds + * @throws InterruptedException the interrupted exception + */ + private void waitForMaintenanceModeAndAssertOnTimeout(boolean isOnMode, + int timeoutInSeconds) throws InterruptedException { + boolean modeSet = false; + int elapsedTime = 0; + while (elapsedTime < timeoutInSeconds) { + if (isOnMode == MockMaintenanceModeConsumer.isMaintenance()) { + modeSet = true; + break; + } + Thread.sleep(1 * 1000); + elapsedTime++; + } + Assert.assertTrue(modeSet); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java index 61691690df878..d8b75402678f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DecommissioningNodesWatcher.java @@ -27,6 +27,7 @@ import java.util.Timer; import java.util.TimerTask; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecommissioningEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -96,6 +97,10 @@ class DecommissioningNodeContext { private long lastUpdateTime; + // Maintenance is equivalent to Decommissioning except the node will + // not be put into Decommissioned state. + private boolean maintenance; + public DecommissioningNodeContext(NodeId nodeId, int timeoutSec) { this.nodeId = nodeId; this.appIds = new ArrayList<>(); @@ -106,6 +111,14 @@ public DecommissioningNodeContext(NodeId nodeId, int timeoutSec) { void updateTimeout(int timeoutSec) { this.timeoutMs = 1000L * timeoutSec; } + + public void setMaintenance(boolean maintenance) { + this.maintenance = maintenance; + } + + public boolean isMaintenance() { + return maintenance; + } } // All DECOMMISSIONING nodes to track. @@ -149,15 +162,28 @@ public synchronized void update(RMNode rmNode, NodeStatus remoteNodeStatus) { } else if (now - context.decommissionedTime > 60000L) { decomNodes.remove(rmNode.getNodeID()); } - } else if (rmNode.getState() == NodeState.DECOMMISSIONING) { + } else if (rmNode.getState() == NodeState.DECOMMISSIONING || + remoteNodeStatus.isMaintenance()) { + Integer decommissioningTimeout = rmNode.getDecommissioningTimeout() != null ? + rmNode.getDecommissioningTimeout() : -1; if (context == null) { context = new DecommissioningNodeContext(rmNode.getNodeID(), - rmNode.getDecommissioningTimeout()); + decommissioningTimeout); + if(remoteNodeStatus.isMaintenance() && !context.isMaintenance()) { + context.setMaintenance(true); + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeDecommissioningEvent(rmNode.getNodeID(), -1)); + } decomNodes.put(rmNode.getNodeID(), context); context.nodeState = rmNode.getState(); context.decommissionedTime = 0; + } else if (!remoteNodeStatus.isMaintenance() && context.isMaintenance()) { + decomNodes.remove(rmNode.getNodeID()); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMNodeEvent(rmNode.getNodeID(), RMNodeEventType.RECOMMISSION)); + return; } - context.updateTimeout(rmNode.getDecommissioningTimeout()); + context.updateTimeout(decommissioningTimeout); context.lastUpdateTime = now; context.appIds = rmNode.getRunningApps(); @@ -231,7 +257,7 @@ public boolean checkReadyToBeDecommissioned(NodeId nodeId) { public DecommissioningNodeStatus checkDecommissioningStatus(NodeId nodeId) { DecommissioningNodeContext context = decomNodes.get(nodeId); - if (context == null) { + if (context == null || context.isMaintenance()) { return DecommissioningNodeStatus.NONE; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index b8aaea5de330c..fcc99e60a0c36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -1369,7 +1369,12 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { NodeState initialState = rmNode.getState(); boolean isNodeDecommissioning = initialState.equals(NodeState.DECOMMISSIONING); - if (isNodeDecommissioning) { + /** + * Since, Graceful Decommissioning is used as Maintenance Mode + * the node should not be allowed to be DECOMMISSIONED even though there + * are no applications running if and only if the Node is in maintenance. + */ + if (isNodeDecommissioning && !statusEvent.isMaintenance()) { List keepAliveApps = statusEvent.getKeepAliveAppIds(); // hasScheduledAMContainers solves the following race condition - // 1. launch AM container on a node with 0 containers. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index 5f5fe24d173c5..a09332ef347d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -85,5 +85,7 @@ public List getNMReportedIncreasedContainers() { Collections.emptyList() : this.nodeStatus.getIncreasedContainers(); } - + public boolean isMaintenance() { + return this.nodeStatus.isMaintenance(); + } } \ No newline at end of file