From 2b80a3a290d96c2466a1f20108d51635b0856f6d Mon Sep 17 00:00:00 2001 From: Costin Leau Date: Sun, 30 Mar 2014 17:49:23 +0300 Subject: [PATCH] Fix issue caused by refreshing index with patterns Relates #181 --- .../elasticsearch/hadoop/mr/EsOutputFormat.java | 8 +++++--- .../org/elasticsearch/hadoop/rest/Resource.java | 16 ++++++---------- .../elasticsearch/hadoop/rest/SimpleRequest.java | 14 ++++++++++++++ .../rest/commonshttp/CommonsHttpTransport.java | 7 +++++++ .../command/AbstractCommandFactory.java | 3 ++- .../serialization/field/JsonFieldExtractors.java | 3 ++- .../elasticsearch/hadoop/util/StringUtils.java | 11 +++++++++++ .../hadoop/integration/mr/MROldApiSaveTest.java | 11 ++++++++++- 8 files changed, 57 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java b/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java index 3822e61602de9d..301d07b8b01f15 100644 --- a/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java +++ b/src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java @@ -41,6 +41,7 @@ import org.elasticsearch.hadoop.cfg.Settings; import org.elasticsearch.hadoop.cfg.SettingsManager; import org.elasticsearch.hadoop.rest.InitializationUtils; +import org.elasticsearch.hadoop.rest.Resource; import org.elasticsearch.hadoop.rest.RestRepository; import org.elasticsearch.hadoop.rest.dto.Node; import org.elasticsearch.hadoop.rest.dto.Shard; @@ -138,7 +139,8 @@ protected static class EsRecordWriter extends RecordWriter implements org.apache protected boolean initialized = false; protected RestRepository client; - private String uri, resource; + private String uri; + private Resource resource; private HeartBeat beat; private Progressable progressable; @@ -185,11 +187,11 @@ protected void init() throws IOException { beat = new HeartBeat(progressable, cfg, settings.getHeartBeatLead(), log); beat.start(); - resource = settings.getResourceWrite(); + resource = new Resource(settings, false); // single index vs multi indices IndexFormat iformat = ObjectUtils.instantiate(settings.getMappingIndexFormatClassName(), settings); - iformat.compile(resource); + iformat.compile(resource.toString()); if (iformat.hasPattern()) { initMultiIndices(settings, currentInstance); } diff --git a/src/main/java/org/elasticsearch/hadoop/rest/Resource.java b/src/main/java/org/elasticsearch/hadoop/rest/Resource.java index 4f18272c0abdfc..b1689e4c20f60b 100644 --- a/src/main/java/org/elasticsearch/hadoop/rest/Resource.java +++ b/src/main/java/org/elasticsearch/hadoop/rest/Resource.java @@ -35,6 +35,7 @@ public class Resource { private final String type; private final String index; private final String bulk; + private final String refresh; public Resource(Settings settings, boolean read) { String resource = (read ? settings.getResourceRead() : settings.getResourceWrite()); @@ -70,14 +71,7 @@ public Resource(Settings settings, boolean read) { } } - String res = resource.trim(); - - if (res.startsWith("/")) { - res = res.substring(1); - } - if (res.endsWith("/")) { - res = res.substring(0, res.length() - 1); - } + String res = StringUtils.sanitizeResource(resource); int slash = res.indexOf("/"); Assert.isTrue(slash >= 0 && slash < res.length() - 1, errorMessage); @@ -90,7 +84,9 @@ public Resource(Settings settings, boolean read) { indexAndType = index + "/" + type; // check bulk - bulk = (indexAndType.contains("{") ? "/_bulk" : indexAndType + "/_bulk"); + boolean hasPattern = indexAndType.contains("{"); + bulk = (hasPattern ? "/_bulk" : indexAndType + "/_bulk"); + refresh = (hasPattern ? "/_refresh" : index + "/_refresh"); } String bulk() { @@ -124,6 +120,6 @@ public String toString() { } public String refresh() { - return index + "/_refresh"; + return refresh; } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/hadoop/rest/SimpleRequest.java b/src/main/java/org/elasticsearch/hadoop/rest/SimpleRequest.java index 7754ce9150a833..fb02af78727985 100644 --- a/src/main/java/org/elasticsearch/hadoop/rest/SimpleRequest.java +++ b/src/main/java/org/elasticsearch/hadoop/rest/SimpleRequest.java @@ -73,4 +73,18 @@ public CharSequence params() { public ByteSequence body() { return body; } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(method.name()); + sb.append("@"); + sb.append(uri); + sb.append("/"); + sb.append(path); + if (params != null) { + sb.append("?"); + sb.append(params); + } + return sb.toString(); + } } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/hadoop/rest/commonshttp/CommonsHttpTransport.java b/src/main/java/org/elasticsearch/hadoop/rest/commonshttp/CommonsHttpTransport.java index 62994aa428dc3e..3d577d78324636 100644 --- a/src/main/java/org/elasticsearch/hadoop/rest/commonshttp/CommonsHttpTransport.java +++ b/src/main/java/org/elasticsearch/hadoop/rest/commonshttp/CommonsHttpTransport.java @@ -269,6 +269,13 @@ public Response execute(Request request) throws IOException { // NB: initialize the path _after_ the URI otherwise the path gets reset to / http.setPath(prefixPath(request.path().toString())); + try { + // validate new URI + http.getURI(); + } catch (URIException uriex) { + throw new EsHadoopTransportException("Invalid target URI " + request, uriex); + } + CharSequence params = request.params(); if (StringUtils.hasText(params)) { http.setQueryString(params.toString()); diff --git a/src/main/java/org/elasticsearch/hadoop/serialization/command/AbstractCommandFactory.java b/src/main/java/org/elasticsearch/hadoop/serialization/command/AbstractCommandFactory.java index 9d1458ae84a80e..b197cb5c660e95 100644 --- a/src/main/java/org/elasticsearch/hadoop/serialization/command/AbstractCommandFactory.java +++ b/src/main/java/org/elasticsearch/hadoop/serialization/command/AbstractCommandFactory.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.elasticsearch.hadoop.cfg.Settings; +import org.elasticsearch.hadoop.rest.Resource; import org.elasticsearch.hadoop.serialization.IndexFormat; import org.elasticsearch.hadoop.serialization.builder.ValueWriter; import org.elasticsearch.hadoop.serialization.command.TemplatedCommand.FieldWriter; @@ -102,7 +103,7 @@ private void initFieldExtractors(Settings settings) { // create adapter IndexFormat iformat = ObjectUtils. instantiate(settings.getMappingIndexFormatClassName(), settings); - iformat.compile(settings.getResourceWrite()); + iformat.compile(new Resource(settings, false).toString()); if (iformat.hasPattern()) { indexFormat = iformat; diff --git a/src/main/java/org/elasticsearch/hadoop/serialization/field/JsonFieldExtractors.java b/src/main/java/org/elasticsearch/hadoop/serialization/field/JsonFieldExtractors.java index 1037ee7880e1ca..5947dbef215811 100644 --- a/src/main/java/org/elasticsearch/hadoop/serialization/field/JsonFieldExtractors.java +++ b/src/main/java/org/elasticsearch/hadoop/serialization/field/JsonFieldExtractors.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.elasticsearch.hadoop.cfg.Settings; +import org.elasticsearch.hadoop.rest.Resource; import org.elasticsearch.hadoop.serialization.AbstractIndexFormat; import org.elasticsearch.hadoop.serialization.IndexFormat; import org.elasticsearch.hadoop.serialization.ParsingUtils; @@ -89,7 +90,7 @@ protected Object createFieldExtractor(String fieldName) { return createJsonFieldExtractor(fieldName, jsonPaths); } }; - indexFormat.compile(settings.getResourceWrite()); + indexFormat.compile(new Resource(settings, false).toString()); // if there's no pattern, simply remove it indexFormat = (indexFormat.hasPattern() ? indexFormat : null); diff --git a/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java b/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java index fa2ea8be8c4e3c..fc108768220384 100644 --- a/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java +++ b/src/main/java/org/elasticsearch/hadoop/util/StringUtils.java @@ -185,4 +185,15 @@ else if (dist == maxDistance) { return list; } + + public static String sanitizeResource(String resource) { + String res = resource.trim(); + if (res.startsWith("/")) { + res = res.substring(1); + } + if (res.endsWith("/")) { + res = res.substring(0, res.length() - 1); + } + return res; + } } \ No newline at end of file diff --git a/src/test/java/org/elasticsearch/hadoop/integration/mr/MROldApiSaveTest.java b/src/test/java/org/elasticsearch/hadoop/integration/mr/MROldApiSaveTest.java index 7f6bb9cfe82d39..23d0c24347d5bf 100644 --- a/src/test/java/org/elasticsearch/hadoop/integration/mr/MROldApiSaveTest.java +++ b/src/test/java/org/elasticsearch/hadoop/integration/mr/MROldApiSaveTest.java @@ -207,7 +207,16 @@ public void testParentChild() throws Exception { @Test public void testIndexPattern() throws Exception { JobConf conf = createJobConf(); - conf.set(ConfigurationOptions.ES_RESOURCE, "mroldapi/pattern-{number}"); + conf.set(ConfigurationOptions.ES_RESOURCE, "/mroldapi/pattern-{number}"); + conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "yes"); + + runJob(conf); + } + + @Test + public void testAlmostValidIndexPattern() throws Exception { + JobConf conf = createJobConf(); + conf.set(ConfigurationOptions.ES_RESOURCE, "mroldapi-{number}/pattern"); conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "yes"); runJob(conf);