Skip to content

Commit

Permalink
use cache for partition values at fetch projector
Browse files Browse the repository at this point in the history
  • Loading branch information
seut committed Mar 23, 2015
1 parent 54056ec commit ffeeb7d
Showing 1 changed file with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ public class FetchProjector implements Projector, RowDownstreamHandle {
private final List<String> executionNodes;
private final int numNodes;
private final AtomicInteger remainingRequests = new AtomicInteger(0);
private final Map<String, Object[]> partitionValuesCache = new HashMap<>();
private final Object partitionValuesCacheLock = new Object();

private long inputCursor = 0;
private boolean consumedRows = false;
Expand Down Expand Up @@ -220,10 +222,17 @@ public void fail(Throwable throwable) {
@Nullable
private Row partitionedByRow(String index) {
if (!partitionedBy.isEmpty() && PartitionName.isPartition(index)) {
List<BytesRef> partitionRawValues = PartitionName.fromStringSafe(index).values();
Object[] partitionValues = new Object[partitionRawValues.size()];
for (int i = 0; i < partitionRawValues.size(); i++) {
partitionValues[i] = partitionedBy.get(i).type().value(partitionRawValues.get(i));
Object[] partitionValues;
synchronized (partitionValuesCacheLock) {
partitionValues = partitionValuesCache.get(index);
if (partitionValues == null) {
List<BytesRef> partitionRawValues = PartitionName.fromStringSafe(index).values();
partitionValues = new Object[partitionRawValues.size()];
for (int i = 0; i < partitionRawValues.size(); i++) {
partitionValues[i] = partitionedBy.get(i).type().value(partitionRawValues.get(i));
}
partitionValuesCache.put(index, partitionValues);
}
}
return new RowN(partitionValues);
}
Expand Down

0 comments on commit ffeeb7d

Please sign in to comment.