Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 73 additions & 111 deletions README-ZH.md

Large diffs are not rendered by default.

36 changes: 19 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,75 +33,77 @@ Please be patient, it will take some time to load gif.

       c. [Qualitis](https://github.com/WeBankFinTech/Qualitis) - Data Quality Management Tool

       d. [Azkaban](https://azkaban.github.io/) - Batch workflow job scheduler
       d. [Schedulis](https://github.com/WeBankFinTech/Schedulis) - Batch workflow job scheduler

       f. [Exchangis](https://github.com/WeBankFinTech/Exchangis) - Data Exchange Tool

![DSS one-stop video](images/en_US/readme/onestop.gif)

### 2. AppJoint, based on Linkis,defines a unique design concept
### 2. AppConn, based on Linkis,defines a unique design concept

       AppJoint——application joint, defining unified front-end and back-end
       AppConn——application connector, defining unified front-end and back-end
integration specifications, can quickly and easily integrate with external data application systems,
making them as part of DSS data application development.

       DSS arranges multiple AppJoints in series to form a workflow that supports real-time execution and scheduled execution. Users can complete the entire process development of data applications with simple drag and drop operations.
       DSS arranges multiple AppConns in series to form a workflow that supports real-time execution and scheduled execution. Users can complete the entire process development of data applications with simple drag and drop operations.

       Since AppJoint is integrated with Linkis, the external data application system shares the capabilities of resource management, concurrent limiting, and high performance. AppJoint also allows sharable context across system level and completely gets away from application silos.
       Since AppConn is integrated with Linkis, the external data application system shares the capabilities of resource management, concurrent limiting, and high performance. AppConn also allows sharable context across system level and completely gets away from application silos.

### 3. Project, as the management unit

       With Project as the management unit, DSS organizes and manages the business applications of each data application system, and defines a set of common standards for collaborative development of projects across data application systems.

### 4. Integrated data application components

      a. Azkaban AppJoint —— Batch workflow job scheduler
      a. Schedulis AppConn —— Batch workflow job scheduler

         Many data applications developed by users usually require periodic scheduling capability.

         At present, the open source scheduling system in the community is pretty unfriendly to integrate with other data application systems.

         DSS implements Azkaban AppJoint, which allows users to publish DSS workflows to Azkaban for regular scheduling.
         DSS implements Schedulis AppConn, which allows users to publish DSS workflows to Azkaban for regular scheduling.

         DSS also defines standard and generic workflow parsing and publishing specifications for scheduling systems, allowing other scheduling systems to easily achieve low-cost integration with DSS.

![Azkaban](images/en_US/readme/Azkaban_AppJoint.gif)

      b. Scriptis AppJoint —— Data Development IDE Tool
      b. Scriptis AppConn —— Data Development IDE Tool

         What is [Scriptis](https://github.com/WeBankFinTech/Scriptis)?

         Scriptis is for interactive data analysis with script development(SQL, Pyspark, HiveQL), task submission(Spark, Hive), UDF, function, resource management and intelligent diagnosis.

         Scriptis AppJoint integrates the data development capabilities of Scriptis to DSS, and allows various script types of Scriptis to serve as nodes in the DSS workflow to participate in the application development process.
         Scriptis AppConn integrates the data development capabilities of Scriptis to DSS, and allows various script types of Scriptis to serve as nodes in the DSS workflow to participate in the application development process.

         Currently supports HiveSQL, SparkSQL, Pyspark, Scala and other script node types.

![Scriptis](images/en_US/readme/Scriptis_AppJoint.gif)

      c. Visualis AppJoint —— Data Visualization Tool
      c. Visualis AppConn —— Data Visualization Tool

         What is [Visualis](https://github.com/WeBankFinTech/Visualis)?

         Visualis is a BI tool for data visualization. It provides financial-grade data visualization capabilities on the basis of data security and permissions, based on the open source project Davinci contributed by CreditEase.

         Visualis AppJoint integrates data visualization capabilities to DSS, and allows displays and dashboards, as nodes of DSS workflows, to be associated with upstream data market.
         Visualis AppConn integrates data visualization capabilities to DSS, and allows displays and dashboards, as nodes of DSS workflows, to be associated with upstream data market.

![Visualis](images/en_US/readme/Visualis_AppJoint.gif)

      d. Qualitis AppJoint —— Data quality management Tool
      d. Qualitis AppConn —— Data quality management Tool

         Qualitis AppJoint integrates data quality verification capabilities for DSS, allows Qualitis as a node in DSS workflow
         Qualitis AppConn integrates data quality verification capabilities for DSS, allows Qualitis as a node in DSS workflow

![Qualitis](images/en_US/readme/Qualitis_AppJoint.gif)

      e. Data Sender——Sender AppJoint
      e. Data Sender——Sender AppConn

         Sender AppJoint provides data delivery capability for DSS. Currently it supports the SendEmail node type, and the result sets of all other nodes can be sent via email.
         Sender AppConn provides data delivery capability for DSS. Currently it supports the SendEmail node type, and the result sets of all other nodes can be sent via email.

         For example, the SendEmail node can directly send the screen shot of a display as an email.

      f. Signal AppJoint —— Signal Nodes
      f. Signal AppConn —— Signal Nodes

         Signal AppJoint is used to strengthen the correlation between business and process while keeping them decoupled.
         Signal AppConn is used to strengthen the correlation between business and process while keeping them decoupled.

         DataChecker Node:Checks whether a table or partition exists.

Expand Down
7 changes: 4 additions & 3 deletions plugins/azkaban/linkis-jobtype/bin/install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@ fi

echo "start to subsitution conf"
sed -i "s#jobtype.lib.dir.*#jobtype.lib.dir=$AZKABAN_JOBTYPE_DIR/linkis/lib#g" ${workDir}/private.properties
sed -i "s#wds.linkis.gateway.url.*#wds.linkis.gateway.url=$LINKIS_GATEWAY_URL#g" ${workDir}/plugin.properties
sed -i "s#wds.linkis.gateway.url.v0.*#wds.linkis.gateway.url.v0=$LINKIS_GATEWAY_URL#g" ${workDir}/plugin.properties
sed -i "s#wds.linkis.gateway.url.v1.*#wds.linkis.gateway.url.v1=$LINKIS_GATEWAY_URL#g" ${workDir}/plugin.properties
sed -i "s#wds.linkis.client.flow.author.user.token.*#wds.linkis.client.flow.author.user.token=$LINKIS_GATEWAY_TOKEN#g" ${workDir}/plugin.properties
isSuccess "subsitution conf"

echo "$COPY Plugin"
ssh -p $SSH_PORT $AZKABAN_EXECUTOR_HOST "cd $AZKABAN_JOBTYPE_DIR;rm -rf linkis-bak; mv -f linkis ../linkis-bak"
##ssh -p $SSH_PORT $AZKABAN_EXECUTOR_HOST "cd $AZKABAN_JOBTYPE_DIR;rm -rf linkis-bak; mv -f linkis ../linkis-bak"

scp -P $SSH_PORT -r ${workDir} $AZKABAN_EXECUTOR_HOST:$AZKABAN_JOBTYPE_DIR

echo "reload jobType"

curl $AZKABAN_EXECUTOR_URL
##curl $AZKABAN_EXECUTOR_URL
27 changes: 20 additions & 7 deletions plugins/azkaban/linkis-jobtype/pom.xml
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2019 WeBank
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
Expand All @@ -23,12 +22,11 @@
<parent>
<artifactId>dss</artifactId>
<groupId>com.webank.wedatasphere.dss</groupId>
<version>0.9.1</version>
<version>1.0.0</version>
</parent>
<groupId>com.webank.wedatasphere.dss</groupId>
<artifactId>linkis-jobtype</artifactId>
<properties>
<azkaban.version>2.5.0</azkaban.version>
<azkaban.version>0.6.1</azkaban.version>
</properties>

<dependencies>
Expand All @@ -40,9 +38,19 @@
</dependency>

<dependency>
<groupId>com.linkedin.azkaban</groupId>
<artifactId>azkaban</artifactId>
<groupId>com.webank.wedatasphere.schedulis</groupId>
<artifactId>azkaban-common</artifactId>
<version>${azkaban.version}</version>
<exclusions>
<exclusion>
<groupId>com.webank.azkaban</groupId>
<artifactId>azkaban-spi</artifactId>
</exclusion>
<exclusion>
<groupId>com.webank.azkaban</groupId>
<artifactId>azkaban-db</artifactId>
</exclusion>
</exclusions>
<scope>provided</scope>
</dependency>

Expand Down Expand Up @@ -77,6 +85,11 @@
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>package</id>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
Expand All @@ -22,24 +21,24 @@
import azkaban.utils.Props;
import com.webank.wedatasphere.dss.linkis.node.execution.conf.LinkisJobExecutionConfiguration;
import com.webank.wedatasphere.dss.linkis.node.execution.execution.impl.LinkisNodeExecutionImpl;
import com.webank.wedatasphere.dss.linkis.node.execution.job.LinkisJob;
import com.webank.wedatasphere.dss.linkis.node.execution.job.Job;
import com.webank.wedatasphere.dss.linkis.node.execution.job.JobTypeEnum;
import com.webank.wedatasphere.dss.linkis.node.execution.job.LinkisJob;
import com.webank.wedatasphere.dss.linkis.node.execution.listener.LinkisExecutionListener;
import com.webank.wedatasphere.dss.plugins.azkaban.linkis.jobtype.job.JobBuilder;
import com.webank.wedatasphere.dss.plugins.azkaban.linkis.jobtype.log.AzkabanAppjointLog;
import org.apache.log4j.Logger;
import java.util.Map;
import com.webank.wedatasphere.dss.plugins.azkaban.linkis.jobtype.log.AzkabanAppConnLog;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;


/**
* Created by peacewong on 2019/9/19.
*/
public class AzkabanDssJobType extends AbstractJob {



private static final String SENSITIVE_JOB_PROP_NAME_SUFFIX = "_X";
private static final String SENSITIVE_JOB_PROP_VALUE_PLACEHOLDER = "[MASKED]";
private static final String JOB_DUMP_PROPERTIES_IN_LOG = "job.dump.properties";
Expand Down Expand Up @@ -82,23 +81,51 @@ public AzkabanDssJobType(String jobId, Props sysProps, Props jobProps, Logger lo

}


@Override
public void run() throws Exception {

info("Start to execute job");
logJobProperties();
String runDate = getRunDate();
if (StringUtils.isNotBlank(runDate)){
this.jobPropsMap.put("run_date", runDate);
}
this.job = JobBuilder.getAzkanbanBuilder().setJobProps(this.jobPropsMap).build();
this.job.setLogObj(new AzkabanAppjointLog(this.log));
this.job.setLogObj(new AzkabanAppConnLog(this.log));
if(JobTypeEnum.EmptyJob == ((LinkisJob)this.job).getJobType()){
this.log.warn("This node is empty type");
return;
}
// info("runtimeMap is " + job.getRuntimeParams());
//job.getRuntimeParams().put("workspace", getWorkspace(job.getUser()));
info("runtimeMap is " + job.getRuntimeParams());
LinkisNodeExecutionImpl.getLinkisNodeExecution().runJob(this.job);
LinkisNodeExecutionImpl.getLinkisNodeExecution().waitForComplete(this.job);

try {
LinkisNodeExecutionImpl.getLinkisNodeExecution().waitForComplete(this.job);
} catch (Exception e) {
this.log.warn("Failed to execute job", e);
//String reason = LinkisNodeExecutionImpl.getLinkisNodeExecution().getLog(this.job);
//this.log.error("Reason for failure: " + reason);
throw e;
}
try {
String endLog = LinkisNodeExecutionImpl.getLinkisNodeExecution().getLog(this.job);
this.log.info(endLog);
} catch (Throwable e){
this.log.info("Failed to get log", e);
}

LinkisExecutionListener listener = (LinkisExecutionListener)LinkisNodeExecutionImpl.getLinkisNodeExecution();
listener.onStatusChanged(null, LinkisNodeExecutionImpl.getLinkisNodeExecution().getState(this.job),this.job);
int resultSize = LinkisNodeExecutionImpl.getLinkisNodeExecution().getResultSize(this.job);
int resultSize = 0;
try{
resultSize = LinkisNodeExecutionImpl.getLinkisNodeExecution().getResultSize(this.job);
}catch(final Throwable t){
this.log.error("failed to get result size");
resultSize = -1;
}
for(int i =0; i < resultSize; i++){
this.log.info("The content of the " + (i + 1) + "th resultset is :"
+ LinkisNodeExecutionImpl.getLinkisNodeExecution().getResult(this.job, i, LinkisJobExecutionConfiguration.RESULT_PRINT_SIZE.getValue(this.jobPropsMap)));
Expand All @@ -109,7 +136,7 @@ public void run() throws Exception {

@Override
public void cancel() throws Exception {
super.cancel();
//super.cancel();
LinkisNodeExecutionImpl.getLinkisNodeExecution().cancel(this.job);
isCanceled = true;
warn("This job has been canceled");
Expand Down Expand Up @@ -149,4 +176,37 @@ private void logJobProperties() {
}
}

private String getRunDate(){
this.info("begin to get run date");
if (this.jobProps != null &&
this.jobProps.getBoolean(JOB_DUMP_PROPERTIES_IN_LOG, true)) {
try {
for (final Map.Entry<String, String> entry : this.jobPropsMap.entrySet()) {
final String key = entry.getKey();
final String value = key.endsWith(SENSITIVE_JOB_PROP_NAME_SUFFIX) ?
SENSITIVE_JOB_PROP_VALUE_PLACEHOLDER :
entry.getValue();
if ("azkaban.flow.start.timestamp".equals(key)){
this.info("run time is " + value);
String runDateNow = value.substring(0, 10).replaceAll("-", "");
this.info("run date now is " + runDateNow);
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
try {
Date date = simpleDateFormat.parse(runDateNow);
//因为date已经当天的00:00:00 减掉12小时 就是昨天的时间
String runDate = simpleDateFormat.format(new Date(date.getTime() - 24 * 60 * 60 * 1000));
this.info("runDate is " + runDate);
return runDate;
} catch (ParseException e) {
this.log.error("failed to parse run date " + runDateNow, e);
}
}
}
} catch (final Exception ex) {
this.log.error("failed to log job properties ", ex);
}
}
return null;
}

}
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
/*
* Copyright 2019 WeBank
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
Expand All @@ -19,15 +18,15 @@

import com.webank.wedatasphere.linkis.common.conf.CommonVars;

/**
* Created by peacewong on 2019/11/3.
*/

public class LinkisJobTypeConf {

public static final String COMMAND = "command";

public static final String JOB_ID = "azkaban.job.id";

public static final String DSS_LABELS_KEY = "labels";

public static final String FLOW_NAME = "azkaban.flow.flowid";

public static final String PROJECT_ID = "azkaban.flow.projectid";
Expand All @@ -41,12 +40,8 @@ public class LinkisJobTypeConf {

public static final String FLOW_SUBMIT_USER = "azkaban.flow.submituser";

public static final String READ_NODE_TOKEN = "read.nodes";

public static final String SHARED_NODE_TOKEN = "share.num";

public static final String MSG_SAVE_KEY = "msg.savekey";

public final static CommonVars<String> SIGNAL_NODES = CommonVars.apply("wds.dss.flow.signal.nodes","linkis.appjoint.eventchecker.eventreceiver");
public final static CommonVars<String> SIGNAL_NODES = CommonVars.apply("wds.dss.flow.signal.nodes","linkis.appconn.eventchecker.eventreceiver");

}
Loading