Skip to content

Commit

Permalink
Support fetching _routing, _parent, _timestamp using realtime get whe…
Browse files Browse the repository at this point in the history
…n stored, closes #1289.
  • Loading branch information
kimchy committed Aug 30, 2011
1 parent 6560a9e commit b3ef0a3
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 54 deletions.
Expand Up @@ -44,6 +44,17 @@ public BytesStreamInput(byte buf[], int offset, int length) {
this.count = Math.min(offset + length, buf.length);
}

@Override public long skip(long n) throws IOException {
if (pos + n > count) {
n = count - pos;
}
if (n < 0) {
return 0;
}
pos += n;
return n;
}

public int position() {
return this.pos;
}
Expand Down
Expand Up @@ -27,7 +27,6 @@
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.Query;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.CloseableComponent;
import org.elasticsearch.common.lease.Releasable;
Expand Down Expand Up @@ -743,13 +742,13 @@ public Get loadSource(boolean loadSource) {
static class GetResult {
private final boolean exists;
private final long version;
private final BytesHolder source;
private final Translog.Source source;
private final UidField.DocIdAndVersion docIdAndVersion;
private final Searcher searcher;

public static final GetResult NOT_EXISTS = new GetResult(false, -1, null);

public GetResult(boolean exists, long version, @Nullable BytesHolder source) {
public GetResult(boolean exists, long version, @Nullable Translog.Source source) {
this.source = source;
this.exists = exists;
this.version = version;
Expand All @@ -773,7 +772,7 @@ public long version() {
return this.version;
}

@Nullable public BytesHolder source() {
@Nullable public Translog.Source source() {
return source;
}

Expand Down
Expand Up @@ -33,7 +33,6 @@
import org.apache.lucene.util.UnicodeUtil;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.bloom.BloomFilter;
Expand Down Expand Up @@ -304,7 +303,7 @@ public GetResult get(Get get) throws EngineException {
byte[] data = translog.read(versionValue.translogLocation());
if (data != null) {
try {
BytesHolder source = TranslogStreams.readSource(data);
Translog.Source source = TranslogStreams.readSource(data);
return new GetResult(true, versionValue.version(), source);
} catch (IOException e) {
// switched on us, read it from the reader
Expand Down
Expand Up @@ -35,12 +35,16 @@
import org.elasticsearch.index.mapper.FieldMappers;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.mapper.internal.RoutingFieldMapper;
import org.elasticsearch.index.mapper.internal.TimestampFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.mapper.selector.FieldMappersFieldSelector;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.SearchScript;
import org.elasticsearch.search.lookup.SearchLookup;
Expand Down Expand Up @@ -224,7 +228,7 @@ public GetResult innerGet(String type, String id, String[] gFields, boolean real

return new GetResult(shardId.index().name(), type, id, get.version(), get.exists(), source == null ? null : new BytesHolder(source), fields);
} else {
BytesHolder source = get.source();
Translog.Source source = get.source();

Map<String, GetField> fields = null;
boolean sourceRequested = false;
Expand All @@ -236,56 +240,70 @@ public GetResult innerGet(String type, String id, String[] gFields, boolean real
// no fields, and no source
sourceRequested = false;
} else {
Map<String, Object> sourceAsMap = SourceLookup.sourceAsMap(source.bytes(), source.offset(), source.length());
Map<String, Object> sourceAsMap = null;
SearchLookup searchLookup = null;
for (String field : gFields) {
if (field.equals("_source")) {
sourceRequested = true;
continue;
}
String script = null;
if (field.contains("_source.")) {
script = field;
Object value = null;
if (field.equals(RoutingFieldMapper.NAME) && docMapper.routingFieldMapper().stored()) {
value = source.routing;
} else if (field.equals(ParentFieldMapper.NAME) && docMapper.parentFieldMapper().stored()) {
value = source.parent;
} else if (field.equals(TimestampFieldMapper.NAME) && docMapper.timestampFieldMapper().stored()) {
value = source.timestamp;
} else {
FieldMappers x = docMapper.mappers().smartName(field);
if (x != null) {
script = "_source." + x.mapper().names().fullName();
}
}
if (script != null) {
if (searchLookup == null) {
searchLookup = new SearchLookup(mapperService, indexCache.fieldData());
String script = null;
if (field.contains("_source.")) {
script = field;
} else {
FieldMappers x = docMapper.mappers().smartName(field);
if (x != null) {
script = "_source." + x.mapper().names().fullName();
}
}
SearchScript searchScript = scriptService.search(searchLookup, "mvel", script, null);
// we can't do this, only allow to run scripts against the source
//searchScript.setNextReader(docIdAndVersion.reader);
//searchScript.setNextDocId(docIdAndVersion.docId);

// but, we need to inject the parsed source into the script, so it will be used...
searchScript.setNextSource(sourceAsMap);

try {
Object value = searchScript.run();
if (fields == null) {
fields = newHashMapWithExpectedSize(2);
if (script != null) {
if (searchLookup == null) {
searchLookup = new SearchLookup(mapperService, indexCache.fieldData());
}
GetField getField = fields.get(field);
if (getField == null) {
getField = new GetField(field, new ArrayList<Object>(2));
fields.put(field, getField);
if (sourceAsMap == null) {
sourceAsMap = SourceLookup.sourceAsMap(source.source.bytes(), source.source.offset(), source.source.length());
}
getField.values().add(value);
} catch (RuntimeException e) {
if (logger.isTraceEnabled()) {
logger.trace("failed to execute get request script field [{}]", e, script);
SearchScript searchScript = scriptService.search(searchLookup, "mvel", script, null);
// we can't do this, only allow to run scripts against the source
//searchScript.setNextReader(docIdAndVersion.reader);
//searchScript.setNextDocId(docIdAndVersion.docId);

// but, we need to inject the parsed source into the script, so it will be used...
searchScript.setNextSource(sourceAsMap);

try {
value = searchScript.run();
} catch (RuntimeException e) {
if (logger.isTraceEnabled()) {
logger.trace("failed to execute get request script field [{}]", e, script);
}
// ignore
}
// ignore
}
}
if (value != null) {
if (fields == null) {
fields = newHashMapWithExpectedSize(2);
}
GetField getField = fields.get(field);
if (getField == null) {
getField = new GetField(field, new ArrayList<Object>(2));
fields.put(field, getField);
}
getField.values().add(value);
}
}
}

return new GetResult(shardId.index().name(), type, id, get.version(), get.exists(), sourceRequested ? source : null, fields);
return new GetResult(shardId.index().name(), type, id, get.version(), get.exists(), sourceRequested ? source.source : null, fields);
}
} finally {
get.release();
Expand Down
Expand Up @@ -223,7 +223,21 @@ public static Type fromId(byte id) {

long estimateSize();

BytesHolder readSource(BytesStreamInput in) throws IOException;
Source readSource(BytesStreamInput in) throws IOException;
}

static class Source {
public final BytesHolder source;
public final String routing;
public final String parent;
public final long timestamp;

public Source(BytesHolder source, String routing, String parent, long timestamp) {
this.source = source;
this.routing = routing;
this.parent = parent;
this.timestamp = timestamp;
}
}

static class Create implements Operation {
Expand Down Expand Up @@ -288,14 +302,32 @@ public long version() {
return this.version;
}

@Override public BytesHolder readSource(BytesStreamInput in) throws IOException {
@Override public Source readSource(BytesStreamInput in) throws IOException {
int version = in.readVInt(); // version
id = in.readUTF();
type = in.readUTF();

int length = in.readVInt();
int offset = in.position();
return new BytesHolder(in.underlyingBuffer(), offset, length);
BytesHolder source = new BytesHolder(in.underlyingBuffer(), offset, length);
in.skip(length);
if (version >= 1) {
if (in.readBoolean()) {
routing = in.readUTF();
}
}
if (version >= 2) {
if (in.readBoolean()) {
parent = in.readUTF();
}
}
if (version >= 3) {
this.version = in.readLong();
}
if (version >= 4) {
this.timestamp = in.readLong();
}
return new Source(source, routing, parent, timestamp);
}

@Override public void readFrom(StreamInput in) throws IOException {
Expand Down Expand Up @@ -407,14 +439,32 @@ public long version() {
return this.version;
}

@Override public BytesHolder readSource(BytesStreamInput in) throws IOException {
@Override public Source readSource(BytesStreamInput in) throws IOException {
int version = in.readVInt(); // version
id = in.readUTF();
type = in.readUTF();

int length = in.readVInt();
int offset = in.position();
return new BytesHolder(in.underlyingBuffer(), offset, length);
BytesHolder source = new BytesHolder(in.underlyingBuffer(), offset, length);
in.skip(length);
if (version >= 1) {
if (in.readBoolean()) {
routing = in.readUTF();
}
}
if (version >= 2) {
if (in.readBoolean()) {
parent = in.readUTF();
}
}
if (version >= 3) {
this.version = in.readLong();
}
if (version >= 4) {
this.timestamp = in.readLong();
}
return new Source(source, routing, parent, timestamp);
}

@Override public void readFrom(StreamInput in) throws IOException {
Expand Down Expand Up @@ -496,7 +546,7 @@ public long version() {
return this.version;
}

@Override public BytesHolder readSource(BytesStreamInput in) throws IOException {
@Override public Source readSource(BytesStreamInput in) throws IOException {
throw new ElasticSearchIllegalStateException("trying to read doc source from delete operation");
}

Expand Down Expand Up @@ -554,7 +604,7 @@ public String[] types() {
return this.types;
}

@Override public BytesHolder readSource(BytesStreamInput in) throws IOException {
@Override public Source readSource(BytesStreamInput in) throws IOException {
throw new ElasticSearchIllegalStateException("trying to read doc source from delete_by_query operation");
}

Expand Down
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.index.translog;

import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -54,7 +53,7 @@ public static Translog.Operation readTranslogOperation(StreamInput in) throws IO
return operation;
}

public static BytesHolder readSource(byte[] data) throws IOException {
public static Translog.Source readSource(byte[] data) throws IOException {
BytesStreamInput in = new BytesStreamInput(data);
in.readInt(); // the size header
Translog.Operation.Type type = Translog.Operation.Type.fromId(in.readByte());
Expand Down
Expand Up @@ -214,7 +214,7 @@ protected MergeSchedulerProvider createMergeScheduler() {
// but, we can still get it (in realtime)
Engine.GetResult getResult = engine.get(new Engine.Get(true, newUid("1")));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.source(), equalTo(new BytesHolder(B_1)));
assertThat(getResult.source().source, equalTo(new BytesHolder(B_1)));
assertThat(getResult.docIdAndVersion(), nullValue());

// but, not there non realtime
Expand Down Expand Up @@ -249,7 +249,7 @@ protected MergeSchedulerProvider createMergeScheduler() {
// but, we can still get it (in realtime)
getResult = engine.get(new Engine.Get(true, newUid("1")));
assertThat(getResult.exists(), equalTo(true));
assertThat(getResult.source(), equalTo(new BytesHolder(B_2)));
assertThat(getResult.source().source, equalTo(new BytesHolder(B_2)));
assertThat(getResult.docIdAndVersion(), nullValue());

// refresh and it should be updated
Expand Down
Expand Up @@ -64,12 +64,21 @@ protected Client getClient() {
client.prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefresh(true).execute().actionGet();
long now2 = System.currentTimeMillis();

// we need to add support for fetching _timestamp from the translog in realtime case...
GetResponse getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(false).execute().actionGet();
// we check both realtime get and non realtime get
GetResponse getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(true).execute().actionGet();
long timestamp = ((Number) getResponse.field("_timestamp").value()).longValue();
assertThat(timestamp, greaterThanOrEqualTo(now1));
assertThat(timestamp, lessThanOrEqualTo(now2));
// verify its the same timestamp when going the replica
getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(true).execute().actionGet();
assertThat(((Number) getResponse.field("_timestamp").value()).longValue(), equalTo(timestamp));

// non realtime get (stored)
getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(false).execute().actionGet();
timestamp = ((Number) getResponse.field("_timestamp").value()).longValue();
assertThat(timestamp, greaterThanOrEqualTo(now1));
assertThat(timestamp, lessThanOrEqualTo(now2));
// verify its the same timestamp when going the replica
getResponse = client.prepareGet("test", "type1", "1").setFields("_timestamp").setRealtime(false).execute().actionGet();
assertThat(((Number) getResponse.field("_timestamp").value()).longValue(), equalTo(timestamp));

Expand Down

0 comments on commit b3ef0a3

Please sign in to comment.