Skip to content

Commit

Permalink
Merged in with latest trunk.
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.apache.org/repos/asf/hive/branches/ptf-windowing@1462671 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
ashutoshc committed Mar 29, 2013
2 parents ef25044 + bd41852 commit 700d34f
Show file tree
Hide file tree
Showing 13 changed files with 345 additions and 38 deletions.
27 changes: 27 additions & 0 deletions bin/ext/orcfiledump.sh
@@ -0,0 +1,27 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

THISSERVICE=orcfiledump
export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} "

orcfiledump () {
CLASS=org.apache.hadoop.hive.ql.io.orc.FileDump
HIVE_OPTS=''
execHiveCmd $CLASS "$@"
}

orcfiledump_help () {
echo "usage ./hive orcfiledump <path_to_file>"
}
4 changes: 4 additions & 0 deletions bin/hive
Expand Up @@ -38,6 +38,10 @@ while [ $# -gt 0 ]; do
SERVICE=rcfilecat
shift
;;
--orcfiledump)
SERVICE=orcfiledump
shift
;;
--help)
HELP=_help
shift
Expand Down
Expand Up @@ -28,13 +28,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.classification.InterfaceAudience;
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.hooks.JDOConnectionURLHook;
import org.apache.hadoop.util.ReflectionUtils;

@InterfaceAudience.Private
@InterfaceStability.Evolving
Expand All @@ -43,7 +40,7 @@ public class RetryingHMSHandler implements InvocationHandler {
private static final Log LOG = LogFactory.getLog(RetryingHMSHandler.class);

private final IHMSHandler base;
private MetaStoreInit.MetaStoreInitData metaStoreInitData =
private final MetaStoreInit.MetaStoreInitData metaStoreInitData =
new MetaStoreInit.MetaStoreInitData();
private final HiveConf hiveConf;

Expand Down Expand Up @@ -112,13 +109,15 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
// Due to reflection, the jdo exception is wrapped in
// invocationTargetException
caughtException = e.getCause();
}
else {
} else if (e.getCause() instanceof MetaException && e.getCause().getCause() != null
&& e.getCause().getCause() instanceof javax.jdo.JDOException) {
// The JDOException may be wrapped further in a MetaException
caughtException = e.getCause().getCause();
} else {
LOG.error(ExceptionUtils.getStackTrace(e.getCause()));
throw e.getCause();
}
}
else {
} else {
LOG.error(ExceptionUtils.getStackTrace(e));
throw e;
}
Expand All @@ -127,8 +126,11 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
// Due to reflection, the jdo exception is wrapped in
// invocationTargetException
caughtException = e.getCause();
}
else {
} else if (e.getCause() instanceof MetaException && e.getCause().getCause() != null
&& e.getCause().getCause() instanceof javax.jdo.JDOException) {
// The JDOException may be wrapped further in a MetaException
caughtException = e.getCause().getCause();
} else {
LOG.error(ExceptionUtils.getStackTrace(e.getCause()));
throw e.getCause();
}
Expand Down
@@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.metastore;

import javax.jdo.JDOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.events.PreEventContext;

/**
*
* AlternateFailurePreListener.
*
* An implemenation of MetaStorePreEventListener which fails every other time it's invoked,
* starting with the first time.
*
* It also records and makes available the number of times it's been invoked.
*/
public class AlternateFailurePreListener extends MetaStorePreEventListener {

private static int callCount = 0;
private static boolean throwException = true;

public AlternateFailurePreListener(Configuration config) {
super(config);
}

@Override
public void onEvent(PreEventContext context) throws MetaException, NoSuchObjectException,
InvalidOperationException {

callCount++;
if (throwException) {
throwException = false;
throw new JDOException();
}

throwException = true;
}

public static int getCallCount() {
return callCount;
}
}
@@ -0,0 +1,116 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.metastore;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import junit.framework.Assert;
import junit.framework.TestCase;

import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.shims.ShimLoader;

/**
* TestRetryingHMSHandler. Test case for
* {@link org.apache.hadoop.hive.metastore.RetryingHMSHandler}
*/
public class TestRetryingHMSHandler extends TestCase {
private HiveConf hiveConf;
private HiveMetaStoreClient msc;

@Override
protected void setUp() throws Exception {

super.setUp();
System.setProperty("hive.metastore.pre.event.listeners",
AlternateFailurePreListener.class.getName());
int port = MetaStoreUtils.findFreePort();
MetaStoreUtils.startMetaStore(port, ShimLoader.getHadoopThriftAuthBridge());
hiveConf = new HiveConf(this.getClass());
hiveConf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + port);
hiveConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3);
hiveConf.setIntVar(HiveConf.ConfVars.HMSHANDLERATTEMPTS, 2);
hiveConf.setIntVar(HiveConf.ConfVars.HMSHANDLERINTERVAL, 0);
hiveConf.setBoolVar(HiveConf.ConfVars.HMSHANDLERFORCERELOADCONF, false);
msc = new HiveMetaStoreClient(hiveConf, null);
}

@Override
protected void tearDown() throws Exception {
super.tearDown();
}

// Create a database and a table in that database. Because the AlternateFailurePreListener is
// being used each attempt to create something should require two calls by the RetryingHMSHandler
public void testRetryingHMSHandler() throws Exception {
String dbName = "tmpdb";
String tblName = "tmptbl";

Database db = new Database();
db.setName(dbName);
msc.createDatabase(db);

Assert.assertEquals(2, AlternateFailurePreListener.getCallCount());

ArrayList<FieldSchema> cols = new ArrayList<FieldSchema>(2);
cols.add(new FieldSchema("c1", serdeConstants.STRING_TYPE_NAME, ""));
cols.add(new FieldSchema("c2", serdeConstants.INT_TYPE_NAME, ""));

Map<String, String> params = new HashMap<String, String>();
params.put("test_param_1", "Use this for comments etc");

Map<String, String> serdParams = new HashMap<String, String>();
serdParams.put(serdeConstants.SERIALIZATION_FORMAT, "1");

StorageDescriptor sd = new StorageDescriptor();

sd.setCols(cols);
sd.setCompressed(false);
sd.setNumBuckets(1);
sd.setParameters(params);
sd.setBucketCols(new ArrayList<String>(2));
sd.getBucketCols().add("name");
sd.setSerdeInfo(new SerDeInfo());
sd.getSerdeInfo().setName(tblName);
sd.getSerdeInfo().setParameters(serdParams);
sd.getSerdeInfo().getParameters()
.put(serdeConstants.SERIALIZATION_FORMAT, "1");
sd.setSortCols(new ArrayList<Order>());

Table tbl = new Table();
tbl.setDbName(dbName);
tbl.setTableName(tblName);
tbl.setSd(sd);
tbl.setLastAccessTime(0);

msc.createTable(tbl);

Assert.assertEquals(4, AlternateFailurePreListener.getCallCount());
}

}
26 changes: 16 additions & 10 deletions ql/src/java/org/apache/hadoop/hive/ql/Driver.java
Expand Up @@ -1144,8 +1144,8 @@ public int execute() throws CommandNeedRetryException {
int exitVal = tskRes.getExitVal();
if (exitVal != 0) {
if (tsk.ifRetryCmdWhenFail()) {
if (running.size() != 0) {
taskCleanup();
if (!running.isEmpty()) {
taskCleanup(running);
}
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
Expand Down Expand Up @@ -1189,8 +1189,8 @@ public int execute() throws CommandNeedRetryException {
}
SQLState = "08S01";
console.printError(errorMessage);
if (running.size() != 0) {
taskCleanup();
if (!running.isEmpty()) {
taskCleanup(running);
}
// in case we decided to run everything in local mode, restore the
// the jobtracker setting to its initial value
Expand Down Expand Up @@ -1355,12 +1355,18 @@ public void launchTask(Task<? extends Serializable> tsk, String queryId, boolean
/**
* Cleans up remaining tasks in case of failure
*/

public void taskCleanup() {
// The currently existing Shutdown hooks will be automatically called,
// killing the map-reduce processes.
// The non MR processes will be killed as well.
System.exit(9);
public void taskCleanup(Map<TaskResult, TaskRunner> running) {
for (Map.Entry<TaskResult, TaskRunner> entry : running.entrySet()) {
if (entry.getKey().isRunning()) {
Task<?> task = entry.getValue().getTask();
try {
task.shutdown();
} catch (Exception e) {
console.printError("Exception on shutting down task " + task.getId() + ": " + e);
}
}
}
running.clear();
}

/**
Expand Down
18 changes: 15 additions & 3 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
Expand Up @@ -25,7 +25,6 @@
import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -89,7 +88,6 @@
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.FileAppender;
import org.apache.log4j.LogManager;
import org.apache.log4j.PropertyConfigurator;
import org.apache.log4j.varia.NullAppender;

/**
Expand All @@ -107,6 +105,8 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop

protected static transient final Log LOG = LogFactory.getLog(ExecDriver.class);

private RunningJob rj;

/**
* Constructor when invoked from QL.
*/
Expand Down Expand Up @@ -358,7 +358,6 @@ public int execute(DriverContext driverContext) {
initializeFiles("tmpfiles", addedFiles);
}
int returnVal = 0;
RunningJob rj = null;
boolean noName = StringUtils.isEmpty(HiveConf.getVar(job, HiveConf.ConfVars.HADOOPJOBNAME));

if (noName) {
Expand Down Expand Up @@ -980,4 +979,17 @@ public void updateCounters(Counters ctrs, RunningJob rj) throws IOException {
public void logPlanProgress(SessionState ss) throws IOException {
ss.getHiveHistory().logPlanProgress(queryPlan);
}

@Override
public void shutdown() {
super.shutdown();
if (rj != null) {
try {
rj.killJob();
} catch (Exception e) {
LOG.warn("failed to kill job " + rj.getID(), e);
}
rj = null;
}
}
}

0 comments on commit 700d34f

Please sign in to comment.