Skip to content

Commit

Permalink
HIVE-243. ^C breaks out of running query, but not whole CLI (George D…
Browse files Browse the repository at this point in the history
…jabarov via Ning Zhang)

git-svn-id: https://svn.apache.org/repos/asf/hive/trunk@1104625 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
Ning Zhang committed May 17, 2011
1 parent dc92949 commit 49bc14f
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 150 deletions.
174 changes: 118 additions & 56 deletions cli/src/java/org/apache/hadoop/hive/cli/CliDriver.java
Expand Up @@ -43,12 +43,14 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.HiveInterruptUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.HadoopJobExecHelper;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
import org.apache.hadoop.hive.ql.parse.ParseDriver;
Expand All @@ -61,6 +63,10 @@
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.thrift.TException;

import sun.misc.Signal;
import sun.misc.SignalHandler;


/**
* CliDriver.
*
Expand Down Expand Up @@ -155,48 +161,48 @@ public int processCmd(String cmd) {
}
}
} else if (ss.isRemoteMode()) { // remote mode -- connecting to remote hive server
HiveClient client = ss.getClient();
PrintStream out = ss.out;
PrintStream err = ss.err;
HiveClient client = ss.getClient();
PrintStream out = ss.out;
PrintStream err = ss.err;

try {
client.execute(cmd_trimmed);
List<String> results;
do {
results = client.fetchN(LINES_TO_FETCH);
for (String line: results) {
out.println(line);
}
} while (results.size() == LINES_TO_FETCH);
} catch (HiveServerException e) {
ret = e.getErrorCode();
if (ret != 0) { // OK if ret == 0 -- reached the EOF
String errMsg = e.getMessage();
if (errMsg == null) {
errMsg = e.toString();
}
ret = e.getErrorCode();
err.println("[Hive Error]: " + errMsg);
try {
client.execute(cmd_trimmed);
List<String> results;
do {
results = client.fetchN(LINES_TO_FETCH);
for (String line : results) {
out.println(line);
}
} catch (TException e) {
} while (results.size() == LINES_TO_FETCH);
} catch (HiveServerException e) {
ret = e.getErrorCode();
if (ret != 0) { // OK if ret == 0 -- reached the EOF
String errMsg = e.getMessage();
if (errMsg == null) {
errMsg = e.toString();
}
ret = -10002;
err.println("[Thrift Error]: " + errMsg);
} finally {
try {
client.clean();
} catch (TException e) {
String errMsg = e.getMessage();
if (errMsg == null) {
errMsg = e.toString();
}
err.println("[Thrift Error]: Hive server is not cleaned due to thrift exception: "
+ errMsg);
ret = e.getErrorCode();
err.println("[Hive Error]: " + errMsg);
}
} catch (TException e) {
String errMsg = e.getMessage();
if (errMsg == null) {
errMsg = e.toString();
}
ret = -10002;
err.println("[Thrift Error]: " + errMsg);
} finally {
try {
client.clean();
} catch (TException e) {
String errMsg = e.getMessage();
if (errMsg == null) {
errMsg = e.toString();
}
err.println("[Thrift Error]: Hive server is not cleaned due to thrift exception: "
+ errMsg);
}
}
} else { // local mode
CommandProcessor proc = CommandProcessorFactory.get(tokens[0], (HiveConf)conf);
int tryCount = 0;
Expand Down Expand Up @@ -284,32 +290,88 @@ public int processCmd(String cmd) {
}

public int processLine(String line) {
int lastRet = 0, ret = 0;
return processLine(line, false);
}

String command = "";
for (String oneCmd : line.split(";")) {
/**
* Processes a line of semicolon separated commands
*
* @param line
* The commands to process
* @param allowInterupting
* When true the function will handle SIG_INT (Ctrl+C) by interrupting the processing and
* returning -1
* @return
*/
public int processLine(String line, boolean allowInterupting) {
SignalHandler oldSignal = null;
Signal interupSignal = null;

if (allowInterupting) {
// Remember all threads that were running at the time we started line processing.
// Hook up the custom Ctrl+C handler while processing this line
interupSignal = new Signal("INT");
oldSignal = Signal.handle(interupSignal, new SignalHandler() {
private final Thread cliThread = Thread.currentThread();
private boolean interruptRequested;

@Override
public void handle(Signal signal) {
boolean initialRequest = !interruptRequested;
interruptRequested = true;

// Kill the VM on second ctrl+c
if (!initialRequest) {
console.printInfo("Exiting the JVM");
System.exit(127);
}

if (StringUtils.endsWith(oneCmd, "\\")) {
command += StringUtils.chop(oneCmd) + ";";
continue;
} else {
command += oneCmd;
}
if (StringUtils.isBlank(command)) {
continue;
}
// Interrupt the CLI thread to stop the current statement and return
// to prompt
console.printInfo("Interrupting... Be patient, this might take some time.");
console.printInfo("Press Ctrl+C again to kill JVM");

ret = processCmd(command);
command = "";
lastRet = ret;
boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS);
if (ret != 0 && !ignoreErrors) {
CommandProcessorFactory.clean((HiveConf)conf);
return ret;
// First, kill any running MR jobs
HadoopJobExecHelper.killRunningJobs();
HiveInterruptUtils.interrupt();
this.cliThread.interrupt();
}
});
}

try {
int lastRet = 0, ret = 0;

String command = "";
for (String oneCmd : line.split(";")) {

if (StringUtils.endsWith(oneCmd, "\\")) {
command += StringUtils.chop(oneCmd) + ";";
continue;
} else {
command += oneCmd;
}
if (StringUtils.isBlank(command)) {
continue;
}

ret = processCmd(command);
command = "";
lastRet = ret;
boolean ignoreErrors = HiveConf.getBoolVar(conf, HiveConf.ConfVars.CLIIGNOREERRORS);
if (ret != 0 && !ignoreErrors) {
CommandProcessorFactory.clean((HiveConf) conf);
return ret;
}
}
CommandProcessorFactory.clean((HiveConf) conf);
return lastRet;
} finally {
// Once we are done processing the line, restore the old handler
if (oldSignal != null && interupSignal != null) {
Signal.handle(interupSignal, oldSignal);
}
}
CommandProcessorFactory.clean((HiveConf)conf);
return lastRet;
}

public int processReader(BufferedReader r) throws IOException {
Expand Down Expand Up @@ -528,7 +590,7 @@ public static void main(String[] args) throws Exception {
}
if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) {
line = prefix + line;
ret = cli.processLine(line);
ret = cli.processLine(line, true);
prefix = "";
curPrompt = prompt;
} else {
Expand Down
@@ -0,0 +1,26 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.common;

public interface HiveInterruptCallback {
/**
* Request interrupting of the processing
*/
void interrupt();
}
@@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.common;

import java.util.ArrayList;
import java.util.List;

public class HiveInterruptUtils {

/**
* A list of currently running comments that needs cleanup when the command is canceled
*/
private static List<HiveInterruptCallback> interruptCallbacks = new ArrayList<HiveInterruptCallback>();

public static HiveInterruptCallback add(HiveInterruptCallback command) {
synchronized (interruptCallbacks) {
interruptCallbacks.add(command);
}
return command;
}

public static HiveInterruptCallback remove(HiveInterruptCallback command) {
synchronized (interruptCallbacks) {
interruptCallbacks.remove(command);
}
return command;
}

/**
* Request interruption of current hive command
*/
public static void interrupt() {
synchronized (interruptCallbacks) {
for (HiveInterruptCallback resource : new ArrayList<HiveInterruptCallback>(interruptCallbacks)) {
resource.interrupt();
}
}
}

/**
* Checks if the current thread has been interrupted and throws RuntimeException is it has.
*/
public static void checkInterrupted() {
if (Thread.currentThread().isInterrupted()) {
InterruptedException interrupt = null;
try {
Thread.sleep(0);
} catch (InterruptedException e) {
interrupt = e;
}
throw new RuntimeException("Interuppted", interrupt);
}
}
}
42 changes: 23 additions & 19 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/HadoopJobExecHelper.java
Expand Up @@ -158,29 +158,33 @@ public HadoopJobExecHelper(JobConf job, LogHelper console,
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
synchronized (runningJobKillURIs) {
for (String uri : runningJobKillURIs.values()) {
try {
System.err.println("killing job with: " + uri);
java.net.HttpURLConnection conn = (java.net.HttpURLConnection) new java.net.URL(uri)
.openConnection();
conn.setRequestMethod("POST");
int retCode = conn.getResponseCode();
if (retCode != 200) {
System.err.println("Got an error trying to kill job with URI: " + uri + " = "
+ retCode);
}
} catch (Exception e) {
System.err.println("trying to kill job, caught: " + e);
// do nothing
}
}
}
killRunningJobs();
}
});
}
}


public static void killRunningJobs() {
synchronized (runningJobKillURIs) {
for (String uri : runningJobKillURIs.values()) {
try {
System.err.println("killing job with: " + uri);
java.net.HttpURLConnection conn = (java.net.HttpURLConnection) new java.net.URL(uri)
.openConnection();
conn.setRequestMethod("POST");
int retCode = conn.getResponseCode();
if (retCode != 200) {
System.err.println("Got an error trying to kill job with URI: " + uri + " = "
+ retCode);
}
} catch (Exception e) {
System.err.println("trying to kill job, caught: " + e);
// do nothing
}
}
}
}

public boolean checkFatalErrors(Counters ctrs, StringBuilder errMsg) {
if (ctrs == null) {
// hadoop might return null if it cannot locate the job.
Expand Down

0 comments on commit 49bc14f

Please sign in to comment.