Skip to content
Permalink
Browse files

Initial draft addressing index formatting

Relates to #187
  • Loading branch information...
costin committed Apr 14, 2014
1 parent 4c0a4a4 commit 96e4af436121b69929e854c2ef19898e4d21f88e
@@ -18,7 +18,8 @@
*/
package org.elasticsearch.hadoop.cfg;

import org.elasticsearch.hadoop.serialization.DefaultIndexFormat;
import org.elasticsearch.hadoop.serialization.field.DateIndexFormatter;
import org.elasticsearch.hadoop.serialization.field.DefaultIndexExtractor;

/**
* Class providing the various Configuration parameters used by the Elasticsearch Hadoop integration.
@@ -135,8 +136,11 @@
String ES_MAPPING_TTL_EXTRACTOR_CLASS = "es.mapping.ttl.extractor.class";
String ES_MAPPING_TIMESTAMP = "es.mapping.timestamp";
String ES_MAPPING_TIMESTAMP_EXTRACTOR_CLASS = "es.mapping.timestamp.extractor.class";
String ES_MAPPING_INDEX_FORMAT_CLASS = "es.mapping.index.format.class";
String ES_MAPPING_DEFAULT_INDEX_FORMAT_CLASS = DefaultIndexFormat.class.getName();
String ES_MAPPING_INDEX_EXTRACTOR_CLASS = "es.mapping.index.extractor.class";
String ES_MAPPING_DEFAULT_INDEX_EXTRACTOR_CLASS = DefaultIndexExtractor.class.getName();
String ES_MAPPING_INDEX_FORMATTER_CLASS = "es.mapping.index.formatter.class";
String ES_MAPPING_DEFAULT_INDEX_FORMATTER_CLASS = DateIndexFormatter.class.getName();


/** Operation types */
String ES_WRITE_OPERATION = "es.write.operation";
@@ -183,8 +183,12 @@ public String getMappingTimestampExtractorClassName() {
return getProperty(ES_MAPPING_TIMESTAMP_EXTRACTOR_CLASS, getMappingDefaultClassExtractor());
}

public String getMappingIndexFormatClassName() {
return getProperty(ES_MAPPING_INDEX_FORMAT_CLASS, ES_MAPPING_DEFAULT_INDEX_FORMAT_CLASS);
public String getMappingIndexExtractorClassName() {
return getProperty(ES_MAPPING_INDEX_EXTRACTOR_CLASS, ES_MAPPING_DEFAULT_INDEX_EXTRACTOR_CLASS);
}

public String getMappingIndexFormatterClassName() {
return getProperty(ES_MAPPING_INDEX_FORMATTER_CLASS, ES_MAPPING_DEFAULT_INDEX_FORMATTER_CLASS);
}

public boolean getUpsertDoc() {
@@ -46,7 +46,7 @@
import org.elasticsearch.hadoop.rest.RestRepository;
import org.elasticsearch.hadoop.rest.dto.Node;
import org.elasticsearch.hadoop.rest.dto.Shard;
import org.elasticsearch.hadoop.serialization.IndexFormat;
import org.elasticsearch.hadoop.serialization.field.IndexExtractor;
import org.elasticsearch.hadoop.serialization.field.MapWritableFieldExtractor;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.ObjectUtils;
@@ -191,7 +191,7 @@ protected void init() throws IOException {
resource = new Resource(settings, false);

// single index vs multi indices
IndexFormat iformat = ObjectUtils.instantiate(settings.getMappingIndexFormatClassName(), settings);
IndexExtractor iformat = ObjectUtils.instantiate(settings.getMappingIndexExtractorClassName(), settings);
iformat.compile(resource.toString());
if (iformat.hasPattern()) {
initMultiIndices(settings, currentInstance);
@@ -25,11 +25,11 @@
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.Resource;
import org.elasticsearch.hadoop.serialization.IndexFormat;
import org.elasticsearch.hadoop.serialization.builder.ValueWriter;
import org.elasticsearch.hadoop.serialization.command.TemplatedCommand.FieldWriter;
import org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor;
import org.elasticsearch.hadoop.serialization.field.FieldExtractor;
import org.elasticsearch.hadoop.serialization.field.IndexExtractor;
import org.elasticsearch.hadoop.serialization.field.JsonFieldExtractors;
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.StringUtils;
@@ -45,7 +45,7 @@
private Settings settings;
private ValueWriter<?> valueWriter;
// used when specifying an index pattern
private IndexFormat indexFormat;
private IndexExtractor indexExtractor;
private FieldExtractor idExtractor, parentExtractor, routingExtractor, versionExtractor, ttlExtractor,
timestampExtractor;

@@ -64,7 +64,7 @@ private void initFieldExtractors(Settings settings) {
}

jsonExtractors = new JsonFieldExtractors(settings);
indexFormat = jsonExtractors.indexAndType();
indexExtractor = jsonExtractors.indexAndType();

idExtractor = jsonExtractors.id();
parentExtractor = jsonExtractors.parent();
@@ -102,11 +102,11 @@ private void initFieldExtractors(Settings settings) {
}

// create adapter
IndexFormat iformat = ObjectUtils.<IndexFormat> instantiate(settings.getMappingIndexFormatClassName(), settings);
IndexExtractor iformat = ObjectUtils.<IndexExtractor> instantiate(settings.getMappingIndexExtractorClassName(), settings);
iformat.compile(new Resource(settings, false).toString());

if (iformat.hasPattern()) {
indexFormat = iformat;
indexExtractor = iformat;
}


@@ -134,8 +134,8 @@ private void initFieldExtractors(Settings settings) {
}
}

protected IndexFormat index() {
return indexFormat;
protected IndexExtractor index() {
return indexExtractor;
}

protected FieldExtractor id() {
@@ -16,20 +16,20 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.serialization;
package org.elasticsearch.hadoop.serialization.field;

import java.util.ArrayList;
import java.util.List;

import org.elasticsearch.hadoop.EsHadoopIllegalArgumentException;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.serialization.field.FieldExtractor;
import org.elasticsearch.hadoop.serialization.SettingsAware;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.ObjectUtils;



public abstract class AbstractIndexFormat implements IndexFormat, SettingsAware {
public abstract class AbstractIndexExtractor implements IndexExtractor, SettingsAware {

protected Settings settings;
protected String pattern;
@@ -64,13 +64,35 @@ public void compile(String pattern) {
template.add(string.substring(0, startPattern));
int endPattern = string.indexOf("}");
Assert.isTrue(endPattern > startPattern + 1, "Invalid pattern given " + string);
template.add(createFieldExtractor(string.substring(startPattern + 1, endPattern)));
String nestedString = string.substring(startPattern + 1, endPattern);
int separator = nestedString.indexOf(":");
if (separator > 0) {
Assert.isTrue(nestedString.length() > separator + 1, "Invalid format given " + nestedString);
nestedString = nestedString.substring(0, separator);
String format = nestedString.substring(separator + 1);
template.add(wrapWithFormatter(format, createFieldExtractor(nestedString)));
}
else {
template.add(createFieldExtractor(nestedString));
}
string = string.substring(endPattern + 1).trim();
}
template.add(string);
return template;
}

private Object wrapWithFormatter(String format, final FieldExtractor createFieldExtractor) {
// instantiate field extractor
final IndexFormatter iformatter = ObjectUtils.instantiate(settings.getMappingIndexFormatterClassName(), settings);
iformatter.configure(format);
return new FieldExtractor() {
@Override
public String field(Object target) {
return iformatter.format(createFieldExtractor.field(target));
}
};
}

private void append(StringBuilder sb, List<Object> list, Object target) {
for (Object object : list) {
if (object instanceof FieldExtractor) {
@@ -106,5 +128,5 @@ public boolean hasPattern() {
return hasPattern;
}

protected abstract Object createFieldExtractor(String fieldName);
protected abstract FieldExtractor createFieldExtractor(String fieldName);
}
@@ -0,0 +1,43 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.serialization.field;

import java.text.SimpleDateFormat;
import java.util.Calendar;

import javax.xml.bind.DatatypeConverter;


public class DateIndexFormatter implements IndexFormatter {

private String format;
private SimpleDateFormat dateFormat;

@Override
public void configure(String format) {
this.format = format;
this.dateFormat = new SimpleDateFormat(format);
}

@Override
public String format(String value) {
Calendar calendar = DatatypeConverter.parseDateTime(value);
return dateFormat.format(calendar.getTime());
}
}
@@ -16,14 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.serialization;
package org.elasticsearch.hadoop.serialization.field;

import org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor;
import org.elasticsearch.hadoop.util.ObjectUtils;

public class DefaultIndexFormat extends AbstractIndexFormat {
public class DefaultIndexExtractor extends AbstractIndexExtractor {

protected Object createFieldExtractor(String fieldName) {
protected FieldExtractor createFieldExtractor(String fieldName) {
settings.setProperty(ConstantFieldExtractor.PROPERTY, fieldName);
return ObjectUtils.instantiate(settings.getMappingDefaultClassExtractor(), settings);
}
@@ -16,15 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.serialization;
package org.elasticsearch.hadoop.serialization.field;

import org.elasticsearch.hadoop.serialization.field.FieldExtractor;


/**
* Produces an index name based on the given pattern and arguments (field values).
*/
public interface IndexFormat extends FieldExtractor {
public interface IndexExtractor extends FieldExtractor {

void compile(String pattern);

@@ -0,0 +1,26 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.hadoop.serialization.field;

public interface IndexFormatter {

void configure(String format);

String format(String value);
}
@@ -26,8 +26,6 @@
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.rest.Resource;
import org.elasticsearch.hadoop.serialization.AbstractIndexFormat;
import org.elasticsearch.hadoop.serialization.IndexFormat;
import org.elasticsearch.hadoop.serialization.ParsingUtils;
import org.elasticsearch.hadoop.serialization.json.JacksonJsonParser;
import org.elasticsearch.hadoop.util.BytesArray;
@@ -44,7 +42,7 @@
private String[] paths;

private FieldExtractor id, parent, routing, ttl, version, timestamp;
private IndexFormat indexFormat;
private IndexExtractor indexExtractor;

class PrecomputedFieldExtractor implements FieldExtractor {

@@ -84,16 +82,16 @@ public JsonFieldExtractors(Settings settings) {
timestamp = init(settings.getMappingTimestamp(), jsonPaths);

// create index format
indexFormat = new AbstractIndexFormat() {
indexExtractor = new AbstractIndexExtractor() {
@Override
protected Object createFieldExtractor(String fieldName) {
protected FieldExtractor createFieldExtractor(String fieldName) {
return createJsonFieldExtractor(fieldName, jsonPaths);
}
};
indexFormat.compile(new Resource(settings, false).toString());
indexExtractor.compile(new Resource(settings, false).toString());

// if there's no pattern, simply remove it
indexFormat = (indexFormat.hasPattern() ? indexFormat : null);
indexExtractor = (indexExtractor.hasPattern() ? indexExtractor : null);

paths = jsonPaths.toArray(new String[jsonPaths.size()]);
}
@@ -123,8 +121,8 @@ private String initConstant(String field) {
return null;
}

public IndexFormat indexAndType() {
return indexFormat;
public IndexExtractor indexAndType() {
return indexExtractor;
}

public FieldExtractor id() {

0 comments on commit 96e4af4

Please sign in to comment.
You can’t perform that action at this time.