Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve #1915: add support for additional cursor streaming modes #1916

Merged
merged 1 commit into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ The Guava dependency version has been updated to 31.1. Projects may need to chec
* **Performance** Improvement 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Performance** Improvement 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Performance** Improvement 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Support additional streaming modes: LARGE, MEDIUM, SMALL [(Issue ##1915)](https://github.com/FoundationDB/fdb-record-layer/issues/#1915)
* **Feature** Feature 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Feature** Feature 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,12 @@ public enum CursorStreamingMode {
/** The client will process records one-at-a-time. */
ITERATOR,
/** The client will load all records immediately, such as with {@link RecordCursor#asList}. */
WANT_ALL
WANT_ALL,
/** Advanced. Transfer data in batches small enough to not be much more expensive than reading individual rows,
* to minimize cost if iteration stops early */
SMALL,
/** Advanced. Transfer data in batches sized in between small and large */
MEDIUM,
/** Advanced. Transfer data in batches large enough to be, in a high-concurrency environment, nearly as efficient as possible */
LARGE
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,15 @@ public KeyValueCursor build() throws RecordCoreException {

final int limit = scanProperties.getExecuteProperties().getReturnedRowLimit();
final StreamingMode streamingMode;
if (scanProperties.getCursorStreamingMode() == CursorStreamingMode.ITERATOR) {
CursorStreamingMode propertiesStreamingMode = scanProperties.getCursorStreamingMode();
if (propertiesStreamingMode == CursorStreamingMode.ITERATOR) {
streamingMode = StreamingMode.ITERATOR;
} else if (propertiesStreamingMode == CursorStreamingMode.LARGE) {
streamingMode = StreamingMode.LARGE;
} else if (propertiesStreamingMode == CursorStreamingMode.MEDIUM) {
streamingMode = StreamingMode.MEDIUM;
} else if (propertiesStreamingMode == CursorStreamingMode.SMALL) {
streamingMode = StreamingMode.SMALL;
} else if (limit == ReadTransaction.ROW_LIMIT_UNLIMITED) {
streamingMode = StreamingMode.WANT_ALL;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package com.apple.foundationdb.record.provider.foundationdb;

import com.apple.foundationdb.record.CursorStreamingMode;
import com.apple.foundationdb.record.TestRecords1Proto;
import com.apple.foundationdb.record.IndexFetchMethod;
import com.apple.foundationdb.record.query.plan.plans.RecordQueryPlan;
Expand All @@ -28,6 +29,7 @@
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;

import javax.annotation.Nonnull;

Expand Down Expand Up @@ -83,8 +85,8 @@ void indexPrefetchSplitRecordReverseTest(IndexFetchMethod useIndexPrefetch) thro
}

@ParameterizedTest(name = "indexPrefetchManySplitRecordTest(" + ARGUMENTS_WITH_NAMES_PLACEHOLDER + ")")
@EnumSource()
void indexPrefetchManySplitRecordTest(IndexFetchMethod useIndexPrefetch) throws Exception {
@MethodSource("fetchMethodAndStreamMode")
void indexPrefetchManySplitRecordTest(IndexFetchMethod useIndexPrefetch, CursorStreamingMode streamingMode) throws Exception {
// TODO: This test actually runs the API in a way that returns results that are too large: Over 50MB
// FDB will fix the issue to limit the bytes returned and then this test would need to adjust accordingly.
int numTransactions = 8;
Expand All @@ -96,7 +98,7 @@ void indexPrefetchManySplitRecordTest(IndexFetchMethod useIndexPrefetch) throws
}
RecordQueryPlan plan = plan(NUM_VALUES_LARGER_THAN_1000_REVERSE, useIndexPrefetch);

executeAndVerifyData(plan, numRecordsPerTransaction * numTransactions, (rec, i) -> {
executeAndVerifyData(plan, null, serializableWithStreamingMode(streamingMode), numRecordsPerTransaction * numTransactions, (rec, i) -> {
int primaryKey = 200 + i;
String strValue = ((primaryKey % 2) == 0) ? "even" : "odd";
int numValue = 2000 - i;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

package com.apple.foundationdb.record.provider.foundationdb;

import com.apple.foundationdb.record.CursorStreamingMode;
import com.apple.foundationdb.record.ExecuteProperties;
import com.apple.foundationdb.record.IndexEntry;
import com.apple.foundationdb.record.IndexScanType;
Expand All @@ -45,6 +46,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;

import javax.annotation.Nonnull;
import java.util.ArrayList;
Expand Down Expand Up @@ -124,10 +126,10 @@ void indexPrefetchSimpleIndexReverseTest(IndexFetchMethod useIndexPrefetch) thro
* @param useIndexPrefetch the fetch method mode to use
*/
@ParameterizedTest(name = "indexPrefetchPrimaryKeyIndexTest(" + ARGUMENTS_WITH_NAMES_PLACEHOLDER + ")")
@EnumSource()
void indexPrefetchPrimaryKeyIndexTest(IndexFetchMethod useIndexPrefetch) throws Exception {
@MethodSource("fetchMethodAndStreamMode")
void indexPrefetchPrimaryKeyIndexTest(IndexFetchMethod useIndexPrefetch, CursorStreamingMode streamingMode) throws Exception {
RecordQueryPlan plan = plan(PRIMARY_KEY_EQUAL, useIndexPrefetch);
executeAndVerifyData(plan, 1, (rec, i) -> {
executeAndVerifyData(plan, null, serializableWithStreamingMode(streamingMode), 1, (rec, i) -> {
int primaryKey = 1;
String strValue = ((primaryKey % 2) == 0) ? "even" : "odd";
int numValue = 1000 - primaryKey;
Expand All @@ -137,10 +139,11 @@ void indexPrefetchPrimaryKeyIndexTest(IndexFetchMethod useIndexPrefetch) throws
}

@ParameterizedTest(name = "indexPrefetchComplexIndexTest(" + ARGUMENTS_WITH_NAMES_PLACEHOLDER + ")")
@EnumSource()
void indexPrefetchComplexIndexTest(IndexFetchMethod useIndexPrefetch) throws Exception {
@MethodSource("fetchMethodAndStreamMode")
void indexPrefetchComplexIndexTest(IndexFetchMethod useIndexPrefetch, CursorStreamingMode streamingMode) throws Exception {
RecordQueryPlan plan = plan(STR_VALUE_EVEN, useIndexPrefetch);
executeAndVerifyData(plan, 50, (rec, i) -> {
// Pass in every supported streaming mode. The result should not change.
executeAndVerifyData(plan, null, serializableWithStreamingMode(streamingMode), 50, (rec, i) -> {
int primaryKey = i * 2;
int numValue = 1000 - primaryKey;
assertRecord(rec, primaryKey, "even", numValue, "MySimpleRecord$str_value_indexed", "even", primaryKey); // we are filtering out all odd entries, so count*2 are the keys of the even ones
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@

package com.apple.foundationdb.record.provider.foundationdb;

import com.apple.foundationdb.record.CursorStreamingMode;
import com.apple.foundationdb.record.ExecuteProperties;
import com.apple.foundationdb.record.ExecuteState;
import com.apple.foundationdb.record.IndexEntry;
import com.apple.foundationdb.record.IsolationLevel;
import com.apple.foundationdb.record.RecordCursor;
import com.apple.foundationdb.record.RecordCursorIterator;
import com.apple.foundationdb.record.ScanProperties;
Expand All @@ -37,12 +40,14 @@
import com.apple.test.Tags;
import com.google.protobuf.Message;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.provider.Arguments;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Objects;
import java.util.function.BiConsumer;
import java.util.stream.Stream;

import static com.apple.foundationdb.record.provider.foundationdb.FDBStoreTimer.Counts.REMOTE_FETCH;
import static com.apple.foundationdb.record.provider.foundationdb.FDBStoreTimer.Events.SCAN_REMOTE_FETCH_ENTRY;
Expand Down Expand Up @@ -94,6 +99,12 @@ public class RemoteFetchTestBase extends FDBRecordStoreQueryTestBase {
.setFilter(Query.field("rec_no").equalsValue(1L))
.build();

public static Stream<Arguments> fetchMethodAndStreamMode() {
return Stream.of(IndexFetchMethod.values())
.flatMap(indexFetchMethod -> Stream.of(CursorStreamingMode.ITERATOR, CursorStreamingMode.LARGE, CursorStreamingMode.MEDIUM, CursorStreamingMode.SMALL)
.map(streamMode -> Arguments.of(indexFetchMethod, streamMode)));
}

protected void assertRecord(final FDBQueriedRecord<Message> rec, final long primaryKey, final String strValue,
final int numValue, final String indexName, Object indexedValue) {
assertBaseRecord(rec, primaryKey, strValue, numValue, indexName, indexedValue);
Expand Down Expand Up @@ -241,4 +252,14 @@ protected List<FDBQueriedRecord<Message>> scanToList(FDBRecordContext context, S
}
return results;
}

@Nonnull
protected ExecuteProperties serializableWithStreamingMode(final CursorStreamingMode streamingMode) {
final ExecuteProperties execProperties = ExecuteProperties.newBuilder()
.setIsolationLevel(IsolationLevel.SERIALIZABLE)
.setState(ExecuteState.NO_LIMITS)
.setDefaultCursorStreamingMode(streamingMode)
.build();
return execProperties;
}
}