Skip to content

Commit

Permalink
fix NPE (#14353)
Browse files Browse the repository at this point in the history
* fix NPE

* Add test

* Fix trailing
  • Loading branch information
benmoriceau committed Jul 1, 2022
1 parent 9ffc7dd commit e5f01f3
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,22 @@ public void start(final WorkerSourceConfig workerSourceConfig, final Path jobRoo
*/
isResetBasedForConfig = false;
} else {
stateWrapper = StateMessageHelper.getTypedState(workerSourceConfig.getState().getState(), useStreamCapableState);
if (workerSourceConfig.getState() != null) {
stateWrapper = StateMessageHelper.getTypedState(workerSourceConfig.getState().getState(), useStreamCapableState);

if (stateWrapper.isPresent() &&
stateWrapper.get().getStateType() == StateType.LEGACY &&
!isResetAllStreamsInCatalog(workerSourceConfig)) {
log.error("The state a legacy one but we are trying to do a partial update, this is not supported.");
throw new IllegalStateException("Try to perform a partial reset on a legacy state");
if (stateWrapper.isPresent() &&
stateWrapper.get().getStateType() == StateType.LEGACY &&
!isResetAllStreamsInCatalog(workerSourceConfig)) {
log.error("The state a legacy one but we are trying to do a partial update, this is not supported.");
throw new IllegalStateException("Try to perform a partial reset on a legacy state");
}

isResetBasedForConfig = true;
} else {
/// No state
isResetBasedForConfig = false;
}

isResetBasedForConfig = true;
}
}
isStarted = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,39 @@ public void testLegacyWithNewConfig() throws Exception {
.isEmpty();
}

@Test
public void testLegacyWithNullState() throws Exception {
final List<StreamDescriptor> streamToReset = getConfigStreamDescriptorFromName(Lists.newArrayList("a", "b", "c"));

final ResetSourceConfiguration resetSourceConfiguration = new ResetSourceConfiguration()
.withStreamsToReset(streamToReset);
final ConfiguredAirbyteCatalog airbyteCatalogWithExtraStream = new ConfiguredAirbyteCatalog()
.withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("a")),
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("b")),
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("c"))));

final WorkerSourceConfig workerSourceConfig = new WorkerSourceConfig()
.withSourceConnectionConfiguration(Jsons.jsonNode(resetSourceConfiguration))
.withCatalog(airbyteCatalogWithExtraStream);

emptyAirbyteSource.start(workerSourceConfig, null);

final Optional<AirbyteMessage> maybeMessage = emptyAirbyteSource.attemptRead();
Assertions.assertThat(maybeMessage)
.isNotEmpty();

final AirbyteMessage message = maybeMessage.get();
Assertions.assertThat(message.getType()).isEqualTo(Type.STATE);

final AirbyteStateMessage stateMessage = message.getState();
Assertions.assertThat(stateMessage.getType()).isEqualTo(AirbyteStateType.LEGACY);
Assertions.assertThat(stateMessage.getData()).isEqualTo(Jsons.emptyObject());

Assertions.assertThat(emptyAirbyteSource.attemptRead())
.isEmpty();
}

private void testReceiveNullStreamState(final StreamDescriptor streamDescriptor) {
final Optional<AirbyteMessage> maybeMessage = emptyAirbyteSource.attemptRead();
Assertions.assertThat(maybeMessage)
Expand Down

0 comments on commit e5f01f3

Please sign in to comment.