Skip to content

Commit

Permalink
Fix issue caused by refreshing index with patterns
Browse files Browse the repository at this point in the history
Relates elastic#181
  • Loading branch information
costin committed Mar 30, 2014
1 parent 710ac5c commit 2b80a3a
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 16 deletions.
8 changes: 5 additions & 3 deletions src/main/java/org/elasticsearch/hadoop/mr/EsOutputFormat.java
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.cfg.SettingsManager;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.rest.Resource;
import org.elasticsearch.hadoop.rest.RestRepository;
import org.elasticsearch.hadoop.rest.dto.Node;
import org.elasticsearch.hadoop.rest.dto.Shard;
Expand Down Expand Up @@ -138,7 +139,8 @@ protected static class EsRecordWriter extends RecordWriter implements org.apache
protected boolean initialized = false;

protected RestRepository client;
private String uri, resource;
private String uri;
private Resource resource;

private HeartBeat beat;
private Progressable progressable;
Expand Down Expand Up @@ -185,11 +187,11 @@ protected void init() throws IOException {
beat = new HeartBeat(progressable, cfg, settings.getHeartBeatLead(), log);
beat.start();

resource = settings.getResourceWrite();
resource = new Resource(settings, false);

// single index vs multi indices
IndexFormat iformat = ObjectUtils.instantiate(settings.getMappingIndexFormatClassName(), settings);
iformat.compile(resource);
iformat.compile(resource.toString());
if (iformat.hasPattern()) {
initMultiIndices(settings, currentInstance);
}
Expand Down
16 changes: 6 additions & 10 deletions src/main/java/org/elasticsearch/hadoop/rest/Resource.java
Expand Up @@ -35,6 +35,7 @@ public class Resource {
private final String type;
private final String index;
private final String bulk;
private final String refresh;

public Resource(Settings settings, boolean read) {
String resource = (read ? settings.getResourceRead() : settings.getResourceWrite());
Expand Down Expand Up @@ -70,14 +71,7 @@ public Resource(Settings settings, boolean read) {
}
}

String res = resource.trim();

if (res.startsWith("/")) {
res = res.substring(1);
}
if (res.endsWith("/")) {
res = res.substring(0, res.length() - 1);
}
String res = StringUtils.sanitizeResource(resource);

int slash = res.indexOf("/");
Assert.isTrue(slash >= 0 && slash < res.length() - 1, errorMessage);
Expand All @@ -90,7 +84,9 @@ public Resource(Settings settings, boolean read) {
indexAndType = index + "/" + type;

// check bulk
bulk = (indexAndType.contains("{") ? "/_bulk" : indexAndType + "/_bulk");
boolean hasPattern = indexAndType.contains("{");
bulk = (hasPattern ? "/_bulk" : indexAndType + "/_bulk");
refresh = (hasPattern ? "/_refresh" : index + "/_refresh");
}

String bulk() {
Expand Down Expand Up @@ -124,6 +120,6 @@ public String toString() {
}

public String refresh() {
return index + "/_refresh";
return refresh;
}
}
14 changes: 14 additions & 0 deletions src/main/java/org/elasticsearch/hadoop/rest/SimpleRequest.java
Expand Up @@ -73,4 +73,18 @@ public CharSequence params() {
public ByteSequence body() {
return body;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder(method.name());
sb.append("@");
sb.append(uri);
sb.append("/");
sb.append(path);
if (params != null) {
sb.append("?");
sb.append(params);
}
return sb.toString();
}
}
Expand Up @@ -269,6 +269,13 @@ public Response execute(Request request) throws IOException {
// NB: initialize the path _after_ the URI otherwise the path gets reset to /
http.setPath(prefixPath(request.path().toString()));

try {
// validate new URI
http.getURI();
} catch (URIException uriex) {
throw new EsHadoopTransportException("Invalid target URI " + request, uriex);
}

CharSequence params = request.params();
if (StringUtils.hasText(params)) {
http.setQueryString(params.toString());
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.Resource;
import org.elasticsearch.hadoop.serialization.IndexFormat;
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;
import org.elasticsearch.hadoop.serialization.command.TemplatedCommand.FieldWriter;
Expand Down Expand Up @@ -102,7 +103,7 @@ private void initFieldExtractors(Settings settings) {

// create adapter
IndexFormat iformat = ObjectUtils.<IndexFormat> instantiate(settings.getMappingIndexFormatClassName(), settings);
iformat.compile(settings.getResourceWrite());
iformat.compile(new Resource(settings, false).toString());

if (iformat.hasPattern()) {
indexFormat = iformat;
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.Resource;
import org.elasticsearch.hadoop.serialization.AbstractIndexFormat;
import org.elasticsearch.hadoop.serialization.IndexFormat;
import org.elasticsearch.hadoop.serialization.ParsingUtils;
Expand Down Expand Up @@ -89,7 +90,7 @@ protected Object createFieldExtractor(String fieldName) {
return createJsonFieldExtractor(fieldName, jsonPaths);
}
};
indexFormat.compile(settings.getResourceWrite());
indexFormat.compile(new Resource(settings, false).toString());

// if there's no pattern, simply remove it
indexFormat = (indexFormat.hasPattern() ? indexFormat : null);
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/org/elasticsearch/hadoop/util/StringUtils.java
Expand Up @@ -185,4 +185,15 @@ else if (dist == maxDistance) {

return list;
}

public static String sanitizeResource(String resource) {
String res = resource.trim();
if (res.startsWith("/")) {
res = res.substring(1);
}
if (res.endsWith("/")) {
res = res.substring(0, res.length() - 1);
}
return res;
}
}
Expand Up @@ -207,7 +207,16 @@ public void testParentChild() throws Exception {
@Test
public void testIndexPattern() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_RESOURCE, "mroldapi/pattern-{number}");
conf.set(ConfigurationOptions.ES_RESOURCE, "/mroldapi/pattern-{number}");
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "yes");

runJob(conf);
}

@Test
public void testAlmostValidIndexPattern() throws Exception {
JobConf conf = createJobConf();
conf.set(ConfigurationOptions.ES_RESOURCE, "mroldapi-{number}/pattern");
conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "yes");

runJob(conf);
Expand Down

0 comments on commit 2b80a3a

Please sign in to comment.