Skip to content

Commit

Permalink
[INLONG-2669][Manager] Optimizing inlong-manager source module code s…
Browse files Browse the repository at this point in the history
…tructure (#2669)
  • Loading branch information
herywang committed Feb 24, 2022
1 parent ff27e05 commit 9d0479c
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 159 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.service.source;

import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.pojo.source.SourceRequest;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import javax.validation.constraints.NotNull;
import java.util.Date;

public abstract class AbstractStreamSourceOperation implements StreamSourceOperation {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractStreamSourceOperation.class);
@Autowired
protected StreamSourceEntityMapper sourceMapper;

/**
* Setting the parameters of the latest entity.
*
* @param request source request
* @param targetEntity entity object which will set the new parameters.
*/
protected abstract void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity);

/**
* Getting the source type.
*
* @return source type string.
*/
protected abstract String getSourceType();

/**
* Creating source response object.
*
* @return response object.
*/
protected abstract SourceResponse getResponse();

@Override
public SourceResponse getById(@NotNull Integer id) {
StreamSourceEntity entity = sourceMapper.selectByPrimaryKey(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
String existType = entity.getSourceType();
Preconditions.checkTrue(getSourceType().equals(existType),
String.format(Constant.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
return this.getFromEntity(entity, this::getResponse);
}

@Override
public void updateOpt(SourceRequest request, String operator) {
StreamSourceEntity entity = sourceMapper.selectByPrimaryKey(request.getId());
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
// Setting updated parameters of stream source entity.
setTargetEntity(request, entity);
entity.setPreviousStatus(entity.getStatus());
entity.setStatus(EntityStatus.GROUP_CONFIG_ING.getCode());
entity.setModifier(operator);
entity.setModifyTime(new Date());
sourceMapper.updateByPrimaryKeySelective(entity);
LOGGER.info("success to update source of type={}", request.getSourceType());
}

@Override
public Integer saveOpt(SourceRequest request, String operator) {
StreamSourceEntity entity = CommonBeanUtils.copyProperties(request, StreamSourceEntity::new);
entity.setStatus(EntityStatus.SOURCE_NEW.getCode());
entity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
entity.setCreator(operator);
entity.setModifier(operator);
Date now = new Date();
entity.setCreateTime(now);
entity.setModifyTime(now);
// get the ext params
setTargetEntity(request, entity);
sourceMapper.insert(entity);
return entity.getId();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,12 @@ public interface StreamSourceOperation {
Integer saveOpt(SourceRequest request, String operator);

/**
* Get source info by source type and source id.
* Get source info by source id.
*
* @param sourceType Source type.
* @param id Source id.
* @return Source info.
*/
SourceResponse getById(String sourceType, Integer id);
SourceResponse getById(Integer id);

/**
* Get the target from the given entity.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public Integer save(SourceRequest request, String operator) {
public SourceResponse get(Integer id, String sourceType) {
LOGGER.debug("begin to get source by id={}, sourceType={}", id, sourceType);
StreamSourceOperation operation = operationFactory.getInstance(SourceType.getType(sourceType));
SourceResponse sourceResponse = operation.getById(sourceType, id);
SourceResponse sourceResponse = operation.getById(id);
LOGGER.debug("success to get source info");
return sourceResponse;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.github.pagehelper.PageInfo;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
Expand All @@ -36,92 +35,34 @@
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.service.source.StreamSourceOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.inlong.manager.service.source.AbstractStreamSourceOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.validation.constraints.NotNull;
import java.util.Date;
import java.util.function.Supplier;

/**
* Binlog source operation
*/
@Service
public class BinlogStreamSourceOperation implements StreamSourceOperation {

private static final Logger LOGGER = LoggerFactory.getLogger(BinlogStreamSourceOperation.class);
public class BinlogStreamSourceOperation extends AbstractStreamSourceOperation {

@Autowired
private ObjectMapper objectMapper;
@Autowired
private StreamSourceEntityMapper sourceMapper;

@Override
public Boolean accept(SourceType sourceType) {
return SourceType.DB_BINLOG == sourceType;
}

@Override
public Integer saveOpt(SourceRequest request, String operator) {
String sourceType = request.getSourceType();
Preconditions.checkTrue(Constant.SOURCE_DB_BINLOG.equals(sourceType),
ErrorCodeEnum.SOURCE_TYPE_NOT_SUPPORT.getMessage() + ": " + sourceType);

BinlogSourceRequest sourceRequest = (BinlogSourceRequest) request;
StreamSourceEntity entity = CommonBeanUtils.copyProperties(sourceRequest, StreamSourceEntity::new);
entity.setStatus(EntityStatus.SOURCE_NEW.getCode());
entity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
entity.setCreator(operator);
entity.setModifier(operator);
Date now = new Date();
entity.setCreateTime(now);
entity.setModifyTime(now);

// get the ext params
BinlogSourceDTO dto = BinlogSourceDTO.getFromRequest(sourceRequest);
try {
entity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.SOURCE_SAVE_FAILED);
}
sourceMapper.insert(entity);

Integer sourceId = entity.getId();
request.setId(sourceId);
return sourceId;
protected String getSourceType() {
return Constant.SOURCE_DB_BINLOG;
}

@Override
public SourceResponse getById(@NotNull String sourceType, @NotNull Integer id) {
StreamSourceEntity entity = sourceMapper.selectByPrimaryKey(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
String existType = entity.getSourceType();
Preconditions.checkTrue(Constant.SOURCE_DB_BINLOG.equals(existType),
String.format(Constant.SOURCE_TYPE_NOT_SAME, Constant.SOURCE_DB_BINLOG, existType));

return this.getFromEntity(entity, BinlogSourceResponse::new);
}

@Override
public <T> T getFromEntity(StreamSourceEntity entity, Supplier<T> target) {
T result = target.get();
if (entity == null) {
return result;
}

String existType = entity.getSourceType();
Preconditions.checkTrue(Constant.SOURCE_DB_BINLOG.equals(existType),
String.format(Constant.SOURCE_TYPE_NOT_SAME, Constant.SOURCE_DB_BINLOG, existType));

BinlogSourceDTO dto = BinlogSourceDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(entity, result, true);
CommonBeanUtils.copyProperties(dto, result, true);

return result;
protected SourceResponse getResponse() {
return new BinlogSourceResponse();
}

@Override
Expand All @@ -133,29 +74,29 @@ public PageInfo<? extends SourceListResponse> getPageInfo(Page<StreamSourceEntit
}

@Override
public void updateOpt(SourceRequest request, String operator) {
String sourceType = request.getSourceType();
Preconditions.checkTrue(Constant.SOURCE_DB_BINLOG.equals(sourceType),
String.format(Constant.SOURCE_TYPE_NOT_SAME, Constant.SOURCE_DB_BINLOG, sourceType));

StreamSourceEntity entity = sourceMapper.selectByPrimaryKey(request.getId());
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
protected void setTargetEntity(SourceRequest request, StreamSourceEntity targetEntity) {
BinlogSourceRequest sourceRequest = (BinlogSourceRequest) request;
CommonBeanUtils.copyProperties(sourceRequest, entity, true);
CommonBeanUtils.copyProperties(sourceRequest, targetEntity, true);
try {
BinlogSourceDTO dto = BinlogSourceDTO.getFromRequest(sourceRequest);
entity.setExtParams(objectMapper.writeValueAsString(dto));
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
}

entity.setPreviousStatus(entity.getStatus());
entity.setStatus(EntityStatus.GROUP_CONFIG_ING.getCode());
entity.setModifier(operator);
entity.setModifyTime(new Date());
sourceMapper.updateByPrimaryKeySelective(entity);

LOGGER.info("success to update source of type={}", sourceType);
}

@Override
public <T> T getFromEntity(StreamSourceEntity entity, Supplier<T> target) {
T result = target.get();
if (entity == null) {
return result;
}
String existType = entity.getSourceType();
Preconditions.checkTrue(getSourceType().equals(existType),
String.format(Constant.SOURCE_TYPE_NOT_SAME, getSourceType(), existType));
BinlogSourceDTO dto = BinlogSourceDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(entity, result, true);
CommonBeanUtils.copyProperties(dto, result, true);
return result;
}
}
Loading

0 comments on commit 9d0479c

Please sign in to comment.