diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index e892dc92e0..e74d842e98 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -40,6 +40,7 @@ import java.net.InetSocketAddress; import java.util.Map; import java.util.TimeZone; +import java.util.concurrent.TimeUnit; public class TajoConf extends Configuration { private static TimeZone SYSTEM_TIMEZONE; @@ -154,6 +155,7 @@ public static enum ConfVars implements ConfigKey { // QueryMaster resource TAJO_QUERYMASTER_DISK_SLOT("tajo.qm.resource.disk.slots", 0.0f, Validators.min("0.0f")), TAJO_QUERYMASTER_MEMORY_MB("tajo.qm.resource.memory-mb", 512, Validators.min("64")), + TAJO_QUERYMASTER_ALLOCATION_TIMEOUT("tajo.qm.resource.allocation.timeout", "3 sec"), // Tajo Worker Service Addresses WORKER_INFO_ADDRESS("tajo.worker.info-http.address", "0.0.0.0:28080", Validators.networkAddr()), @@ -580,6 +582,72 @@ public void setBoolVar(ConfVars var, boolean val) { setBoolVar(this, var, val); } + // borrowed from HIVE-5799 + public static long getTimeVar(Configuration conf, ConfVars var, TimeUnit outUnit) { + return toTime(getVar(conf, var), outUnit); + } + + public static void setTimeVar(Configuration conf, ConfVars var, long time, TimeUnit timeunit) { + assert (var.valClass == String.class) : var.varname; + conf.set(var.varname, time + stringFor(timeunit)); + } + + public long getTimeVar(ConfVars var, TimeUnit outUnit) { + return getTimeVar(this, var, outUnit); + } + + public void setTimeVar(ConfVars var, long time, TimeUnit outUnit) { + setTimeVar(this, var, time, outUnit); + } + + public static long toTime(String value, TimeUnit outUnit) { + String[] parsed = parseTime(value.trim()); + return outUnit.convert(Long.valueOf(parsed[0].trim()), unitFor(parsed[1].trim())); + } + + private static String[] parseTime(String value) { + char[] chars = value.toCharArray(); + int i = 0; + for (; i < chars.length && (chars[i] == '-' || Character.isDigit(chars[i])); i++) { + } + return new String[] {value.substring(0, i), value.substring(i)}; + } + + public static TimeUnit unitFor(String unit) { + unit = unit.trim().toLowerCase(); + if (unit.isEmpty() || unit.equals("l")) { + return TimeUnit.MILLISECONDS; + } else if (unit.equals("d") || unit.startsWith("day")) { + return TimeUnit.DAYS; + } else if (unit.equals("h") || unit.startsWith("hour")) { + return TimeUnit.HOURS; + } else if (unit.equals("m") || unit.startsWith("min")) { + return TimeUnit.MINUTES; + } else if (unit.equals("s") || unit.startsWith("sec")) { + return TimeUnit.SECONDS; + } else if (unit.equals("ms") || unit.startsWith("msec")) { + return TimeUnit.MILLISECONDS; + } else if (unit.equals("us") || unit.startsWith("usec")) { + return TimeUnit.MICROSECONDS; + } else if (unit.equals("ns") || unit.startsWith("nsec")) { + return TimeUnit.NANOSECONDS; + } + throw new IllegalArgumentException("Invalid time unit " + unit); + } + + public static String stringFor(TimeUnit timeunit) { + switch (timeunit) { + case DAYS: return "day"; + case HOURS: return "hour"; + case MINUTES: return "min"; + case SECONDS: return "sec"; + case MILLISECONDS: return "msec"; + case MICROSECONDS: return "usec"; + case NANOSECONDS: return "nsec"; + } + throw new IllegalArgumentException("Invalid timeunit " + timeunit); + } + public void setClassVar(ConfVars var, Class clazz) { setVar(var, clazz.getCanonicalName()); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java index 0d830eaa34..293948d8ff 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java @@ -37,7 +37,7 @@ import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; import org.apache.tajo.master.QueryInProgress; import org.apache.tajo.master.TajoMaster; -import org.apache.tajo.rpc.CallFuture; +import org.apache.tajo.rpc.CancelableRpcCallback; import org.apache.tajo.util.ApplicationIdUtils; import java.io.IOException; @@ -209,23 +209,41 @@ private WorkerResourceAllocationRequest createQMResourceRequest(QueryId queryId) @Override public WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress) { + + // 3 seconds, by default + long timeout = masterContext.getConf().getTimeVar( + TajoConf.ConfVars.TAJO_QUERYMASTER_ALLOCATION_TIMEOUT, TimeUnit.MILLISECONDS); + // Create a resource request for a query master WorkerResourceAllocationRequest qmResourceRequest = createQMResourceRequest(queryInProgress.getQueryId()); // call future for async call - CallFuture callFuture = new CallFuture(); + final CancelableRpcCallback callFuture = + new CancelableRpcCallback() { + @Override + protected void cancel(WorkerResourceAllocationResponse canceled) { + if (canceled != null && !canceled.getWorkerAllocatedResourceList().isEmpty()) { + LOG.info("Canceling resources allocated"); + WorkerAllocatedResource resource = canceled.getWorkerAllocatedResource(0); + releaseWorkerResource(resource.getContainerId()); + } + } + }; allocateWorkerResources(qmResourceRequest, callFuture); - // Wait for 3 seconds WorkerResourceAllocationResponse response = null; try { - response = callFuture.get(3, TimeUnit.SECONDS); + response = callFuture.get(timeout, TimeUnit.MILLISECONDS); } catch (Throwable t) { - LOG.error(t, t); - return null; + response = callFuture.cancel(); // try cancel + if (response == null) { + // canceled successfuly + LOG.warn("Got exception waiting resources for query master " + queryInProgress.getQueryId(), t); + return null; + } } - if (response.getWorkerAllocatedResourceList().size() == 0) { + if (response == null || response.getWorkerAllocatedResourceList().size() == 0) { return null; } diff --git a/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CancelableRpcCallback.java b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CancelableRpcCallback.java new file mode 100644 index 0000000000..80bf76c30b --- /dev/null +++ b/tajo-rpc/tajo-rpc-protobuf/src/main/java/org/apache/tajo/rpc/CancelableRpcCallback.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import com.google.protobuf.RpcCallback; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +// message exchange between threads which can be only a success(run) or fail(cancel) +// successful cancel will make all following run() invocations to cancel the object, by calling cancel(T) +public class CancelableRpcCallback implements RpcCallback { + + private static final int INITIAL = 0; + private static final int RESULT = 1; + private static final int CANCELED = 2; + + private volatile T result; + private final AtomicInteger state = new AtomicInteger(INITIAL); + private final Semaphore semaphore = new Semaphore(0); + + @Override + public void run(T result) { + assert result != null; + try { + if (state.compareAndSet(INITIAL, RESULT)) { + this.result = result; + } else { + cancel(result); + } + } finally { + semaphore.release(); + } + } + + public T cancel() { + try { + if (state.compareAndSet(INITIAL, CANCELED)) { + return null; + } + return state.get() == RESULT ? result : null; + } finally { + semaphore.release(); + } + } + + public T get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { + if (semaphore.tryAcquire(timeout, unit) && state.get() == RESULT) { + return result; + } + throw new TimeoutException(); + } + + protected void cancel(T canceled) { + } +}