Skip to content

Commit

Permalink
implement transport action for fetch phase only
Browse files Browse the repository at this point in the history
  • Loading branch information
seut committed Mar 5, 2015
1 parent aa902d9 commit f9f22a8
Show file tree
Hide file tree
Showing 19 changed files with 1,085 additions and 36 deletions.
133 changes: 133 additions & 0 deletions sql/src/main/java/io/crate/executor/transport/NodeFetchRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.executor.transport;

import com.carrotsearch.hppc.IntArrayList;
import com.carrotsearch.hppc.cursors.IntCursor;
import io.crate.Streamer;
import io.crate.planner.symbol.Symbol;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportRequest;

import java.io.IOException;
import java.util.*;

public class NodeFetchRequest extends TransportRequest {

private UUID jobId;
private Map<Integer, IntArrayList> jobSearchContextDocIds = new HashMap<>();
private List<Symbol> toFetchSymbols;
private boolean closeContext = true;

public NodeFetchRequest() {
}

public void jobId(UUID jobId) {
this.jobId = jobId;
}

public UUID jobId() {
return jobId;
}

public void addDocId(int jobSearchContextId, int docId) {
IntArrayList docIds = jobSearchContextDocIds.get(jobSearchContextId);
if (docIds == null) {
docIds = new IntArrayList();
jobSearchContextDocIds.put(jobSearchContextId, docIds);
}
docIds.add(docId);
}

public Map<Integer, IntArrayList> jobSearchContextDocIds() {
return jobSearchContextDocIds;
}

public void toFetchSymbols(List<Symbol> toFetchSymbols) {
this.toFetchSymbols = toFetchSymbols;
}

public List<Symbol> toFetchSymbols() {
return toFetchSymbols;
}

public void closeContext(boolean closeContext) {
this.closeContext = closeContext;
}

public boolean closeContext() {
return closeContext;
}

public List<Streamer<?>> outputStreamer() {
List<Streamer<?>> streamers = new ArrayList<>(toFetchSymbols.size());
for (Symbol symbol : toFetchSymbols) {
streamers.add(symbol.valueType().streamer());
}
return streamers;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
jobId = new UUID(in.readLong(), in.readLong());
int mapSize = in.readVInt();
jobSearchContextDocIds = new HashMap<>(mapSize);
for (int i = 0; i < mapSize; i++) {
Integer jobSearchContextId = in.readVInt();
int docIdsSize = in.readVInt();
IntArrayList docIds = new IntArrayList(docIdsSize);
for (int j = 0; j < docIdsSize; j++) {
docIds.add(in.readVInt());
}
jobSearchContextDocIds.put(jobSearchContextId, docIds);
}
int symbolsSize = in.readVInt();
toFetchSymbols = new ArrayList<>(symbolsSize);
for (int i = 0; i < symbolsSize; i++) {
toFetchSymbols.add(Symbol.fromStream(in));
}
closeContext = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(jobId.getMostSignificantBits());
out.writeLong(jobId.getLeastSignificantBits());
out.writeVInt(jobSearchContextDocIds.size());
for (Map.Entry<Integer, IntArrayList> entry : jobSearchContextDocIds.entrySet()) {
out.writeVInt(entry.getKey());
out.writeVInt(entry.getValue().size());
for (IntCursor cursor : entry.getValue()) {
out.writeVInt(cursor.value);
}
}
out.writeVInt(toFetchSymbols.size());
for (Symbol symbol : toFetchSymbols) {
Symbol.toStream(symbol, out);
}
out.writeBoolean(closeContext);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.executor.transport;

import io.crate.Streamer;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.transport.TransportResponse;

import java.io.IOException;

public class NodeFetchResponse extends TransportResponse {

private Object[][] rows;
private final Streamer<?>[] streamers;


public NodeFetchResponse(Streamer<?>[] streamers) {
this.streamers = streamers;
}

public void rows(Object[][] rows) {
this.rows = rows;
}

public Object[][] rows() {
return rows;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
rows = new Object[in.readVInt()][];
for (int r = 0; r < rows.length; r++) {
rows[r] = new Object[streamers.length];
for (int c = 0; c < rows[r].length; c++) {
rows[r][c] = streamers[c].readValueFrom(in);
}
}
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(rows.length);
for (Object[] row : rows) {
for (int c = 0; c < streamers.length; c++) {
streamers[c].writeValueTo(out, row[c]);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ protected void configure() {
bind(TransportQueryShardAction.class).asEagerSingleton();
bind(SymbolBasedTransportShardUpsertAction.class).asEagerSingleton();
bind(TransportShardUpsertAction.class).asEagerSingleton();
bind(TransportFetchNodeAction.class).asEagerSingleton();

bind(CrateResultSorter.class).asEagerSingleton();

Expand Down
Loading

0 comments on commit f9f22a8

Please sign in to comment.