Skip to content

Commit

Permalink
Merge pull request #1726 from alibaba/opensource_2303
Browse files Browse the repository at this point in the history
Opensource 2303
  • Loading branch information
TrafalgarLuo committed Mar 22, 2023
2 parents f01836a + 85c2211 commit 2f5c9cf
Show file tree
Hide file tree
Showing 21 changed files with 24 additions and 327 deletions.
9 changes: 8 additions & 1 deletion README.md
Expand Up @@ -26,7 +26,7 @@ DataX本身作为数据同步框架,将不同数据源的同步抽象为从源

# Quick Start

##### Download [DataX下载地址](https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz)
##### Download [DataX下载地址](https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202303/datax.tar.gz)


##### 请点击:[Quick Start](https://github.com/alibaba/DataX/blob/master/userGuid.md)
Expand Down Expand Up @@ -70,6 +70,7 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N
| | Databend | || [](https://github.com/alibaba/DataX/blob/master/databendwriter/doc/databendwriter.md) |
| | Hive ||| [](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md)[](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) |
| | kudu | || [](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) |
| | selectdb | || [](https://github.com/alibaba/DataX/blob/master/selectdbwriter/doc/selectdbwriter.md) |
| 无结构化数据存储 | TxtFile ||| [](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md)[](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md) |
| | FTP ||| [](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md)[](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md) |
| | HDFS ||| [](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md)[](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) |
Expand Down Expand Up @@ -108,6 +109,12 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N

DataX 后续计划月度迭代更新,也欢迎感兴趣的同学提交 Pull requests,月度更新内容会介绍介绍如下。

- [datax_v202303]https://github.com/alibaba/DataX/releases/tag/datax_v202303)
- 精简代码
- 新增插件(adbmysqlwriter、databendwriter、selectdbwriter)
- 优化插件、修复问题(sqlserver、hdfs、cassandra、kudu、oss)
- fastjson 升级到 fastjson2

- [datax_v202210]https://github.com/alibaba/DataX/releases/tag/datax_v202210)
- 涉及通道能力更新(OceanBase、Tdengine、Doris等)

Expand Down
1 change: 0 additions & 1 deletion adswriter/doc/adswriter.md
Expand Up @@ -110,7 +110,6 @@ DataX 将数据直连ADS接口,利用ADS暴露的INSERT接口直写到ADS。
"account": "xxx@aliyun.com",
"odpsServer": "xxx",
"tunnelServer": "xxx",
"accountType": "aliyun",
"project": "transfer_project"
},
"writeMode": "load",
Expand Down
Expand Up @@ -12,15 +12,13 @@ public class TransferProjectConf {
public final static String KEY_ACCOUNT = "odps.account";
public final static String KEY_ODPS_SERVER = "odps.odpsServer";
public final static String KEY_ODPS_TUNNEL = "odps.tunnelServer";
public final static String KEY_ACCOUNT_TYPE = "odps.accountType";
public final static String KEY_PROJECT = "odps.project";

private String accessId;
private String accessKey;
private String account;
private String odpsServer;
private String odpsTunnel;
private String accountType;
private String project;

public static TransferProjectConf create(Configuration adsWriterConf) {
Expand All @@ -30,7 +28,6 @@ public static TransferProjectConf create(Configuration adsWriterConf) {
res.account = adsWriterConf.getString(KEY_ACCOUNT);
res.odpsServer = adsWriterConf.getString(KEY_ODPS_SERVER);
res.odpsTunnel = adsWriterConf.getString(KEY_ODPS_TUNNEL);
res.accountType = adsWriterConf.getString(KEY_ACCOUNT_TYPE, "aliyun");
res.project = adsWriterConf.getString(KEY_PROJECT);
return res;
}
Expand All @@ -55,9 +52,6 @@ public String getOdpsTunnel() {
return odpsTunnel;
}

public String getAccountType() {
return accountType;
}

public String getProject() {
return project;
Expand Down
Expand Up @@ -31,7 +31,6 @@ public class PerfTrace {
private int taskGroupId;
private int channelNumber;

private int priority;
private int batchSize = 500;
private volatile boolean perfReportEnable = true;

Expand All @@ -54,12 +53,12 @@ public class PerfTrace {
* @param taskGroupId
* @return
*/
public static PerfTrace getInstance(boolean isJob, long jobId, int taskGroupId, int priority, boolean enable) {
public static PerfTrace getInstance(boolean isJob, long jobId, int taskGroupId, boolean enable) {

if (instance == null) {
synchronized (lock) {
if (instance == null) {
instance = new PerfTrace(isJob, jobId, taskGroupId, priority, enable);
instance = new PerfTrace(isJob, jobId, taskGroupId, enable);
}
}
}
Expand All @@ -76,22 +75,21 @@ public static PerfTrace getInstance() {
LOG.error("PerfTrace instance not be init! must have some error! ");
synchronized (lock) {
if (instance == null) {
instance = new PerfTrace(false, -1111, -1111, 0, false);
instance = new PerfTrace(false, -1111, -1111, false);
}
}
}
return instance;
}

private PerfTrace(boolean isJob, long jobId, int taskGroupId, int priority, boolean enable) {
private PerfTrace(boolean isJob, long jobId, int taskGroupId, boolean enable) {
try {
this.perfTraceId = isJob ? "job_" + jobId : String.format("taskGroup_%s_%s", jobId, taskGroupId);
this.enable = enable;
this.isJob = isJob;
this.taskGroupId = taskGroupId;
this.instId = jobId;
this.priority = priority;
LOG.info(String.format("PerfTrace traceId=%s, isEnable=%s, priority=%s", this.perfTraceId, this.enable, this.priority));
LOG.info(String.format("PerfTrace traceId=%s, isEnable=%s", this.perfTraceId, this.enable));

} catch (Exception e) {
// do nothing
Expand Down Expand Up @@ -398,7 +396,6 @@ public synchronized JobStatisticsDto2 getReports(String mode) {
jdo.setWindowEnd(this.windowEnd);
jdo.setJobStartTime(jobStartTime);
jdo.setJobRunTimeMs(System.currentTimeMillis() - jobStartTime.getTime());
jdo.setJobPriority(this.priority);
jdo.setChannelNum(this.channelNumber);
jdo.setCluster(this.cluster);
jdo.setJobDomain(this.jobDomain);
Expand Down Expand Up @@ -609,7 +606,6 @@ class JobStatisticsDto2 {
private Date jobStartTime;
private Date jobEndTime;
private Long jobRunTimeMs;
private Integer jobPriority;
private Integer channelNum;
private String cluster;
private String jobDomain;
Expand Down Expand Up @@ -680,10 +676,6 @@ public Long getJobRunTimeMs() {
return jobRunTimeMs;
}

public Integer getJobPriority() {
return jobPriority;
}

public Integer getChannelNum() {
return channelNum;
}
Expand Down Expand Up @@ -816,10 +808,6 @@ public void setJobRunTimeMs(Long jobRunTimeMs) {
this.jobRunTimeMs = jobRunTimeMs;
}

public void setJobPriority(Integer jobPriority) {
this.jobPriority = jobPriority;
}

public void setChannelNum(Integer channelNum) {
this.channelNum = channelNum;
}
Expand Down

This file was deleted.

9 changes: 1 addition & 8 deletions core/src/main/java/com/alibaba/datax/core/Engine.java
Expand Up @@ -79,16 +79,9 @@ public void start(Configuration allConf) {
perfReportEnable = false;
}

int priority = 0;
try {
priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY"));
}catch (NumberFormatException e){
LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY"));
}

Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
//初始化PerfTrace
PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, traceEnable);
perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber);
container.start();

Expand Down
Expand Up @@ -134,7 +134,7 @@ public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPlu
break;
case BOOLEAN:
synchronized (lock) {
row.addBoolean(name, Boolean.getBoolean(rawData));
row.addBoolean(name, Boolean.parseBoolean(rawData));
}
break;
case STRING:
Expand Down
3 changes: 1 addition & 2 deletions mongodbreader/doc/mongodbreader.md
Expand Up @@ -114,8 +114,7 @@ MongoDBReader通过Datax框架从MongoDB并行的读取数据,通过主控的J
"accessKey": "********************",
"truncate": true,
"odpsServer": "xxx/api",
"tunnelServer": "xxx",
"accountType": "aliyun"
"tunnelServer": "xxx"
}
}
}
Expand Down
Expand Up @@ -14,20 +14,9 @@ public class Constant {

public static final String PARTITION_SPLIT_MODE = "partition";

public static final String DEFAULT_ACCOUNT_TYPE = "aliyun";

public static final String TAOBAO_ACCOUNT_TYPE = "taobao";

// 常量字段用COLUMN_CONSTANT_FLAG 首尾包住即可
public final static String COLUMN_CONSTANT_FLAG = "'";

/**
* 以下是获取accesskey id 需要用到的常量值
*/
public static final String SKYNET_ACCESSID = "SKYNET_ACCESSID";

public static final String SKYNET_ACCESSKEY = "SKYNET_ACCESSKEY";

public static final String PARTITION_COLUMNS = "partitionColumns";

public static final String PARSED_COLUMNS = "parsedColumns";
Expand Down
Expand Up @@ -24,9 +24,6 @@ public class Key {
// 当值为:partition 则只切分到分区;当值为:record,则当按照分区切分后达不到adviceNum时,继续按照record切分
public final static String SPLIT_MODE = "splitMode";

// 账号类型,默认为aliyun,也可能为taobao等其他类型
public final static String ACCOUNT_TYPE = "accountType";

public final static String PACKAGE_AUTHORIZED_PROJECT = "packageAuthorizedProject";

public final static String IS_COMPRESS = "isCompress";
Expand Down
Expand Up @@ -42,12 +42,6 @@ public void init() {
this.originalConfig = super.getPluginJobConf();
this.successOnNoPartition = this.originalConfig.getBool(Key.SUCCESS_ON_NO_PATITION, false);

//如果用户没有配置accessId/accessKey,尝试从环境变量获取
String accountType = originalConfig.getString(Key.ACCOUNT_TYPE, Constant.DEFAULT_ACCOUNT_TYPE);
if (Constant.DEFAULT_ACCOUNT_TYPE.equalsIgnoreCase(accountType)) {
this.originalConfig = IdAndKeyUtil.parseAccessIdAndKey(this.originalConfig);
}

//检查必要的参数配置
OdpsUtil.checkNecessaryConfig(this.originalConfig);
//重试次数的配置检查
Expand Down

This file was deleted.

Expand Up @@ -76,19 +76,12 @@ public static Odps initOdps(Configuration originalConfig) {
defaultProject = packageAuthorizedProject;
}

String accountType = originalConfig.getString(Key.ACCOUNT_TYPE,
Constant.DEFAULT_ACCOUNT_TYPE);

Account account = null;
if (accountType.equalsIgnoreCase(Constant.DEFAULT_ACCOUNT_TYPE)) {
if (StringUtils.isNotBlank(securityToken)) {
account = new StsAccount(accessId, accessKey, securityToken);
} else {
account = new AliyunAccount(accessId, accessKey);
}
if (StringUtils.isNotBlank(securityToken)) {
account = new StsAccount(accessId, accessKey, securityToken);
} else {
throw DataXException.asDataXException(OdpsReaderErrorCode.ACCOUNT_TYPE_ERROR,
MESSAGE_SOURCE.message("odpsutil.3", accountType));
account = new AliyunAccount(accessId, accessKey);
}

Odps odps = new Odps(account);
Expand Down

0 comments on commit 2f5c9cf

Please sign in to comment.