Skip to content

Commit

Permalink
define fetch position as int instead of long
Browse files Browse the repository at this point in the history
  • Loading branch information
seut committed Mar 30, 2015
1 parent 60fa4e0 commit fba1d3c
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void setNextReader(AtomicReaderContext context) throws IOException {
}
}

private void fetch(long position, int doc) throws Exception {
private void fetch(int position, int doc) throws Exception {
if (ramAccountingContext != null && ramAccountingContext.trippedBreaker()) {
// stop fetching because breaker limit was reached
throw new UnexpectedFetchTerminatedException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class NodeFetchOperation {
private final ThreadPoolExecutor executor;
private final int poolSize;

private long inputCursor = 0;
private int inputCursor = 0;

private static final ESLogger LOGGER = Loggers.getLogger(NodeFetchOperation.class);

Expand Down Expand Up @@ -213,10 +213,10 @@ private void doFetch(ShardCollectFuture result, LuceneDocFetcher fetcher,

static class ShardDocIdsBucket {

private final LongArrayList positions = new LongArrayList();
private final IntArrayList positions = new IntArrayList();
private final IntArrayList docIds = new IntArrayList();

public void add(long position, int docId) {
public void add(int position, int docId) {
positions.add(position);
docIds.add(docId);
}
Expand All @@ -229,7 +229,7 @@ public int size() {
return docIds.size();
}

public long position(int idx) {
public int position(int idx) {
return positions.get(idx);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import java.util.concurrent.atomic.AtomicInteger;

/**
* Merge multiple upstream buckets, whereas every row is ordered by a positional unique long.
* Merge multiple upstream buckets, whereas every row is ordered by a positional unique integer.
* Emits rows as soon as possible. Buckets from one upstream can be consumed in an undefined
* order. The main purpose of this implementation is merging ordered node responses.
*/
Expand All @@ -42,8 +42,8 @@ public class PositionalBucketMerger implements RowUpstream {
private final AtomicInteger upstreamsRemaining = new AtomicInteger(0);
private final int orderingColumnIndex;
private final UpstreamBucket[] remainingBuckets;
private volatile long outputCursor = 0;
private volatile long leastBucketCursor = -1;
private volatile int outputCursor = 0;
private volatile int leastBucketCursor = -1;
private volatile int leastBucketId = -1;
private final AtomicBoolean consumeBuckets = new AtomicBoolean(true);

Expand Down Expand Up @@ -87,7 +87,7 @@ public void mergeBucket(List<Row> newBucket, int upstreamId) {
int idx = 0;
while(bucketIt.hasNext()) {
Row row = bucketIt.next();
int compare = Long.compare((Long) row.get(orderingColumnIndex), (Long) newFirstRow.get(orderingColumnIndex));
int compare = Integer.compare((int) row.get(orderingColumnIndex), (int) newFirstRow.get(orderingColumnIndex));
if (compare == 1) {
remainingBucket.addAll(idx, newBucket);
return;
Expand Down Expand Up @@ -126,7 +126,7 @@ private void findLeastBucketIt() {
}
try {
Row row = bucketIt.getFirst();
Long orderingValue = (Long)row.get(orderingColumnIndex);
int orderingValue = (int)row.get(orderingColumnIndex);
if (orderingValue == outputCursor) {
leastBucketCursor = orderingValue;
leastBucketId = i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import io.crate.core.collections.Row;

public class PositionalRowDelegate implements Row {
private final long position;
private Object[] cells;
private final int position;
private final Object[] cells;

public PositionalRowDelegate(Row delegate, long position) {
public PositionalRowDelegate(Row delegate, int position) {
this.cells = delegate.materialize();
this.position = position;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public class PositionalRowMerger implements Projector, RowDownstreamHandle {
private final AtomicInteger upstreamsRemaining = new AtomicInteger(0);
private final List<UpstreamBuffer> upstreamBuffers = new ArrayList<>();
private final int orderingColumnIndex;
private volatile long outputCursor = 0;
private volatile long leastUpstreamBufferCursor = -1;
private volatile int outputCursor = 0;
private volatile int leastUpstreamBufferCursor = -1;
private volatile int leastUpstreamBufferId = -1;
private final AtomicBoolean consumeRows = new AtomicBoolean(true);

Expand Down Expand Up @@ -86,7 +86,7 @@ private void findLeastUpstreamBufferId() {
}
try {
Row row = upstreamBuffer.first();
Long orderingValue = (Long)row.get(orderingColumnIndex);
int orderingValue = (int)row.get(orderingColumnIndex);
if (orderingValue == outputCursor) {
leastUpstreamBufferCursor = orderingValue;
leastUpstreamBufferId = i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

package io.crate.operation.projectors;

import com.carrotsearch.hppc.IntArrayList;
import com.carrotsearch.hppc.IntObjectOpenHashMap;
import com.carrotsearch.hppc.LongArrayList;
import io.crate.core.collections.Row;
Expand Down Expand Up @@ -78,7 +79,7 @@ public class FetchProjector implements Projector, RowDownstreamHandle {
private final Map<String, Object[]> partitionValuesCache = new HashMap<>();
private final Object partitionValuesCacheLock = new Object();

private long inputCursor = 0;
private int inputCursor = 0;
private boolean consumedRows = false;

private static final ESLogger LOGGER = Loggers.getLogger(FetchProjector.class);
Expand Down Expand Up @@ -319,7 +320,7 @@ private static class NodeBucket {
private final String nodeId;
private final String index;
private final List<Row> inputRows = new ArrayList<>();
private final LongArrayList cursors = new LongArrayList();
private final IntArrayList cursors = new IntArrayList();
private final LongArrayList docIds = new LongArrayList();

public NodeBucket(String nodeId, String index, int nodeIdx) {
Expand All @@ -328,7 +329,7 @@ public NodeBucket(String nodeId, String index, int nodeIdx) {
this.nodeIdx = nodeIdx;
}

public void add(long cursor, Long docId, Row row) {
public void add(int cursor, Long docId, Row row) {
cursors.add(cursor);
docIds.add(docId);
inputRows.add(new RowN(row.materialize()));
Expand All @@ -342,7 +343,7 @@ public LongArrayList docIds() {
return docIds;
}

public long cursor(int index) {
public int cursor(int index) {
return cursors.get(index);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ public void testConcurrentSetNextBucket() throws Exception {

final List<List<List<Object[]>>> bucketsPerUpstream = new ArrayList<>(numUpstreams);
List<List<Object[]>> upstream1 = new ArrayList<>(2);
upstream1.add(ImmutableList.of(new Object[]{4L}, new Object[]{6L}, new Object[]{7L}));
upstream1.add(ImmutableList.of(new Object[]{0L}, new Object[]{1L}, new Object[]{3L}));
upstream1.add(ImmutableList.of(new Object[]{4}, new Object[]{6}, new Object[]{7}));
upstream1.add(ImmutableList.of(new Object[]{0}, new Object[]{1}, new Object[]{3}));
bucketsPerUpstream.add(upstream1);

List<List<Object[]>> upstream2 = new ArrayList<>(2);
upstream2.add(ImmutableList.of(new Object[]{2L}, new Object[]{5L}));
upstream2.add(ImmutableList.of(new Object[]{8L}, new Object[]{9L}));
upstream2.add(ImmutableList.of(new Object[]{2}, new Object[]{5}));
upstream2.add(ImmutableList.of(new Object[]{8}, new Object[]{9}));
bucketsPerUpstream.add(upstream2);

final List<Throwable> setNextRowExceptions = new ArrayList<>();
Expand All @@ -84,7 +84,7 @@ public void testConcurrentSetNextBucket() throws Exception {
public void run() {
List<Row> rows1 = new ArrayList<>();
for (Object[] row : bucket) {
rows1.add(new PositionalRowDelegate(new RowN(row), (long) row[0]));
rows1.add(new PositionalRowDelegate(new RowN(row), (int) row[0]));
}
try {
bucketMerger.setNextBucket(rows1, upstreamId);
Expand Down Expand Up @@ -120,7 +120,7 @@ public void onFailure(Throwable t) {
assertThat(result.size(), is(10));
Iterator<Row> it = result.iterator();
for (int i = 0; i < 10; i++) {
assertThat((long) it.next().get(0), is((long) i));
assertThat((int) it.next().get(0), is(i));
}

executorService.awaitTermination(1, TimeUnit.SECONDS);
Expand All @@ -135,13 +135,13 @@ public void testOneUpstreamWillFail() throws Exception {

final List<List<List<Object[]>>> bucketsPerUpstream = new ArrayList<>(numUpstreams);
List<List<Object[]>> upstream1 = new ArrayList<>(2);
upstream1.add(ImmutableList.of(new Object[]{4L}, new Object[]{6L}, new Object[]{7L}));
upstream1.add(ImmutableList.of(new Object[]{0L}, new Object[]{1L}, new Object[]{3L}));
upstream1.add(ImmutableList.of(new Object[]{4}, new Object[]{6}, new Object[]{7}));
upstream1.add(ImmutableList.of(new Object[]{0}, new Object[]{1}, new Object[]{3}));
bucketsPerUpstream.add(upstream1);

List<List<Object[]>> upstream2 = new ArrayList<>(2);
upstream2.add(ImmutableList.of(new Object[]{2L}, new Object[]{5L}));
upstream2.add(ImmutableList.of(new Object[]{8L}, new Object[]{9L}));
upstream2.add(ImmutableList.of(new Object[]{2}, new Object[]{5}));
upstream2.add(ImmutableList.of(new Object[]{8}, new Object[]{9}));
bucketsPerUpstream.add(upstream2);

final List<Throwable> setNextRowExceptions = new ArrayList<>();
Expand Down Expand Up @@ -170,7 +170,7 @@ public void testOneUpstreamWillFail() throws Exception {
public void run() {
List<Row> rows1 = new ArrayList<>();
for (Object[] row : bucket) {
rows1.add(new PositionalRowDelegate(new RowN(row), (long) row[0]));
rows1.add(new PositionalRowDelegate(new RowN(row), (int) row[0]));
}
try {
bucketMerger.setNextBucket(rows1, upstreamId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,23 +54,29 @@ public void testConcurrentSetNextRow() throws Exception {
final PositionalRowMerger rowMerger = new PositionalRowMerger(resultProvider, 1);

final List<List<Object[]>> rowsPerUpstream = new ArrayList<>(numUpstreams);
rowsPerUpstream.add(ImmutableList.of(new Object[]{0L}, new Object[]{2L}, new Object[]{6L}));
rowsPerUpstream.add(ImmutableList.of(new Object[]{1L}, new Object[]{4L}, new Object[]{7L}));
rowsPerUpstream.add(ImmutableList.of(new Object[]{3L}, new Object[]{5L}, new Object[]{8L}, new Object[]{9L}));
rowsPerUpstream.add(ImmutableList.of(new Object[]{0}, new Object[]{2}, new Object[]{6}));
rowsPerUpstream.add(ImmutableList.of(new Object[]{1}, new Object[]{4}, new Object[]{7}));
rowsPerUpstream.add(ImmutableList.of(new Object[]{3}, new Object[]{5}, new Object[]{8}, new Object[]{9}));

final List<Throwable> setNextRowExceptions = new ArrayList<>();
final CountDownLatch latch = new CountDownLatch(numUpstreams);
final ExecutorService executorService = Executors.newScheduledThreadPool(numUpstreams);

final List<RowDownstreamHandle> downstreamHandles = new ArrayList<>(numUpstreams);
// register upstreams
for (int i = 0; i < numUpstreams; i++) {
downstreamHandles.add(rowMerger.registerUpstream(null));
}
for (int i = 0; i < numUpstreams; i++) {
final int upstreamId = i;
final RowDownstreamHandle upstreamBuffer = rowMerger.registerUpstream(null);
final RowDownstreamHandle upstreamBuffer = downstreamHandles.get(i);
executorService.execute(new Runnable() {
@Override
public void run() {
List<Object[]> rows = rowsPerUpstream.get(upstreamId);
for (Object[] row : rows) {
try {
upstreamBuffer.setNextRow(new PositionalRowDelegate(new RowN(row), (long)row[0]));
upstreamBuffer.setNextRow(new PositionalRowDelegate(new RowN(row), (int)row[0]));
} catch (Exception e) {
setNextRowExceptions.add(e);
}
Expand Down Expand Up @@ -102,7 +108,7 @@ public void onFailure(Throwable t) {
assertThat(result.size(), is(10));
Iterator<Row> it = result.iterator();
for (int i = 0; i < 10; i++) {
assertThat((long) it.next().get(0), is((long) i));
assertThat((int) it.next().get(0), is(i));
}

executorService.awaitTermination(1, TimeUnit.SECONDS);
Expand All @@ -116,8 +122,8 @@ public void testOneUpstreamFail() throws Exception {
final PositionalRowMerger rowMerger = new PositionalRowMerger(resultProvider, 1);

final List<List<Object[]>> rowsPerUpstream = new ArrayList<>(numUpstreams);
rowsPerUpstream.add(ImmutableList.of(new Object[]{0L}, new Object[]{2L}));
rowsPerUpstream.add(ImmutableList.of(new Object[]{1L}));
rowsPerUpstream.add(ImmutableList.of(new Object[]{0}, new Object[]{2}));
rowsPerUpstream.add(ImmutableList.of(new Object[]{1}));
rowsPerUpstream.add(ImmutableList.<Object[]>of());

final List<Throwable> setNextRowExceptions = new ArrayList<>();
Expand All @@ -142,7 +148,7 @@ public void run() {
List<Object[]> rows = rowsPerUpstream.get(upstreamId);
for (Object[] row : rows) {
try {
upstreamBuffer.setNextRow(new PositionalRowDelegate(new RowN(row), (long) row[0]));
upstreamBuffer.setNextRow(new PositionalRowDelegate(new RowN(row), (int) row[0]));
} catch (Exception e) {
setNextRowExceptions.add(e);
}
Expand Down

0 comments on commit fba1d3c

Please sign in to comment.