Skip to content

Commit

Permalink
introduce dedicated es-hadoop exception
Browse files Browse the repository at this point in the history
fix #164
  • Loading branch information
costin committed Apr 8, 2014
1 parent e85f843 commit d59b7cc
Show file tree
Hide file tree
Showing 61 changed files with 489 additions and 222 deletions.
Expand Up @@ -16,24 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.serialization;

import java.io.IOException;
package org.elasticsearch.hadoop;

/**
*
* Base class for Elasticsearch Hadoop exceptions.
*/
public class SerializationException extends RuntimeException {
public class EsHadoopException extends RuntimeException {

public SerializationException(String message) {
super(message);
public EsHadoopException() {
super();
}

public EsHadoopException(String message, Throwable cause) {
super(message, cause);
}

public SerializationException(IOException cause) {
this("Cannot serialize/deserialize", cause);
public EsHadoopException(String message) {
super(message);
}

public SerializationException(String message, IOException cause) {
super(message, cause);
public EsHadoopException(Throwable cause) {
super(cause);
}
}
@@ -0,0 +1,39 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop;


public class EsHadoopIllegalArgumentException extends EsHadoopException {

public EsHadoopIllegalArgumentException() {
super();
}

public EsHadoopIllegalArgumentException(String message, Throwable cause) {
super(message, cause);
}

public EsHadoopIllegalArgumentException(String message) {
super(message);
}

public EsHadoopIllegalArgumentException(Throwable cause) {
super(cause);
}
}
@@ -0,0 +1,38 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop;

public class EsHadoopIllegalStateException extends EsHadoopException {

public EsHadoopIllegalStateException() {
super();
}

public EsHadoopIllegalStateException(String message, Throwable cause) {
super(message, cause);
}

public EsHadoopIllegalStateException(String message) {
super(message);
}

public EsHadoopIllegalStateException(Throwable cause) {
super(cause);
}
}
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.hadoop.mr.LinkedMapWritable;
import org.elasticsearch.hadoop.util.FieldAlias;
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.ReflectionUtils;
import org.elasticsearch.hadoop.util.SettingsUtils;
import org.elasticsearch.hadoop.util.StringUtils;

Expand Down Expand Up @@ -121,14 +122,9 @@ static Collection<String> fieldToAlias(Settings settings, Fields fields) {
}

static Properties extractOriginalProperties(Properties copy) {
Field field;
try {
field = Properties.class.getDeclaredField("defaults");
field.setAccessible(true);
return (Properties) field.get(copy);
} catch (Exception ex) {
throw new IllegalArgumentException("Cannot retrieve actual configuration", ex);
}
Field field = ReflectionUtils.findField(Properties.class, "defaults", Properties.class);
ReflectionUtils.makeAccessible(field);
return ReflectionUtils.getField(field, copy);
}

static Settings init(Settings settings, String nodes, int port, String resource, String query) {
Expand Down Expand Up @@ -207,6 +203,7 @@ static Tuple coerceToString(SinkCall<?, ?> sinkCall) {
return (CASCADING_22_AVAILABLE ? CoercibleOps.coerceToString(sinkCall) : LegacyOps.coerceToString(sinkCall));
}

@SuppressWarnings("rawtypes")
public static Tap hadoopTap(String host, int port, String path, String query, Fields fields, Properties props) {
return new EsHadoopTap(host, port, path, query, fields, props);
}
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.Properties;

import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.cascading.CascadingUtils;
import org.elasticsearch.hadoop.util.StringUtils;

Expand All @@ -35,6 +36,7 @@
@SuppressWarnings("rawtypes")
public class EsFactory {

@SuppressWarnings("serial")
public static class EsScheme extends Scheme {
Fields fields;

Expand Down Expand Up @@ -65,7 +67,7 @@ public void sink(FlowProcess flowProcess, SinkCall sinkCall) throws IOException

public Tap createTap(Scheme scheme, String path, SinkMode sinkMode, Properties properties) {
if (!(scheme instanceof EsScheme)) {
throw new IllegalArgumentException("Unknown scheme; expected " + EsScheme.class.getName());
throw new EsHadoopIllegalArgumentException("Unknown scheme; expected " + EsScheme.class.getName());
}

String host = properties.getProperty("host");
Expand Down
31 changes: 4 additions & 27 deletions src/main/java/org/elasticsearch/hadoop/cfg/Settings.java
Expand Up @@ -20,14 +20,14 @@

import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.Enumeration;
import java.util.Locale;
import java.util.Properties;

import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.util.StringUtils;
import org.elasticsearch.hadoop.util.unit.Booleans;
import org.elasticsearch.hadoop.util.unit.ByteSizeValue;
Expand All @@ -36,7 +36,7 @@
/**
* Holder class containing the various configuration bits used by ElasticSearch Hadoop. Handles internally the fall back to defaults when looking for undefined, optional settings.
*/
public abstract class Settings implements Serializable, InternalConfigurationOptions {
public abstract class Settings implements InternalConfigurationOptions {

private static boolean ES_HOST_WARNING = true;

Expand Down Expand Up @@ -217,13 +217,6 @@ public Settings setQuery(String query) {
setProperty(ES_QUERY, StringUtils.hasText(query) ? query : "");
return this;
}

// aggregate the resource - computed / set / properties
// public String getTargetResource() {
// String resource = getProperty(INTERNAL_ES_TARGET_RESOURCE);
// return (StringUtils.hasText(targetResource) ? targetResource : StringUtils.hasText(resource) ? resource : getProperty(ES_RESOURCE));
// }

public String getResource() {
return getProperty(ES_RESOURCE);
}
Expand All @@ -238,22 +231,6 @@ public String getQuery() {
return getProperty(ES_QUERY);
}

// public Settings cleanHosts() {
// setProperty(INTERNAL_ES_HOSTS, "");
// return this;
// }
//
// public Settings cleanResource() {
// setProperty(INTERNAL_ES_TARGET_RESOURCE, "");
// return this;
// }
//
// public Settings clean() {
// cleanResource();
// cleanHosts();
// return this;
// }

public abstract InputStream loadResource(String location);

public abstract Settings copy();
Expand Down Expand Up @@ -306,7 +283,7 @@ public Settings load(String source) {
try {
copy.load(new StringReader(source));
} catch (IOException ex) {
throw new IllegalStateException(ex);
throw new EsHadoopIllegalArgumentException(ex);
}
merge(copy);
return this;
Expand All @@ -318,7 +295,7 @@ public String save() {
try {
copy.store(sw, "");
} catch (IOException ex) {
throw new IllegalStateException(ex);
throw new EsHadoopIllegalArgumentException(ex);
}
return sw.toString();
}
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;

/**
* Factory for loading settings based on various configuration objects, such as Properties or Hadoop configuration.
Expand Down Expand Up @@ -53,6 +54,6 @@ public static Settings loadFrom(Object configuration) {
if (HADOOP_CONFIGURATION != null && HADOOP_CONFIGURATION.isInstance(configuration)) {
return FromHadoopConfiguration.create(configuration);
}
throw new IllegalArgumentException("Don't know how to create Settings from configuration " + configuration);
throw new EsHadoopIllegalArgumentException("Don't know how to create Settings from configuration " + configuration);
}
}
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.mr.EsOutputFormat;

/**
Expand Down Expand Up @@ -56,7 +57,7 @@ public void write(Writable w) throws IOException {
}
else {
// we could allow custom BAs
throw new IllegalArgumentException(String.format("Unexpected type; expected [%s], received [%s]", HiveBytesArrayWritable.class, w));
throw new EsHadoopIllegalArgumentException(String.format("Unexpected type; expected [%s], received [%s]", HiveBytesArrayWritable.class, w));
}
}

Expand Down
8 changes: 0 additions & 8 deletions src/main/java/org/elasticsearch/hadoop/hive/EsSerDe.java
Expand Up @@ -91,14 +91,6 @@ public Object deserialize(Writable blob) throws SerDeException {
if (!readInitialized) {
readInitialized = true;
IS_ES_10 = SettingsUtils.isEs10(settings);

// discover ES version since EsSerDe is initialized before the InputFormat (which does discovery)
// try {
// InitializationUtils.discoverEsVersion(settings, log);
// IS_ES_10 = SettingsUtils.isEs10(settings);
// } catch (IOException ex) {
// throw new SerDeException("Cannot detect Elasticsearch version", ex);
// }
}

if (blob == null || blob instanceof NullWritable) {
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.OutputFormat;
import org.elasticsearch.hadoop.EsHadoopIllegalStateException;
import org.elasticsearch.hadoop.cfg.InternalConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.cfg.SettingsManager;
Expand Down Expand Up @@ -98,7 +99,7 @@ private void init(TableDesc tableDesc) {
try {
InitializationUtils.discoverEsVersion(settings, log);
} catch (IOException ex) {
throw new IllegalStateException("Cannot discover Elasticsearch version", ex);
throw new EsHadoopIllegalStateException("Cannot discover Elasticsearch version", ex);
}


Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/elasticsearch/hadoop/mr/EsInputFormat.java
Expand Up @@ -40,6 +40,7 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.cfg.ConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.cfg.SettingsManager;
Expand Down Expand Up @@ -422,7 +423,7 @@ public org.apache.hadoop.mapred.InputSplit[] getSplits(JobConf job, int numSplit
}
else {
client.close();
throw new IllegalArgumentException(
throw new EsHadoopIllegalArgumentException(
String.format("Index [%s] missing and settings [%s] is set to false", settings.getResource(), ConfigurationOptions.ES_FIELD_READ_EMPTY_AS_NULL));
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/elasticsearch/hadoop/mr/HadoopIOUtils.java
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;

public abstract class HadoopIOUtils {

Expand Down Expand Up @@ -76,7 +77,7 @@ public static InputStream open(String resource, Configuration conf) {
FileSystem fs = p.getFileSystem(conf);
return fs.open(p);
} catch (IOException ex) {
throw new IllegalArgumentException(String.format("Cannot open stream for resource %s", resource));
throw new EsHadoopIllegalArgumentException(String.format("Cannot open stream for resource %s", resource));
}
}
}
Expand Up @@ -22,6 +22,7 @@

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.elasticsearch.hadoop.EsHadoopIllegalStateException;

/**
* Utility handling the difference in built-in Writable between Hadoop 1 and 2.
Expand Down Expand Up @@ -54,7 +55,7 @@ static Object availableShort(short value) {
try {
return SHORT_CTOR.newInstance(value);
} catch (Exception e) {
throw new IllegalStateException(e);
throw new EsHadoopIllegalStateException(e);
}
}
return new IntWritable(value);
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/elasticsearch/hadoop/pig/EsStorage.java
Expand Up @@ -54,6 +54,7 @@
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;
import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.cfg.InternalConfigurationOptions;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.cfg.SettingsManager;
Expand Down Expand Up @@ -110,7 +111,7 @@ public EsStorage(String... configuration) {
properties.load(new StringReader(string));
}
} catch (IOException ex) {
throw new IllegalArgumentException("Cannot parse options " + Arrays.toString(configuration), ex);
throw new EsHadoopIllegalArgumentException("Cannot parse options " + Arrays.toString(configuration), ex);
}
}
}
Expand Down Expand Up @@ -186,7 +187,7 @@ public void putNext(Tuple t) throws IOException {
try {
writer.write(null, pigTuple);
} catch (InterruptedException ex) {
throw new IOException("interrupted", ex);
throw new EsHadoopIllegalArgumentException("interrupted", ex);
}
}

Expand Down

0 comments on commit d59b7cc

Please sign in to comment.