Skip to content

Commit

Permalink
implement caching for pageable task results
Browse files Browse the repository at this point in the history
  • Loading branch information
msbt committed Feb 18, 2015
1 parent b2a3f35 commit 057a5cf
Show file tree
Hide file tree
Showing 25 changed files with 705 additions and 439 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import com.google.common.base.Preconditions;

import java.util.Arrays;
import java.util.Locale;

/**
Expand Down Expand Up @@ -50,7 +51,6 @@ public abstract class AbstractMultiArrayBigArray<T, ArrayType> implements Iterab
@SafeVarargs
public AbstractMultiArrayBigArray(long offset, long size, ArrayType ... backingArrays) {
long arraysSize = arraysSize(backingArrays);
Preconditions.checkArgument(backingArrays.length > 0, "empty backing arrays");
Preconditions.checkArgument(offset <= arraysSize, "offset exceeds backing arrays");
this.backingArrays = backingArrays;
this.offset = offset;
Expand Down Expand Up @@ -79,6 +79,7 @@ protected long[] arraysIdx(ArrayType[] arrays, long index) {
int arrayIdx = 0;
long accumulatedSize = 0L;


long curArrayLen;
while (arrayIdx < arrays.length) {
curArrayLen = arrayLength(arrays[arrayIdx]);
Expand All @@ -100,7 +101,7 @@ protected long[] arraysIdx(long index) {

@Override
public T get(long index) {
if (index >= size || index < 0) {
if (index >= size || index < 0 || backingArrays.length == 0) {
throw new ArrayIndexOutOfBoundsException(
String.format(Locale.ENGLISH, "index %d exceeds bounds of backing arrays", index));
}
Expand All @@ -110,7 +111,7 @@ public T get(long index) {

@Override
public T set(long index, T value) {
if (index > size) {
if (index > size || backingArrays.length == 0) {
throw new ArrayIndexOutOfBoundsException(
String.format(Locale.ENGLISH, "index %d exceeds bounds of backing arrays", index));
}
Expand All @@ -122,4 +123,27 @@ public T set(long index, T value) {
public long size() {
return size;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

AbstractMultiArrayBigArray that = (AbstractMultiArrayBigArray) o;

if (offset != that.offset) return false;
if (size != that.size) return false;
if (!Arrays.deepEquals(backingArrays, that.backingArrays))
return false;

return true;
}

@Override
public int hashCode() {
int result = Arrays.deepHashCode(backingArrays);
result = 31 * result + (int) (offset ^ (offset >>> 32));
result = 31 * result + (int) (size ^ (size >>> 32));
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ private boolean switchArray() {
@Override
protected T computeNext() {
T value;
if (backingArraysIdx < 0) {
endOfData();
return null;
}
if (backingArraysIdx < arraysLen && curArrayIdx >= getArrayLength(backingArraysIdx)) {
if (!switchArray()) {
endOfData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,62 @@

package io.crate.core.bigarray;

import com.google.common.base.MoreObjects;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.ObjectArray;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

public class MultiObjectArrayBigArray<T> extends AbstractMultiArrayBigArray<T, ObjectArray<T>> {

public static class Builder<T> {

private Long offset = null;
private Long size = null;
private List<ObjectArray<T>> arrays = new LinkedList<>();

public Builder<T> offset(long offset) {
this.offset = offset;
return this;
}

public Builder<T> size(long size) {
this.size = size;
return this;
}

public Builder<T> add(ObjectArray<T> array) {
this.arrays.add(array);
return this;
}

public Builder<T> useArraysSize() {
offset = 0L;
size = 0L;
for (ObjectArray<?> array : arrays) {
size += array.size();
}
return this;
}

@SuppressWarnings("unchecked")
public MultiObjectArrayBigArray<T> build() {
ObjectArray<T>[] arraysArray = new ObjectArray[arrays.size()];
arraysArray = arrays.toArray(arraysArray);
if (size == null) {
useArraysSize();
}
return new MultiObjectArrayBigArray<>(
MoreObjects.firstNonNull(offset, 0L),
size,
arraysArray
);
}
}

@SafeVarargs
public MultiObjectArrayBigArray(long offset, long size, ObjectArray<T>... backingArrays) {
super(offset, size, backingArrays);
Expand Down Expand Up @@ -69,4 +117,8 @@ public Iterator<T> iterator() {
public void close() throws ElasticsearchException {
Releasables.close(backingArrays);
}

public static <T> Builder<T> builder() {
return new Builder<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -242,4 +242,22 @@ public void testIterator() throws Exception {
assertThat(i, is(20));
}

@Test
public void testEmptyArraysGet() throws Exception {
MultiNativeArrayBigArray<Object[]> bigArray = new MultiNativeArrayBigArray<Object[]>(0, 10);
assertThat(bigArray.size(), is(0L));
assertThat(bigArray.iterator().hasNext(), is(false));
assertThat(bigArray.ramBytesUsed(), is(0L));

expectedException.expect(ArrayIndexOutOfBoundsException.class);
bigArray.get(0L);
}

@Test
public void testEmptyArraysSet() throws Exception {
MultiNativeArrayBigArray<Object[]> bigArray = new MultiNativeArrayBigArray<Object[]>(0, 10);
expectedException.expect(ArrayIndexOutOfBoundsException.class);
bigArray.set(0L, new Object[]{1, 2, 3});
}

}
9 changes: 7 additions & 2 deletions sql/src/main/java/io/crate/executor/PageableTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,15 @@
public interface PageableTask<T extends Closeable> extends Task {

/**
* Start a paged execution.
* Start a paged execution without caching single pages.
*/
public void start(PageInfo pageInfo);

public void fetchNew(PageInfo pageInfo, FutureCallback<TaskResult> callback);
/**
* Start a paged execution with an implementation dependent caching strategy
*/
public void startCached(PageInfo pageInfo);

public void fetchNew(PageInfo pageInfo, T context, FutureCallback<TaskResult> callback);
public void fetchMore(PageInfo pageInfo, T context, FutureCallback<TaskResult> callback);
}
17 changes: 8 additions & 9 deletions sql/src/main/java/io/crate/executor/PageableTaskResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package io.crate.executor;


import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -35,6 +36,7 @@

public class PageableTaskResult<T extends Closeable> implements TaskResult {


private final PageableTask<T> task;
private final PageInfo pageInfo;
private final Page page;
Expand All @@ -49,14 +51,15 @@ public PageableTaskResult(PageableTask<T> task, PageInfo pageInfo, Page page, T

@Override
public Object[][] rows() {
return new Object[0][];
return Iterables.toArray(page, Object[].class);
}

@Override
public ListenableFuture<TaskResult> fetch(final PageInfo newPageInfo) {
if (context == null) {
throw new IllegalStateException("TaskResult already closed");
}

if (newPageInfo.position() == pageInfo.position()) {
if (newPageInfo.size() == pageInfo.size()) {
return Futures.immediateFuture((TaskResult) this);
Expand All @@ -66,6 +69,7 @@ public ListenableFuture<TaskResult> fetch(final PageInfo newPageInfo) {
throw new UnsupportedOperationException("fetching a smaller page than the current one is not supported");
}
}

if (newPageInfo.position() == (pageInfo.position() + pageInfo.size())) {
final SettableFuture<TaskResult> future = SettableFuture.create();
FutureCallback<TaskResult> forwardingFutureCallback = new ForwardingFutureCallback<>(future);
Expand All @@ -79,7 +83,7 @@ public ListenableFuture<TaskResult> fetch(final PageInfo newPageInfo) {

private ListenableFuture<TaskResult> pageWithOverlap(final PageInfo newPageInfo) {
final SettableFuture<TaskResult> future = SettableFuture.create();
PageInfo remainingPageInfo = new PageInfo(pageInfo.size(), newPageInfo.size() - pageInfo.size());
PageInfo remainingPageInfo = new PageInfo(pageInfo.position() + pageInfo.size(), newPageInfo.size() - pageInfo.size());

task.fetchMore(remainingPageInfo, context, new FutureCallback<TaskResult>() {
@Override
Expand All @@ -98,13 +102,8 @@ public void onFailure(@Nonnull Throwable t) {
private ListenableFuture<TaskResult> fetchNew(PageInfo newPageInfo) {
final SettableFuture<TaskResult> future = SettableFuture.create();
FutureCallback<TaskResult> forwardingFutureCallback = new ForwardingFutureCallback<>(future);
try {
context.close();
context = null;
} catch (IOException e) {
future.setException(e);
}
task.fetchNew(newPageInfo, forwardingFutureCallback);
task.fetchNew(newPageInfo, context, forwardingFutureCallback);
context = null;
return future;
}

Expand Down
1 change: 0 additions & 1 deletion sql/src/main/java/io/crate/executor/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.util.concurrent.ListenableFuture;

import java.util.List;
import java.util.UUID;

/**
* A task gets executed as part of or in the context of a
Expand Down
56 changes: 56 additions & 0 deletions sql/src/main/java/io/crate/executor/pageable/CachingPageCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.executor.pageable;

import io.crate.executor.Page;
import io.crate.executor.PageInfo;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class CachingPageCache implements PageCache {

private final long maxRowSize;
private final Map<PageInfo, Page> cache;

private long currentSize;

public CachingPageCache(long maxRowSize) {
this.cache = new ConcurrentHashMap<>();
this.maxRowSize = maxRowSize;
this.currentSize = 0;
}

@Override
public Page get(PageInfo pageInfo) {
return cache.get(pageInfo);
}

@Override
public void put(PageInfo pageInfo, Page page) {
if (currentSize + pageInfo.size() <= maxRowSize) {
if (cache.put(pageInfo, page) == null) {
currentSize += pageInfo.size();
}
}
}
}
44 changes: 44 additions & 0 deletions sql/src/main/java/io/crate/executor/pageable/NoOpPageCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to CRATE Technology GmbH ("Crate") under one or more contributor
* license agreements. See the NOTICE file distributed with this work for
* additional information regarding copyright ownership. Crate licenses
* this file to you under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. You may
* obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*
* However, if you have executed another commercial license agreement
* with Crate these terms will supersede the license and you may use the
* software solely pursuant to the terms of the relevant commercial agreement.
*/

package io.crate.executor.pageable;

import io.crate.executor.Page;
import io.crate.executor.PageInfo;

import javax.annotation.Nullable;

public class NoOpPageCache implements PageCache {

public static final NoOpPageCache INSTANCE = new NoOpPageCache();


@Nullable
@Override
public Page get(PageInfo pageInfo) {
return null;
}

@Override
public void put(PageInfo pageInfo, Page page) {

}
}
Loading

0 comments on commit 057a5cf

Please sign in to comment.