Skip to content

Commit

Permalink
Remove poor-mans compression in InternalSearchHit and friends (#20472)
Browse files Browse the repository at this point in the history
We still use some crazy poor mans compression in InternalSearchHit that
uses a thread local and an unordered map as a lookup table if requested.
Stuff like this should be handled by compression on the transport layer
rather than in-line in the serialization code. This code is complex enough.
  • Loading branch information
s1monw committed Sep 14, 2016
1 parent 6c7da90 commit 4f65372
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 155 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,13 @@

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.transport.TransportResponse;

import java.io.IOException;

import static org.elasticsearch.search.internal.InternalSearchHits.StreamContext;

/**
*
*/
Expand Down Expand Up @@ -70,9 +69,17 @@ public void shardTarget(SearchShardTarget shardTarget) {
}

public void hits(InternalSearchHits hits) {
assert assertNoSearchTarget(hits);
this.hits = hits;
}

private boolean assertNoSearchTarget(InternalSearchHits hits) {
for (SearchHit hit : hits.hits()) {
assert hit.getShard() == null : "expected null but got: " + hit.getShard();
}
return true;
}

public InternalSearchHits hits() {
return hits;
}
Expand All @@ -96,13 +103,13 @@ public static FetchSearchResult readFetchSearchResult(StreamInput in) throws IOE
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readLong();
hits = InternalSearchHits.readSearchHits(in, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM));
hits = InternalSearchHits.readSearchHits(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
hits.writeTo(out, InternalSearchHits.streamContext().streamShardTarget(StreamContext.ShardTargetType.NO_STREAM));
hits.writeTo(out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.fetch.subphase.highlight.HighlightField;
import org.elasticsearch.search.internal.InternalSearchHits.StreamContext.ShardTargetType;
import org.elasticsearch.search.lookup.SourceLookup;

import java.io.IOException;
Expand Down Expand Up @@ -554,18 +553,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

public static InternalSearchHit readSearchHit(StreamInput in, InternalSearchHits.StreamContext context) throws IOException {
public static InternalSearchHit readSearchHit(StreamInput in) throws IOException {
InternalSearchHit hit = new InternalSearchHit();
hit.readFrom(in, context);
hit.readFrom(in);
return hit;
}

@Override
public void readFrom(StreamInput in) throws IOException {
readFrom(in, InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM));
}

public void readFrom(StreamInput in, InternalSearchHits.StreamContext context) throws IOException {
score = in.readFloat();
id = in.readOptionalText();
type = in.readOptionalText();
Expand Down Expand Up @@ -644,37 +639,20 @@ public void readFrom(StreamInput in, InternalSearchHits.StreamContext context) t
matchedQueries[i] = in.readString();
}
}

if (context.streamShardTarget() == ShardTargetType.STREAM) {
if (in.readBoolean()) {
shard = new SearchShardTarget(in);
}
} else if (context.streamShardTarget() == ShardTargetType.LOOKUP) {
int lookupId = in.readVInt();
if (lookupId > 0) {
shard = context.handleShardLookup().get(lookupId);
}
}

shard = in.readOptionalWriteable(SearchShardTarget::new);
size = in.readVInt();
if (size > 0) {
innerHits = new HashMap<>(size);
for (int i = 0; i < size; i++) {
String key = in.readString();
ShardTargetType shardTarget = context.streamShardTarget();
InternalSearchHits value = InternalSearchHits.readSearchHits(in, context.streamShardTarget(ShardTargetType.NO_STREAM));
context.streamShardTarget(shardTarget);
InternalSearchHits value = InternalSearchHits.readSearchHits(in);
innerHits.put(key, value);
}
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
writeTo(out, InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM));
}

public void writeTo(StreamOutput out, InternalSearchHits.StreamContext context) throws IOException {
out.writeFloat(score);
out.writeOptionalText(id);
out.writeOptionalText(type);
Expand Down Expand Up @@ -752,31 +730,14 @@ public void writeTo(StreamOutput out, InternalSearchHits.StreamContext context)
out.writeString(matchedFilter);
}
}

if (context.streamShardTarget() == ShardTargetType.STREAM) {
if (shard == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
shard.writeTo(out);
}
} else if (context.streamShardTarget() == ShardTargetType.LOOKUP) {
if (shard == null) {
out.writeVInt(0);
} else {
out.writeVInt(context.shardHandleLookup().get(shard));
}
}

out.writeOptionalWriteable(shard);
if (innerHits == null) {
out.writeVInt(0);
} else {
out.writeVInt(innerHits.size());
for (Map.Entry<String, InternalSearchHits> entry : innerHits.entrySet()) {
out.writeString(entry.getKey());
ShardTargetType shardTarget = context.streamShardTarget();
entry.getValue().writeTo(out, context.streamShardTarget(ShardTargetType.NO_STREAM));
context.streamShardTarget(shardTarget);
entry.getValue().writeTo(out);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,54 +40,6 @@
*/
public class InternalSearchHits implements SearchHits {

public static class StreamContext {

public static enum ShardTargetType {
STREAM,
LOOKUP,
NO_STREAM
}

private IdentityHashMap<SearchShardTarget, Integer> shardHandleLookup = new IdentityHashMap<>();
private IntObjectHashMap<SearchShardTarget> handleShardLookup = new IntObjectHashMap<>();
private ShardTargetType streamShardTarget = ShardTargetType.STREAM;

public StreamContext reset() {
shardHandleLookup.clear();
handleShardLookup.clear();
streamShardTarget = ShardTargetType.STREAM;
return this;
}

public IdentityHashMap<SearchShardTarget, Integer> shardHandleLookup() {
return shardHandleLookup;
}

public IntObjectHashMap<SearchShardTarget> handleShardLookup() {
return handleShardLookup;
}

public ShardTargetType streamShardTarget() {
return streamShardTarget;
}

public StreamContext streamShardTarget(ShardTargetType streamShardTarget) {
this.streamShardTarget = streamShardTarget;
return this;
}
}

private static final ThreadLocal<StreamContext> cache = new ThreadLocal<StreamContext>() {
@Override
protected StreamContext initialValue() {
return new StreamContext();
}
};

public static StreamContext streamContext() {
return cache.get().reset();
}

public static InternalSearchHits empty() {
// We shouldn't use static final instance, since that could directly be returned by native transport clients
return new InternalSearchHits(EMPTY, 0, 0);
Expand Down Expand Up @@ -186,11 +138,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

public static InternalSearchHits readSearchHits(StreamInput in, StreamContext context) throws IOException {
InternalSearchHits hits = new InternalSearchHits();
hits.readFrom(in, context);
return hits;
}

public static InternalSearchHits readSearchHits(StreamInput in) throws IOException {
InternalSearchHits hits = new InternalSearchHits();
Expand All @@ -200,63 +147,27 @@ public static InternalSearchHits readSearchHits(StreamInput in) throws IOExcepti

@Override
public void readFrom(StreamInput in) throws IOException {
readFrom(in, streamContext().streamShardTarget(StreamContext.ShardTargetType.LOOKUP));
}

public void readFrom(StreamInput in, StreamContext context) throws IOException {
totalHits = in.readVLong();
maxScore = in.readFloat();
int size = in.readVInt();
if (size == 0) {
hits = EMPTY;
} else {
if (context.streamShardTarget() == StreamContext.ShardTargetType.LOOKUP) {
// read the lookup table first
int lookupSize = in.readVInt();
for (int i = 0; i < lookupSize; i++) {
context.handleShardLookup().put(in.readVInt(), new SearchShardTarget(in));
}
}

hits = new InternalSearchHit[size];
for (int i = 0; i < hits.length; i++) {
hits[i] = readSearchHit(in, context);
hits[i] = readSearchHit(in);
}
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
writeTo(out, streamContext().streamShardTarget(StreamContext.ShardTargetType.LOOKUP));
}

public void writeTo(StreamOutput out, StreamContext context) throws IOException {
out.writeVLong(totalHits);
out.writeFloat(maxScore);
out.writeVInt(hits.length);
if (hits.length > 0) {
if (context.streamShardTarget() == StreamContext.ShardTargetType.LOOKUP) {
// start from 1, 0 is for null!
int counter = 1;
for (InternalSearchHit hit : hits) {
if (hit.shard() != null) {
Integer handle = context.shardHandleLookup().get(hit.shard());
if (handle == null) {
context.shardHandleLookup().put(hit.shard(), counter++);
}
}
}
out.writeVInt(context.shardHandleLookup().size());
if (!context.shardHandleLookup().isEmpty()) {
for (Map.Entry<SearchShardTarget, Integer> entry : context.shardHandleLookup().entrySet()) {
out.writeVInt(entry.getValue());
entry.getKey().writeTo(out);
}
}
}

for (InternalSearchHit hit : hits) {
hit.writeTo(out, context);
hit.writeTo(out);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchHits.StreamContext.ShardTargetType;
import org.elasticsearch.search.suggest.Suggest;

import java.io.IOException;
Expand Down Expand Up @@ -261,8 +260,7 @@ public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
this.doc = Lucene.readScoreDoc(in);
if (in.readBoolean()) {
this.hit = InternalSearchHit.readSearchHit(in,
InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM));
this.hit = InternalSearchHit.readSearchHit(in);
}
int contextSize = in.readInt();
this.contexts = new LinkedHashMap<>(contextSize);
Expand All @@ -283,7 +281,7 @@ public void writeTo(StreamOutput out) throws IOException {
Lucene.writeScoreDoc(out, doc);
if (hit != null) {
out.writeBoolean(true);
hit.writeTo(out, InternalSearchHits.streamContext().streamShardTarget(ShardTargetType.STREAM));
hit.writeTo(out);
} else {
out.writeBoolean(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class InternalSearchHitTests extends ESTestCase {
Expand Down Expand Up @@ -63,19 +64,15 @@ public void testSerializeShardTarget() throws Exception {

InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[]{hit1, hit2}, 2, 1f);

InternalSearchHits.StreamContext context = new InternalSearchHits.StreamContext();
context.streamShardTarget(InternalSearchHits.StreamContext.ShardTargetType.STREAM);
BytesStreamOutput output = new BytesStreamOutput();
hits.writeTo(output, context);
hits.writeTo(output);
InputStream input = output.bytes().streamInput();
context = new InternalSearchHits.StreamContext();
context.streamShardTarget(InternalSearchHits.StreamContext.ShardTargetType.STREAM);
InternalSearchHits results = InternalSearchHits.readSearchHits(new InputStreamStreamInput(input), context);
InternalSearchHits results = InternalSearchHits.readSearchHits(new InputStreamStreamInput(input));
assertThat(results.getAt(0).shard(), equalTo(target));
assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).shard(), nullValue());
assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).getInnerHits().get("1").getAt(0).shard(), nullValue());
assertThat(results.getAt(0).getInnerHits().get("1").getAt(1).shard(), nullValue());
assertThat(results.getAt(0).getInnerHits().get("2").getAt(0).shard(), nullValue());
assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).shard(), notNullValue());
assertThat(results.getAt(0).getInnerHits().get("1").getAt(0).getInnerHits().get("1").getAt(0).shard(), notNullValue());
assertThat(results.getAt(0).getInnerHits().get("1").getAt(1).shard(), notNullValue());
assertThat(results.getAt(0).getInnerHits().get("2").getAt(0).shard(), notNullValue());
assertThat(results.getAt(1).shard(), equalTo(target));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ public void testIssue8226() {
}
}

@LuceneTestCase.BadApple(bugUrl = "simon is working on this")
public void testIssue6614() throws ExecutionException, InterruptedException {
List<IndexRequestBuilder> builders = new ArrayList<>();
boolean strictTimeBasedIndices = randomBoolean();
Expand Down

0 comments on commit 4f65372

Please sign in to comment.