Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix#169 MapJoin failed, Configuration and input path are inconsistent #173

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
177 changes: 100 additions & 77 deletions src/main/java/org/elasticsearch/hadoop/hive/EsStorageHandler.java
Expand Up @@ -18,8 +18,12 @@
*/
package org.elasticsearch.hadoop.hive;

import static org.elasticsearch.hadoop.hive.HiveConstants.TABLE_LOCATION;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -40,88 +44,107 @@
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.StringUtils;

import static org.elasticsearch.hadoop.hive.HiveConstants.*;

/**
* Hive storage for writing data into an ElasticSearch index.
*
* The ElasticSearch host/port can be specified through Hadoop properties (see package description)
* or passed to {@link #EsStorageHandler} through Hive <tt>TBLPROPERTIES</tt>
*
* The ElasticSearch host/port can be specified through Hadoop properties (see
* package description) or passed to {@link #EsStorageHandler} through Hive
* <tt>TBLPROPERTIES</tt>
*/
@SuppressWarnings({ "deprecation", "rawtypes" })
public class EsStorageHandler extends DefaultStorageHandler {

private static Log log = LogFactory.getLog(EsStorageHandler.class);

@Override
public Class<? extends InputFormat> getInputFormatClass() {
return EsHiveInputFormat.class;
}

@Override
public Class<? extends OutputFormat> getOutputFormatClass() {
return EsHiveOutputFormat.class;
}

@Override
public Class<? extends SerDe> getSerDeClass() {
return EsSerDe.class;
}

@Override
public HiveMetaHook getMetaHook() {
//TODO: add metahook support
return null;
}

@Override
public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
init(tableDesc, true);
}

@Override
public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
init(tableDesc, false);
}

private void init(TableDesc tableDesc, boolean read) {
Configuration cfg = getConf();
Settings settings = SettingsManager.loadFrom(cfg).merge(tableDesc.getProperties());

// NB: ESSerDe is already initialized at this stage but should still have a reference to the same cfg object
// NB: the value writer is not needed by Hive but it's set for consistency and debugging purposes

InitializationUtils.checkIdForOperation(settings);
if (read) {
InitializationUtils.setValueReaderIfNotSet(settings, HiveValueReader.class, log);
settings.setProperty(InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS, StringUtils.concatenate(HiveUtils.columnToAlias(settings), ","));
// set read resource
settings.setResourceRead(settings.getResourceRead());
}
else {
InitializationUtils.setValueWriterIfNotSet(settings, HiveValueWriter.class, log);
InitializationUtils.setBytesConverterIfNeeded(settings, HiveBytesConverter.class, log);
// replace the default committer when using the old API
private static Log log = LogFactory.getLog(EsStorageHandler.class);

@Override
public Class<? extends InputFormat> getInputFormatClass() {
return EsHiveInputFormat.class;
}

@Override
public Class<? extends OutputFormat> getOutputFormatClass() {
return EsHiveOutputFormat.class;
}

@Override
public Class<? extends SerDe> getSerDeClass() {
return EsSerDe.class;
}

@Override
public HiveMetaHook getMetaHook() {
// TODO: add metahook support
return null;
}

@Override
public void configureInputJobProperties(TableDesc tableDesc,
Map<String, String> jobProperties) {
init(tableDesc, jobProperties, true);
}

@Override
public void configureOutputJobProperties(TableDesc tableDesc,
Map<String, String> jobProperties) {
init(tableDesc, jobProperties, false);
}

private void init(TableDesc tableDesc, Map<String, String> jobProperties,
boolean read) {

// NB: ESSerDe is already initialized at this stage but should still
// have a reference to the same cfg object
// NB: the value writer is not needed by Hive but it's set for
// consistency and debugging purposes
Configuration cfg = getConf();
Properties tmpProperties = new Properties();
Settings settings = SettingsManager.loadFrom(tmpProperties).merge(
tableDesc.getProperties());
InitializationUtils.checkIdForOperation(settings);
if (read) {
InitializationUtils.setValueReaderIfNotSet(settings,
HiveValueReader.class, log);
settings.setProperty(
InternalConfigurationOptions.INTERNAL_ES_TARGET_FIELDS,
StringUtils.concatenate(HiveUtils.columnToAlias(settings),
","));
// set read resource
settings.setResourceRead(settings.getResourceRead());
} else {
InitializationUtils.setValueWriterIfNotSet(settings,
HiveValueWriter.class, log);
InitializationUtils.setBytesConverterIfNeeded(settings,
HiveBytesConverter.class, log);
// replace the default committer when using the old API
HadoopCfgUtils.setOutputCommitterClass(cfg, EsOutputFormat.ESOutputCommitter.class.getName());
// set write resource
settings.setResourceWrite(settings.getResourceWrite());
}

InitializationUtils.setFieldExtractorIfNotSet(settings, HiveFieldExtractor.class, log);
try {
InitializationUtils.discoverEsVersion(settings, log);
} catch (IOException ex) {
throw new EsHadoopIllegalStateException("Cannot discover Elasticsearch version", ex);
}


Assert.hasText(tableDesc.getProperties().getProperty(TABLE_LOCATION), String.format(
"no table location [%s] declared by Hive resulting in abnormal execution;", TABLE_LOCATION));
}

@Override
@Deprecated
public void configureTableJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
throw new UnsupportedOperationException();
}
// set write resource
settings.setResourceWrite(settings.getResourceWrite());
}
InitializationUtils.setFieldExtractorIfNotSet(settings,
HiveFieldExtractor.class, log);
try {
InitializationUtils.discoverEsVersion(settings, log);
} catch (IOException ex) {
throw new EsHadoopIllegalStateException(
"Cannot discover Elasticsearch version", ex);
}
// save tmpProperties to jobProperties
Iterator it = tmpProperties.entrySet().iterator();
while (it.hasNext()) {
Map.Entry entry=(Map.Entry)it.next();
jobProperties.put((String)entry.getKey(), (String)entry.getValue());
}
Assert.hasText(
tableDesc.getProperties().getProperty(TABLE_LOCATION),
String.format(
"no table location [%s] declared by Hive resulting in abnormal execution;",
TABLE_LOCATION));
}

@Override
@Deprecated
public void configureTableJobProperties(TableDesc tableDesc,
Map<String, String> jobProperties) {
throw new UnsupportedOperationException();
}
}