Skip to content

Commit

Permalink
fix bug: add check file isUploaded, add table for file transport stat…
Browse files Browse the repository at this point in the history
…us (#631)
  • Loading branch information
anyspa committed Jun 30, 2020
1 parent a564768 commit cd4ce22
Show file tree
Hide file tree
Showing 37 changed files with 501 additions and 209 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,4 @@ public class WeEventConfig {

@Value("${mqtt.broker.keepalive:60}")
private Integer keepAlive;

@Value("${file.path:./logs/file}")
private String filePath;

@Value("${file.chunk.size:1048576}")
private int fileChunkSize;
}
6 changes: 1 addition & 5 deletions weevent-broker/src/main/resources/weevent.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,4 @@ block.chain.type=fisco
stomp.heartbeats=30
#mqtt broker
#mqtt.broker.tcp.port=7001
mqtt.broker.keepalive=60
#upload/download path
file.chunk.path=~/file
#chunk size default 1MB
file.chunk.size=1048576
mqtt.broker.keepalive=60
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,4 @@ public SendResult(SendResultStatus status) {
this.status = status;
}

public SendResult(SendResultStatus status, String topic, String eventId) {
this.status = status;
this.topic = topic;
this.eventId = eventId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ static IWeEventFileClient build(String groupId, String filePath, FtpInfo ftpInfo
* @param topic topic name
* @param publicPem public pem path string
* @throws BrokerException broker exception
* @throws BrokerException BrokerException
* @throws IOException IOException
*/
void openTransport4Sender(String topic, String publicPem) throws BrokerException, IOException;

Expand All @@ -59,7 +59,6 @@ static IWeEventFileClient build(String groupId, String filePath, FtpInfo ftpInfo
* @return send result, SendResult.SUCCESS if success, and return SendResult.eventId
* @throws BrokerException broker exception
* @throws IOException IOException
* @throws BrokerException BrokerException
*/
FileChunksMeta publishFile(String topic, String localFile, boolean overwrite) throws BrokerException, IOException;

Expand Down Expand Up @@ -172,5 +171,14 @@ interface FileListener {
*/
void genPemFile(String filePath) throws BrokerException;

/**
* Check if the receiver end has a file.
*
* @param fileName file name
* @param topic topic name
* @param groupId group id
* @return is file exist
* @throws BrokerException BrokerException
*/
boolean isFileExist(String fileName, String topic, String groupId) throws BrokerException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -365,8 +365,6 @@ public void genPemFile(String filePath) throws BrokerException {
PemFile privatePemFile = new PemFile(pair.getPrivate(), PRIVATE_KEY_DESC);
PemFile publicPemFile = new PemFile(pair.getPublic(), PUBLIC_KEY_DESC);


System.out.println(filePath + PATH_SEPARATOR + account + PRIVATE_KEY_SUFFIX);
privatePemFile.write(filePath + PATH_SEPARATOR + account + PRIVATE_KEY_SUFFIX);
publicPemFile.write(filePath + PATH_SEPARATOR + account + PUBLIC_KEY_SUFFIX);
} catch (IOException | NoSuchProviderException | NoSuchAlgorithmException | InvalidAlgorithmParameterException e) {
Expand Down Expand Up @@ -416,7 +414,7 @@ public interface EventListener {
void onException(Throwable e);
}

static class FileEventListener implements EventListener{
static class FileEventListener implements EventListener {
private final String receivePath;
private final FtpInfo ftpInfo;
private final FileListener fileListener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,11 @@ public class ConstantProperties {
public final static int UPLOAD_CHUNK_FAIL_RETRY_COUNT = 5;
public final static long WAIT1S = 1000;


// file transport status
public final static String UPLOADING = "0";
public final static String SUCCESS = "1";
public final static String FAILED = "2";


}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ public enum ErrorCode {
FILE_CHUNK_INDEX_ILLEGAL(102007, "the upload file chunk idx is illegal"),
FILE_CHUNK_DATA_IS_NULL(102008, "the upload file chunk data is null"),
FILE_UPLOAD_FAILED(102009, "file upload failed"),
FILE_NOT_EXIST(1020010, "file not exist"),
FILE_DOWNLOAD_ERROR(102011, "file download failed"),
GENERATE_PEM_FAILED(102012, "generate pem key failed"),
CHECK_FILE_IS_UPLOADED_ERROR(102013, "check file is uploaded error"),

PARSE_CHUNK_REQUEST_ERROR(102020, "parse chunk request error"),
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import com.webank.weevent.governance.common.GovernanceException;
import com.webank.weevent.governance.common.GovernanceResult;
import com.webank.weevent.governance.entity.FileTransportEntity;
import com.webank.weevent.governance.entity.FileTransportChannelEntity;
import com.webank.weevent.governance.service.FileService;
import com.webank.weevent.governance.utils.ParamCheckUtils;

Expand Down Expand Up @@ -52,7 +52,7 @@ public void setFileService(FileService fileService) {

@PostMapping(path = "/openTransport")
@ResponseBody
public GovernanceResult openTransport(@RequestBody FileTransportEntity fileTransport) throws GovernanceException {
public GovernanceResult openTransport(@RequestBody FileTransportChannelEntity fileTransport) throws GovernanceException {
log.info("openTransport, fileTransport:{}.", fileTransport.toString());

return this.fileService.openTransport(fileTransport);
Expand Down Expand Up @@ -80,20 +80,19 @@ public GovernanceResult prepareUploadFile(@RequestParam(name = "groupId") String
}

@RequestMapping(path = "/download")
public void download(@RequestParam(name = "groupId") String groupId,
@RequestParam(name = "brokerId") Integer brokerId,
@RequestParam(name = "fileId") String fileId,
public void download(@RequestParam(name = "topic") String topic,
@RequestParam(name = "fileName") String fileName,
HttpServletResponse response) throws GovernanceException {
log.info("download file, groupId:{}, brokerId:{}, fileId:{}.", groupId, brokerId, fileId);
log.info("download file, topic:{}, fileName:{}.", topic, fileName);
response.setHeader("content-type", "application/octet-stream");
response.setContentType("application/octet-stream; charset=UTF-8");

ParamCheckUtils.validateFileId(fileId);
String downloadFile = this.fileService.downloadFile(groupId, brokerId, fileId);
ParamCheckUtils.validateFileName(fileName);
String downloadFile = this.fileService.downloadFile(topic, fileName);
if (StringUtils.isBlank(downloadFile)) {
throw new GovernanceException("download file not exist");
}
String fileName = downloadFile.substring(downloadFile.lastIndexOf("/") + 1);

try {
response.setHeader("filename", URLEncoder.encode(fileName, StandardCharsets.UTF_8.toString()));
} catch (UnsupportedEncodingException e) {
Expand All @@ -110,9 +109,9 @@ public void download(@RequestParam(name = "groupId") String groupId,
os.flush();
i = bis.read(buffer);
}
log.info("download file success, fileId:{}, fileName:{}", fileId, fileName);
log.info("download file success, topic:{}, fileName:{}", topic, fileName);
} catch (IOException e) {
log.error("download file error, groupId:{} fileId:{}", groupId, fileId, e);
log.error("download file error, topic:{} fileName:{}", topic, fileName, e);
throw new GovernanceException("download file error", e);
}
}
Expand All @@ -126,14 +125,22 @@ public GovernanceResult listFile(@RequestParam(name = "groupId") String groupId,
return this.fileService.listFile(groupId, brokerId, topicName);
}

@RequestMapping(path = "/status")
@RequestMapping(path = "/downLoadStatus")
@ResponseBody
public GovernanceResult downLoadStatus(@RequestParam(name = "groupId") String groupId,
@RequestParam(name = "brokerId") Integer brokerId,
@RequestParam(name = "topicName") String topicName) throws GovernanceException {
log.info("status, groupId:{}, topic:{}.", groupId, topicName);
return this.fileService.downLoadStatus(groupId, brokerId, topicName);
}

@RequestMapping(path = "/uploadStatus")
@ResponseBody
public GovernanceResult status(@RequestParam(name = "groupId") String groupId,
@RequestParam(name = "brokerId") Integer brokerId,
@RequestParam(name = "topicName") String topicName,
@RequestParam(name = "role") String role) throws GovernanceException {
log.info("status, groupId:{}, topic:{}, role:{}.", groupId, topicName, role);
return this.fileService.status(groupId, brokerId, topicName, role);
public GovernanceResult uploadStatus(@RequestParam(name = "groupId") String groupId,
@RequestParam(name = "brokerId") Integer brokerId,
@RequestParam(name = "topicName") String topicName) throws GovernanceException {
log.info("status, groupId:{}, topic:{}.", groupId, topicName);
return this.fileService.uploadStatus(groupId, brokerId, topicName);
}

@RequestMapping(path = "/listTransport")
Expand All @@ -146,7 +153,7 @@ public GovernanceResult listTransport(@RequestParam(name = "groupId") String gro

@PostMapping(path = "/closeTransport")
@ResponseBody
public GovernanceResult closeTransport(@RequestBody FileTransportEntity fileTransport) throws GovernanceException {
public GovernanceResult closeTransport(@RequestBody FileTransportChannelEntity fileTransport) throws GovernanceException {
log.info("closeTransport, groupId:{}, brokerId:{}, transportId:{}, roleId:{}, topic:{}.", fileTransport.getGroupId(),
fileTransport.getBrokerId(), fileTransport.getId(), fileTransport.getRole(), fileTransport.getTopicName());
return fileService.closeTransport(fileTransport);
Expand All @@ -160,4 +167,14 @@ public void genPemFile(@RequestParam(name = "groupId") String groupId,
this.fileService.genPemFile(groupId, brokerId, filePath);
}

@RequestMapping(path = "/checkUploaded")
@ResponseBody
public GovernanceResult checkFileIsUploaded(@RequestParam(name = "groupId") String groupId,
@RequestParam(name = "brokerId") Integer brokerId,
@RequestParam(name = "topicName") String topicName,
@RequestParam(name = "fileName") String fileName) throws GovernanceException {
log.info("checkFileIsUploaded, groupId:{}, topic:{}, fileName:{}.", groupId, topicName, fileName);
return this.fileService.checkFileIsUploaded(groupId, brokerId, topicName, fileName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import lombok.Setter;

/**
* FileTransportEntity class.
* FileTransportChannelEntity class.
*
* @author v_wbhwliu
* @version 1.3
Expand All @@ -23,10 +23,10 @@
@Getter
@EqualsAndHashCode(callSuper = false)
@Entity
@Table(name = "t_file_transport",
@Table(name = "t_file_transport_channel",
uniqueConstraints = {@UniqueConstraint(name = "topicBrokerGroupDelete",
columnNames = {"topic_name", "broker_id", "group_id"})})
public class FileTransportEntity extends TopicBase {
public class FileTransportChannelEntity extends TopicBase {

@Column(name = "role", columnDefinition = "varchar(1)")
private String role;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package com.webank.weevent.governance.entity;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Table;
import javax.persistence.Transient;
import javax.persistence.UniqueConstraint;

import com.webank.weevent.governance.entity.base.TopicBase;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;

/**
* FileTransportStatusEntity class.
*
* @author v_wbhwliu
* @version 1.3
* @since 2020/5/20
*/
@Setter
@Getter
@EqualsAndHashCode(callSuper = false)
@Entity
@Table(name = "t_file_transport_status",
uniqueConstraints = {@UniqueConstraint(name = "topicBrokerGroupFileName",
columnNames = {"broker_id", "group_id", "topic_name", "file_name"})})
public class FileTransportStatusEntity extends TopicBase {

@Column(name = "file_name", columnDefinition = "varchar(256)")
private String fileName;

@Column(name = "transport_status", columnDefinition = "varchar(1)")
private String status;

@Column(name = "file_md5", columnDefinition = "varchar(32)")
private String fileMD5;

@Column(name = "file_size")
private Long fileSize;

// cost time in second
@Transient
private String time;
// sender ready chunk
@Transient
private int readyChunk;
// processing
@Transient
private String process;
// speed in Byte/s
@Transient
private String speed;

// @Transient
// private FileChunksMetaStatus fileChunksMetaStatus;


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.webank.weevent.governance.repository;

import java.util.List;

import com.webank.weevent.governance.entity.FileTransportChannelEntity;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface TransportChannelRepository extends JpaRepository<FileTransportChannelEntity, Long> {

List<FileTransportChannelEntity> queryByBrokerIdAndGroupId(Integer brokerId, String groupId);
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.webank.weevent.governance.repository;

import java.util.List;

import com.webank.weevent.governance.entity.FileTransportStatusEntity;

import org.apache.ibatis.annotations.Param;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;

@Repository
public interface TransportStatusRepository extends JpaRepository<FileTransportStatusEntity, Long> {

List<FileTransportStatusEntity> queryByBrokerIdAndGroupIdAndTopicName(Integer brokerId, String groupId, String topicName);

FileTransportStatusEntity queryByBrokerIdAndGroupIdAndTopicNameAndFileName(Integer brokerId, String groupId, String topicName, String fileName);

@Transactional
@Modifying
@Query(value = "update t_file_transport_status set transport_status=:status where id =:id", nativeQuery = true)
void updateTransportStatus(@Param("transport_status") String status, @Param("id") Long id);
}

0 comments on commit cd4ce22

Please sign in to comment.