Skip to content

Commit

Permalink
Introduce support for parallel scans into tempest2 (#190)
Browse files Browse the repository at this point in the history
* Introduce support for parallel scans into tempest2

* addressing comments

* internal method calls of Scan to use named args
  • Loading branch information
abmargb committed Sep 4, 2024
1 parent 9c4094e commit 8430cf3
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 43 deletions.
16 changes: 2 additions & 14 deletions docs/guide/query_scan.md
Original file line number Diff line number Diff line change
Expand Up @@ -514,19 +514,7 @@ worker can be a thread (in programming languages that support multithreading) or
system process. To perform a parallel scan, each worker issues its own Scan request with an
unique `WorkerId`.

=== "Kotlin - SDK 2.x"

```kotlin
Not supported
```

=== "Java - SDK 2.x"

```java
Not supported
```

=== "Kotlin - SDK 1.x"
=== "Kotlin"

```kotlin
private val table: MusicTable
Expand All @@ -545,7 +533,7 @@ unique `WorkerId`.
}
```

=== "Java - SDK 1.x"
=== "Java"

```java
private final MusicTable table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,28 @@
import app.cash.tempest2.Offset;
import app.cash.tempest2.Page;
import app.cash.tempest2.QueryConfig;
import app.cash.tempest2.ScanConfig;
import app.cash.tempest2.WorkerId;
import app.cash.tempest2.musiclibrary.java.AlbumTrack;
import app.cash.tempest2.musiclibrary.java.MusicTable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import software.amazon.awssdk.enhanced.dynamodb.Expression;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

public class QueryNScan {

private final MusicTable table;
private final ExecutorService executor;

public QueryNScan(MusicTable table) {
public QueryNScan(MusicTable table, ExecutorService executor) {
this.table = table;
this.executor = executor;
}

// Query - Key Condition - Partition Key and Entity Type.
Expand Down Expand Up @@ -164,5 +171,25 @@ public List<AlbumTrack> loadAllAlbumTracks() {
}

// Scan - Parallel.
// Not supported.
public List<AlbumTrack> loadAllAlbumTracks2() {
Future<List<AlbumTrack>> segment1 = executor.submit(() -> loadSegment(1));
Future<List<AlbumTrack>> segment2 = executor.submit(() -> loadSegment(2));
List<AlbumTrack> results = new ArrayList<>();
try {
results.addAll(segment1.get());
results.addAll(segment2.get());
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException("Failed to load tracks", e);
}
return results;
}

private List<AlbumTrack> loadSegment(int segment) {
Page<AlbumTrack.Key, AlbumTrack> page = table.albumTracks().scan(
new ScanConfig.Builder()
.workerId(new WorkerId(segment, /* totalSegments */ 2))
.build()
);
return page.getContents();
}
}
100 changes: 76 additions & 24 deletions tempest2/src/main/kotlin/app/cash/tempest2/Scan.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,41 @@ interface Scannable<K : Any, I : Any> {
/**
* Scans up to the [pageSize] items or a maximum of 1 MB of data. This limit applies before the
* filter expression is evaluated.
*
* @param workerId identifies a tuple of `segment` and `totalSegments` in the context of parallel
* scans.
*/
fun scan(
pageSize: Int = 100,
consistentRead: Boolean = false,
filterExpression: Expression? = null,
initialOffset: Offset<K>? = null
initialOffset: Offset<K>? = null,
workerId: WorkerId? = null,
): Page<K, I>

// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).

fun scan() = scan(
ScanConfig.Builder().build(),
config = ScanConfig.Builder().build(),
initialOffset = null
)

fun scan(initialOffset: Offset<K>?) = scan(
ScanConfig.Builder().build(),
config = ScanConfig.Builder().build(),
initialOffset = initialOffset
)

fun scan(config: ScanConfig) = scan(
config,
config = config,
initialOffset = null
)

fun scan(config: ScanConfig, initialOffset: Offset<K>?) = scan(
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset
pageSize = config.pageSize,
consistentRead = config.consistentRead,
filterExpression = config.filterExpression,
initialOffset = initialOffset,
workerId = config.workerId
)

/**
Expand All @@ -68,26 +73,40 @@ interface Scannable<K : Any, I : Any> {
// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).

fun scanAll() = scanAll(
ScanConfig.Builder().build(),
config = ScanConfig.Builder().build(),
initialOffset = null
)

fun scanAll(initialOffset: Offset<K>?) = scanAll(
ScanConfig.Builder().build(),
config = ScanConfig.Builder().build(),
initialOffset = initialOffset
)

/**
* Executes a scan and returns a sequence of pages that contains all results, regardless of page size.
* New pages will be fetched as needed when the resulting sequence is enumerated.
*
* This method doesn't support parallel scans. `workerId`, if provided as part of `config`, will
* be ignored.
*/
fun scanAll(config: ScanConfig) = scanAll(
config,
config = config,
initialOffset = null
)

/**
* Executes a scan and returns a sequence of pages that contains all results, regardless of page size.
* New pages will be fetched as needed when the resulting sequence is enumerated.
*
* This method doesn't support parallel scans. `workerId`, if provided as part of `config`, will
* be ignored.
*/
fun scanAll(config: ScanConfig, initialOffset: Offset<K>?): Sequence<Page<K, I>> {
return scanAll(
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset
pageSize = config.pageSize,
consistentRead = config.consistentRead,
filterExpression = config.filterExpression,
initialOffset = initialOffset
)
}

Expand All @@ -105,39 +124,69 @@ interface Scannable<K : Any, I : Any> {
// Overloaded functions for Java callers (Kotlin interfaces do not support `@JvmOverloads`).

fun scanAllContents() = scanAllContents(
ScanConfig.Builder().build(),
config = ScanConfig.Builder().build(),
initialOffset = null
)

fun scanAllContents(initialOffset: Offset<K>?) = scanAllContents(
ScanConfig.Builder().build(),
config = ScanConfig.Builder().build(),
initialOffset = initialOffset
)

/**
* Executes a scan and returns a sequence that contains all results, regardless of page size.
* New pages will be fetched as needed when the resulting sequence is enumerated.
*
* This method doesn't support parallel scans. `workerId`, if provided as part of `config`, will
* be ignored.
*/
fun scanAllContents(config: ScanConfig) = scanAllContents(
config,
config = config,
initialOffset = null
)

/**
* Executes a scan and returns a sequence that contains all results, regardless of page size.
* New pages will be fetched as needed when the resulting sequence is enumerated.
*
* This method doesn't support parallel scans. `workerId`, if provided as part of `config`, will
* be ignored.
*/
fun scanAllContents(config: ScanConfig, initialOffset: Offset<K>?): Sequence<I> {
return scanAllContents(
config.pageSize,
config.consistentRead,
config.filterExpression,
initialOffset
pageSize = config.pageSize,
consistentRead = config.consistentRead,
filterExpression = config.filterExpression,
initialOffset = initialOffset
)
}
}

/**
* In the context of parallel scans, a worker is analogous to a thread or an operating
* system process. Each worker then issues its own Scan request with a unique [WorkerId], which
* represents a tuple of `segment` and `totalSegments`.
*/
data class WorkerId(
val segment: Int,
val totalSegments: Int
) {
init {
require(segment < totalSegments) { "Expect $segment to be less than $totalSegments" }
}
}

data class ScanConfig internal constructor(
val pageSize: Int,
val consistentRead: Boolean,
val filterExpression: Expression?
val filterExpression: Expression?,
val workerId: WorkerId?
) {
class Builder {
private var pageSize = 100
private var consistentRead = false
private var filterExpression: Expression? = null
private var workerId: WorkerId? = null

fun pageSize(pageSize: Int) = apply { this.pageSize = pageSize }

Expand All @@ -146,10 +195,13 @@ data class ScanConfig internal constructor(
fun filterExpression(filterExpression: Expression) =
apply { this.filterExpression = filterExpression }

fun workerId(workerId: WorkerId) = apply { this.workerId = workerId }

fun build() = ScanConfig(
pageSize,
consistentRead,
filterExpression
filterExpression,
workerId
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import app.cash.tempest2.AsyncScannable
import app.cash.tempest2.Offset
import app.cash.tempest2.Page
import app.cash.tempest2.Scannable
import app.cash.tempest2.WorkerId
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.asPublisher
Expand Down Expand Up @@ -50,9 +51,10 @@ internal class DynamoDbScannable<K : Any, I : Any, R : Any>(
pageSize: Int,
consistentRead: Boolean,
filterExpression: Expression?,
initialOffset: Offset<K>?
initialOffset: Offset<K>?,
workerId: WorkerId?
): Page<K, I> {
val request = toScanRequest(consistentRead, pageSize, filterExpression, initialOffset)
val request = toScanRequest(consistentRead, pageSize, filterExpression, initialOffset, workerId)
val page = if (secondaryIndexName != null) {
dynamoDbTable.index(secondaryIndexName).scan(request)
} else {
Expand Down Expand Up @@ -118,7 +120,8 @@ internal class DynamoDbScannable<K : Any, I : Any, R : Any>(
consistentRead: Boolean,
pageSize: Int,
filterExpression: Expression?,
initialOffset: Offset<K>?
initialOffset: Offset<K>?,
workerId: WorkerId? = null
): ScanEnhancedRequest {
val scan = ScanEnhancedRequest.builder()
.consistentRead(consistentRead)
Expand All @@ -130,6 +133,10 @@ internal class DynamoDbScannable<K : Any, I : Any, R : Any>(
if (initialOffset != null) {
scan.exclusiveStartKey(initialOffset.encodeOffset())
}
if (workerId != null) {
scan.segment(workerId.segment)
scan.totalSegments(workerId.totalSegments)
}
return scan.build()
}

Expand Down
Loading

0 comments on commit 8430cf3

Please sign in to comment.