Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Add HCatalog support to HiveMultiInputFormat. As HCatalog and Hive st…

…ore their metadata differently in the job conf we need to support both use cases.
  • Loading branch information...
commit 7196ab3721311d628a2f933f37e2aa7c15679ead 1 parent 4b28225
@traviscrawford traviscrawford authored
View
1  ivy.xml
@@ -41,6 +41,7 @@
<dependency org="org.apache.mahout" name="mahout-collections" rev="${mahout-collections.version}" transitive="false"/>
<dependency org="org.apache.mahout" name="mahout-core" rev="${mahout.version}" transitive="false"/>
<dependency org="org.apache.mahout" name="mahout-math" rev="${mahout.version}" transitive="false"/>
+ <dependency org="org.apache.hcatalog" name="hcatalog" rev="0.5.0-dev" transitive="false"/>
<exclude org="javax.jms"/>
<exclude org="com.sun.jdmk"/>
<exclude org="com.sun.jmx"/>
View
BIN  lib/hcatalog-0.5.0-dev.jar
Binary file not shown
View
41 src/java/com/twitter/elephantbird/mapred/input/HiveMultiInputFormat.java
@@ -3,6 +3,7 @@
import com.twitter.elephantbird.mapreduce.input.MultiInputFormat;
import com.twitter.elephantbird.mapreduce.io.BinaryWritable;
import com.twitter.elephantbird.util.TypeRef;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.serde.Constants;
@@ -12,6 +13,11 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
@@ -26,23 +32,40 @@
public class HiveMultiInputFormat
extends DeprecatedInputFormatWrapper<LongWritable, BinaryWritable> {
+ private static final Logger LOG = LoggerFactory.getLogger(HiveMultiInputFormat.class);
+
public HiveMultiInputFormat() {
super(new MultiInputFormat());
}
- private void initialize(FileSplit split, JobConf job) {
- Map<String, PartitionDesc> partitionDescMap =
- Utilities.getMapRedWork(job).getPathToPartitionInfo();
+ private void initialize(FileSplit split, JobConf job) throws IOException {
+ LOG.info("Initializing HiveMultiInputFormat for " + split + " with job " + job);
+
+ String thriftClassName = null;
+ Properties properties = null;
- if (!partitionDescMap.containsKey(split.getPath().getParent().toUri().toString())) {
- throw new RuntimeException("Failed locating partition description for "
- + split.getPath().toUri().toString());
+ if (!"".equals(HiveConf.getVar(job, HiveConf.ConfVars.PLAN))) {
+ // Running as a Hive query. Use MapredWork for metadata.
+ Map<String, PartitionDesc> partitionDescMap =
+ Utilities.getMapRedWork(job).getPathToPartitionInfo();
+
+ if (!partitionDescMap.containsKey(split.getPath().getParent().toUri().toString())) {
+ throw new RuntimeException("Failed locating partition description for "
+ + split.getPath().toUri().toString());
+ }
+ properties = partitionDescMap.get(split.getPath().getParent().toUri().toString())
+ .getTableDesc().getProperties();
+ } else if (job.get(HCatConstants.HCAT_KEY_JOB_INFO, null) != null) {
+ // Running as an HCatalog query. Use InputJobInfo for metadata.
+ InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(
+ job.get(HCatConstants.HCAT_KEY_JOB_INFO));
+ properties = inputJobInfo.getTableInfo().getStorerInfo().getProperties();
}
- Properties properties = partitionDescMap.get(split.getPath().getParent().toUri().toString())
- .getTableDesc().getProperties();
+ if (properties != null) {
+ thriftClassName = properties.getProperty(Constants.SERIALIZATION_CLASS);
+ }
- String thriftClassName = properties.getProperty(Constants.SERIALIZATION_CLASS, null);
if (thriftClassName == null) {
throw new RuntimeException(
"Required property " + Constants.SERIALIZATION_CLASS + " is null.");
Please sign in to comment.
Something went wrong with that request. Please try again.