Skip to content

Commit

Permalink
Remove data field in the state aggregator if possible (#17538)
Browse files Browse the repository at this point in the history
* Remove data field in the state aggregator if possible

* Add comments

* Add comment and update test
  • Loading branch information
benmoriceau committed Oct 3, 2022
1 parent 65cd7e8 commit f45275e
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ public State getAggregated() {
if (state.getType() == null || state.getType() == AirbyteStateType.LEGACY) {
return new State().withState(state.getData());
} else {
/**
* The destination emit a Legacy state in order to be retro-compatible with old platform. If we are
* running this code, we know that the platform has been upgraded and we can thus discard the legacy
* state. Keeping the legacy state is causing issue because of its size
* (https://github.com/airbytehq/oncall/issues/731)
*/
state.setData(null);
return new State()
.withState(Jsons.jsonNode(List.of(state)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ class StreamStateAggregator implements StateAggregator {

@Override
public void ingest(final AirbyteStateMessage stateMessage) {
/**
* The destination emit a Legacy state in order to be retro-compatible with old platform. If we are
* running this code, we know that the platform has been upgraded and we can thus discard the legacy
* state. Keeping the legacy state is causing issue because of its size
* (https://github.com/airbytehq/oncall/issues/731)
*/
stateMessage.setData(null);
aggregatedState.put(stateMessage.getStream().getStreamDescriptor(), stateMessage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,16 @@ void testGlobalState() {
final AirbyteStateMessage state1 = getGlobalMessage(1);
final AirbyteStateMessage state2 = getGlobalMessage(2);

stateAggregator.ingest(state1);
final AirbyteStateMessage state1NoData = getGlobalMessage(1).withData(null);
final AirbyteStateMessage state2NoData = getGlobalMessage(2).withData(null);

stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state1), AirbyteStateMessage.class));
Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State()
.withState(Jsons.jsonNode(List.of(state1))));
.withState(Jsons.jsonNode(List.of(state1NoData))));

stateAggregator.ingest(state2);
stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state2), AirbyteStateMessage.class));
Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State()
.withState(Jsons.jsonNode(List.of(state2))));
.withState(Jsons.jsonNode(List.of(state2NoData))));
}

@Test
Expand Down Expand Up @@ -126,19 +129,23 @@ void testStreamStateWithFeatureFlagOn() {
final AirbyteStateMessage state2 = getStreamMessage("b", 2);
final AirbyteStateMessage state3 = getStreamMessage("b", 3);

final AirbyteStateMessage state1NoData = getStreamMessage("a", 1).withData(null);
final AirbyteStateMessage state2NoData = getStreamMessage("b", 2).withData(null);
final AirbyteStateMessage state3NoData = getStreamMessage("b", 3).withData(null);

stateAggregator = new DefaultStateAggregator(USE_STREAM_CAPABLE_STATE);

stateAggregator.ingest(state1);
stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state1), AirbyteStateMessage.class));
Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State()
.withState(Jsons.jsonNode(List.of(state1))));
.withState(Jsons.jsonNode(List.of(state1NoData))));

stateAggregator.ingest(state2);
stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state2), AirbyteStateMessage.class));
Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State()
.withState(Jsons.jsonNode(List.of(state2, state1))));
.withState(Jsons.jsonNode(List.of(state2NoData, state1NoData))));

stateAggregator.ingest(state3);
stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state3), AirbyteStateMessage.class));
Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State()
.withState(Jsons.jsonNode(List.of(state3, state1))));
.withState(Jsons.jsonNode(List.of(state3NoData, state1NoData))));
}

private AirbyteStateMessage getNullMessage(final int stateValue) {
Expand All @@ -158,7 +165,8 @@ private AirbyteStateMessage getGlobalMessage(final int stateValue) {
.withStreamDescriptor(
new StreamDescriptor()
.withName("test"))
.withStreamState(Jsons.jsonNode(stateValue)))));
.withStreamState(Jsons.jsonNode(stateValue)))))
.withData(Jsons.jsonNode("HelloWorld"));
}

private AirbyteStateMessage getStreamMessage(final String streamName, final int stateValue) {
Expand All @@ -168,7 +176,8 @@ private AirbyteStateMessage getStreamMessage(final String streamName, final int
.withStreamDescriptor(
new StreamDescriptor()
.withName(streamName))
.withStreamState(Jsons.jsonNode(stateValue)));
.withStreamState(Jsons.jsonNode(stateValue)))
.withData(Jsons.jsonNode("Hello"));
}

private AirbyteStateMessage getEmptyMessage(final AirbyteStateType stateType) {
Expand Down

0 comments on commit f45275e

Please sign in to comment.