Skip to content

Commit

Permalink
Realtime GET, closes #1060.
Browse files Browse the repository at this point in the history
  • Loading branch information
kimchy committed Jun 24, 2011
1 parent 4547bc3 commit 72ee0aa
Show file tree
Hide file tree
Showing 33 changed files with 874 additions and 144 deletions.
1 change: 1 addition & 0 deletions .idea/dictionaries/kimchy.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

@@ -0,0 +1,55 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.benchmark.get;

import org.elasticsearch.client.Client;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;

// simple test for embedded / single remote lookup
public class SimpleGetActionBenchmark {

public static void main(String[] args) {
long OPERATIONS = SizeValue.parseSizeValue("300k").singles();

Node node = NodeBuilder.nodeBuilder().node();

Client client;
if (false) {
client = NodeBuilder.nodeBuilder().client(true).node().client();
} else {
client = node.client();
}

client.prepareIndex("test", "type1", "1").setSource("field1", "value1").execute().actionGet();

StopWatch stopWatch = new StopWatch().start();
for (long i = 0; i < OPERATIONS; i++) {
client.prepareGet("test", "type1", "1").execute().actionGet();
}
stopWatch.stop();

System.out.println("Ran in " + stopWatch.totalTime() + ", per second: " + (((double) OPERATIONS) / stopWatch.totalTime().secondsFrac()));

node.close();
}
}
Expand Up @@ -44,6 +44,8 @@ public class GetRequest extends SingleShardOperationRequest {

private boolean refresh = false;

Boolean realtime;

GetRequest() {
}

Expand Down Expand Up @@ -140,6 +142,15 @@ public boolean refresh() {
return this.refresh;
}

public boolean realtime() {
return this.realtime == null ? true : this.realtime;
}

public GetRequest realtime(Boolean realtime) {
this.realtime = realtime;
return this;
}

/**
* Should the listener be called on a separate thread if needed.
*/
Expand All @@ -166,6 +177,12 @@ public boolean refresh() {
fields[i] = in.readUTF();
}
}
byte realtime = in.readByte();
if (realtime == 0) {
this.realtime = false;
} else if (realtime == 1) {
this.realtime = true;
}
}

@Override public void writeTo(StreamOutput out) throws IOException {
Expand All @@ -179,6 +196,13 @@ public boolean refresh() {
out.writeUTF(field);
}
}
if (realtime == null) {
out.writeByte((byte) -1);
} else if (realtime == false) {
out.writeByte((byte) 0);
} else {
out.writeByte((byte) 1);
}
}

@Override public String toString() {
Expand Down
Expand Up @@ -21,15 +21,19 @@

import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.compress.lzf.LZFDecoder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.*;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
import org.elasticsearch.search.lookup.SourceLookup;

import java.io.IOException;
import java.util.Iterator;
Expand Down Expand Up @@ -62,12 +66,14 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel

private Map<String, Object> sourceAsMap;

private byte[] source;
private BytesHolder source;

private byte[] sourceAsBytes;

GetResponse() {
}

GetResponse(String index, String type, String id, long version, boolean exists, byte[] source, Map<String, GetField> fields) {
GetResponse(String index, String type, String id, long version, boolean exists, BytesHolder source, Map<String, GetField> fields) {
this.index = index;
this.type = type;
this.id = id;
Expand Down Expand Up @@ -157,9 +163,21 @@ public byte[] source() {
if (source == null) {
return null;
}
if (LZF.isCompressed(source)) {
if (sourceAsBytes != null) {
return sourceAsBytes;
}
this.sourceAsBytes = sourceRef().copyBytes();
return this.sourceAsBytes;
}

/**
* Returns bytes reference, also un compress the source if needed.
*/
public BytesHolder sourceRef() {
if (LZF.isCompressed(source.bytes(), source.offset(), source.length())) {
try {
this.source = LZFDecoder.decode(source);
// TODO decompress without doing an extra copy!
this.source = new BytesHolder(LZFDecoder.decode(source.copyBytes()));
} catch (IOException e) {
throw new ElasticSearchParseException("failed to decompress source", e);
}
Expand All @@ -181,7 +199,8 @@ public String sourceAsString() {
if (source == null) {
return null;
}
return Unicode.fromBytes(source());
BytesHolder source = sourceRef();
return Unicode.fromBytes(source.bytes(), source.offset(), source.length());
}

/**
Expand All @@ -195,20 +214,9 @@ public Map<String, Object> sourceAsMap() throws ElasticSearchParseException {
if (sourceAsMap != null) {
return sourceAsMap;
}
byte[] source = source();
XContentParser parser = null;
try {
parser = XContentFactory.xContent(source).createParser(source);
sourceAsMap = parser.map();
parser.close();
return sourceAsMap;
} catch (Exception e) {
throw new ElasticSearchParseException("Failed to parse source to map", e);
} finally {
if (parser != null) {
parser.close();
}
}

sourceAsMap = SourceLookup.sourceAsMap(source.bytes(), source.offset(), source.length());
return sourceAsMap;
}

public Map<String, Object> getSource() {
Expand Down Expand Up @@ -258,7 +266,7 @@ static final class Fields {
builder.field(Fields._VERSION, version);
}
if (source != null) {
RestXContentBuilder.restDocumentSource(source, builder, params);
RestXContentBuilder.restDocumentSource(source.bytes(), source.offset(), source.length(), builder, params);
}

if (fields != null && !fields.isEmpty()) {
Expand Down Expand Up @@ -294,12 +302,10 @@ static final class Fields {
version = in.readLong();
exists = in.readBoolean();
if (exists) {
int size = in.readVInt();
if (size > 0) {
source = new byte[size];
in.readFully(source);
if (in.readBoolean()) {
source = BytesHolder.readBytesHolder(in);
}
size = in.readVInt();
int size = in.readVInt();
if (size == 0) {
fields = ImmutableMap.of();
} else {
Expand All @@ -320,10 +326,10 @@ static final class Fields {
out.writeBoolean(exists);
if (exists) {
if (source == null) {
out.writeVInt(0);
out.writeBoolean(false);
} else {
out.writeVInt(source.length);
out.writeBytes(source);
out.writeBoolean(true);
source.writeTo(out);
}
if (fields == null) {
out.writeVInt(0);
Expand Down

0 comments on commit 72ee0aa

Please sign in to comment.