Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove poor-mans compression in InternalSearchHit and friends #20472

Merged
merged 2 commits into from
Sep 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.transport.TransportResponse;

import java.io.IOException;

import static org.elasticsearch.search.internal.InternalSearchHits.StreamContext;

/**
*
*/
Expand Down Expand Up @@ -70,9 +69,17 @@ public void shardTarget(SearchShardTarget shardTarget) {
}

public void hits(InternalSearchHits hits) {
assert assertNoSearchTarget(hits);
this.hits = hits;
}

private boolean assertNoSearchTarget(InternalSearchHits hits) {
for (SearchHit hit : hits.hits()) {
assert hit.getShard() == null : "expected null but got: " + hit.getShard();
}
return true;
}

public InternalSearchHits hits() {
return hits;
}
Expand All @@ -96,13 +103,13 @@ public static FetchSearchResult readFetchSearchResult(StreamInput in) throws IOE
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readLong();
hits = InternalSearchHits.readSearchHits(in, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM));
hits = InternalSearchHits.readSearchHits(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
hits.writeTo(out, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM));
hits.writeTo(out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.internal.InternalSearchHits.StreamContext.ShardTargetType;
import org.elasticsearch.search.lookup.SourceLookup;

import java.io.IOException;
Expand Down Expand Up @@ -554,18 +553,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

public static InternalSearchHit readSearchHit(StreamInput in, InternalSearchHits.StreamContext context) throws IOException {
public static InternalSearchHit readSearchHit(StreamInput in) throws IOException {
InternalSearchHit hit = new InternalSearchHit();
hit.readFrom(in, context);
hit.readFrom(in);
return hit;
}

@Override
public void readFrom(StreamInput in) throws IOException {
readFrom(in, InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM));
}

public void readFrom(StreamInput in, InternalSearchHits.StreamContext context) throws IOException {
score = in.readFloat();
id = in.readOptionalText();
type = in.readOptionalText();
Expand Down Expand Up @@ -644,37 +639,20 @@ public void readFrom(StreamInput in, InternalSearchHits.StreamContext context) t
matchedQueries[i] = in.readString();
}
}

if (context.streamShardTarget() == ShardTargetType.STREAM) {
if (in.readBoolean()) {
shard = new SearchShardTarget(in);
}
} else if (context.streamShardTarget() == ShardTargetType.LOOKUP) {
int lookupId = in.readVInt();
if (lookupId > 0) {
shard = context.handleShardLookup().get(lookupId);
}
}

shard = in.readOptionalWriteable(SearchShardTarget::new);
size = in.readVInt();
if (size > 0) {
innerHits = new HashMap<>(size);
for (int i = 0; i < size; i++) {
String key = in.readString();
ShardTargetType shardTarget = context.streamShardTarget();
InternalSearchHits value = InternalSearchHits.readSearchHits(in, context.streamShardTarget(ShardTargetType.NO_STREAM));
context.streamShardTarget(shardTarget);
InternalSearchHits value = InternalSearchHits.readSearchHits(in);
innerHits.put(key, value);
}
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
writeTo(out, InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM));
}

public void writeTo(StreamOutput out, InternalSearchHits.StreamContext context) throws IOException {
out.writeFloat(score);
out.writeOptionalText(id);
out.writeOptionalText(type);
Expand Down Expand Up @@ -752,31 +730,14 @@ public void writeTo(StreamOutput out, InternalSearchHits.StreamContext context)
out.writeString(matchedFilter);
}
}

if (context.streamShardTarget() == ShardTargetType.STREAM) {
if (shard == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
shard.writeTo(out);
}
} else if (context.streamShardTarget() == ShardTargetType.LOOKUP) {
if (shard == null) {
out.writeVInt(0);
} else {
out.writeVInt(context.shardHandleLookup().get(shard));
}
}

out.writeOptionalWriteable(shard);
if (innerHits == null) {
out.writeVInt(0);
} else {
out.writeVInt(innerHits.size());
for (Map.Entry<String, InternalSearchHits> entry : innerHits.entrySet()) {
out.writeString(entry.getKey());
ShardTargetType shardTarget = context.streamShardTarget();
entry.getValue().writeTo(out, context.streamShardTarget(ShardTargetType.NO_STREAM));
context.streamShardTarget(shardTarget);
entry.getValue().writeTo(out);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,54 +40,6 @@
*/
public class InternalSearchHits implements SearchHits {

public static class StreamContext {

public static enum ShardTargetType {
STREAM,
LOOKUP,
NO_STREAM
}

private IdentityHashMap<SearchShardTarget, Integer> shardHandleLookup = new IdentityHashMap<>();
private IntObjectHashMap<SearchShardTarget> handleShardLookup = new IntObjectHashMap<>();
private ShardTargetType streamShardTarget = ShardTargetType.STREAM;

public StreamContext reset() {
shardHandleLookup.clear();
handleShardLookup.clear();
streamShardTarget = ShardTargetType.STREAM;
return this;
}

public IdentityHashMap<SearchShardTarget, Integer> shardHandleLookup() {
return shardHandleLookup;
}

public IntObjectHashMap<SearchShardTarget> handleShardLookup() {
return handleShardLookup;
}

public ShardTargetType streamShardTarget() {
return streamShardTarget;
}

public StreamContext streamShardTarget(ShardTargetType streamShardTarget) {
this.streamShardTarget = streamShardTarget;
return this;
}
}

private static final ThreadLocal<StreamContext> cache = new ThreadLocal<StreamContext>() {
@Override
protected StreamContext initialValue() {
return new StreamContext();
}
};

public static StreamContext streamContext() {
return cache.get().reset();
}

public static InternalSearchHits empty() {
// We shouldn't use static final instance, since that could directly be returned by native transport clients
return new InternalSearchHits(EMPTY, 0, 0);
Expand Down Expand Up @@ -186,11 +138,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

public static InternalSearchHits readSearchHits(StreamInput in, StreamContext context) throws IOException {
InternalSearchHits hits = new InternalSearchHits();
hits.readFrom(in, context);
return hits;
}

public static InternalSearchHits readSearchHits(StreamInput in) throws IOException {
InternalSearchHits hits = new InternalSearchHits();
Expand All @@ -200,63 +147,27 @@ public static InternalSearchHits readSearchHits(StreamInput in) throws IOExcepti

@Override
public void readFrom(StreamInput in) throws IOException {
readFrom(in, streamContext().streamShardTarget(StreamContext.ShardTargetType.LOOKUP));
}

public void readFrom(StreamInput in, StreamContext context) throws IOException {
totalHits = in.readVLong();
maxScore = in.readFloat();
int size = in.readVInt();
if (size == 0) {
hits = EMPTY;
} else {
if (context.streamShardTarget() == StreamContext.ShardTargetType.LOOKUP) {
// read the lookup table first
int lookupSize = in.readVInt();
for (int i = 0; i < lookupSize; i++) {
context.handleShardLookup().put(in.readVInt(), new SearchShardTarget(in));
}
}

hits = new InternalSearchHit[size];
for (int i = 0; i < hits.length; i++) {
hits[i] = readSearchHit(in, context);
hits[i] = readSearchHit(in);
}
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
writeTo(out, streamContext().streamShardTarget(StreamContext.ShardTargetType.LOOKUP));
}

public void writeTo(StreamOutput out, StreamContext context) throws IOException {
out.writeVLong(totalHits);
out.writeFloat(maxScore);
out.writeVInt(hits.length);
if (hits.length > 0) {
if (context.streamShardTarget() == StreamContext.ShardTargetType.LOOKUP) {
// start from 1, 0 is for null!
int counter = 1;
for (InternalSearchHit hit : hits) {
if (hit.shard() != null) {
Integer handle = context.shardHandleLookup().get(hit.shard());
if (handle == null) {
context.shardHandleLookup().put(hit.shard(), counter++);
}
}
}
out.writeVInt(context.shardHandleLookup().size());
if (!context.shardHandleLookup().isEmpty()) {
for (Map.Entry<SearchShardTarget, Integer> entry : context.shardHandleLookup().entrySet()) {
out.writeVInt(entry.getValue());
entry.getKey().writeTo(out);
}
}
}

for (InternalSearchHit hit : hits) {
hit.writeTo(out, context);
hit.writeTo(out);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchHits.StreamContext.ShardTargetType;
import org.elasticsearch.search.suggest.Suggest;

import java.io.IOException;
Expand Down Expand Up @@ -261,8 +260,7 @@ public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.doc = Lucene.readScoreDoc(in);
if (in.readBoolean()) {
this.hit = InternalSearchHit.readSearchHit(in,
InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM));
this.hit = InternalSearchHit.readSearchHit(in);
}
int contextSize = in.readInt();
this.contexts = new LinkedHashMap<>(contextSize);
Expand All @@ -283,7 +281,7 @@ public void writeTo(StreamOutput out) throws IOException {
Lucene.writeScoreDoc(out, doc);
if (hit != null) {
out.writeBoolean(true);
hit.writeTo(out, InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM));
hit.writeTo(out);
} else {
out.writeBoolean(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class InternalSearchHitTests extends ESTestCase {
Expand Down Expand Up @@ -63,19 +64,15 @@ public void testSerializeShardTarget() throws Exception {

InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[]{hit1, hit2}, 2, 1f);

InternalSearchHits.StreamContext context = new InternalSearchHits.StreamContext();
context.streamShardTarget(InternalSearchHits.StreamContext.ShardTargetType.STREAM);
BytesStreamOutput output = new BytesStreamOutput();
hits.writeTo(output, context);
hits.writeTo(output);
InputStream input = output.bytes().streamInput();
context = new InternalSearchHits.StreamContext();
context.streamShardTarget(InternalSearchHits.StreamContext.ShardTargetType.STREAM);
InternalSearchHits results = InternalSearchHits.readSearchHits(new InputStreamStreamInput(input), context);
InternalSearchHits results = InternalSearchHits.readSearchHits(new InputStreamStreamInput(input));
assertThat(results.getAt(0).shard(), equalTo(target));
assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).shard(), nullValue());
assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).getInnerHits().get("1").getAt(0).shard(), nullValue());
assertThat(results.getAt(0).getInnerHits().get("1").getAt(1).shard(), nullValue());
assertThat(results.getAt(0).getInnerHits().get("2").getAt(0).shard(), nullValue());
assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).shard(), notNullValue());
assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).getInnerHits().get("1").getAt(0).shard(), notNullValue());
assertThat(results.getAt(0).getInnerHits().get("1").getAt(1).shard(), notNullValue());
assertThat(results.getAt(0).getInnerHits().get("2").getAt(0).shard(), notNullValue());
assertThat(results.getAt(1).shard(), equalTo(target));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ public void testIssue8226() {
}
}

@LuceneTestCase.BadApple(bugUrl = "simon is working on this")
public void testIssue6614() throws ExecutionException, InterruptedException {
List<IndexRequestBuilder> builders = new ArrayList<>();
boolean strictTimeBasedIndices = randomBoolean();
Expand Down