Skip to content

Commit

Permalink
Java CDK (destinations): add tests for 0-record state messages (#34419)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Jan 22, 2024
1 parent 6b4f215 commit 71038ce
Showing 1 changed file with 64 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class GlobalAsyncStateManagerTest {
.withType(Type.STATE)
.withState(new PartialAirbyteStateMessage()
.withType(AirbyteStateType.GLOBAL));
private static final PartialAirbyteMessage GLOBAL_STATE_MESSAGE3 = new PartialAirbyteMessage()
.withType(Type.STATE)
.withState(new PartialAirbyteStateMessage()
.withType(AirbyteStateType.GLOBAL));
private static final PartialAirbyteMessage STREAM1_STATE_MESSAGE1 = new PartialAirbyteMessage()
.withType(Type.STATE)
.withState(new PartialAirbyteStateMessage()
Expand All @@ -58,6 +62,12 @@ class GlobalAsyncStateManagerTest {
.withState(new PartialAirbyteStateMessage()
.withType(AirbyteStateType.STREAM)
.withStream(new PartialAirbyteStreamState().withStreamDescriptor(STREAM1_DESC)));

private static final PartialAirbyteMessage STREAM1_STATE_MESSAGE3 = new PartialAirbyteMessage()
.withType(Type.STATE)
.withState(new PartialAirbyteStateMessage()
.withType(AirbyteStateType.STREAM)
.withStream(new PartialAirbyteStreamState().withStreamDescriptor(STREAM1_DESC)));
private static final PartialAirbyteMessage STREAM2_STATE_MESSAGE = new PartialAirbyteMessage()
.withType(Type.STATE)
.withState(new PartialAirbyteStateMessage()
Expand Down Expand Up @@ -149,6 +159,33 @@ void testCorrectFlushingOneStream() {
assertEquals(List.of(new AirbyteStateStats().withRecordCount(10.0)), stateWithStats2.values().stream().toList());
}

@Test
void testZeroRecordFlushing() {
final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES));

final var preConvertId0 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager);
stateManager.trackState(GLOBAL_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE);
stateManager.decrement(preConvertId0, 10);
final Map<PartialAirbyteMessage, AirbyteStateStats> stateWithStats = stateManager.flushStates().stream()
.collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats));
assertEquals(List.of(GLOBAL_STATE_MESSAGE1), stateWithStats.keySet().stream().toList());
assertEquals(List.of(new AirbyteStateStats().withRecordCount(10.0)), stateWithStats.values().stream().toList());

stateManager.trackState(GLOBAL_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE);
final Map<PartialAirbyteMessage, AirbyteStateStats> stateWithStats2 = stateManager.flushStates().stream()
.collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats));
assertEquals(List.of(GLOBAL_STATE_MESSAGE2), stateWithStats2.keySet().stream().toList());
assertEquals(List.of(new AirbyteStateStats().withRecordCount(0.0)), stateWithStats2.values().stream().toList());

final var afterConvertId2 = simulateIncomingRecords(STREAM1_DESC, 10, stateManager);
stateManager.trackState(GLOBAL_STATE_MESSAGE3, STATE_MSG_SIZE, DEFAULT_NAMESPACE);
stateManager.decrement(afterConvertId2, 10);
final Map<PartialAirbyteMessage, AirbyteStateStats> stateWithStats3 = stateManager.flushStates().stream()
.collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats));
assertEquals(List.of(GLOBAL_STATE_MESSAGE3), stateWithStats3.keySet().stream().toList());
assertEquals(List.of(new AirbyteStateStats().withRecordCount(10.0)), stateWithStats3.values().stream().toList());
}

@Test
void testCorrectFlushingManyStreams() {
final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES));
Expand Down Expand Up @@ -215,6 +252,33 @@ void testCorrectFlushingOneStream() {
assertEquals(List.of(new AirbyteStateStats().withRecordCount(10.0)), stateWithStats2.values().stream().toList());
}

@Test
void testZeroRecordFlushing() {
final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES));

var stateId = simulateIncomingRecords(STREAM1_DESC, 3, stateManager);
stateManager.trackState(STREAM1_STATE_MESSAGE1, STATE_MSG_SIZE, DEFAULT_NAMESPACE);
stateManager.decrement(stateId, 3);
final Map<PartialAirbyteMessage, AirbyteStateStats> stateWithStats = stateManager.flushStates().stream()
.collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats));
assertEquals(List.of(STREAM1_STATE_MESSAGE1), stateWithStats.keySet().stream().toList());
assertEquals(List.of(new AirbyteStateStats().withRecordCount(3.0)), stateWithStats.values().stream().toList());

stateManager.trackState(STREAM1_STATE_MESSAGE2, STATE_MSG_SIZE, DEFAULT_NAMESPACE);
final Map<PartialAirbyteMessage, AirbyteStateStats> stateWithStats2 = stateManager.flushStates().stream()
.collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats));
assertEquals(List.of(STREAM1_STATE_MESSAGE2), stateWithStats2.keySet().stream().toList());
assertEquals(List.of(new AirbyteStateStats().withRecordCount(0.0)), stateWithStats2.values().stream().toList());

stateId = simulateIncomingRecords(STREAM1_DESC, 10, stateManager);
stateManager.trackState(STREAM1_STATE_MESSAGE3, STATE_MSG_SIZE, DEFAULT_NAMESPACE);
stateManager.decrement(stateId, 10);
final Map<PartialAirbyteMessage, AirbyteStateStats> stateWithStats3 = stateManager.flushStates().stream()
.collect(Collectors.toMap(PartialStateWithDestinationStats::stateMessage, PartialStateWithDestinationStats::stats));
assertEquals(List.of(STREAM1_STATE_MESSAGE3), stateWithStats3.keySet().stream().toList());
assertEquals(List.of(new AirbyteStateStats().withRecordCount(10.0)), stateWithStats3.values().stream().toList());
}

@Test
void testCorrectFlushingManyStream() {
final GlobalAsyncStateManager stateManager = new GlobalAsyncStateManager(new GlobalMemoryManager(TOTAL_QUEUES_MAX_SIZE_LIMIT_BYTES));
Expand Down

0 comments on commit 71038ce

Please sign in to comment.