Skip to content

Commit

Permalink
Add comments about intermediate state emission (#16262)
Browse files Browse the repository at this point in the history
* Add comments about intermediate state emission

* Adjust wording

* Format code
  • Loading branch information
tuliren committed Sep 4, 2022
1 parent e9a8a05 commit 63bc323
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,30 @@ public class StateDecoratingIterator extends AbstractIterator<AirbyteMessage> im
private final AirbyteStreamNameNamespacePair pair;
private final String cursorField;
private final JsonSchemaPrimitive cursorType;
private final int stateEmissionFrequency;

private final String initialCursor;
private String maxCursor;
private boolean hasEmittedFinalState;

// The intermediateStateMessage is set to the latest state message.
// For every stateEmissionFrequency messages, emitIntermediateState is set to true and
// the latest intermediateStateMessage will be emitted.
/**
* These parameters are for intermediate state message emission. We can emit an intermediate state
* when the following two conditions are met.
* <p/>
* 1. The records are sorted by the cursor field. This is true when {@code stateEmissionFrequency} >
* 0. This logic is guaranteed in {@code AbstractJdbcSource#queryTableIncremental}, in which an
* "ORDER BY" clause is appended to the SQL query if {@code stateEmissionFrequency} > 0.
* <p/>
* 2. There is a cursor value that is ready for emission. A cursor value is "ready" if there is no
* more record with the same value. We cannot emit a cursor at will, because there may be multiple
* records with the same cursor value. If we emit a cursor ignoring this condition, should the sync
* fail right after the emission, the next sync may skip some records with the same cursor value due
* to "WHERE cursor_field > cursor" in {@code AbstractJdbcSource#queryTableIncremental}.
* <p/>
* The {@code intermediateStateMessage} is set to the latest state message that is ready for
* emission. For every {@code stateEmissionFrequency} messages, {@code emitIntermediateState} is set
* to true and the latest "ready" state will be emitted in the next {@code computeNext} call.
*/
private final int stateEmissionFrequency;
private int totalRecordCount = 0;
private boolean emitIntermediateState = false;
private AirbyteMessage intermediateStateMessage = null;
Expand All @@ -47,9 +62,11 @@ public class StateDecoratingIterator extends AbstractIterator<AirbyteMessage> im
* @param cursorField Path to the comparator field used to track the records read so far
* @param initialCursor name of the initial cursor column
* @param cursorType ENUM type of primitive values that can be used as a cursor for checkpointing
* @param stateEmissionFrequency If larger than 0, intermediate states will be emitted for every
* stateEmissionFrequency records. Only emit intermediate states if the records are sorted by
* the cursor field.
* @param stateEmissionFrequency If larger than 0, the records are sorted by the cursor field, and
* intermediate states will be emitted for every {@code stateEmissionFrequency} records. The
* order of the records is guaranteed in {@code AbstractJdbcSource#queryTableIncremental}, in
* which an "ORDER BY" clause is appended to the SQL query if {@code stateEmissionFrequency}
* > 0.
*/
public StateDecoratingIterator(final Iterator<AirbyteMessage> messageIterator,
final StateManager stateManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -75,6 +74,7 @@ private static AirbyteMessage createStateMessage(final String recordValue) {

private Iterator<AirbyteMessage> createExceptionIterator() {
return new Iterator<AirbyteMessage>() {

final Iterator<AirbyteMessage> internalMessageIterator = MoreIterators.of(RECORD_MESSAGE_1, RECORD_MESSAGE_2,
RECORD_MESSAGE_2, RECORD_MESSAGE_3);

Expand All @@ -88,7 +88,8 @@ public AirbyteMessage next() {
if (internalMessageIterator.hasNext()) {
return internalMessageIterator.next();
} else {
// this line throws a RunTimeException wrapped around a SQLException to mimic the flow of when a SQLException is thrown and wrapped in
// this line throws a RunTimeException wrapped around a SQLException to mimic the flow of when a
// SQLException is thrown and wrapped in
// StreamingJdbcDatabase#tryAdvance
throw new RuntimeException(new SQLException("Connection marked broken because of SQLSTATE(080006)", "08006"));
}
Expand Down Expand Up @@ -186,10 +187,12 @@ void testIteratorCatchesExceptionWhenEmissionFrequencyNonZero() {
1);
assertEquals(RECORD_MESSAGE_1, iterator.next());
assertEquals(RECORD_MESSAGE_2, iterator.next());
// continues to emit RECORD_MESSAGE_2 since cursorField has not changed thus not satisfying the condition of "ready"
// continues to emit RECORD_MESSAGE_2 since cursorField has not changed thus not satisfying the
// condition of "ready"
assertEquals(RECORD_MESSAGE_2, iterator.next());
assertEquals(RECORD_MESSAGE_3, iterator.next());
// emits the first state message since the iterator has changed cursorFields (2 -> 3) and met the frequency minimum of 1 record
// emits the first state message since the iterator has changed cursorFields (2 -> 3) and met the
// frequency minimum of 1 record
assertEquals(STATE_MESSAGE_2, iterator.next());
// no further records to read since Exception was caught above and marked iterator as endOfData()
assertFalse(iterator.hasNext());
Expand All @@ -210,8 +213,10 @@ void testIteratorCatchesExceptionWhenEmissionFrequencyZero() {
assertEquals(RECORD_MESSAGE_2, iterator.next());
assertEquals(RECORD_MESSAGE_2, iterator.next());
assertEquals(RECORD_MESSAGE_3, iterator.next());
// since stateEmission is not set to emit frequently, this will catch the error but not emit state message since it wasn't in a ready state
// of having a frequency > 0 but will prevent an exception from causing the iterator to fail by marking iterator as endOfData()
// since stateEmission is not set to emit frequently, this will catch the error but not emit state
// message since it wasn't in a ready state
// of having a frequency > 0 but will prevent an exception from causing the iterator to fail by
// marking iterator as endOfData()
assertFalse(iterator.hasNext());
}

Expand Down

0 comments on commit 63bc323

Please sign in to comment.