Skip to content

Commit

Permalink
[INLONG-7706][Manager] Fix sink is always in the configuration after …
Browse files Browse the repository at this point in the history
…being saved (#7707)
  • Loading branch information
fuweng11 committed Mar 27, 2023
1 parent 4d0c0b5 commit 9bedbac
Showing 1 changed file with 20 additions and 10 deletions.
Expand Up @@ -137,7 +137,10 @@ public Integer save(SinkRequest request, String operator) {
if (streamSuccess || StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) {
boolean enableCreateResource = InlongConstants.ENABLE_CREATE_RESOURCE.equals(
request.getEnableCreateResource());
SinkStatus nextStatus = enableCreateResource ? SinkStatus.CONFIG_ING : SinkStatus.CONFIG_SUCCESSFUL;
SinkStatus nextStatus = request.getStartProcess() ? SinkStatus.CONFIG_ING : SinkStatus.NEW;
if (!enableCreateResource) {
nextStatus = SinkStatus.CONFIG_SUCCESSFUL;
}
StreamSinkEntity sinkEntity = sinkMapper.selectByPrimaryKey(id);
sinkEntity.setStatus(nextStatus.getCode());
sinkMapper.updateStatus(sinkEntity);
Expand Down Expand Up @@ -196,7 +199,10 @@ public Integer save(SinkRequest request, UserInfo opInfo) {
if (streamSuccess || StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) {
boolean enableCreateResource = InlongConstants.ENABLE_CREATE_RESOURCE.equals(
request.getEnableCreateResource());
SinkStatus nextStatus = enableCreateResource ? SinkStatus.CONFIG_ING : SinkStatus.CONFIG_SUCCESSFUL;
SinkStatus nextStatus = request.getStartProcess() ? SinkStatus.CONFIG_ING : SinkStatus.NEW;
if (!enableCreateResource) {
nextStatus = SinkStatus.CONFIG_SUCCESSFUL;
}
StreamSinkEntity sinkEntity = sinkMapper.selectByPrimaryKey(id);
sinkEntity.setStatus(nextStatus.getCode());
sinkMapper.updateStatus(sinkEntity);
Expand Down Expand Up @@ -379,17 +385,19 @@ public Boolean update(SinkRequest request, String operator) {
}

SinkStatus nextStatus = null;
boolean streamSuccess = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus());
if (streamSuccess || StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) {
boolean enableConfig = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus())
|| StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus());
if (enableConfig) {
boolean enableCreateResource = InlongConstants.ENABLE_CREATE_RESOURCE.equals(
request.getEnableCreateResource());
nextStatus = enableCreateResource ? SinkStatus.CONFIG_ING : SinkStatus.CONFIG_SUCCESSFUL;
}
StreamSinkOperator sinkOperator = operatorFactory.getInstance(request.getSinkType());
sinkOperator.updateOpt(request, nextStatus, operator);

// If the stream is [CONFIG_SUCCESSFUL], then asynchronously start the [CREATE_STREAM_RESOURCE] process
if (streamSuccess && request.getStartProcess()) {
// If the stream is [CONFIG_SUCCESSFUL] or [CONFIG_FAILED], then asynchronously start the
// [CREATE_STREAM_RESOURCE] process
if (enableConfig && request.getStartProcess()) {
this.startProcessForSink(request.getInlongGroupId(), request.getInlongStreamId(), operator);
}

Expand Down Expand Up @@ -440,16 +448,18 @@ public Boolean update(SinkRequest request, UserInfo opInfo) {
}
// update record
SinkStatus nextStatus = null;
boolean streamSuccess = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus());
if (streamSuccess || StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) {
boolean enableConfig = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus())
|| StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus());
if (enableConfig) {
boolean enableCreateResource = InlongConstants.ENABLE_CREATE_RESOURCE.equals(
request.getEnableCreateResource());
nextStatus = enableCreateResource ? SinkStatus.CONFIG_ING : SinkStatus.CONFIG_SUCCESSFUL;
}
StreamSinkOperator sinkOperator = operatorFactory.getInstance(request.getSinkType());
sinkOperator.updateOpt(request, nextStatus, opInfo.getName());
// If the stream is [CONFIG_SUCCESSFUL], then asynchronously start the [CREATE_STREAM_RESOURCE] process
if (streamSuccess && request.getStartProcess()) {
// If the stream is [CONFIG_SUCCESSFUL] or [CONFIG_FAILED], then asynchronously start the
// [CREATE_STREAM_RESOURCE] process
if (enableConfig && request.getStartProcess()) {
this.startProcessForSink(request.getInlongGroupId(), request.getInlongStreamId(), opInfo.getName());
}
return true;
Expand Down

0 comments on commit 9bedbac

Please sign in to comment.