diff --git a/docs/ReleaseNotes.md b/docs/ReleaseNotes.md index db5284b5e7..75682894c2 100644 --- a/docs/ReleaseNotes.md +++ b/docs/ReleaseNotes.md @@ -25,7 +25,7 @@ The Guava dependency version has been updated to 31.1. Projects may need to chec * **Performance** Improvement 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) * **Performance** Improvement 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) * **Performance** Improvement 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) -* **Feature** Feature 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) +* **Feature** Support additional streaming modes: LARGE, MEDIUM, SMALL [(Issue ##1915)](https://github.com/FoundationDB/fdb-record-layer/issues/#1915) * **Feature** Feature 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) * **Feature** Feature 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) * **Feature** Feature 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN) diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/CursorStreamingMode.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/CursorStreamingMode.java index e94b502a66..2df8d610e8 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/CursorStreamingMode.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/CursorStreamingMode.java @@ -32,5 +32,12 @@ public enum CursorStreamingMode { /** The client will process records one-at-a-time. */ ITERATOR, /** The client will load all records immediately, such as with {@link RecordCursor#asList}. */ - WANT_ALL + WANT_ALL, + /** Advanced. Transfer data in batches small enough to not be much more expensive than reading individual rows, + * to minimize cost if iteration stops early */ + SMALL, + /** Advanced. Transfer data in batches sized in between small and large */ + MEDIUM, + /** Advanced. Transfer data in batches large enough to be, in a high-concurrency environment, nearly as efficient as possible */ + LARGE } diff --git a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursor.java b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursor.java index 9e89036bf3..2f8a0cf3bc 100644 --- a/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursor.java +++ b/fdb-record-layer-core/src/main/java/com/apple/foundationdb/record/provider/foundationdb/KeyValueCursor.java @@ -252,8 +252,15 @@ public KeyValueCursor build() throws RecordCoreException { final int limit = scanProperties.getExecuteProperties().getReturnedRowLimit(); final StreamingMode streamingMode; - if (scanProperties.getCursorStreamingMode() == CursorStreamingMode.ITERATOR) { + CursorStreamingMode propertiesStreamingMode = scanProperties.getCursorStreamingMode(); + if (propertiesStreamingMode == CursorStreamingMode.ITERATOR) { streamingMode = StreamingMode.ITERATOR; + } else if (propertiesStreamingMode == CursorStreamingMode.LARGE) { + streamingMode = StreamingMode.LARGE; + } else if (propertiesStreamingMode == CursorStreamingMode.MEDIUM) { + streamingMode = StreamingMode.MEDIUM; + } else if (propertiesStreamingMode == CursorStreamingMode.SMALL) { + streamingMode = StreamingMode.SMALL; } else if (limit == ReadTransaction.ROW_LIMIT_UNLIMITED) { streamingMode = StreamingMode.WANT_ALL; } else { diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/RemoteFetchSplitRecordsTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/RemoteFetchSplitRecordsTest.java index c6c189c125..b51074ddf7 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/RemoteFetchSplitRecordsTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/RemoteFetchSplitRecordsTest.java @@ -20,6 +20,7 @@ package com.apple.foundationdb.record.provider.foundationdb; +import com.apple.foundationdb.record.CursorStreamingMode; import com.apple.foundationdb.record.TestRecords1Proto; import com.apple.foundationdb.record.IndexFetchMethod; import com.apple.foundationdb.record.query.plan.plans.RecordQueryPlan; @@ -28,6 +29,7 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; import javax.annotation.Nonnull; @@ -83,8 +85,8 @@ void indexPrefetchSplitRecordReverseTest(IndexFetchMethod useIndexPrefetch) thro } @ParameterizedTest(name = "indexPrefetchManySplitRecordTest(" + ARGUMENTS_WITH_NAMES_PLACEHOLDER + ")") - @EnumSource() - void indexPrefetchManySplitRecordTest(IndexFetchMethod useIndexPrefetch) throws Exception { + @MethodSource("fetchMethodAndStreamMode") + void indexPrefetchManySplitRecordTest(IndexFetchMethod useIndexPrefetch, CursorStreamingMode streamingMode) throws Exception { // TODO: This test actually runs the API in a way that returns results that are too large: Over 50MB // FDB will fix the issue to limit the bytes returned and then this test would need to adjust accordingly. int numTransactions = 8; @@ -96,7 +98,7 @@ void indexPrefetchManySplitRecordTest(IndexFetchMethod useIndexPrefetch) throws } RecordQueryPlan plan = plan(NUM_VALUES_LARGER_THAN_1000_REVERSE, useIndexPrefetch); - executeAndVerifyData(plan, numRecordsPerTransaction * numTransactions, (rec, i) -> { + executeAndVerifyData(plan, null, serializableWithStreamingMode(streamingMode), numRecordsPerTransaction * numTransactions, (rec, i) -> { int primaryKey = 200 + i; String strValue = ((primaryKey % 2) == 0) ? "even" : "odd"; int numValue = 2000 - i; diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/RemoteFetchTest.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/RemoteFetchTest.java index 90d8b06204..3d20f24c6d 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/RemoteFetchTest.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/RemoteFetchTest.java @@ -20,6 +20,7 @@ package com.apple.foundationdb.record.provider.foundationdb; +import com.apple.foundationdb.record.CursorStreamingMode; import com.apple.foundationdb.record.ExecuteProperties; import com.apple.foundationdb.record.IndexEntry; import com.apple.foundationdb.record.IndexScanType; @@ -45,6 +46,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; +import org.junit.jupiter.params.provider.MethodSource; import javax.annotation.Nonnull; import java.util.ArrayList; @@ -124,10 +126,10 @@ void indexPrefetchSimpleIndexReverseTest(IndexFetchMethod useIndexPrefetch) thro * @param useIndexPrefetch the fetch method mode to use */ @ParameterizedTest(name = "indexPrefetchPrimaryKeyIndexTest(" + ARGUMENTS_WITH_NAMES_PLACEHOLDER + ")") - @EnumSource() - void indexPrefetchPrimaryKeyIndexTest(IndexFetchMethod useIndexPrefetch) throws Exception { + @MethodSource("fetchMethodAndStreamMode") + void indexPrefetchPrimaryKeyIndexTest(IndexFetchMethod useIndexPrefetch, CursorStreamingMode streamingMode) throws Exception { RecordQueryPlan plan = plan(PRIMARY_KEY_EQUAL, useIndexPrefetch); - executeAndVerifyData(plan, 1, (rec, i) -> { + executeAndVerifyData(plan, null, serializableWithStreamingMode(streamingMode), 1, (rec, i) -> { int primaryKey = 1; String strValue = ((primaryKey % 2) == 0) ? "even" : "odd"; int numValue = 1000 - primaryKey; @@ -137,10 +139,11 @@ void indexPrefetchPrimaryKeyIndexTest(IndexFetchMethod useIndexPrefetch) throws } @ParameterizedTest(name = "indexPrefetchComplexIndexTest(" + ARGUMENTS_WITH_NAMES_PLACEHOLDER + ")") - @EnumSource() - void indexPrefetchComplexIndexTest(IndexFetchMethod useIndexPrefetch) throws Exception { + @MethodSource("fetchMethodAndStreamMode") + void indexPrefetchComplexIndexTest(IndexFetchMethod useIndexPrefetch, CursorStreamingMode streamingMode) throws Exception { RecordQueryPlan plan = plan(STR_VALUE_EVEN, useIndexPrefetch); - executeAndVerifyData(plan, 50, (rec, i) -> { + // Pass in every supported streaming mode. The result should not change. + executeAndVerifyData(plan, null, serializableWithStreamingMode(streamingMode), 50, (rec, i) -> { int primaryKey = i * 2; int numValue = 1000 - primaryKey; assertRecord(rec, primaryKey, "even", numValue, "MySimpleRecord$str_value_indexed", "even", primaryKey); // we are filtering out all odd entries, so count*2 are the keys of the even ones diff --git a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/RemoteFetchTestBase.java b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/RemoteFetchTestBase.java index 864bf6e4aa..3670a154e1 100644 --- a/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/RemoteFetchTestBase.java +++ b/fdb-record-layer-core/src/test/java/com/apple/foundationdb/record/provider/foundationdb/RemoteFetchTestBase.java @@ -20,8 +20,11 @@ package com.apple.foundationdb.record.provider.foundationdb; +import com.apple.foundationdb.record.CursorStreamingMode; import com.apple.foundationdb.record.ExecuteProperties; +import com.apple.foundationdb.record.ExecuteState; import com.apple.foundationdb.record.IndexEntry; +import com.apple.foundationdb.record.IsolationLevel; import com.apple.foundationdb.record.RecordCursor; import com.apple.foundationdb.record.RecordCursorIterator; import com.apple.foundationdb.record.ScanProperties; @@ -37,12 +40,14 @@ import com.apple.test.Tags; import com.google.protobuf.Message; import org.junit.jupiter.api.Tag; +import org.junit.jupiter.params.provider.Arguments; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.List; import java.util.Objects; import java.util.function.BiConsumer; +import java.util.stream.Stream; import static com.apple.foundationdb.record.provider.foundationdb.FDBStoreTimer.Counts.REMOTE_FETCH; import static com.apple.foundationdb.record.provider.foundationdb.FDBStoreTimer.Events.SCAN_REMOTE_FETCH_ENTRY; @@ -94,6 +99,12 @@ public class RemoteFetchTestBase extends FDBRecordStoreQueryTestBase { .setFilter(Query.field("rec_no").equalsValue(1L)) .build(); + public static Stream fetchMethodAndStreamMode() { + return Stream.of(IndexFetchMethod.values()) + .flatMap(indexFetchMethod -> Stream.of(CursorStreamingMode.ITERATOR, CursorStreamingMode.LARGE, CursorStreamingMode.MEDIUM, CursorStreamingMode.SMALL) + .map(streamMode -> Arguments.of(indexFetchMethod, streamMode))); + } + protected void assertRecord(final FDBQueriedRecord rec, final long primaryKey, final String strValue, final int numValue, final String indexName, Object indexedValue) { assertBaseRecord(rec, primaryKey, strValue, numValue, indexName, indexedValue); @@ -241,4 +252,14 @@ protected List> scanToList(FDBRecordContext context, S } return results; } + + @Nonnull + protected ExecuteProperties serializableWithStreamingMode(final CursorStreamingMode streamingMode) { + final ExecuteProperties execProperties = ExecuteProperties.newBuilder() + .setIsolationLevel(IsolationLevel.SERIALIZABLE) + .setState(ExecuteState.NO_LIMITS) + .setDefaultCursorStreamingMode(streamingMode) + .build(); + return execProperties; + } }