From 0d6ca33ec2d5b6f5be8bd3d78385b620548a7efb Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Fri, 15 May 2015 12:03:59 +0900 Subject: [PATCH 01/12] TAJO-1599 --- .../java/org/apache/tajo/conf/TajoConf.java | 3 +- tajo-common/src/main/proto/tajo_protos.proto | 6 + .../resource/DefaultResourceCalculator.java | 109 ++++++++++ .../apache/tajo/resource/NodeResource.java | 166 +++++++++++++++ .../apache/tajo/resource/NodeResources.java | 191 ++++++++++++++++++ .../tajo/resource/ResourceCalculator.java | 169 ++++++++++++++++ .../tajo/worker/NodeMonitorService.java | 146 +++++++++++++ .../event/NodeResourceAllocateEvent.java | 25 +++ .../event/NodeResourceDeAllocateEvent.java | 25 +++ .../tajo/worker/event/NodeResourceEvent.java | 33 +++ .../apache/tajo/resource/TestResources.java | 48 +++++ .../org/apache/tajo/storage/DiskUtil.java | 4 +- 12 files changed, 922 insertions(+), 3 deletions(-) create mode 100644 tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java create mode 100644 tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java create mode 100644 tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java create mode 100644 tajo-core/src/main/java/org/apache/tajo/resource/ResourceCalculator.java create mode 100644 tajo-core/src/main/java/org/apache/tajo/worker/NodeMonitorService.java create mode 100644 tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java create mode 100644 tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeAllocateEvent.java create mode 100644 tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java create mode 100644 tajo-core/src/test/java/org/apache/tajo/resource/TestResources.java diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 46e7618a40..07782c1bfa 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -169,7 +169,8 @@ public static enum ConfVars implements ConfigKey { WORKER_TEMPORAL_DIR_CLEANUP("tajo.worker.tmpdir.cleanup-at-startup", false, Validators.bool()), // Tajo Worker Resources - WORKER_RESOURCE_AVAILABLE_CPU_CORES("tajo.worker.resource.cpu-cores", 1, Validators.min("1")), + WORKER_RESOURCE_AVAILABLE_CPU_CORES("tajo.worker.resource.cpu-cores", + Runtime.getRuntime().availableProcessors(), Validators.min("1")), WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 1024, Validators.min("64")), WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 1.0f), WORKER_EXECUTION_MAX_SLOTS("tajo.worker.parallel-execution.max-num", 2), diff --git a/tajo-common/src/main/proto/tajo_protos.proto b/tajo-common/src/main/proto/tajo_protos.proto index b6cd9efb6d..8474f546db 100644 --- a/tajo-common/src/main/proto/tajo_protos.proto +++ b/tajo-common/src/main/proto/tajo_protos.proto @@ -62,4 +62,10 @@ message WorkerConnectionInfoProto { optional int32 queryMasterPort = 5; required int32 clientPort = 6; required int32 httpInfoPort = 7; +} + +message NodeResourceProto { + optional int32 memory = 1; + optional int32 virtual_cores = 2; + optional int32 disks = 3; } \ No newline at end of file diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java b/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java new file mode 100644 index 0000000000..58b8a26868 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/resource/DefaultResourceCalculator.java @@ -0,0 +1,109 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tajo.resource; + + +public class DefaultResourceCalculator extends ResourceCalculator { + + @Override + public int compare(NodeResource unused, NodeResource lhs, NodeResource rhs) { + return lhs.compareTo(rhs); + } + + @Override + public int computeAvailableContainers(NodeResource available, NodeResource required) { + return Math.min(Math.min( + available.getMemory() / required.getMemory(), + available.getDisks() / required.getDisks()), + available.getVirtualCores() / required.getVirtualCores()); + } + + @Override + public float divide(NodeResource unused, + NodeResource numerator, NodeResource denominator) { + return ratio(numerator, denominator); + } + + public boolean isInvalidDivisor(NodeResource r) { + if (r.getMemory() == 0.0f) { + return true; + } + return false; + } + + @Override + public float ratio(NodeResource a, NodeResource b) { + return (float)a.getMemory() / b.getMemory(); + } + + @Override + public NodeResource divideAndCeil(NodeResource numerator, int denominator) { + return NodeResources.createResource( + divideAndCeil(numerator.getMemory(), denominator)); + } + + @Override + public NodeResource normalize(NodeResource r, NodeResource minimumResource, + NodeResource maximumResource, NodeResource stepFactor) { + int normalizedMemory = Math.min( + roundUp( + Math.max(r.getMemory(), minimumResource.getMemory()), + stepFactor.getMemory()), + maximumResource.getMemory()); + return NodeResources.createResource(normalizedMemory); + } + + @Override + public NodeResource normalize(NodeResource r, NodeResource minimumResource, + NodeResource maximumResource) { + return normalize(r, minimumResource, maximumResource, minimumResource); + } + + @Override + public NodeResource roundUp(NodeResource r, NodeResource stepFactor) { + return NodeResources.createResource( + roundUp(r.getMemory(), stepFactor.getMemory()) + ); + } + + @Override + public NodeResource roundDown(NodeResource r, NodeResource stepFactor) { + return NodeResources.createResource( + roundDown(r.getMemory(), stepFactor.getMemory())); + } + + @Override + public NodeResource multiplyAndNormalizeUp(NodeResource r, double by, + NodeResource stepFactor) { + return NodeResources.createResource( + roundUp((int) (r.getMemory() * by + 0.5), stepFactor.getMemory()) + ); + } + + @Override + public NodeResource multiplyAndNormalizeDown(NodeResource r, double by, + NodeResource stepFactor) { + return NodeResources.createResource( + roundDown( + (int) (r.getMemory() * by), + stepFactor.getMemory() + ) + ); + } + +} diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java new file mode 100644 index 0000000000..c2512b971c --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.resource; + +import com.google.common.base.Objects; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.common.ProtoObject; + +/** + *

NodeResource models a set of computer resources in the + * cluster.

+ *

+ *

Currently it models memory and disk and CPU.

+ *

+ *

The unit for memory is megabytes. The unit for disks is the number of disk. + * CPU is modeled with virtual cores (vcores), a unit for expressing parallelism. + * A node's capacity should be configured with virtual cores equal to its number of physical cores. + * A task should be requested with the number of cores it can saturate.

+ *

+ */ + +public class NodeResource implements ProtoObject, Comparable { + + private int memory; + private int disks; + private int vCores; + + public NodeResource(TajoProtos.NodeResourceProto proto) { + this.memory = proto.getMemory(); + this.vCores = proto.getVirtualCores(); + this.disks = proto.getDisks(); + } + + private NodeResource() { + + } + + public static NodeResource createResource(int memory, int disks, int vCores) { + return new NodeResource().setMemory(memory).setDisks(disks).setVirtualCores(vCores); + } + + /** + * Get memory of the resource. + * + * @return memory of the resource + */ + public int getMemory() { + return memory; + } + + /** + * Set memory of the resource. + * + * @param memory memory of the resource + */ + public NodeResource setMemory(int memory) { + this.memory = memory; + return this; + } + + + /** + * Get number of disks of the resource. + * + * @return number of disks of the resource + */ + public int getDisks() { + return disks; + } + + /** + * Set number of disks of the resource. + * + * @param disks number of disks of the resource + */ + public NodeResource setDisks(int disks) { + this.disks = disks; + return this; + } + + /** + * Get number of virtual cpu cores of the resource. + * Virtual cores are a unit for expressing CPU parallelism. A node's capacity + * should be configured with virtual cores equal to its number of physical cores. + * + * @return num of virtual cpu cores of the resource + */ + public int getVirtualCores() { + return vCores; + } + + + /** + * Set number of virtual cpu cores of the resource. + * + * @param vCores number of virtual cpu cores of the resource + */ + public NodeResource setVirtualCores(int vCores) { + this.vCores = vCores; + return this; + } + + @Override + public TajoProtos.NodeResourceProto getProto() { + TajoProtos.NodeResourceProto.Builder builder = TajoProtos.NodeResourceProto.newBuilder(); + builder.setMemory(memory) + .setDisks(disks) + .setVirtualCores(vCores); + return builder.build(); + } + + @Override + public int hashCode() { + return Objects.hashCode(getMemory(), getDisks(), getVirtualCores()); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (!(obj instanceof NodeResource)) + return false; + NodeResource other = (NodeResource) obj; + if (getMemory() != other.getMemory() || + getDisks() != other.getDisks() || + getVirtualCores() != other.getVirtualCores()) { + return false; + } + return true; + } + + @Override + public int compareTo(NodeResource other) { + int diff = this.getMemory() - other.getMemory(); + if (diff == 0) { + diff = this.getDisks() - other.getDisks(); + } + if (diff == 0) { + diff = this.getVirtualCores() - other.getVirtualCores(); + } + return diff; + } + + @Override + public String toString() { + return ""; + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java new file mode 100644 index 0000000000..51526b6b0a --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java @@ -0,0 +1,191 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tajo.resource; + + +public class NodeResources { + + public static NodeResource createResource(int memory) { + return createResource(memory, 0); + } + + public static NodeResource createResource(int memory, int disks) { + return NodeResource.createResource(memory, disks, (memory > 0) ? 1 : 0); + } + + public static NodeResource createResource(int memory, int disks, int vCores) { + return NodeResource.createResource(memory, disks, vCores); + } + + public static NodeResource clone(NodeResource res) { + return NodeResource.createResource(res.getMemory(), res.getVirtualCores(), res.getDisks()); + } + + public static NodeResource addTo(NodeResource lhs, NodeResource rhs) { + lhs.setMemory(lhs.getMemory() + rhs.getMemory()) + .setVirtualCores(lhs.getVirtualCores() + rhs.getVirtualCores()) + .setDisks(lhs.getDisks() + rhs.getDisks()); + return lhs; + } + + public static NodeResource add(NodeResource lhs, NodeResource rhs) { + return addTo(clone(lhs), rhs); + } + + public static NodeResource subtractFrom(NodeResource lhs, NodeResource rhs) { + lhs.setMemory(lhs.getMemory() - rhs.getMemory()) + .setVirtualCores(lhs.getVirtualCores() - rhs.getVirtualCores()) + .setDisks(lhs.getDisks() - rhs.getDisks()); + return lhs; + } + + public static NodeResource subtract(NodeResource lhs, NodeResource rhs) { + return subtractFrom(clone(lhs), rhs); + } + + public static NodeResource multiplyTo(NodeResource lhs, double by) { + lhs.setMemory((int) (lhs.getMemory() * by)) + .setVirtualCores((int) (lhs.getVirtualCores() * by)) + .setDisks((int) (lhs.getDisks() * by)); + return lhs; + } + + public static NodeResource multiply(NodeResource lhs, double by) { + return multiplyTo(clone(lhs), by); + } + + public static NodeResource multiplyAndNormalizeUp( + ResourceCalculator calculator,NodeResource lhs, double by, NodeResource factor) { + return calculator.multiplyAndNormalizeUp(lhs, by, factor); + } + + public static NodeResource multiplyAndNormalizeDown( + ResourceCalculator calculator,NodeResource lhs, double by, NodeResource factor) { + return calculator.multiplyAndNormalizeDown(lhs, by, factor); + } + + public static NodeResource multiplyAndRoundDown(NodeResource lhs, double by) { + NodeResource out = clone(lhs); + out.setMemory((int)(lhs.getMemory() * by)); + out.setDisks((int)(lhs.getDisks() * by)); + out.setVirtualCores((int)(lhs.getVirtualCores() * by)); + return out; + } + + public static NodeResource normalize( + ResourceCalculator calculator, NodeResource lhs, NodeResource min, + NodeResource max, NodeResource increment) { + return calculator.normalize(lhs, min, max, increment); + } + + public static NodeResource roundUp( + ResourceCalculator calculator, NodeResource lhs, NodeResource factor) { + return calculator.roundUp(lhs, factor); + } + + public static NodeResource roundDown( + ResourceCalculator calculator, NodeResource lhs, NodeResource factor) { + return calculator.roundDown(lhs, factor); + } + + public static boolean isInvalidDivisor( + ResourceCalculator resourceCalculator, NodeResource divisor) { + return resourceCalculator.isInvalidDivisor(divisor); + } + + public static float ratio( + ResourceCalculator resourceCalculator, NodeResource lhs, NodeResource rhs) { + return resourceCalculator.ratio(lhs, rhs); + } + + public static float divide( + ResourceCalculator resourceCalculator, + NodeResource clusterResource, NodeResource lhs, NodeResource rhs) { + return resourceCalculator.divide(clusterResource, lhs, rhs); + } + + public static NodeResource divideAndCeil( + ResourceCalculator resourceCalculator, NodeResource lhs, int rhs) { + return resourceCalculator.divideAndCeil(lhs, rhs); + } + + public static boolean equals(NodeResource lhs, NodeResource rhs) { + return lhs.equals(rhs); + } + + public static boolean lessThan( + ResourceCalculator resourceCalculator, + NodeResource clusterResource, + NodeResource lhs, NodeResource rhs) { + return (resourceCalculator.compare(clusterResource, lhs, rhs) < 0); + } + + public static boolean lessThanOrEqual( + ResourceCalculator resourceCalculator, + NodeResource clusterResource, + NodeResource lhs, NodeResource rhs) { + return (resourceCalculator.compare(clusterResource, lhs, rhs) <= 0); + } + + public static boolean greaterThan( + ResourceCalculator resourceCalculator, + NodeResource clusterResource, + NodeResource lhs, NodeResource rhs) { + return resourceCalculator.compare(clusterResource, lhs, rhs) > 0; + } + + public static boolean greaterThanOrEqual( + ResourceCalculator resourceCalculator, + NodeResource clusterResource, + NodeResource lhs, NodeResource rhs) { + return resourceCalculator.compare(clusterResource, lhs, rhs) >= 0; + } + + public static NodeResource min( + ResourceCalculator resourceCalculator, + NodeResource clusterResource, + NodeResource lhs, NodeResource rhs) { + return resourceCalculator.compare(clusterResource, lhs, rhs) <= 0 ? lhs : rhs; + } + + public static NodeResource max( + ResourceCalculator resourceCalculator, + NodeResource clusterResource, + NodeResource lhs, NodeResource rhs) { + return resourceCalculator.compare(clusterResource, lhs, rhs) >= 0 ? lhs : rhs; + } + + public static boolean fitsIn(NodeResource smaller, NodeResource bigger) { + return smaller.getMemory() <= bigger.getMemory() && + smaller.getDisks() <= bigger.getDisks() && + smaller.getVirtualCores() <= bigger.getVirtualCores(); + } + + public static NodeResource componentwiseMin(NodeResource lhs, NodeResource rhs) { + return createResource(Math.min(lhs.getMemory(), rhs.getMemory()), + Math.min(lhs.getDisks(), rhs.getDisks()), + Math.min(lhs.getVirtualCores(), rhs.getVirtualCores())); + } + + public static NodeResource componentwiseMax(NodeResource lhs, NodeResource rhs) { + return createResource(Math.max(lhs.getMemory(), rhs.getMemory()), + Math.max(lhs.getDisks(), rhs.getDisks()), + Math.max(lhs.getVirtualCores(), rhs.getVirtualCores())); + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/ResourceCalculator.java b/tajo-core/src/main/java/org/apache/tajo/resource/ResourceCalculator.java new file mode 100644 index 0000000000..b08228f026 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/resource/ResourceCalculator.java @@ -0,0 +1,169 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.tajo.resource; + + +/** + * A set of {@link NodeResource} comparison and manipulation interfaces. + */ + +public abstract class ResourceCalculator { + + public abstract int + compare(NodeResource clusterResource, NodeResource lhs, NodeResource rhs); + + public static int divideAndCeil(int a, int b) { + if (b == 0) { + return 0; + } + return (a + (b - 1)) / b; + } + + public static int roundUp(int a, int b) { + return divideAndCeil(a, b) * b; + } + + public static int roundDown(int a, int b) { + return (a / b) * b; + } + + /** + * Compute the number of containers which can be allocated given + * available and required resources. + * + * @param available available resources + * @param required required resources + * @return number of containers which can be allocated + */ + public abstract int computeAvailableContainers( + NodeResource available, NodeResource required); + /** + * Multiply resource r by factor by + * and normalize up using step-factor stepFactor. + * + * @param r resource to be multiplied + * @param by multiplier + * @param stepFactor factor by which to normalize up + * @return resulting normalized resource + */ + public abstract NodeResource multiplyAndNormalizeUp( + NodeResource r, double by, NodeResource stepFactor); + + /** + * Multiply resource r by factor by + * and normalize down using step-factor stepFactor. + * + * @param r resource to be multiplied + * @param by multiplier + * @param stepFactor factor by which to normalize down + * @return resulting normalized resource + */ + public abstract NodeResource multiplyAndNormalizeDown( + NodeResource r, double by, NodeResource stepFactor); + + /** + * Normalize resource r given the base + * minimumResource and verify against max allowed + * maximumResource + * + * @param r resource + * @param minimumResource step-factor + * @param maximumResource the upper bound of the resource to be allocated + * @return normalized resource + */ + public NodeResource normalize(NodeResource r, NodeResource minimumResource, + NodeResource maximumResource) { + return normalize(r, minimumResource, maximumResource, minimumResource); + } + + /** + * Normalize resource r given the base + * minimumResource and verify against max allowed + * maximumResource using a step factor for hte normalization. + * + * @param r resource + * @param minimumResource minimum value + * @param maximumResource the upper bound of the resource to be allocated + * @param stepFactor the increment for resources to be allocated + * @return normalized resource + */ + public abstract NodeResource normalize(NodeResource r, NodeResource minimumResource, + NodeResource maximumResource, + NodeResource stepFactor); + + + /** + * Round-up resource r given factor stepFactor. + * + * @param r resource + * @param stepFactor step-factor + * @return rounded resource + */ + public abstract NodeResource roundUp(NodeResource r, NodeResource stepFactor); + + /** + * Round-down resource r given factor stepFactor. + * + * @param r resource + * @param stepFactor step-factor + * @return rounded resource + */ + public abstract NodeResource roundDown(NodeResource r, NodeResource stepFactor); + + /** + * Divide resource numerator by resource denominator + * using specified policy (domination, average, fairness etc.); hence overall + * clusterResource is provided for context. + * + * @param clusterResource cluster resources + * @param numerator numerator + * @param denominator denominator + * @return numerator/denominator + * using specific policy + */ + public abstract float divide( + NodeResource clusterResource, NodeResource numerator, NodeResource denominator); + + /** + * Determine if a resource is not suitable for use as a divisor + * (will result in divide by 0, etc) + * + * @param r resource + * @return true if divisor is invalid (should not be used), false else + */ + public abstract boolean isInvalidDivisor(NodeResource r); + + /** + * Ratio of resource a to resource b. + * + * @param a resource + * @param b resource + * @return ratio of resource a to resource b + */ + public abstract float ratio(NodeResource a, NodeResource b); + + /** + * Divide-and-ceil numerator by denominator. + * + * @param numerator numerator resource + * @param denominator denominator + * @return resultant resource + */ + public abstract NodeResource divideAndCeil(NodeResource numerator, int denominator); + +} diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeMonitorService.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeMonitorService.java new file mode 100644 index 0000000000..344f62c81a --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeMonitorService.java @@ -0,0 +1,146 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tajo.worker; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputByteBuffer; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.VersionUtil; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; +import org.apache.hadoop.yarn.server.api.ResourceTracker; +import org.apache.hadoop.yarn.server.api.ServerRMProxy; +import org.apache.hadoop.yarn.server.api.protocolrecords.*; +import org.apache.hadoop.yarn.server.api.records.MasterKey; +import org.apache.hadoop.yarn.server.api.records.NodeAction; +import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; +import org.apache.hadoop.yarn.util.Records; +import org.apache.hadoop.yarn.util.YarnVersionInfo; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.resource.NodeResource; +import org.apache.tajo.storage.DiskUtil; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.worker.event.NodeResourceEvent; +import org.apache.tajo.worker.event.TaskRunnerEvent; + +import java.io.IOException; +import java.net.ConnectException; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentLinkedQueue; + +public class NodeMonitorService extends AbstractService implements EventHandler { + private static final Log LOG = LogFactory.getLog(NodeMonitorService.class); + + private final Object heartbeatMonitor = new Object(); + private final Dispatcher dispatcher; + private final TajoWorker.WorkerContext context; + private NodeResource totalResource; + private TajoConf tajoConf; + + + private volatile boolean isStopped; + public NodeMonitorService(TajoWorker.WorkerContext context, Dispatcher dispatcher){ + super(NodeMonitorService.class.getName()); + this.dispatcher = dispatcher; + this.context = context; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + if (!(conf instanceof TajoConf)) { + throw new IllegalArgumentException("Configuration must be a TajoConf instance"); + } + this.tajoConf = (TajoConf)conf; + this.dispatcher.register(NodeResourceEvent.EventType.class, this); + this.totalResource = createWorkerResource(tajoConf); + + super.serviceInit(conf); + LOG.info("Initialized NodeMonitorService for " + totalResource); + } + + @Override + protected void serviceStart() throws Exception { + + // NodeManager is the last service to start, so NodeId is available. + this.nodeId = this.context.getNodeId(); + this.httpPort = this.context.getHttpPort(); + this.nodeManagerVersionId = YarnVersionInfo.getVersion(); + try { + // Registration has to be in start so that ContainerManager can get the + // perNM tokens needed to authenticate ContainerTokens. + this.resourceTracker = getRMClient(); + registerWithRM(); + super.serviceStart(); + startStatusUpdater(); + } catch (Exception e) { + String errorMessage = "Unexpected error starting NodeStatusUpdater"; + LOG.error(errorMessage, e); + throw new YarnRuntimeException(e); + } + } + + @Override + protected void serviceStop() throws Exception { + // Interrupt the updater. + this.isStopped = true; + super.serviceStop(); + } + + @Override + public void handle(NodeResourceEvent event) { + + } + + private NodeResource createWorkerResource(TajoConf conf) { + + int memoryMb = Math.min((int) (Runtime.getRuntime().maxMemory() / StorageUnit.MB), + conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB)); + int vCores = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES); + int disks = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS); + + int dataNodeStorageSize = DiskUtil.getDataNodeStorageSize(); + if (conf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DFS_DIR_AWARE) && dataNodeStorageSize > 0) { + disks = dataNodeStorageSize; + } + return NodeResource.createResource(memoryMb, disks, vCores); + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java new file mode 100644 index 0000000000..a3d1d879c2 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + +public class NodeResourceAllocateEvent extends NodeResourceEvent { + public NodeResourceAllocateEvent() { + super(EventType.ALLOCATE); + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeAllocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeAllocateEvent.java new file mode 100644 index 0000000000..12dcb7e4de --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeAllocateEvent.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + +public class NodeResourceDeallocateEvent extends NodeResourceEvent { + public NodeResourceDeallocateEvent() { + super(EventType.DEALLOCATE); + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java new file mode 100644 index 0000000000..a5a349ec46 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.tajo.ExecutionBlockId; + +public class NodeResourceEvent extends AbstractEvent { + public enum EventType { + ALLOCATE, + DEALLOCATE + } + + public NodeResourceEvent(EventType eventType) { + super(eventType); + } +} diff --git a/tajo-core/src/test/java/org/apache/tajo/resource/TestResources.java b/tajo-core/src/test/java/org/apache/tajo/resource/TestResources.java new file mode 100644 index 0000000000..eb0d732dd8 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/resource/TestResources.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.resource; + +import org.junit.Test; + +import static org.apache.tajo.resource.NodeResources.componentwiseMin; +import static org.apache.tajo.resource.NodeResources.createResource; +import static org.apache.tajo.resource.NodeResources.fitsIn; +import static org.junit.Assert.*; + +public class TestResources { + @Test + public void testFitsIn() { + assertTrue(fitsIn(createResource(512, 1, 1), createResource(1024, 2, 1))); + assertTrue(fitsIn(createResource(1024, 2, 1), createResource(1024, 2, 1))); + assertFalse(fitsIn(createResource(1024, 2, 1), createResource(512, 1, 1))); + assertFalse(fitsIn(createResource(512, 2, 1), createResource(1024, 1, 1))); + assertFalse(fitsIn(createResource(1024, 1, 1), createResource(512, 2, 1))); + assertFalse(fitsIn(createResource(512, 1, 2), createResource(512, 1, 1))); + } + + @Test + public void testComponentwiseMin() { + assertEquals(createResource(1, 1), + componentwiseMin(createResource(1, 1), createResource(2, 2))); + assertEquals(createResource(1, 1), + componentwiseMin(createResource(2, 2), createResource(1, 1))); + assertEquals(createResource(1, 1), + componentwiseMin(createResource(1, 2), createResource(2, 1))); + } +} diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java index 0bcd5eca3c..19e08e8066 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/DiskUtil.java @@ -189,10 +189,10 @@ private static void setDeviceMountInfo(List deviceInfos) throws } public static int getDataNodeStorageSize(){ - return getStorageDirs().size(); + return getDataNodeStorageDirs().size(); } - public static List getStorageDirs(){ + public static List getDataNodeStorageDirs(){ Configuration conf = new HdfsConfiguration(); Collection dirNames = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY); return Util.stringCollectionAsURIs(dirNames); From 211f000bb8c14af81b5d77be9ae355b6d0b56cb1 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 18 May 2015 16:15:14 +0900 Subject: [PATCH 02/12] TAJO-1599 --- .../tajo/master/rm/TajoResourceTracker.java | 7 + .../tajo/worker/NodeMonitorService.java | 146 ---------- .../worker/NodeResourceManagerService.java | 147 ++++++++++ .../tajo/worker/NodeStatusUpdaterService.java | 260 ++++++++++++++++++ .../tajo/worker/WorkerHeartbeatService.java | 1 + .../event/NodeResourceAllocateEvent.java | 25 +- .../event/NodeResourceDeAllocateEvent.java | 14 +- ...ent.java => NodeResourceManagerEvent.java} | 5 +- .../tajo/worker/event/NodeStatusEvent.java | 40 +++ .../main/proto/ResourceTrackerProtocol.proto | 27 ++ .../src/main/proto/TajoWorkerProtocol.proto | 16 ++ 11 files changed, 536 insertions(+), 152 deletions(-) delete mode 100644 tajo-core/src/main/java/org/apache/tajo/worker/NodeMonitorService.java create mode 100644 tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java create mode 100644 tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdaterService.java rename tajo-core/src/main/java/org/apache/tajo/worker/event/{NodeResourceEvent.java => NodeResourceManagerEvent.java} (83%) create mode 100644 tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java index 4f3b66a736..a65c8a546f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java @@ -182,6 +182,13 @@ public void heartbeat( } } + @Override + public void nodeHeartbeat(RpcController controller, TajoResourceTrackerProtocol.NodeHeartbeatRequestProto request, + RpcCallback done) { + //TODO implement with ResourceManager for scheduler + + } + private Worker createWorkerResource(NodeHeartbeat request) { WorkerResource workerResource = new WorkerResource(); diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeMonitorService.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeMonitorService.java deleted file mode 100644 index 344f62c81a..0000000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeMonitorService.java +++ /dev/null @@ -1,146 +0,0 @@ -/** -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ - -package org.apache.tajo.worker; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.DataInputByteBuffer; -import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.VersionUtil; -import org.apache.hadoop.yarn.api.records.*; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; -import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; -import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; -import org.apache.hadoop.yarn.server.api.ResourceTracker; -import org.apache.hadoop.yarn.server.api.ServerRMProxy; -import org.apache.hadoop.yarn.server.api.protocolrecords.*; -import org.apache.hadoop.yarn.server.api.records.MasterKey; -import org.apache.hadoop.yarn.server.api.records.NodeAction; -import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; -import org.apache.hadoop.yarn.server.api.records.NodeStatus; -import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; -import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; -import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; -import org.apache.hadoop.yarn.util.Records; -import org.apache.hadoop.yarn.util.YarnVersionInfo; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.resource.NodeResource; -import org.apache.tajo.storage.DiskUtil; -import org.apache.tajo.unit.StorageUnit; -import org.apache.tajo.worker.event.NodeResourceEvent; -import org.apache.tajo.worker.event.TaskRunnerEvent; - -import java.io.IOException; -import java.net.ConnectException; -import java.nio.ByteBuffer; -import java.util.*; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentLinkedQueue; - -public class NodeMonitorService extends AbstractService implements EventHandler { - private static final Log LOG = LogFactory.getLog(NodeMonitorService.class); - - private final Object heartbeatMonitor = new Object(); - private final Dispatcher dispatcher; - private final TajoWorker.WorkerContext context; - private NodeResource totalResource; - private TajoConf tajoConf; - - - private volatile boolean isStopped; - public NodeMonitorService(TajoWorker.WorkerContext context, Dispatcher dispatcher){ - super(NodeMonitorService.class.getName()); - this.dispatcher = dispatcher; - this.context = context; - } - - @Override - protected void serviceInit(Configuration conf) throws Exception { - if (!(conf instanceof TajoConf)) { - throw new IllegalArgumentException("Configuration must be a TajoConf instance"); - } - this.tajoConf = (TajoConf)conf; - this.dispatcher.register(NodeResourceEvent.EventType.class, this); - this.totalResource = createWorkerResource(tajoConf); - - super.serviceInit(conf); - LOG.info("Initialized NodeMonitorService for " + totalResource); - } - - @Override - protected void serviceStart() throws Exception { - - // NodeManager is the last service to start, so NodeId is available. - this.nodeId = this.context.getNodeId(); - this.httpPort = this.context.getHttpPort(); - this.nodeManagerVersionId = YarnVersionInfo.getVersion(); - try { - // Registration has to be in start so that ContainerManager can get the - // perNM tokens needed to authenticate ContainerTokens. - this.resourceTracker = getRMClient(); - registerWithRM(); - super.serviceStart(); - startStatusUpdater(); - } catch (Exception e) { - String errorMessage = "Unexpected error starting NodeStatusUpdater"; - LOG.error(errorMessage, e); - throw new YarnRuntimeException(e); - } - } - - @Override - protected void serviceStop() throws Exception { - // Interrupt the updater. - this.isStopped = true; - super.serviceStop(); - } - - @Override - public void handle(NodeResourceEvent event) { - - } - - private NodeResource createWorkerResource(TajoConf conf) { - - int memoryMb = Math.min((int) (Runtime.getRuntime().maxMemory() / StorageUnit.MB), - conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB)); - int vCores = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES); - int disks = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS); - - int dataNodeStorageSize = DiskUtil.getDataNodeStorageSize(); - if (conf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DFS_DIR_AWARE) && dataNodeStorageSize > 0) { - disks = dataNodeStorageSize; - } - return NodeResource.createResource(memoryMb, disks, vCores); - } -} diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java new file mode 100644 index 0000000000..7a2a68d2fc --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java @@ -0,0 +1,147 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tajo.worker; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.resource.NodeResource; +import org.apache.tajo.resource.NodeResources; +import org.apache.tajo.storage.DiskUtil; +import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.worker.event.NodeResourceAllocateEvent; +import org.apache.tajo.worker.event.NodeResourceDeallocateEvent; +import org.apache.tajo.worker.event.NodeResourceManagerEvent; + +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.tajo.ipc.TajoWorkerProtocol.*; + +public class NodeResourceManagerService extends AbstractService implements EventHandler { + private static final Log LOG = LogFactory.getLog(NodeResourceManagerService.class); + + private final Dispatcher dispatcher; + private final TajoWorker.WorkerContext context; + private NodeResource totalResource; + private NodeResource availableResource; + private AtomicInteger running; + private TajoConf tajoConf; + + private volatile boolean isStopped; + public NodeResourceManagerService(TajoWorker.WorkerContext context, Dispatcher dispatcher){ + super(NodeResourceManagerService.class.getName()); + this.dispatcher = dispatcher; + this.context = context; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + if (!(conf instanceof TajoConf)) { + throw new IllegalArgumentException("Configuration must be a TajoConf instance"); + } + this.tajoConf = (TajoConf)conf; + this.totalResource = createWorkerResource(tajoConf); + this.availableResource = NodeResources.clone(totalResource); + this.dispatcher.register(NodeResourceManagerEvent.EventType.class, this); + this.running = new AtomicInteger(); + super.serviceInit(conf); + LOG.info("Initialized NodeMonitorService for " + totalResource); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + } + + @Override + protected void serviceStop() throws Exception { + // Interrupt the updater. + this.isStopped = true; + super.serviceStop(); + } + + @Override + public void handle(NodeResourceManagerEvent event) { + switch (event.getType()) { + case ALLOCATE: { + NodeResourceAllocateEvent allocateEvent = (NodeResourceAllocateEvent) event; + BatchAllocationResponseProto.Builder response = BatchAllocationResponseProto.newBuilder(); + for (TaskAllocationRequestProto request : allocateEvent.getRequest().getTaskRequestList()) { + NodeResource resource = new NodeResource(request.getResource()); + if (allocate(resource)) { + running.incrementAndGet(); + //TODO send task event to taskExecutorService + } else { + response.addCancellationTask(request); + } + } + allocateEvent.getCallback().run(response.build()); + break; + } + case DEALLOCATE: { + running.decrementAndGet(); + NodeResourceDeallocateEvent deallocateEvent = (NodeResourceDeallocateEvent) event; + release(deallocateEvent.getResource()); + break; + } + } + } + + public Dispatcher getDispatcher() { + return dispatcher; + } + + public NodeResource getTotalResource() { + return NodeResources.clone(totalResource); + } + + public NodeResource getAvailableResource() { + return NodeResources.clone(availableResource); + } + + private boolean allocate(NodeResource resource) { + if (NodeResources.fitsIn(resource, availableResource)) { + NodeResources.addTo(availableResource, resource); + return true; + } + return false; + } + + private void release(NodeResource resource) { + NodeResources.subtractFrom(availableResource, resource); + } + + private NodeResource createWorkerResource(TajoConf conf) { + + int memoryMb = Math.min((int) (Runtime.getRuntime().maxMemory() / StorageUnit.MB), + conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB)); + int vCores = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES); + int disks = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS); + + int dataNodeStorageSize = DiskUtil.getDataNodeStorageSize(); + if (conf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DFS_DIR_AWARE) && dataNodeStorageSize > 0) { + disks = dataNodeStorageSize; + } + return NodeResource.createResource(memoryMb, disks, vCores); + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdaterService.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdaterService.java new file mode 100644 index 0000000000..5ac100f0d9 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdaterService.java @@ -0,0 +1,260 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import com.google.common.collect.Lists; +import com.google.common.collect.Queues; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.TajoResourceTrackerProtocol; +import org.apache.tajo.resource.NodeResource; +import org.apache.tajo.rpc.*; +import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.worker.event.NodeStatusEvent; + +import java.net.ConnectException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + + +import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.*; + +/** + * It periodically sends heartbeat to {@link org.apache.tajo.master.rm.TajoResourceTracker} via asynchronous rpc. + */ +public class NodeStatusUpdaterService extends AbstractService implements EventHandler { + + private final static Log LOG = LogFactory.getLog(NodeStatusUpdaterService.class); + + private TajoConf tajoConf; + private StatusUpdaterThread updaterThread; + private AtomicBoolean flushReport = new AtomicBoolean(); + private volatile boolean isStopped; + private volatile long heartBeatInterval; + private BlockingQueue heartBeatRequestQueue; + private TajoWorker.WorkerContext context; + private NodeResourceManagerService nodeResourceManagerService; + private AsyncRpcClient rmClient; + private TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker; + + public NodeStatusUpdaterService(TajoWorker.WorkerContext context, NodeResourceManagerService resourceManagerService) { + super(NodeStatusUpdaterService.class.getSimpleName()); + this.context = context; + this.nodeResourceManagerService = resourceManagerService; + this.heartBeatInterval = 10 * 1000; //10 sec + } + + @Override + public void serviceInit(Configuration conf) throws Exception { + if (!(conf instanceof TajoConf)) { + throw new IllegalArgumentException("Configuration must be a TajoConf instance"); + } + this.tajoConf = (TajoConf) conf; + this.heartBeatRequestQueue = Queues.newLinkedBlockingQueue(); + this.updaterThread = new StatusUpdaterThread(); + super.serviceInit(conf); + } + + @Override + public void serviceStart() throws Exception { + updaterThread.start(); + super.serviceStart(); + LOG.info("Node status updater started."); + } + + @Override + public void serviceStop() throws Exception { + this.isStopped = true; + + synchronized (updaterThread) { + updaterThread.notifyAll(); + } + super.serviceStop(); + } + + @Override + public void handle(NodeStatusEvent event) { + switch (event.getType()) { + case REPORT_RESOURCE: + heartBeatRequestQueue.add(event); + break; + case FLUSH_REPORTS: + heartBeatRequestQueue.add(event); + synchronized (updaterThread) { + updaterThread.notifyAll(); + } + break; + } + } + + private NodeHeartbeatRequestProto createResourceReport(NodeResource resource) { + NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder(); + requestProto.setAvailableResource(resource.getProto()); + requestProto.setWorkerId(context.getConnectionInfo().getId()); + return requestProto.build(); + } + + private NodeHeartbeatRequestProto createHeartBeatReport() { + NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder(); + requestProto.setWorkerId(context.getConnectionInfo().getId()); + return requestProto.build(); + } + + private NodeHeartbeatRequestProto createNodeStatusReport() { + NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder(); + requestProto.setTotalResource(nodeResourceManagerService.getTotalResource().getProto()); + requestProto.setAvailableResource(nodeResourceManagerService.getAvailableResource().getProto()); + requestProto.setWorkerId(context.getConnectionInfo().getId()); + requestProto.setConnectionInfo(context.getConnectionInfo().getProto()); + + //TODO set node status to requestProto.setStatus() + return requestProto.build(); + } + + private TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService newStub() + throws NoSuchMethodException, ConnectException, ClassNotFoundException { + ServiceTracker serviceTracker = context.getServiceTracker(); + RpcClientManager.cleanup(rmClient); + + RpcClientManager rpcManager = RpcClientManager.getInstance(); + rmClient = rpcManager.newClient(serviceTracker.getResourceTrackerAddress(), + TajoResourceTrackerProtocol.class, true, rpcManager.getRetries(), + rpcManager.getTimeoutSeconds(), TimeUnit.SECONDS, false); + return rmClient.getStub(); + } + + protected NodeHeartbeatResponseProto sendHeartbeat(NodeHeartbeatRequestProto requestProto) + throws NoSuchMethodException, ClassNotFoundException, ConnectException, ExecutionException { + if (resourceTracker == null) { + resourceTracker = newStub(); + } + + NodeHeartbeatResponseProto response = null; + try { + CallFuture callBack = new CallFuture(); + + resourceTracker.nodeHeartbeat(callBack.getController(), requestProto, callBack); + response = callBack.get(RpcConstants.DEFAULT_FUTURE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn(e.getMessage()); + } catch (TimeoutException te) { + LOG.warn("Heartbeat response is being delayed.", te); + } catch (ExecutionException ee) { + LOG.warn("TajoMaster failure: " + ee.getMessage()); + resourceTracker = null; + throw ee; + } + return response; + } + + class StatusUpdaterThread extends Thread { + + public StatusUpdaterThread() { + super("NodeStatusUpdater"); + } + + private int drain(Collection buffer, int numElements, + long timeout, TimeUnit unit) throws InterruptedException { + + long deadline = System.nanoTime() + unit.toNanos(timeout); + int added = 0; + while (added < numElements) { + added += heartBeatRequestQueue.drainTo(buffer, numElements - added); + if (added < numElements) { // not enough elements immediately available; will have to wait + if (deadline <= System.currentTimeMillis()) { + break; + } else { + synchronized (updaterThread) { + updaterThread.wait(deadline - System.currentTimeMillis()); + if (deadline > System.nanoTime()) { + added += heartBeatRequestQueue.drainTo(buffer, numElements - added); + break; + } + } + } + } + } + return added; + } + + @Override + public void run() { + NodeHeartbeatResponseProto lastResponse = null; + while (!isStopped && !Thread.interrupted()) { + + try { + if (lastResponse != null) { + if (lastResponse.getCommand() == ResponseCommand.NORMAL) { + List events = Lists.newArrayList(); + try { + drain(events, + Math.max(nodeResourceManagerService.getTotalResource().getVirtualCores() / 2, 1), + heartBeatInterval, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + break; + } + + if (!events.isEmpty()) { + // send last available resource; + lastResponse = sendHeartbeat(createResourceReport(events.get(events.size() - 1).getResource())); + } else { + lastResponse = sendHeartbeat(createHeartBeatReport()); + } + + } else if (lastResponse.getCommand() == ResponseCommand.MEMBERSHIP) { + lastResponse = sendHeartbeat(createNodeStatusReport()); + } else if (lastResponse.getCommand() == ResponseCommand.ABORT_QUERY) { + //TODO abort failure queries + } + } else { + lastResponse = sendHeartbeat(createNodeStatusReport()); + } + } catch (NoSuchMethodException e) { + LOG.fatal(e.getMessage(), e); + Runtime.getRuntime().halt(1); + } catch (ClassNotFoundException e) { + LOG.fatal(e.getMessage(), e); + Runtime.getRuntime().halt(1); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + if (!isStopped) { + synchronized (updaterThread) { + try { + updaterThread.wait(heartBeatInterval); + } catch (InterruptedException e1) { + // Do Nothing + } + } + } + } + } + + LOG.info("Heartbeat Thread stopped."); + } + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java index 9afee5a003..2c43049b78 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java @@ -47,6 +47,7 @@ /** * It periodically sends heartbeat to {@link org.apache.tajo.master.rm.TajoResourceTracker} via asynchronous rpc. */ +@Deprecated public class WorkerHeartbeatService extends AbstractService { /** class logger */ private final static Log LOG = LogFactory.getLog(WorkerHeartbeatService.class); diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java index a3d1d879c2..2f411e8c5c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceAllocateEvent.java @@ -18,8 +18,29 @@ package org.apache.tajo.worker.event; -public class NodeResourceAllocateEvent extends NodeResourceEvent { - public NodeResourceAllocateEvent() { + +import com.google.protobuf.RpcCallback; + +import static org.apache.tajo.ipc.TajoWorkerProtocol.BatchAllocationRequestProto; +import static org.apache.tajo.ipc.TajoWorkerProtocol.BatchAllocationResponseProto; + +public class NodeResourceAllocateEvent extends NodeResourceManagerEvent { + + private BatchAllocationRequestProto request; + private RpcCallback callback; + + public NodeResourceAllocateEvent(BatchAllocationRequestProto request, + RpcCallback callback) { super(EventType.ALLOCATE); + this.callback = callback; + this.request = request; + } + + public BatchAllocationRequestProto getRequest() { + return request; + } + + public RpcCallback getCallback() { + return callback; } } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeAllocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeAllocateEvent.java index 12dcb7e4de..7011101b05 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeAllocateEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeAllocateEvent.java @@ -18,8 +18,18 @@ package org.apache.tajo.worker.event; -public class NodeResourceDeallocateEvent extends NodeResourceEvent { - public NodeResourceDeallocateEvent() { +import org.apache.tajo.resource.NodeResource; + +public class NodeResourceDeallocateEvent extends NodeResourceManagerEvent { + + private NodeResource resource; + + public NodeResourceDeallocateEvent(NodeResource resource) { super(EventType.DEALLOCATE); + this.resource = resource; + } + + public NodeResource getResource() { + return resource; } } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java similarity index 83% rename from tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java rename to tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java index a5a349ec46..bcb34486c0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceManagerEvent.java @@ -20,14 +20,15 @@ import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.resource.NodeResource; -public class NodeResourceEvent extends AbstractEvent { +public class NodeResourceManagerEvent extends AbstractEvent { public enum EventType { ALLOCATE, DEALLOCATE } - public NodeResourceEvent(EventType eventType) { + public NodeResourceManagerEvent(EventType eventType) { super(eventType); } } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java new file mode 100644 index 0000000000..58ab74af0a --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeStatusEvent.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.tajo.resource.NodeResource; + +public class NodeStatusEvent extends AbstractEvent { + private final NodeResource resource; + + public enum EventType { + REPORT_RESOURCE, + FLUSH_REPORTS + } + + public NodeStatusEvent(EventType eventType, NodeResource resource) { + super(eventType); + this.resource = resource; + } + + public NodeResource getResource() { + return resource; + } +} diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto index 40aeab7360..dffd8c9ee4 100644 --- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto +++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto @@ -25,15 +25,42 @@ option java_generate_equals_and_hash = true; import "QueryCoordinatorProtocol.proto"; import "ContainerProtocol.proto"; import "tajo_protos.proto"; +import "TajoIdProtos.proto"; package hadoop.yarn; +// deprecated message NodeHeartbeat { required WorkerConnectionInfoProto connectionInfo = 1; optional ServerStatusProto serverStatus = 2; optional string statusMessage = 3; } +message NodeHeartbeatRequestProto { + required int32 workerId = 1; + optional NodeResourceProto totalResource = 2; + optional NodeResourceProto availableResource = 3; + optional WorkerConnectionInfoProto connectionInfo = 4; + optional NodeStatusProto status = 5; +} + +message NodeHeartbeatResponseProto { + required ResponseCommand command = 1 [default = NORMAL]; + repeated QueryIdProto queryId = 2; +} + +enum ResponseCommand { + NORMAL = 1; //ping + MEMBERSHIP = 2; // request membership to worker node + ABORT_QUERY = 3; //query master failure + SHUTDOWN = 4; // black list +} + +//TODO add node health information +message NodeStatusProto { +} + service TajoResourceTrackerProtocolService { rpc heartbeat(NodeHeartbeat) returns (TajoHeartbeatResponse); + rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto); } \ No newline at end of file diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto index bf9bbde509..2324596b1b 100644 --- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto +++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto @@ -116,6 +116,7 @@ message ExecutionBlockReport { repeated IntermediateEntryProto intermediateEntries = 5; } +// deprecated message TaskResponseProto { required string id = 1; required QueryState status = 2; @@ -161,6 +162,7 @@ message QueryExecutionRequestProto { optional StringProto logicalPlanJson = 6; } +// deprecated message GetTaskRequestProto { required int32 workerId = 1; required TajoContainerIdProto containerId = 2; @@ -198,6 +200,20 @@ message ExecutionBlockListProto { repeated ExecutionBlockIdProto executionBlockId = 1; } +message TaskAllocationRequestProto { + required TaskRequestProto taskRequest = 1; + required NodeResourceProto resource = 2; +} + +message BatchAllocationRequestProto { + required ExecutionBlockIdProto executionBlockId = 1; + repeated TaskAllocationRequestProto taskRequest = 2; +} + +message BatchAllocationResponseProto { + repeated TaskAllocationRequestProto cancellationTask = 2; +} + service TajoWorkerProtocolService { rpc ping (TaskAttemptIdProto) returns (BoolProto); From c0257fdc33009f26e31cbd8a1043f3bd6f7600ce Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Tue, 19 May 2015 01:42:49 +0900 Subject: [PATCH 03/12] add test cases --- .../java/org/apache/tajo/conf/TajoConf.java | 4 +- .../tajo/master/rm/TajoResourceTracker.java | 4 +- .../apache/tajo/resource/NodeResources.java | 6 +- .../worker/NodeResourceManagerService.java | 60 ++--- .../tajo/worker/NodeStatusUpdaterService.java | 92 ++++--- .../event/NodeResourceDeAllocateEvent.java | 4 + .../worker/MockNodeStatusUpdaterService.java | 105 ++++++++ .../TestNodeResourceManagerService.java | 242 ++++++++++++++++++ .../worker/TestNodeStatusUpdaterService.java | 115 +++++++++ 9 files changed, 561 insertions(+), 71 deletions(-) create mode 100644 tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdaterService.java create mode 100644 tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManagerService.java create mode 100644 tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdaterService.java diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index d107eb8b9c..995b8d639c 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -174,7 +174,9 @@ public static enum ConfVars implements ConfigKey { WORKER_RESOURCE_AVAILABLE_CPU_CORES("tajo.worker.resource.cpu-cores", Runtime.getRuntime().availableProcessors(), Validators.min("1")), WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 1024, Validators.min("64")), + @Deprecated WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 1.0f), + WORKER_RESOURCE_AVAILABLE_DISK_NUM("tajo.worker.resource.disk.num", 1), WORKER_EXECUTION_MAX_SLOTS("tajo.worker.parallel-execution.max-num", 2), WORKER_RESOURCE_DFS_DIR_AWARE("tajo.worker.resource.dfs-dir-aware", false, Validators.bool()), @@ -187,7 +189,7 @@ public static enum ConfVars implements ConfigKey { WORKER_HISTORY_EXPIRE_PERIOD("tajo.worker.history.expire-interval-minutes", 60), // 1 hours QUERYMASTER_HISTORY_EXPIRE_PERIOD("tajo.qm.history.expire-interval-minutes", 6 * 60), // 6 hours - WORKER_HEARTBEAT_TIMEOUT("tajo.worker.heartbeat.timeout", 120 * 1000), // 120 sec + WORKER_HEARTBEAT_INTERVAL("tajo.worker.heartbeat.interval", 10 * 1000), // 10 sec // Resource Manager RESOURCE_MANAGER_CLASS("tajo.resource.manager", "org.apache.tajo.master.rm.TajoWorkerResourceManager", diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java index a65c8a546f..af28886446 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java @@ -20,10 +20,12 @@ import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; +import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; +import org.apache.tajo.common.exception.NotImplementedException; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse; import org.apache.tajo.ipc.TajoResourceTrackerProtocol; @@ -186,7 +188,7 @@ public void heartbeat( public void nodeHeartbeat(RpcController controller, TajoResourceTrackerProtocol.NodeHeartbeatRequestProto request, RpcCallback done) { //TODO implement with ResourceManager for scheduler - + throw new RuntimeException(new ServiceException(new NotImplementedException().getMessage())); } private Worker createWorkerResource(NodeHeartbeat request) { diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java index 51526b6b0a..01e9dcf5bc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java +++ b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResources.java @@ -34,7 +34,11 @@ public static NodeResource createResource(int memory, int disks, int vCores) { } public static NodeResource clone(NodeResource res) { - return NodeResource.createResource(res.getMemory(), res.getVirtualCores(), res.getDisks()); + return NodeResource.createResource(res.getMemory(), res.getDisks(), res.getVirtualCores()); + } + + public static NodeResource update(NodeResource lhs, NodeResource rhs) { + return lhs.setMemory(rhs.getMemory()).setDisks(rhs.getDisks()).setVirtualCores(rhs.getVirtualCores()); } public static NodeResource addTo(NodeResource lhs, NodeResource rhs) { diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java index 7a2a68d2fc..c4c57cb628 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java @@ -29,9 +29,11 @@ import org.apache.tajo.resource.NodeResources; import org.apache.tajo.storage.DiskUtil; import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.worker.event.NodeResourceAllocateEvent; import org.apache.tajo.worker.event.NodeResourceDeallocateEvent; import org.apache.tajo.worker.event.NodeResourceManagerEvent; +import org.apache.tajo.worker.event.NodeStatusEvent; import java.util.concurrent.atomic.AtomicInteger; @@ -41,17 +43,14 @@ public class NodeResourceManagerService extends AbstractService implements Event private static final Log LOG = LogFactory.getLog(NodeResourceManagerService.class); private final Dispatcher dispatcher; - private final TajoWorker.WorkerContext context; private NodeResource totalResource; private NodeResource availableResource; - private AtomicInteger running; + private AtomicInteger allocatedSize; private TajoConf tajoConf; - private volatile boolean isStopped; - public NodeResourceManagerService(TajoWorker.WorkerContext context, Dispatcher dispatcher){ + public NodeResourceManagerService(Dispatcher dispatcher){ super(NodeResourceManagerService.class.getName()); this.dispatcher = dispatcher; - this.context = context; } @Override @@ -63,21 +62,9 @@ protected void serviceInit(Configuration conf) throws Exception { this.totalResource = createWorkerResource(tajoConf); this.availableResource = NodeResources.clone(totalResource); this.dispatcher.register(NodeResourceManagerEvent.EventType.class, this); - this.running = new AtomicInteger(); + this.allocatedSize = new AtomicInteger(); super.serviceInit(conf); - LOG.info("Initialized NodeMonitorService for " + totalResource); - } - - @Override - protected void serviceStart() throws Exception { - super.serviceStart(); - } - - @Override - protected void serviceStop() throws Exception { - // Interrupt the updater. - this.isStopped = true; - super.serviceStop(); + LOG.info("Initialized NodeResourceManagerService for " + totalResource); } @Override @@ -89,7 +76,7 @@ public void handle(NodeResourceManagerEvent event) { for (TaskAllocationRequestProto request : allocateEvent.getRequest().getTaskRequestList()) { NodeResource resource = new NodeResource(request.getResource()); if (allocate(resource)) { - running.incrementAndGet(); + allocatedSize.incrementAndGet(); //TODO send task event to taskExecutorService } else { response.addCancellationTask(request); @@ -99,44 +86,59 @@ public void handle(NodeResourceManagerEvent event) { break; } case DEALLOCATE: { - running.decrementAndGet(); + allocatedSize.decrementAndGet(); NodeResourceDeallocateEvent deallocateEvent = (NodeResourceDeallocateEvent) event; release(deallocateEvent.getResource()); + + // send current resource to ResourceTracker + getDispatcher().getEventHandler().handle( + new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE, getAvailableResource())); break; } } } - public Dispatcher getDispatcher() { + protected Dispatcher getDispatcher() { return dispatcher; } - public NodeResource getTotalResource() { + protected NodeResource getTotalResource() { return NodeResources.clone(totalResource); } - public NodeResource getAvailableResource() { + protected NodeResource getAvailableResource() { return NodeResources.clone(availableResource); } + public int getAllocatedSize() { + return allocatedSize.get(); + } + private boolean allocate(NodeResource resource) { + //TODO consider the jvm free memory if (NodeResources.fitsIn(resource, availableResource)) { - NodeResources.addTo(availableResource, resource); + NodeResources.subtractFrom(availableResource, resource); return true; } return false; } private void release(NodeResource resource) { - NodeResources.subtractFrom(availableResource, resource); + NodeResources.addTo(availableResource, resource); } private NodeResource createWorkerResource(TajoConf conf) { + int memoryMb; + + if (conf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { + memoryMb = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB); + } else { + memoryMb = Math.min((int) (Runtime.getRuntime().maxMemory() / StorageUnit.MB), + conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB)); + } - int memoryMb = Math.min((int) (Runtime.getRuntime().maxMemory() / StorageUnit.MB), - conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB)); int vCores = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES); - int disks = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS); + int disks = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_NUM); int dataNodeStorageSize = DiskUtil.getDataNodeStorageSize(); if (conf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DFS_DIR_AWARE) && dataNodeStorageSize > 0) { diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdaterService.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdaterService.java index 5ac100f0d9..b16b46174c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdaterService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdaterService.java @@ -27,9 +27,11 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.TajoResourceTrackerProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.resource.NodeResource; import org.apache.tajo.rpc.*; import org.apache.tajo.service.ServiceTracker; +import org.apache.tajo.service.ServiceTrackerFactory; import org.apache.tajo.worker.event.NodeStatusEvent; import java.net.ConnectException; @@ -39,7 +41,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.*; @@ -53,20 +54,20 @@ public class NodeStatusUpdaterService extends AbstractService implements EventHa private TajoConf tajoConf; private StatusUpdaterThread updaterThread; - private AtomicBoolean flushReport = new AtomicBoolean(); private volatile boolean isStopped; private volatile long heartBeatInterval; - private BlockingQueue heartBeatRequestQueue; - private TajoWorker.WorkerContext context; - private NodeResourceManagerService nodeResourceManagerService; + private BlockingQueue heartBeatRequestQueue; + private final WorkerConnectionInfo connectionInfo; + private final NodeResourceManagerService nodeResourceManagerService; private AsyncRpcClient rmClient; - private TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker; + private ServiceTracker serviceTracker; + private TajoResourceTrackerProtocolService.Interface resourceTracker; + private int queueingLimit; - public NodeStatusUpdaterService(TajoWorker.WorkerContext context, NodeResourceManagerService resourceManagerService) { + public NodeStatusUpdaterService(WorkerConnectionInfo connectionInfo, NodeResourceManagerService resourceManagerService) { super(NodeStatusUpdaterService.class.getSimpleName()); - this.context = context; + this.connectionInfo = connectionInfo; this.nodeResourceManagerService = resourceManagerService; - this.heartBeatInterval = 10 * 1000; //10 sec } @Override @@ -76,15 +77,21 @@ public void serviceInit(Configuration conf) throws Exception { } this.tajoConf = (TajoConf) conf; this.heartBeatRequestQueue = Queues.newLinkedBlockingQueue(); + this.serviceTracker = ServiceTrackerFactory.get(tajoConf); + this.nodeResourceManagerService.getDispatcher().register(NodeStatusEvent.EventType.class, this); + this.heartBeatInterval = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_INTERVAL); this.updaterThread = new StatusUpdaterThread(); super.serviceInit(conf); } @Override public void serviceStart() throws Exception { + // if resource changed over than 50%, send reports + this.queueingLimit = nodeResourceManagerService.getTotalResource().getVirtualCores() / 2; + updaterThread.start(); super.serviceStart(); - LOG.info("Node status updater started."); + LOG.info("NodeStatusUpdater started."); } @Override @@ -95,33 +102,39 @@ public void serviceStop() throws Exception { updaterThread.notifyAll(); } super.serviceStop(); + LOG.info("NodeStatusUpdater stopped."); } @Override public void handle(NodeStatusEvent event) { switch (event.getType()) { case REPORT_RESOURCE: - heartBeatRequestQueue.add(event); + heartBeatRequestQueue.add(event); //batch report to ResourceTracker break; case FLUSH_REPORTS: - heartBeatRequestQueue.add(event); - synchronized (updaterThread) { - updaterThread.notifyAll(); - } + heartBeatRequestQueue.add(event); //flush report to ResourceTracker break; } } + public int getQueueSize() { + return heartBeatRequestQueue.size(); + } + + public int getQueueingLimit() { + return queueingLimit; + } + private NodeHeartbeatRequestProto createResourceReport(NodeResource resource) { NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder(); requestProto.setAvailableResource(resource.getProto()); - requestProto.setWorkerId(context.getConnectionInfo().getId()); + requestProto.setWorkerId(connectionInfo.getId()); return requestProto.build(); } private NodeHeartbeatRequestProto createHeartBeatReport() { NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder(); - requestProto.setWorkerId(context.getConnectionInfo().getId()); + requestProto.setWorkerId(connectionInfo.getId()); return requestProto.build(); } @@ -129,16 +142,15 @@ private NodeHeartbeatRequestProto createNodeStatusReport() { NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder(); requestProto.setTotalResource(nodeResourceManagerService.getTotalResource().getProto()); requestProto.setAvailableResource(nodeResourceManagerService.getAvailableResource().getProto()); - requestProto.setWorkerId(context.getConnectionInfo().getId()); - requestProto.setConnectionInfo(context.getConnectionInfo().getProto()); + requestProto.setWorkerId(connectionInfo.getId()); + requestProto.setConnectionInfo(connectionInfo.getProto()); //TODO set node status to requestProto.setStatus() return requestProto.build(); } - private TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService newStub() + protected TajoResourceTrackerProtocolService.Interface newStub() throws NoSuchMethodException, ConnectException, ClassNotFoundException { - ServiceTracker serviceTracker = context.getServiceTracker(); RpcClientManager.cleanup(rmClient); RpcClientManager rpcManager = RpcClientManager.getInstance(); @@ -178,7 +190,7 @@ public StatusUpdaterThread() { super("NodeStatusUpdater"); } - private int drain(Collection buffer, int numElements, + private int drain(Collection buffer, int numElements, long timeout, TimeUnit unit) throws InterruptedException { long deadline = System.nanoTime() + unit.toNanos(timeout); @@ -186,22 +198,22 @@ private int drain(Collection buffer, int numElements, while (added < numElements) { added += heartBeatRequestQueue.drainTo(buffer, numElements - added); if (added < numElements) { // not enough elements immediately available; will have to wait - if (deadline <= System.currentTimeMillis()) { + NodeStatusEvent e = heartBeatRequestQueue.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS); + if (e == null) { + break; // we already waited enough, and there are no more elements in sight + } + buffer.add(e); + added++; + + if (e.getType() == NodeStatusEvent.EventType.FLUSH_REPORTS) { break; - } else { - synchronized (updaterThread) { - updaterThread.wait(deadline - System.currentTimeMillis()); - if (deadline > System.nanoTime()) { - added += heartBeatRequestQueue.drainTo(buffer, numElements - added); - break; - } - } } } } return added; } + /* Node sends a heartbeats with its resource and status periodically to master. */ @Override public void run() { NodeHeartbeatResponseProto lastResponse = null; @@ -212,9 +224,8 @@ public void run() { if (lastResponse.getCommand() == ResponseCommand.NORMAL) { List events = Lists.newArrayList(); try { - drain(events, - Math.max(nodeResourceManagerService.getTotalResource().getVirtualCores() / 2, 1), - heartBeatInterval, TimeUnit.MILLISECONDS); + /* batch update to ResourceTracker */ + drain(events, Math.max(queueingLimit, 1), heartBeatInterval, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { break; } @@ -223,22 +234,25 @@ public void run() { // send last available resource; lastResponse = sendHeartbeat(createResourceReport(events.get(events.size() - 1).getResource())); } else { + // send ping; lastResponse = sendHeartbeat(createHeartBeatReport()); } } else if (lastResponse.getCommand() == ResponseCommand.MEMBERSHIP) { + // Membership changed lastResponse = sendHeartbeat(createNodeStatusReport()); } else if (lastResponse.getCommand() == ResponseCommand.ABORT_QUERY) { //TODO abort failure queries } } else { + // Node registration on startup lastResponse = sendHeartbeat(createNodeStatusReport()); } - } catch (NoSuchMethodException e) { - LOG.fatal(e.getMessage(), e); + } catch (NoSuchMethodException nsme) { + LOG.fatal(nsme.getMessage(), nsme); Runtime.getRuntime().halt(1); - } catch (ClassNotFoundException e) { - LOG.fatal(e.getMessage(), e); + } catch (ClassNotFoundException cnfe) { + LOG.fatal(cnfe.getMessage(), cnfe); Runtime.getRuntime().halt(1); } catch (Exception e) { LOG.error(e.getMessage(), e); @@ -246,7 +260,7 @@ public void run() { synchronized (updaterThread) { try { updaterThread.wait(heartBeatInterval); - } catch (InterruptedException e1) { + } catch (InterruptedException ie) { // Do Nothing } } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeAllocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeAllocateEvent.java index 7011101b05..78a7796b33 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeAllocateEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeAllocateEvent.java @@ -18,12 +18,16 @@ package org.apache.tajo.worker.event; +import org.apache.tajo.TajoProtos; import org.apache.tajo.resource.NodeResource; public class NodeResourceDeallocateEvent extends NodeResourceManagerEvent { private NodeResource resource; + public NodeResourceDeallocateEvent(TajoProtos.NodeResourceProto proto) { + this(new NodeResource(proto)); + } public NodeResourceDeallocateEvent(NodeResource resource) { super(EventType.DEALLOCATE); this.resource = resource; diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdaterService.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdaterService.java new file mode 100644 index 0000000000..934f29259c --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdaterService.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import com.google.common.collect.Maps; +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import org.apache.tajo.ipc.QueryCoordinatorProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.resource.NodeResource; +import org.apache.tajo.resource.NodeResources; + +import java.net.ConnectException; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.*; + +public class MockNodeStatusUpdaterService extends NodeStatusUpdaterService { + + private CountDownLatch barrier; + private Map membership = Maps.newHashMap(); + private Map resources = Maps.newHashMap(); + private MockResourceTracker resourceTracker; + + public MockNodeStatusUpdaterService(CountDownLatch barrier, WorkerConnectionInfo connectionInfo, + NodeResourceManagerService resourceManagerService) { + super(connectionInfo, resourceManagerService); + this.barrier = barrier; + this.resourceTracker = new MockResourceTracker(); + } + + @Override + protected TajoResourceTrackerProtocolService.Interface newStub() + throws NoSuchMethodException, ConnectException, ClassNotFoundException { + + return resourceTracker; + } + + protected MockResourceTracker getResourceTracker() { + return resourceTracker; + } + + class MockResourceTracker implements TajoResourceTrackerProtocolService.Interface { + private NodeHeartbeatRequestProto lastRequest; + + protected Map getTotalResource() { + return membership; + } + + protected Map getAvailableResource() { + return membership; + } + + protected NodeHeartbeatRequestProto getLastRequest() { + return lastRequest; + } + + @Override + public void heartbeat(RpcController controller, NodeHeartbeat request, + RpcCallback done) { + + } + + @Override + public void nodeHeartbeat(RpcController controller, NodeHeartbeatRequestProto request, + RpcCallback done) { + + NodeHeartbeatResponseProto.Builder response = NodeHeartbeatResponseProto.newBuilder(); + if (membership.containsKey(request.getWorkerId())) { + if (request.hasAvailableResource()) { + NodeResource resource = resources.get(request.getWorkerId()); + NodeResources.update(resource, new NodeResource(request.getAvailableResource())); + } + done.run(response.setCommand(ResponseCommand.NORMAL).build()); + } else { + if (request.hasConnectionInfo()) { + membership.put(request.getWorkerId(), new NodeResource(request.getTotalResource())); + resources.put(request.getWorkerId(), new NodeResource(request.getAvailableResource())); + done.run(response.setCommand(ResponseCommand.NORMAL).build()); + } else { + done.run(response.setCommand(ResponseCommand.MEMBERSHIP).build()); + } + } + lastRequest = request; + barrier.countDown(); + } + } +} diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManagerService.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManagerService.java new file mode 100644 index 0000000000..49b1d2ba50 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManagerService.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import com.google.common.collect.Lists; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.tajo.*; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.plan.serder.PlanProto; +import org.apache.tajo.resource.NodeResources; +import org.apache.tajo.rpc.CallFuture; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.worker.event.NodeResourceAllocateEvent; +import org.apache.tajo.worker.event.NodeResourceDeallocateEvent; +import org.junit.*; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.*; + +import static org.apache.tajo.ipc.TajoWorkerProtocol.*; +public class TestNodeResourceManagerService { + + private NodeResourceManagerService resourceManagerService; + private MockNodeStatusUpdaterService statusUpdater; + private AsyncDispatcher dispatcher; + private int taskMemory; + private TajoConf conf; + + @Before + public void setup() { + conf = new TajoConf(); + conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE); + + taskMemory = 512; + conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 4); + conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB, + taskMemory * conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES)); + conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_NUM, 4); + + dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + + resourceManagerService = new NodeResourceManagerService(dispatcher); + resourceManagerService.init(conf); + resourceManagerService.start(); + + WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); + statusUpdater = new MockNodeStatusUpdaterService(new CountDownLatch(0), worker, resourceManagerService); + statusUpdater.init(conf); + statusUpdater.start(); + } + + @After + public void tearDown() { + resourceManagerService.stop(); + statusUpdater.stop(); + dispatcher.stop(); + } + + @Test + public void testNodeResourceAllocateEvent() throws Exception { + int requestSize = 4; + + CallFuture callFuture = new CallFuture(); + BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder(); + ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + requestProto.setExecutionBlockId(ebId.getProto()); + + assertEquals(resourceManagerService.getTotalResource(), resourceManagerService.getAvailableResource()); + requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize)); + + dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); + + BatchAllocationResponseProto responseProto = callFuture.get(); + assertNotEquals(resourceManagerService.getTotalResource(), resourceManagerService.getAvailableResource()); + assertEquals(0, responseProto.getCancellationTaskCount()); + assertEquals(requestSize, resourceManagerService.getAllocatedSize()); + } + + + @Test + public void testNodeResourceCancellation() throws Exception { + int requestSize = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES); + int overSize = 10; + + CallFuture callFuture = new CallFuture(); + BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder(); + ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + requestProto.setExecutionBlockId(ebId.getProto()); + + assertEquals(resourceManagerService.getTotalResource(), resourceManagerService.getAvailableResource()); + requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize + overSize)); + + dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); + BatchAllocationResponseProto responseProto = callFuture.get(); + + assertEquals(overSize, responseProto.getCancellationTaskCount()); + assertEquals(requestSize, resourceManagerService.getAllocatedSize()); + } + + @Test + public void testNodeResourceDeallocateEvent() throws Exception { + int requestSize = 4; + + CallFuture callFuture = new CallFuture(); + BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder(); + ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + requestProto.setExecutionBlockId(ebId.getProto()); + + assertEquals(resourceManagerService.getTotalResource(), resourceManagerService.getAvailableResource()); + requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize)); + + dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); + + BatchAllocationResponseProto responseProto = callFuture.get(); + assertNotEquals(resourceManagerService.getTotalResource(), resourceManagerService.getAvailableResource()); + assertEquals(0, responseProto.getCancellationTaskCount()); + assertEquals(requestSize, resourceManagerService.getAllocatedSize()); + + //deallocate + for(TaskAllocationRequestProto allocationRequestProto : requestProto.getTaskRequestList()) { + // direct invoke handler for testing + resourceManagerService.handle(new NodeResourceDeallocateEvent(allocationRequestProto.getResource())); + } + assertEquals(0, resourceManagerService.getAllocatedSize()); + assertEquals(resourceManagerService.getTotalResource(), resourceManagerService.getAvailableResource()); + } + + @Test(timeout = 30000) + public void testParallelRequest() throws Exception { + final int parallelCount = Runtime.getRuntime().availableProcessors(); + final int taskSize = 100000; + final AtomicInteger totalComplete = new AtomicInteger(); + final AtomicInteger totalCanceled = new AtomicInteger(); + + final ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + final List totalTasks = + Collections.synchronizedList(createTaskRequests(taskMemory, taskSize)); + + ExecutorService executor = Executors.newFixedThreadPool(parallelCount); + List futureList = Lists.newArrayList(); + + long startTime = System.currentTimeMillis(); + for (int i = 0; i < parallelCount; i++) { + futureList.add(executor.submit(new Runnable() { + @Override + public void run() { + int complete = 0; + while (true) { + TaskAllocationRequestProto task; + synchronized (totalTasks) { + if (totalTasks.isEmpty()) break; + else { + task = totalTasks.remove(0); + } + } + + BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder(); + requestProto.addTaskRequest(task); + requestProto.setExecutionBlockId(ebId.getProto()); + + CallFuture callFuture = new CallFuture(); + dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); + try { + BatchAllocationResponseProto proto = callFuture.get(); + if (proto.getCancellationTaskCount() > 0) { + totalTasks.addAll(proto.getCancellationTaskList()); + totalCanceled.addAndGet(proto.getCancellationTaskCount()); + } else { + complete++; + dispatcher.getEventHandler().handle(new NodeResourceDeallocateEvent(task.getResource())); + } + } catch (Exception e) { + fail(e.getMessage()); + } + } + System.out.println(Thread.currentThread().getName() + " complete task: " + complete); + totalComplete.addAndGet(complete); + } + }) + ); + } + + for (Future future : futureList) { + future.get(); + } + + System.out.println(parallelCount + " Thread, completed requests: " + totalComplete.get() + ", canceled requests:" + + totalCanceled.get() + ", " + +(System.currentTimeMillis() - startTime) + " ms elapsed"); + executor.shutdown(); + assertEquals(taskSize, totalComplete.get()); + } + + protected static List createTaskRequests(int memory, int size) { + List requestProtoList = Lists.newArrayList(); + for (int i = 0; i < size; i++) { + + ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0); + TaskAttemptId taskAttemptId = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, i), 0); + + TajoWorkerProtocol.TaskRequestProto.Builder builder = + TajoWorkerProtocol.TaskRequestProto.newBuilder(); + builder.setId(taskAttemptId.getProto()); + builder.setShouldDie(true); + builder.setOutputTable(""); + builder.setPlan(PlanProto.LogicalNodeTree.newBuilder()); + builder.setClusteredOutput(false); + + + requestProtoList.add(TaskAllocationRequestProto.newBuilder() + .setResource(NodeResources.createResource(memory).getProto()) + .setTaskRequest(builder.build()).build()); + } + return requestProtoList; + } +} diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdaterService.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdaterService.java new file mode 100644 index 0000000000..ae0fdda2eb --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdaterService.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.TajoResourceTrackerProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.worker.event.NodeStatusEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; +import static org.junit.Assert.*; + +public class TestNodeStatusUpdaterService { + + private NodeResourceManagerService resourceManagerService; + private MockNodeStatusUpdaterService statusUpdater; + private AsyncDispatcher dispatcher; + private TajoConf conf; + + @Before + public void setup() { + conf = new TajoConf(); + conf.set(CommonTestingUtil.TAJO_TEST_KEY, CommonTestingUtil.TAJO_TEST_TRUE); + + conf.setIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_INTERVAL, 1000); + dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + + resourceManagerService = new NodeResourceManagerService(dispatcher); + resourceManagerService.init(conf); + resourceManagerService.start(); + } + + @After + public void tearDown() { + resourceManagerService.stop(); + if (statusUpdater != null) statusUpdater.stop(); + dispatcher.stop(); + } + + @Test(timeout = 20000) + public void testNodeMembership() throws Exception { + CountDownLatch barrier = new CountDownLatch(1); + WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); + statusUpdater = new MockNodeStatusUpdaterService(barrier, worker, resourceManagerService); + statusUpdater.init(conf); + statusUpdater.start(); + + MockNodeStatusUpdaterService.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker(); + barrier.await(); + + assertTrue(resourceTracker.getTotalResource().containsKey(worker.getId())); + assertEquals(resourceManagerService.getTotalResource(), + resourceTracker.getTotalResource().get(worker.getId())); + + assertEquals(resourceManagerService.getAvailableResource(), + resourceTracker.getAvailableResource().get(worker.getId())); + } + + @Test(timeout = 20000) + public void testPing() throws Exception { + CountDownLatch barrier = new CountDownLatch(2); + WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); + statusUpdater = new MockNodeStatusUpdaterService(barrier, worker, resourceManagerService); + statusUpdater.init(conf); + statusUpdater.start(); + + MockNodeStatusUpdaterService.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker(); + barrier.await(); + + TajoResourceTrackerProtocol.NodeHeartbeatRequestProto lastRequest = resourceTracker.getLastRequest(); + assertTrue(lastRequest.hasWorkerId()); + assertFalse(lastRequest.hasAvailableResource()); + assertFalse(lastRequest.hasTotalResource()); + assertFalse(lastRequest.hasConnectionInfo()); + } + + @Test(timeout = 20000) + public void testResourceReport() throws Exception { + CountDownLatch barrier = new CountDownLatch(2); + WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); + statusUpdater = new MockNodeStatusUpdaterService(barrier, worker, resourceManagerService); + statusUpdater.init(conf); + statusUpdater.start(); + + for (int i = 0; i < statusUpdater.getQueueingLimit(); i++) { + dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE, + resourceManagerService.getAvailableResource())); + } + barrier.await(); + assertEquals(0, statusUpdater.getQueueSize()); + } +} From 24a845cdaf1467610c897e35ed6b98ece8cb7d02 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Tue, 19 May 2015 02:24:21 +0900 Subject: [PATCH 04/12] trigger CI --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index ba9daf2ca0..43e80c06e9 100644 --- a/CHANGES +++ b/CHANGES @@ -1,5 +1,6 @@ Tajo Change Log + Release 0.11.0 - unreleased NEW FEATURES From 798c1451efacba27ac50e25baf9fef95af9a4880 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Tue, 19 May 2015 10:51:43 +0900 Subject: [PATCH 05/12] TAJO-1599 --- .../src/main/java/org/apache/tajo/conf/TajoConf.java | 4 +++- .../org/apache/tajo/worker/NodeResourceManagerService.java | 6 ++++-- .../apache/tajo/worker/TestNodeResourceManagerService.java | 3 ++- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 995b8d639c..e20658b1a9 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -176,7 +176,9 @@ public static enum ConfVars implements ConfigKey { WORKER_RESOURCE_AVAILABLE_MEMORY_MB("tajo.worker.resource.memory-mb", 1024, Validators.min("64")), @Deprecated WORKER_RESOURCE_AVAILABLE_DISKS("tajo.worker.resource.disks", 1.0f), - WORKER_RESOURCE_AVAILABLE_DISK_NUM("tajo.worker.resource.disk.num", 1), + WORKER_RESOURCE_AVAILABLE_DISKS_NUM("tajo.worker.resource.disks.num", 1, Validators.min("1")), + WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM("tajo.worker.resource.disk.parallel-execution.num", 2, + Validators.min("1")), WORKER_EXECUTION_MAX_SLOTS("tajo.worker.parallel-execution.max-num", 2), WORKER_RESOURCE_DFS_DIR_AWARE("tajo.worker.resource.dfs-dir-aware", false, Validators.bool()), diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java index c4c57cb628..26bdba5006 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java @@ -138,12 +138,14 @@ private NodeResource createWorkerResource(TajoConf conf) { } int vCores = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES); - int disks = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_NUM); + int disks = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS_NUM); int dataNodeStorageSize = DiskUtil.getDataNodeStorageSize(); if (conf.getBoolVar(TajoConf.ConfVars.WORKER_RESOURCE_DFS_DIR_AWARE) && dataNodeStorageSize > 0) { disks = dataNodeStorageSize; } - return NodeResource.createResource(memoryMb, disks, vCores); + + int diskParallels = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM); + return NodeResource.createResource(memoryMb, disks * diskParallels, vCores); } } diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManagerService.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManagerService.java index 49b1d2ba50..0bcaedb266 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManagerService.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManagerService.java @@ -60,7 +60,8 @@ public void setup() { conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES, 4); conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_MEMORY_MB, taskMemory * conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES)); - conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_NUM, 4); + conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISKS_NUM, 4); + conf.setIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_DISK_PARALLEL_NUM, 1); dispatcher = new AsyncDispatcher(); dispatcher.init(conf); From 9b7a9f30e48e51d79ed85264a762d71836e10c66 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Tue, 19 May 2015 10:56:06 +0900 Subject: [PATCH 06/12] I can't rename file. maybe git bug --- .../apache/tajo/worker/event/NodeResourceDeAllocateEvent.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeAllocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeAllocateEvent.java index 78a7796b33..a298d77e20 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeAllocateEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeAllocateEvent.java @@ -28,6 +28,7 @@ public class NodeResourceDeallocateEvent extends NodeResourceManagerEvent { public NodeResourceDeallocateEvent(TajoProtos.NodeResourceProto proto) { this(new NodeResource(proto)); } + public NodeResourceDeallocateEvent(NodeResource resource) { super(EventType.DEALLOCATE); this.resource = resource; From 9b40d944e5252cd3c34350ddb2866f8a6cf60b21 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Tue, 19 May 2015 11:06:30 +0900 Subject: [PATCH 07/12] delete old file --- .../event/NodeResourceDeAllocateEvent.java | 40 ------------------- 1 file changed, 40 deletions(-) delete mode 100644 tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeAllocateEvent.java diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeAllocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeAllocateEvent.java deleted file mode 100644 index a298d77e20..0000000000 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeAllocateEvent.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.worker.event; - -import org.apache.tajo.TajoProtos; -import org.apache.tajo.resource.NodeResource; - -public class NodeResourceDeallocateEvent extends NodeResourceManagerEvent { - - private NodeResource resource; - - public NodeResourceDeallocateEvent(TajoProtos.NodeResourceProto proto) { - this(new NodeResource(proto)); - } - - public NodeResourceDeallocateEvent(NodeResource resource) { - super(EventType.DEALLOCATE); - this.resource = resource; - } - - public NodeResource getResource() { - return resource; - } -} From 4448a0bd8ecde82225ea49b72cb015d781e1a0ad Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Tue, 19 May 2015 11:07:17 +0900 Subject: [PATCH 08/12] add renamed file --- .../event/NodeResourceDeallocateEvent.java | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java new file mode 100644 index 0000000000..a298d77e20 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/NodeResourceDeallocateEvent.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker.event; + +import org.apache.tajo.TajoProtos; +import org.apache.tajo.resource.NodeResource; + +public class NodeResourceDeallocateEvent extends NodeResourceManagerEvent { + + private NodeResource resource; + + public NodeResourceDeallocateEvent(TajoProtos.NodeResourceProto proto) { + this(new NodeResource(proto)); + } + + public NodeResourceDeallocateEvent(NodeResource resource) { + super(EventType.DEALLOCATE); + this.resource = resource; + } + + public NodeResource getResource() { + return resource; + } +} From 5d75eb3d20ec9ba29d21e7f626a08dbd689e7505 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Tue, 19 May 2015 11:09:28 +0900 Subject: [PATCH 09/12] revert changes --- CHANGES | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGES b/CHANGES index 43e80c06e9..ba9daf2ca0 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,5 @@ Tajo Change Log - Release 0.11.0 - unreleased NEW FEATURES From ec57a13f0349f29135a00b02fb24c0a70006a40f Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Tue, 19 May 2015 23:47:15 +0900 Subject: [PATCH 10/12] change to atomic updater and update test case --- .../apache/tajo/resource/NodeResource.java | 40 ++++++++++++++----- .../worker/NodeResourceManagerService.java | 4 +- .../TestNodeResourceManagerService.java | 26 +++++------- 3 files changed, 42 insertions(+), 28 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java index c2512b971c..f51fc07e12 100644 --- a/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/resource/NodeResource.java @@ -19,9 +19,12 @@ package org.apache.tajo.resource; import com.google.common.base.Objects; +import io.netty.util.internal.PlatformDependent; import org.apache.tajo.TajoProtos; import org.apache.tajo.common.ProtoObject; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + /** *

NodeResource models a set of computer resources in the * cluster.

@@ -37,14 +40,30 @@ public class NodeResource implements ProtoObject, Comparable { - private int memory; - private int disks; - private int vCores; + private volatile int memory; + private volatile int disks; + private volatile int vCores; + + private static AtomicIntegerFieldUpdater MEMORY_UPDATER; + private static AtomicIntegerFieldUpdater DISKS_UPDATER; + private static AtomicIntegerFieldUpdater VCORES_UPDATER; + + static { + MEMORY_UPDATER = PlatformDependent.newAtomicIntegerFieldUpdater(NodeResource.class, "memory"); + if (MEMORY_UPDATER == null) { + MEMORY_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NodeResource.class, "memory"); + DISKS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NodeResource.class, "disks"); + VCORES_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NodeResource.class, "vCores"); + } else { + DISKS_UPDATER = PlatformDependent.newAtomicIntegerFieldUpdater(NodeResource.class, "disks"); + VCORES_UPDATER = PlatformDependent.newAtomicIntegerFieldUpdater(NodeResource.class, "vCores"); + } + } public NodeResource(TajoProtos.NodeResourceProto proto) { - this.memory = proto.getMemory(); - this.vCores = proto.getVirtualCores(); - this.disks = proto.getDisks(); + setMemory(proto.getMemory()); + setDisks(proto.getDisks()); + setVirtualCores(proto.getVirtualCores()); } private NodeResource() { @@ -69,8 +88,9 @@ public int getMemory() { * * @param memory memory of the resource */ + @SuppressWarnings("unchecked") public NodeResource setMemory(int memory) { - this.memory = memory; + MEMORY_UPDATER.lazySet(this, memory); return this; } @@ -89,8 +109,9 @@ public int getDisks() { * * @param disks number of disks of the resource */ + @SuppressWarnings("unchecked") public NodeResource setDisks(int disks) { - this.disks = disks; + DISKS_UPDATER.lazySet(this, disks); return this; } @@ -111,8 +132,9 @@ public int getVirtualCores() { * * @param vCores number of virtual cpu cores of the resource */ + @SuppressWarnings("unchecked") public NodeResource setVirtualCores(int vCores) { - this.vCores = vCores; + VCORES_UPDATER.lazySet(this, vCores); return this; } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java index 26bdba5006..4b070223cb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java @@ -103,11 +103,11 @@ protected Dispatcher getDispatcher() { } protected NodeResource getTotalResource() { - return NodeResources.clone(totalResource); + return totalResource; } protected NodeResource getAvailableResource() { - return NodeResources.clone(availableResource); + return availableResource; } public int getAllocatedSize() { diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManagerService.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManagerService.java index 0bcaedb266..ea6c2390b6 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManagerService.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManagerService.java @@ -32,12 +32,9 @@ import org.apache.tajo.worker.event.NodeResourceDeallocateEvent; import org.junit.*; -import java.util.Collections; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.Queue; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.*; @@ -161,8 +158,7 @@ public void testParallelRequest() throws Exception { final AtomicInteger totalCanceled = new AtomicInteger(); final ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); - final List totalTasks = - Collections.synchronizedList(createTaskRequests(taskMemory, taskSize)); + final Queue totalTasks = createTaskRequests(taskMemory, taskSize); ExecutorService executor = Executors.newFixedThreadPool(parallelCount); List futureList = Lists.newArrayList(); @@ -174,13 +170,9 @@ public void testParallelRequest() throws Exception { public void run() { int complete = 0; while (true) { - TaskAllocationRequestProto task; - synchronized (totalTasks) { - if (totalTasks.isEmpty()) break; - else { - task = totalTasks.remove(0); - } - } + TaskAllocationRequestProto task = totalTasks.poll(); + if (task == null) break; + BatchAllocationRequestProto.Builder requestProto = BatchAllocationRequestProto.newBuilder(); requestProto.addTaskRequest(task); @@ -201,7 +193,7 @@ public void run() { fail(e.getMessage()); } } - System.out.println(Thread.currentThread().getName() + " complete task: " + complete); + System.out.println(Thread.currentThread().getName() + " complete requests: " + complete); totalComplete.addAndGet(complete); } }) @@ -218,8 +210,8 @@ public void run() { assertEquals(taskSize, totalComplete.get()); } - protected static List createTaskRequests(int memory, int size) { - List requestProtoList = Lists.newArrayList(); + protected static Queue createTaskRequests(int memory, int size) { + Queue requestProtoList = new LinkedBlockingQueue(); for (int i = 0; i < size; i++) { ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0); From 9902e57fc0b613b8a7ebad22b801ced8bdf680f4 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Wed, 20 May 2015 00:04:33 +0900 Subject: [PATCH 11/12] remove findbugs warnings --- .../worker/NodeResourceManagerService.java | 45 +++++++++---------- .../TestNodeResourceManagerService.java | 2 +- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java index 4b070223cb..08d6b5abc6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java @@ -69,32 +69,29 @@ protected void serviceInit(Configuration conf) throws Exception { @Override public void handle(NodeResourceManagerEvent event) { - switch (event.getType()) { - case ALLOCATE: { - NodeResourceAllocateEvent allocateEvent = (NodeResourceAllocateEvent) event; - BatchAllocationResponseProto.Builder response = BatchAllocationResponseProto.newBuilder(); - for (TaskAllocationRequestProto request : allocateEvent.getRequest().getTaskRequestList()) { - NodeResource resource = new NodeResource(request.getResource()); - if (allocate(resource)) { - allocatedSize.incrementAndGet(); - //TODO send task event to taskExecutorService - } else { - response.addCancellationTask(request); - } + + if (event instanceof NodeResourceAllocateEvent) { + NodeResourceAllocateEvent allocateEvent = (NodeResourceAllocateEvent) event; + BatchAllocationResponseProto.Builder response = BatchAllocationResponseProto.newBuilder(); + for (TaskAllocationRequestProto request : allocateEvent.getRequest().getTaskRequestList()) { + NodeResource resource = new NodeResource(request.getResource()); + if (allocate(resource)) { + allocatedSize.incrementAndGet(); + //TODO send task event to taskExecutorService + } else { + response.addCancellationTask(request); } - allocateEvent.getCallback().run(response.build()); - break; - } - case DEALLOCATE: { - allocatedSize.decrementAndGet(); - NodeResourceDeallocateEvent deallocateEvent = (NodeResourceDeallocateEvent) event; - release(deallocateEvent.getResource()); - - // send current resource to ResourceTracker - getDispatcher().getEventHandler().handle( - new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE, getAvailableResource())); - break; } + allocateEvent.getCallback().run(response.build()); + + } else if (event instanceof NodeResourceDeallocateEvent) { + allocatedSize.decrementAndGet(); + NodeResourceDeallocateEvent deallocateEvent = (NodeResourceDeallocateEvent) event; + release(deallocateEvent.getResource()); + + // send current resource to ResourceTracker + getDispatcher().getEventHandler().handle( + new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE, getAvailableResource())); } } diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManagerService.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManagerService.java index ea6c2390b6..7586f37ec2 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManagerService.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManagerService.java @@ -152,7 +152,7 @@ public void testNodeResourceDeallocateEvent() throws Exception { @Test(timeout = 30000) public void testParallelRequest() throws Exception { - final int parallelCount = Runtime.getRuntime().availableProcessors(); + final int parallelCount = conf.getIntVar(TajoConf.ConfVars.WORKER_RESOURCE_AVAILABLE_CPU_CORES) * 2; final int taskSize = 100000; final AtomicInteger totalComplete = new AtomicInteger(); final AtomicInteger totalCanceled = new AtomicInteger(); From 1d33d36c917f1f1bb376225dcccf18f61568fcc9 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Wed, 20 May 2015 13:59:53 +0900 Subject: [PATCH 12/12] rename service to manager --- ...rService.java => NodeResourceManager.java} | 12 +++--- ...terService.java => NodeStatusUpdater.java} | 20 +++++----- ...ervice.java => MockNodeStatusUpdater.java} | 8 ++-- ...vice.java => TestNodeResourceManager.java} | 38 +++++++++---------- ...ervice.java => TestNodeStatusUpdater.java} | 30 +++++++-------- 5 files changed, 54 insertions(+), 54 deletions(-) rename tajo-core/src/main/java/org/apache/tajo/worker/{NodeResourceManagerService.java => NodeResourceManager.java} (93%) rename tajo-core/src/main/java/org/apache/tajo/worker/{NodeStatusUpdaterService.java => NodeStatusUpdater.java} (92%) rename tajo-core/src/test/java/org/apache/tajo/worker/{MockNodeStatusUpdaterService.java => MockNodeStatusUpdater.java} (92%) rename tajo-core/src/test/java/org/apache/tajo/worker/{TestNodeResourceManagerService.java => TestNodeResourceManager.java} (85%) rename tajo-core/src/test/java/org/apache/tajo/worker/{TestNodeStatusUpdaterService.java => TestNodeStatusUpdater.java} (77%) diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java similarity index 93% rename from tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java rename to tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java index 08d6b5abc6..20eec6b4e2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeResourceManager.java @@ -39,8 +39,8 @@ import static org.apache.tajo.ipc.TajoWorkerProtocol.*; -public class NodeResourceManagerService extends AbstractService implements EventHandler { - private static final Log LOG = LogFactory.getLog(NodeResourceManagerService.class); +public class NodeResourceManager extends AbstractService implements EventHandler { + private static final Log LOG = LogFactory.getLog(NodeResourceManager.class); private final Dispatcher dispatcher; private NodeResource totalResource; @@ -48,8 +48,8 @@ public class NodeResourceManagerService extends AbstractService implements Event private AtomicInteger allocatedSize; private TajoConf tajoConf; - public NodeResourceManagerService(Dispatcher dispatcher){ - super(NodeResourceManagerService.class.getName()); + public NodeResourceManager(Dispatcher dispatcher){ + super(NodeResourceManager.class.getName()); this.dispatcher = dispatcher; } @@ -64,7 +64,7 @@ protected void serviceInit(Configuration conf) throws Exception { this.dispatcher.register(NodeResourceManagerEvent.EventType.class, this); this.allocatedSize = new AtomicInteger(); super.serviceInit(conf); - LOG.info("Initialized NodeResourceManagerService for " + totalResource); + LOG.info("Initialized NodeResourceManager for " + totalResource); } @Override @@ -77,7 +77,7 @@ public void handle(NodeResourceManagerEvent event) { NodeResource resource = new NodeResource(request.getResource()); if (allocate(resource)) { allocatedSize.incrementAndGet(); - //TODO send task event to taskExecutorService + //TODO send task event to taskExecutor } else { response.addCancellationTask(request); } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdaterService.java b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java similarity index 92% rename from tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdaterService.java rename to tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java index b16b46174c..84ac419066 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdaterService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/NodeStatusUpdater.java @@ -48,9 +48,9 @@ /** * It periodically sends heartbeat to {@link org.apache.tajo.master.rm.TajoResourceTracker} via asynchronous rpc. */ -public class NodeStatusUpdaterService extends AbstractService implements EventHandler { +public class NodeStatusUpdater extends AbstractService implements EventHandler { - private final static Log LOG = LogFactory.getLog(NodeStatusUpdaterService.class); + private final static Log LOG = LogFactory.getLog(NodeStatusUpdater.class); private TajoConf tajoConf; private StatusUpdaterThread updaterThread; @@ -58,16 +58,16 @@ public class NodeStatusUpdaterService extends AbstractService implements EventHa private volatile long heartBeatInterval; private BlockingQueue heartBeatRequestQueue; private final WorkerConnectionInfo connectionInfo; - private final NodeResourceManagerService nodeResourceManagerService; + private final NodeResourceManager nodeResourceManager; private AsyncRpcClient rmClient; private ServiceTracker serviceTracker; private TajoResourceTrackerProtocolService.Interface resourceTracker; private int queueingLimit; - public NodeStatusUpdaterService(WorkerConnectionInfo connectionInfo, NodeResourceManagerService resourceManagerService) { - super(NodeStatusUpdaterService.class.getSimpleName()); + public NodeStatusUpdater(WorkerConnectionInfo connectionInfo, NodeResourceManager resourceManager) { + super(NodeStatusUpdater.class.getSimpleName()); this.connectionInfo = connectionInfo; - this.nodeResourceManagerService = resourceManagerService; + this.nodeResourceManager = resourceManager; } @Override @@ -78,7 +78,7 @@ public void serviceInit(Configuration conf) throws Exception { this.tajoConf = (TajoConf) conf; this.heartBeatRequestQueue = Queues.newLinkedBlockingQueue(); this.serviceTracker = ServiceTrackerFactory.get(tajoConf); - this.nodeResourceManagerService.getDispatcher().register(NodeStatusEvent.EventType.class, this); + this.nodeResourceManager.getDispatcher().register(NodeStatusEvent.EventType.class, this); this.heartBeatInterval = tajoConf.getIntVar(TajoConf.ConfVars.WORKER_HEARTBEAT_INTERVAL); this.updaterThread = new StatusUpdaterThread(); super.serviceInit(conf); @@ -87,7 +87,7 @@ public void serviceInit(Configuration conf) throws Exception { @Override public void serviceStart() throws Exception { // if resource changed over than 50%, send reports - this.queueingLimit = nodeResourceManagerService.getTotalResource().getVirtualCores() / 2; + this.queueingLimit = nodeResourceManager.getTotalResource().getVirtualCores() / 2; updaterThread.start(); super.serviceStart(); @@ -140,8 +140,8 @@ private NodeHeartbeatRequestProto createHeartBeatReport() { private NodeHeartbeatRequestProto createNodeStatusReport() { NodeHeartbeatRequestProto.Builder requestProto = NodeHeartbeatRequestProto.newBuilder(); - requestProto.setTotalResource(nodeResourceManagerService.getTotalResource().getProto()); - requestProto.setAvailableResource(nodeResourceManagerService.getAvailableResource().getProto()); + requestProto.setTotalResource(nodeResourceManager.getTotalResource().getProto()); + requestProto.setAvailableResource(nodeResourceManager.getAvailableResource().getProto()); requestProto.setWorkerId(connectionInfo.getId()); requestProto.setConnectionInfo(connectionInfo.getProto()); diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdaterService.java b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java similarity index 92% rename from tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdaterService.java rename to tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java index 934f29259c..2d7d0be6d8 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdaterService.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/MockNodeStatusUpdater.java @@ -32,16 +32,16 @@ import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.*; -public class MockNodeStatusUpdaterService extends NodeStatusUpdaterService { +public class MockNodeStatusUpdater extends NodeStatusUpdater { private CountDownLatch barrier; private Map membership = Maps.newHashMap(); private Map resources = Maps.newHashMap(); private MockResourceTracker resourceTracker; - public MockNodeStatusUpdaterService(CountDownLatch barrier, WorkerConnectionInfo connectionInfo, - NodeResourceManagerService resourceManagerService) { - super(connectionInfo, resourceManagerService); + public MockNodeStatusUpdater(CountDownLatch barrier, WorkerConnectionInfo connectionInfo, + NodeResourceManager resourceManager) { + super(connectionInfo, resourceManager); this.barrier = barrier; this.resourceTracker = new MockResourceTracker(); } diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManagerService.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java similarity index 85% rename from tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManagerService.java rename to tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java index 7586f37ec2..7407acc1f9 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManagerService.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeResourceManager.java @@ -40,10 +40,10 @@ import static org.junit.Assert.*; import static org.apache.tajo.ipc.TajoWorkerProtocol.*; -public class TestNodeResourceManagerService { +public class TestNodeResourceManager { - private NodeResourceManagerService resourceManagerService; - private MockNodeStatusUpdaterService statusUpdater; + private NodeResourceManager resourceManager; + private MockNodeStatusUpdater statusUpdater; private AsyncDispatcher dispatcher; private int taskMemory; private TajoConf conf; @@ -64,19 +64,19 @@ public void setup() { dispatcher.init(conf); dispatcher.start(); - resourceManagerService = new NodeResourceManagerService(dispatcher); - resourceManagerService.init(conf); - resourceManagerService.start(); + resourceManager = new NodeResourceManager(dispatcher); + resourceManager.init(conf); + resourceManager.start(); WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); - statusUpdater = new MockNodeStatusUpdaterService(new CountDownLatch(0), worker, resourceManagerService); + statusUpdater = new MockNodeStatusUpdater(new CountDownLatch(0), worker, resourceManager); statusUpdater.init(conf); statusUpdater.start(); } @After public void tearDown() { - resourceManagerService.stop(); + resourceManager.stop(); statusUpdater.stop(); dispatcher.stop(); } @@ -90,15 +90,15 @@ public void testNodeResourceAllocateEvent() throws Exception { ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); requestProto.setExecutionBlockId(ebId.getProto()); - assertEquals(resourceManagerService.getTotalResource(), resourceManagerService.getAvailableResource()); + assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize)); dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); BatchAllocationResponseProto responseProto = callFuture.get(); - assertNotEquals(resourceManagerService.getTotalResource(), resourceManagerService.getAvailableResource()); + assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); assertEquals(0, responseProto.getCancellationTaskCount()); - assertEquals(requestSize, resourceManagerService.getAllocatedSize()); + assertEquals(requestSize, resourceManager.getAllocatedSize()); } @@ -112,14 +112,14 @@ public void testNodeResourceCancellation() throws Exception { ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); requestProto.setExecutionBlockId(ebId.getProto()); - assertEquals(resourceManagerService.getTotalResource(), resourceManagerService.getAvailableResource()); + assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize + overSize)); dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); BatchAllocationResponseProto responseProto = callFuture.get(); assertEquals(overSize, responseProto.getCancellationTaskCount()); - assertEquals(requestSize, resourceManagerService.getAllocatedSize()); + assertEquals(requestSize, resourceManager.getAllocatedSize()); } @Test @@ -131,23 +131,23 @@ public void testNodeResourceDeallocateEvent() throws Exception { ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); requestProto.setExecutionBlockId(ebId.getProto()); - assertEquals(resourceManagerService.getTotalResource(), resourceManagerService.getAvailableResource()); + assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); requestProto.addAllTaskRequest(createTaskRequests(taskMemory, requestSize)); dispatcher.getEventHandler().handle(new NodeResourceAllocateEvent(requestProto.build(), callFuture)); BatchAllocationResponseProto responseProto = callFuture.get(); - assertNotEquals(resourceManagerService.getTotalResource(), resourceManagerService.getAvailableResource()); + assertNotEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); assertEquals(0, responseProto.getCancellationTaskCount()); - assertEquals(requestSize, resourceManagerService.getAllocatedSize()); + assertEquals(requestSize, resourceManager.getAllocatedSize()); //deallocate for(TaskAllocationRequestProto allocationRequestProto : requestProto.getTaskRequestList()) { // direct invoke handler for testing - resourceManagerService.handle(new NodeResourceDeallocateEvent(allocationRequestProto.getResource())); + resourceManager.handle(new NodeResourceDeallocateEvent(allocationRequestProto.getResource())); } - assertEquals(0, resourceManagerService.getAllocatedSize()); - assertEquals(resourceManagerService.getTotalResource(), resourceManagerService.getAvailableResource()); + assertEquals(0, resourceManager.getAllocatedSize()); + assertEquals(resourceManager.getTotalResource(), resourceManager.getAvailableResource()); } @Test(timeout = 30000) diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdaterService.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java similarity index 77% rename from tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdaterService.java rename to tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java index ae0fdda2eb..fb3c77e27c 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdaterService.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestNodeStatusUpdater.java @@ -31,10 +31,10 @@ import java.util.concurrent.CountDownLatch; import static org.junit.Assert.*; -public class TestNodeStatusUpdaterService { +public class TestNodeStatusUpdater { - private NodeResourceManagerService resourceManagerService; - private MockNodeStatusUpdaterService statusUpdater; + private NodeResourceManager resourceManager; + private MockNodeStatusUpdater statusUpdater; private AsyncDispatcher dispatcher; private TajoConf conf; @@ -48,14 +48,14 @@ public void setup() { dispatcher.init(conf); dispatcher.start(); - resourceManagerService = new NodeResourceManagerService(dispatcher); - resourceManagerService.init(conf); - resourceManagerService.start(); + resourceManager = new NodeResourceManager(dispatcher); + resourceManager.init(conf); + resourceManager.start(); } @After public void tearDown() { - resourceManagerService.stop(); + resourceManager.stop(); if (statusUpdater != null) statusUpdater.stop(); dispatcher.stop(); } @@ -64,18 +64,18 @@ public void tearDown() { public void testNodeMembership() throws Exception { CountDownLatch barrier = new CountDownLatch(1); WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); - statusUpdater = new MockNodeStatusUpdaterService(barrier, worker, resourceManagerService); + statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager); statusUpdater.init(conf); statusUpdater.start(); - MockNodeStatusUpdaterService.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker(); + MockNodeStatusUpdater.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker(); barrier.await(); assertTrue(resourceTracker.getTotalResource().containsKey(worker.getId())); - assertEquals(resourceManagerService.getTotalResource(), + assertEquals(resourceManager.getTotalResource(), resourceTracker.getTotalResource().get(worker.getId())); - assertEquals(resourceManagerService.getAvailableResource(), + assertEquals(resourceManager.getAvailableResource(), resourceTracker.getAvailableResource().get(worker.getId())); } @@ -83,11 +83,11 @@ public void testNodeMembership() throws Exception { public void testPing() throws Exception { CountDownLatch barrier = new CountDownLatch(2); WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); - statusUpdater = new MockNodeStatusUpdaterService(barrier, worker, resourceManagerService); + statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager); statusUpdater.init(conf); statusUpdater.start(); - MockNodeStatusUpdaterService.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker(); + MockNodeStatusUpdater.MockResourceTracker resourceTracker = statusUpdater.getResourceTracker(); barrier.await(); TajoResourceTrackerProtocol.NodeHeartbeatRequestProto lastRequest = resourceTracker.getLastRequest(); @@ -101,13 +101,13 @@ public void testPing() throws Exception { public void testResourceReport() throws Exception { CountDownLatch barrier = new CountDownLatch(2); WorkerConnectionInfo worker = new WorkerConnectionInfo("host", 28091, 28092, 21000, 28093, 28080); - statusUpdater = new MockNodeStatusUpdaterService(barrier, worker, resourceManagerService); + statusUpdater = new MockNodeStatusUpdater(barrier, worker, resourceManager); statusUpdater.init(conf); statusUpdater.start(); for (int i = 0; i < statusUpdater.getQueueingLimit(); i++) { dispatcher.getEventHandler().handle(new NodeStatusEvent(NodeStatusEvent.EventType.REPORT_RESOURCE, - resourceManagerService.getAvailableResource())); + resourceManager.getAvailableResource())); } barrier.await(); assertEquals(0, statusUpdater.getQueueSize());