Skip to content

Commit

Permalink
PHOENIX-5120 Avoid using MappedByteBuffers for server side sorting.
Browse files Browse the repository at this point in the history
  • Loading branch information
lhofhansl committed Feb 5, 2019
1 parent 3cb1d32 commit 4f396c6
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 130 deletions.
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end;

import java.util.Map;

import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.BeforeClass;

import com.google.common.collect.Maps;

public class OrderByWithSpillingIT extends OrderByIT {
@BeforeClass
public static void doSetup() throws Exception {
Map<String, String> props = Maps.newHashMapWithExpectedSize(1);
// do lot's of spooling!
props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1));
setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator()));
}
}
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
import static org.apache.phoenix.util.NumberUtil.add; import static org.apache.phoenix.util.NumberUtil.add;
import static org.apache.phoenix.util.NumberUtil.getMin; import static org.apache.phoenix.util.NumberUtil.getMin;


import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.MappedByteBuffer;
import java.sql.ParameterMetaData; import java.sql.ParameterMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Collections; import java.util.Collections;
Expand Down Expand Up @@ -50,7 +51,7 @@
import org.apache.phoenix.execute.visitor.QueryPlanVisitor; import org.apache.phoenix.execute.visitor.QueryPlanVisitor;
import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.iterate.DefaultParallelScanGrouper; import org.apache.phoenix.iterate.DefaultParallelScanGrouper;
import org.apache.phoenix.iterate.MappedByteBufferQueue; import org.apache.phoenix.iterate.BufferedQueue;
import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ParallelScanGrouper;
import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.jdbc.PhoenixParameterMetaData; import org.apache.phoenix.jdbc.PhoenixParameterMetaData;
Expand Down Expand Up @@ -293,7 +294,7 @@ private class BasicJoinIterator implements ResultIterator {
private ValueBitSet lhsBitSet; private ValueBitSet lhsBitSet;
private ValueBitSet rhsBitSet; private ValueBitSet rhsBitSet;
private byte[] emptyProjectedValue; private byte[] emptyProjectedValue;
private MappedByteBufferTupleQueue queue; private BufferedTupleQueue queue;
private Iterator<Tuple> queueIterator; private Iterator<Tuple> queueIterator;


public BasicJoinIterator(ResultIterator lhsIterator, ResultIterator rhsIterator) { public BasicJoinIterator(ResultIterator lhsIterator, ResultIterator rhsIterator) {
Expand All @@ -315,7 +316,7 @@ public BasicJoinIterator(ResultIterator lhsIterator, ResultIterator rhsIterator)
int len = lhsBitSet.getEstimatedLength(); int len = lhsBitSet.getEstimatedLength();
this.emptyProjectedValue = new byte[len]; this.emptyProjectedValue = new byte[len];
lhsBitSet.toBytes(emptyProjectedValue, 0); lhsBitSet.toBytes(emptyProjectedValue, 0);
this.queue = new MappedByteBufferTupleQueue(thresholdBytes); this.queue = new BufferedTupleQueue(thresholdBytes);
this.queueIterator = null; this.queueIterator = null;
} }


Expand Down Expand Up @@ -609,24 +610,24 @@ public int compareTo(JoinKey other) {
} }
} }


private static class MappedByteBufferTupleQueue extends MappedByteBufferQueue<Tuple> { private static class BufferedTupleQueue extends BufferedQueue<Tuple> {


public MappedByteBufferTupleQueue(int thresholdBytes) { public BufferedTupleQueue(int thresholdBytes) {
super(thresholdBytes); super(thresholdBytes);
} }


@Override @Override
protected MappedByteBufferSegmentQueue<Tuple> createSegmentQueue( protected BufferedSegmentQueue<Tuple> createSegmentQueue(
int index, int thresholdBytes) { int index, int thresholdBytes) {
return new MappedByteBufferTupleSegmentQueue(index, thresholdBytes, false); return new BufferedTupleSegmentQueue(index, thresholdBytes, false);
} }


@Override @Override
protected Comparator<MappedByteBufferSegmentQueue<Tuple>> getSegmentQueueComparator() { protected Comparator<BufferedSegmentQueue<Tuple>> getSegmentQueueComparator() {
return new Comparator<MappedByteBufferSegmentQueue<Tuple>>() { return new Comparator<BufferedSegmentQueue<Tuple>>() {
@Override @Override
public int compare(MappedByteBufferSegmentQueue<Tuple> q1, public int compare(BufferedSegmentQueue<Tuple> q1,
MappedByteBufferSegmentQueue<Tuple> q2) { BufferedSegmentQueue<Tuple> q2) {
return q1.index() - q2.index(); return q1.index() - q2.index();
} }
}; };
Expand All @@ -635,7 +636,7 @@ public int compare(MappedByteBufferSegmentQueue<Tuple> q1,
@Override @Override
public Iterator<Tuple> iterator() { public Iterator<Tuple> iterator() {
return new Iterator<Tuple>() { return new Iterator<Tuple>() {
private Iterator<MappedByteBufferSegmentQueue<Tuple>> queueIter; private Iterator<BufferedSegmentQueue<Tuple>> queueIter;
private Iterator<Tuple> currentIter; private Iterator<Tuple> currentIter;
{ {
this.queueIter = getSegmentQueues().iterator(); this.queueIter = getSegmentQueues().iterator();
Expand Down Expand Up @@ -668,10 +669,10 @@ public void remove() {
}; };
} }


private static class MappedByteBufferTupleSegmentQueue extends MappedByteBufferSegmentQueue<Tuple> { private static class BufferedTupleSegmentQueue extends BufferedSegmentQueue<Tuple> {
private LinkedList<Tuple> results; private LinkedList<Tuple> results;


public MappedByteBufferTupleSegmentQueue(int index, public BufferedTupleSegmentQueue(int index,
int thresholdBytes, boolean hasMaxQueueSize) { int thresholdBytes, boolean hasMaxQueueSize) {
super(index, thresholdBytes, hasMaxQueueSize); super(index, thresholdBytes, hasMaxQueueSize);
this.results = Lists.newLinkedList(); this.results = Lists.newLinkedList();
Expand All @@ -688,23 +689,22 @@ protected int sizeOf(Tuple e) {
return Bytes.SIZEOF_INT * 2 + kv.getLength(); return Bytes.SIZEOF_INT * 2 + kv.getLength();
} }


@SuppressWarnings("deprecation")
@Override @Override
protected void writeToBuffer(MappedByteBuffer buffer, Tuple e) { protected void writeToStream(DataOutputStream out, Tuple e) throws IOException {
KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(e.getValue(0)); KeyValue kv = PhoenixKeyValueUtil.maybeCopyCell(e.getValue(0));
buffer.putInt(kv.getLength() + Bytes.SIZEOF_INT); out.writeInt(kv.getLength() + Bytes.SIZEOF_INT);
buffer.putInt(kv.getLength()); out.writeInt(kv.getLength());
buffer.put(kv.getBuffer(), kv.getOffset(), kv.getLength()); out.write(kv.getBuffer(), kv.getOffset(), kv.getLength());
} }


@Override @Override
protected Tuple readFromBuffer(MappedByteBuffer buffer) { protected Tuple readFromStream(DataInputStream in) throws IOException {
int length = buffer.getInt(); int length = in.readInt();
if (length < 0) if (length < 0)
return null; return null;


byte[] b = new byte[length]; byte[] b = new byte[length];
buffer.get(b); in.read(b);
Result result = ResultUtil.toResult(new ImmutableBytesWritable(b)); Result result = ResultUtil.toResult(new ImmutableBytesWritable(b));
return new ResultTuple(result); return new ResultTuple(result);
} }
Expand Down
Loading

0 comments on commit 4f396c6

Please sign in to comment.