")
email.getEmailContents.foreach {
case emailContent: ArrayEmailContent =>
- emailContent.getContent.foreach(content => sb.append("").append(content).append(" |
"))
+ if (emailContent.getContent != null) {
+ emailContent.getContent.foreach(content => sb.append("").append(content).append(" |
"))
+ }
case emailContent: StringEmailContent =>
sb.append("").append(emailContent.getContent).append(" |
")
}
@@ -45,4 +51,13 @@ class MultiEmailContentGenerator extends AbstractEmailContentGenerator with Logg
email.setContent(sb.toString)
}
+ protected def setHtmlContent(email: MultiContentEmail): Unit = {
+ email.getEmailContents.foreach {
+ case emailContent: StringEmailContent =>
+ if (emailContent.getContent != null) {
+ email.setContent(emailContent.getContent)
+ }
+ }
+ }
+
}
\ No newline at end of file
diff --git a/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/AbstractEmailContentParser.scala b/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/AbstractEmailContentParser.scala
index 2bbcb6c217..92bfaec8f6 100644
--- a/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/AbstractEmailContentParser.scala
+++ b/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/AbstractEmailContentParser.scala
@@ -26,7 +26,7 @@ import org.apache.linkis.common.io.resultset.ResultSetReader
import org.apache.linkis.common.io.{MetaData, Record}
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.storage.LineRecord
-import org.apache.linkis.storage.resultset.ResultSetReader
+import org.apache.linkis.storage.resultset.ResultSetReaderFactory
import org.apache.commons.io.IOUtils
abstract class AbstractEmailContentParser[T] extends EmailContentParser {
@@ -41,8 +41,8 @@ abstract class AbstractEmailContentParser[T] extends EmailContentParser {
case _ =>
}
- protected def getResultSetReader(fsPathStore: FsPathStoreEmailContent): ResultSetReader[_ <: MetaData, _ <: Record] = {
- val reader = ResultSetReader.getResultSetReader(fsPathStore.getFsPath.getSchemaPath)
+ protected def getResultSetReader(fsPathStore: FsPathStoreEmailContent): ResultSetReader[_, _ ] = {
+ val reader = ResultSetReaderFactory.getResultSetReader(fsPathStore.getFsPath.getSchemaPath)
reader.getMetaData
reader
}
diff --git a/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/HtmlEmailContentParser.scala b/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/HtmlEmailContentParser.scala
index cab083f70c..d2b5520d88 100644
--- a/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/HtmlEmailContentParser.scala
+++ b/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/HtmlEmailContentParser.scala
@@ -16,12 +16,40 @@
package com.webank.wedatasphere.dss.appconn.sendemail.emailcontent.parser
-import com.webank.wedatasphere.dss.appconn.sendemail.email.domain.MultiContentEmail
+import com.webank.wedatasphere.dss.appconn.sendemail.email.domain.{CsvAttachment, MultiContentEmail, PngAttachment}
+import com.webank.wedatasphere.dss.appconn.sendemail.emailcontent.HtmlItem
import com.webank.wedatasphere.dss.appconn.sendemail.emailcontent.domain.HtmlEmailContent
+import org.apache.linkis.server.JSONUtils
+
+import java.nio.charset.StandardCharsets
+import java.util.Base64
+import scala.sys.error
object HtmlEmailContentParser extends AbstractEmailContentParser[HtmlEmailContent] {
override protected def parseEmailContent(emailContent: HtmlEmailContent,
multiContentEmail: MultiContentEmail): Unit = {
- getFirstLineRecord(emailContent).foreach(emailContent.setContent)
+ getFirstLineRecord(emailContent).foreach(htmlStr =>
+ emailContent.getFileType match {
+ case "html" =>
+ val htmlItems: Array[HtmlItem] = JSONUtils.gson.fromJson(htmlStr, classOf[Array[HtmlItem]])
+ htmlItems.foreach {
+ case htmlItem: HtmlItem =>
+ if (htmlItem.getFileType.equals("attachment") && htmlItem.getContentType.equals("csv")) {
+ val csvName = htmlItem.getFileName
+ val csvContent = htmlItem.getContent
+ val csvContentBytes = csvContent.getBytes
+ val csvBase64BaseContent = new String(Base64.getEncoder.encode(csvContentBytes), StandardCharsets.UTF_8)
+ multiContentEmail.addAttachment(new CsvAttachment(csvName, csvBase64BaseContent))
+ } else if (htmlItem.getContentType.equals("image") && htmlItem.getFileType.equals("inline")) {
+ multiContentEmail.addAttachment(new PngAttachment(htmlItem.getContentId, htmlItem.getContent))
+ } else if (htmlItem.getContentType.equals("html")) {
+ emailContent.setContent(htmlItem.getContent)
+ } else {
+ error("unknow content type: " + emailContent.getFileType)
+ }
+ }
+ case _ =>
+ }
+ )
}
}
diff --git a/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/PictureEmailContentParser.scala b/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/PictureEmailContentParser.scala
index 5162486081..6a39b16a14 100644
--- a/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/PictureEmailContentParser.scala
+++ b/dss-appconn/appconns/dss-sendemail-appconn/sendemail-appconn-core/src/main/scala/com/webank/wedatasphere/dss/appconn/sendemail/emailcontent/parser/PictureEmailContentParser.scala
@@ -17,36 +17,82 @@
package com.webank.wedatasphere.dss.appconn.sendemail.emailcontent.parser
import java.awt.image.BufferedImage
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
-import java.util.{Base64, UUID}
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, InputStream}
+import java.util
+import java.util.{Base64, Iterator, UUID}
-import com.webank.wedatasphere.dss.appconn.sendemail.email.domain.{AbstractEmail, MultiContentEmail, PngAttachment}
+import com.webank.wedatasphere.dss.appconn.sendemail.email.domain.{AbstractEmail, MultiContentEmail, PdfAttachment, PngAttachment}
import com.webank.wedatasphere.dss.appconn.sendemail.emailcontent.domain.PictureEmailContent
import org.apache.linkis.common.conf.Configuration
-import javax.imageio.ImageIO
+import javax.imageio.{ImageIO, ImageReader}
import org.apache.commons.codec.binary.Base64OutputStream
import com.webank.wedatasphere.dss.appconn.sendemail.conf.SendEmailAppConnConfiguration._
+import com.webank.wedatasphere.dss.appconn.sendemail.exception.EmailSendFailedException
+import javax.imageio.stream.ImageInputStream
+import org.apache.commons.io.IOUtils
+import org.apache.linkis.common.utils.Utils
object PictureEmailContentParser extends AbstractEmailContentParser[PictureEmailContent] {
override protected def parseEmailContent(emailContent: PictureEmailContent,
multiContentEmail: MultiContentEmail): Unit = {
getFirstLineRecord(emailContent).foreach { imageStr =>
- val decoder = Base64.getDecoder
- val byteArr = decoder.decode(imageStr)
- val inputStream = new ByteArrayInputStream(byteArr)
- val image = ImageIO.read(inputStream)
- val contents = generateImage(image, multiContentEmail)
- emailContent.setContent(contents)
+ emailContent.getFileType match {
+ case "checkData" =>
+ //对于邮件校验数据不进行处理
+ case "pdf" =>
+ val pdfUUID: String = UUID.randomUUID.toString
+ val pdfName = pdfUUID + ".pdf"
+ val decoder = Base64.getDecoder
+ val byteArr = decoder.decode(imageStr)
+ multiContentEmail.addAttachment(new PdfAttachment(pdfName, Base64.getEncoder.encodeToString(byteArr)))
+ case _ =>
+ var inputStream: ByteArrayInputStream = null
+ Utils.tryFinally({
+ val decoder = Base64.getDecoder
+ val byteArr = decoder.decode(imageStr)
+ if (CHECK_EMAIL_IMAGE_SWITCH.getValue) {
+ checkImageSize(byteArr)
+ }
+ inputStream = new ByteArrayInputStream(byteArr)
+ val image = ImageIO.read(inputStream)
+ val contents = generateImage(image, multiContentEmail)
+ emailContent.setContent(contents)
+ })(IOUtils.closeQuietly(inputStream))
+
+ }
}
}
+ protected def checkImageSize(byteArr: Array[Byte]): Unit = {
+ var reader: ImageReader = null
+ val inputStream: InputStream = new ByteArrayInputStream(byteArr)
+ var imageInputStream: ImageInputStream = null
+ Utils.tryFinally({
+ imageInputStream = ImageIO.createImageInputStream(inputStream)
+ val imageReaders: util.Iterator[ImageReader] = ImageIO.getImageReaders(imageInputStream)
+ if (!imageReaders.hasNext) throw new EmailSendFailedException(80002,"Unsupported image format!")
+ reader = imageReaders.next
+ reader.setInput(imageInputStream)
+ val height = reader.getHeight(0)
+ val width = reader.getWidth(0)
+ if ((height * width) > EMAIL_IMAGE_MAXSIZE.getValue) {
+ throw new EmailSendFailedException(80002, "too large picture size :" + (height * width) + ", expect max picture size is :" + EMAIL_IMAGE_MAXSIZE.getValue)
+ }
+ })({
+ if (reader != null) reader.dispose()
+ IOUtils.closeQuietly(imageInputStream)
+ IOUtils.closeQuietly(inputStream)
+ })
+
+ }
+
protected def generateImage(bufferedImage: BufferedImage, email: AbstractEmail): Array[String] = {
val imageUUID: String = UUID.randomUUID.toString
val width: Int = bufferedImage.getWidth
val height: Int = bufferedImage.getHeight
// 只支持修改visualis图片大小,后续如果有新增其他类型的邮件需要修改图片大小,需要在if中加上该邮件类型
- val imagesCuts = if (email.getEmailType.contains("visualis") && height > EMAIL_IMAGE_HEIGHT.getValue) {
+ val imagesCuts = if (height > EMAIL_IMAGE_HEIGHT.getValue) {
val numOfCut = Math.ceil(height.toDouble / EMAIL_IMAGE_HEIGHT.getValue).toInt
val realHeight = height / numOfCut
(0 until numOfCut).map(i => bufferedImage.getSubimage(0, i * realHeight, width, realHeight)).toArray
@@ -62,8 +108,7 @@ object PictureEmailContentParser extends AbstractEmailContentParser[PictureEmail
var iHeight = image.getHeight
var iWidth = image.getWidth
-
- if (email.getEmailType.contains("visualis") && iWidth > EMAIL_IMAGE_WIDTH.getValue) {
+ if (iWidth > EMAIL_IMAGE_WIDTH.getValue) {
iHeight = ((EMAIL_IMAGE_WIDTH.getValue.toDouble / iWidth.toDouble) * iHeight.toDouble).toInt
iWidth = EMAIL_IMAGE_WIDTH.getValue
}
diff --git a/dss-appconn/appconns/dss-sparketl-appconn/pom.xml b/dss-appconn/appconns/dss-sparketl-appconn/pom.xml
new file mode 100644
index 0000000000..5e79c264a3
--- /dev/null
+++ b/dss-appconn/appconns/dss-sparketl-appconn/pom.xml
@@ -0,0 +1,166 @@
+
+
+
+ dss
+ com.webank.wedatasphere.dss
+ 1.1.2
+ ../../../pom.xml
+
+ 4.0.0
+
+ dss-sparketl-appconn
+
+
+
+ com.webank.wedatasphere.dss
+ dss-appconn-core
+ ${dss.version}
+
+
+ linkis-common
+ org.apache.linkis
+
+
+ json4s-jackson_2.11
+ org.json4s
+
+
+ com.webank.wedatasphere.dss
+ dss-origin-sso-integration-standard
+
+
+
+
+
+ com.webank.wedatasphere.dss
+ dss-development-process-standard
+ ${dss.version}
+
+
+
+ com.webank.wedatasphere.dss
+ dss-development-process-standard-execution
+ ${dss.version}
+
+
+
+ org.apache.linkis
+ linkis-module
+ ${linkis.version}
+ provided
+
+
+ httpclient
+ org.apache.httpcomponents
+
+
+ true
+
+
+ org.apache.linkis
+ linkis-cs-common
+ ${linkis.version}
+ compile
+
+
+ linkis-bml-client
+
+
+ gson
+ com.google.code.gson
+
+
+ org.apache.linkis
+ ${linkis.version}
+ provided
+ true
+
+
+
+ org.apache.linkis
+ linkis-httpclient
+ ${linkis.version}
+
+
+ linkis-common
+ org.apache.linkis
+
+
+ json4s-jackson_2.11
+ org.json4s
+
+
+
+
+
+ org.apache.linkis
+ linkis-storage
+ ${linkis.version}
+ provided
+
+
+ linkis-common
+ org.apache.linkis
+
+
+
+
+
+ com.webank.wedatasphere.dss
+ dss-common
+ ${dss.version}
+ provided
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-deploy-plugin
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ 2.3
+ false
+
+
+ make-assembly
+ package
+
+ single
+
+
+
+ src/main/assembly/distribution.xml
+
+
+
+
+
+ false
+ out
+ false
+ false
+
+ src/main/assembly/distribution.xml
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/dss-appconn/appconns/dss-sparketl-appconn/src/main/assembly/distribution.xml b/dss-appconn/appconns/dss-sparketl-appconn/src/main/assembly/distribution.xml
new file mode 100644
index 0000000000..e54763fcae
--- /dev/null
+++ b/dss-appconn/appconns/dss-sparketl-appconn/src/main/assembly/distribution.xml
@@ -0,0 +1,76 @@
+
+
+
+ dss-sparketl-appconn
+
+ dir
+
+ true
+ sparketl
+
+
+
+
+
+ lib
+ true
+ true
+ false
+ true
+ true
+
+
+
+
+
+ ${basedir}/src/main/resources
+
+ appconn.properties
+
+ 0777
+ /
+ unix
+
+
+
+ ${basedir}/src/main/resources
+
+ log4j.properties
+ log4j2.xml
+
+ 0777
+ conf
+ unix
+
+
+
+ ${basedir}/src/main/resources
+
+ init.sql
+
+ 0777
+ db
+
+
+
+
+
+
+
diff --git a/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/SparkEtlAppConn.java b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/SparkEtlAppConn.java
new file mode 100644
index 0000000000..ce2c8e819d
--- /dev/null
+++ b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/SparkEtlAppConn.java
@@ -0,0 +1,29 @@
+package com.webank.wedatasphere.dss.appconn.sparketl;
+
+import com.webank.wedatasphere.dss.appconn.core.ext.OnlyDevelopmentAppConn;
+import com.webank.wedatasphere.dss.appconn.core.impl.AbstractAppConn;
+import com.webank.wedatasphere.dss.appconn.sparketl.standard.SparkEtlDevelopmentStandard;
+import com.webank.wedatasphere.dss.standard.app.development.standard.DevelopmentIntegrationStandard;
+import org.apache.linkis.common.conf.CommonVars;
+
+
+public class SparkEtlAppConn extends AbstractAppConn implements OnlyDevelopmentAppConn {
+
+ public static final String SPARKETL_APPCONN_NAME = CommonVars.apply("wds.dss.appconn.sparketl.name", "sparketl").getValue();
+
+ private SparkEtlDevelopmentStandard developmentStandard;
+
+ @Override
+ protected void initialize() {
+ developmentStandard = new SparkEtlDevelopmentStandard();
+ }
+
+ @Override
+ public DevelopmentIntegrationStandard getOrCreateDevelopmentStandard() {
+ return developmentStandard;
+ }
+
+
+
+
+}
diff --git a/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/constant/SparkEtlTaskStatusEnum.java b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/constant/SparkEtlTaskStatusEnum.java
new file mode 100644
index 0000000000..d7bc0dd50a
--- /dev/null
+++ b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/constant/SparkEtlTaskStatusEnum.java
@@ -0,0 +1,43 @@
+package com.webank.wedatasphere.dss.appconn.sparketl.constant;
+
+public enum SparkEtlTaskStatusEnum {
+
+ /**
+ * Task status enum
+ */
+ SUBMITTED(1, "已提交", "SUBMITTED"),
+ INITED(2, "初始化", "INITED"),
+ RUNNING(3, "运行中", "RUNNING"),
+ SUCCEED(4, "成功", "SUCCEED"),
+ PASS_CHECKOUT(5, "通过校验", "PASS_CHECKOUT"),
+ FAIL_CHECKOUT(6, "未通过校验", "FAIL_CHECKOUT"),
+ FAILED(7, "失败", "FAILED"),
+ TASK_NOT_EXIST(8, "Task不存在", "TASK_NOT_EXIST"),
+ CANCELLED(9, "取消", "CANCELLED"),
+ TIMEOUT(10, "超时", "TIMEOUT"),
+ SCHEDULED(11, "调度中", "SCHEDULED"),
+ SUBMIT_PENDING(12, "提交阻塞", "SUBMIT_PENDING");
+
+ private Integer code;
+ private String message;
+ private String state;
+
+ SparkEtlTaskStatusEnum(Integer code, String message, String state) {
+ this.code = code;
+ this.message = message;
+ this.state = state;
+ }
+
+ public Integer getCode() {
+ return code;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+}
diff --git a/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/execution/SparkEtlExecutionService.java b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/execution/SparkEtlExecutionService.java
new file mode 100644
index 0000000000..b8e9cd0f34
--- /dev/null
+++ b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/execution/SparkEtlExecutionService.java
@@ -0,0 +1,13 @@
+package com.webank.wedatasphere.dss.appconn.sparketl.execution;
+
+import com.webank.wedatasphere.dss.standard.app.development.operation.RefExecutionOperation;
+import com.webank.wedatasphere.dss.standard.app.development.service.AbstractRefExecutionService;
+
+public class SparkEtlExecutionService extends AbstractRefExecutionService {
+
+ @Override
+ public RefExecutionOperation createRefExecutionOperation() {
+ return new SparkEtlRefExecutionOperation();
+ }
+
+}
diff --git a/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/execution/SparkEtlRefExecutionAction.java b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/execution/SparkEtlRefExecutionAction.java
new file mode 100644
index 0000000000..fc94e8c84b
--- /dev/null
+++ b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/execution/SparkEtlRefExecutionAction.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2019 WeBank
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.webank.wedatasphere.dss.appconn.sparketl.execution;
+
+import com.webank.wedatasphere.dss.standard.app.development.listener.common.AbstractRefExecutionAction;
+import com.webank.wedatasphere.dss.standard.app.development.listener.common.LongTermRefExecutionAction;
+
+public class SparkEtlRefExecutionAction extends AbstractRefExecutionAction implements LongTermRefExecutionAction {
+
+ private String applicationId;
+ private String executionUser;
+ private int schedulerId;
+
+ public SparkEtlRefExecutionAction() {
+ }
+
+ public SparkEtlRefExecutionAction(String applicationId) {
+ this.applicationId = applicationId;
+ }
+
+ public SparkEtlRefExecutionAction(String applicationId, String executionUser) {
+ this.applicationId = applicationId;
+ this.executionUser = executionUser;
+ }
+
+ public String getApplicationId() {
+ return applicationId;
+ }
+
+ public void setApplicationId(String applicationId) {
+ this.applicationId = applicationId;
+ }
+
+ public String getExecutionUser() {
+ return executionUser;
+ }
+
+ public void setExecutionUser(String executionUser) {
+ this.executionUser = executionUser;
+ }
+
+ @Override
+ public void setSchedulerId(int schedulerId) {
+ this.schedulerId = schedulerId;
+ }
+
+ @Override
+ public int getSchedulerId() {
+ return schedulerId;
+ }
+}
diff --git a/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/execution/SparkEtlRefExecutionOperation.java b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/execution/SparkEtlRefExecutionOperation.java
new file mode 100644
index 0000000000..354d83baf6
--- /dev/null
+++ b/dss-appconn/appconns/dss-sparketl-appconn/src/main/java/com/webank/wedatasphere/dss/appconn/sparketl/execution/SparkEtlRefExecutionOperation.java
@@ -0,0 +1,383 @@
+/*
+ * Copyright 2019 WeBank
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.webank.wedatasphere.dss.appconn.sparketl.execution;
+
+import com.google.gson.Gson;
+import com.webank.wedatasphere.dss.appconn.sparketl.constant.SparkEtlTaskStatusEnum;
+import com.webank.wedatasphere.dss.appconn.sparketl.utils.HttpUtils;
+import com.webank.wedatasphere.dss.standard.app.development.listener.common.RefExecutionState;
+import com.webank.wedatasphere.dss.standard.app.development.listener.common.RefExecutionAction;
+import com.webank.wedatasphere.dss.standard.app.development.listener.core.LongTermRefExecutionOperation;
+import com.webank.wedatasphere.dss.standard.app.development.listener.core.Killable;
+import com.webank.wedatasphere.dss.standard.app.development.listener.core.Procedure;
+import com.webank.wedatasphere.dss.standard.app.development.listener.ref.ExecutionResponseRef;
+import com.webank.wedatasphere.dss.standard.common.exception.operation.ExternalOperationFailedException;
+import com.webank.wedatasphere.dss.standard.app.development.listener.ref.RefExecutionRequestRef.RefExecutionProjectWithContextRequestRef;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.ws.rs.HttpMethod;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.web.client.RestTemplate;
+
+public class SparkEtlRefExecutionOperation extends LongTermRefExecutionOperation
+ implements Killable, Procedure {
+ private static final String SUBMIT_TASK_PATH = "sparketl/outer/api/v1/execution";
+ private static final String GET_TASK_STATUS_PATH = "sparketl/outer/api/v1/application/{applicationId}/status/";
+ private static final String GET_TASK_RESULT_PATH = "sparketl/outer/api/v1/application/{applicationId}/result/";
+ private static final String KILL_TASK_PATH = "sparketl/outer/api/v1/execution/application/kill/{applicationId}/{executionUser}";
+ private static final String NODE_NAME_KEY = "nodeName";
+ private static final String EXECUTION_USER_KEY = "executionUser";
+ private static final String WDS_SUBMIT_USER_KEY = "wds.dss.workflow.submit.user";
+
+ private static Logger LOGGER = LoggerFactory.getLogger(SparkEtlRefExecutionOperation.class);
+
+ private String appId = "linkis_id";
+ private String appToken = "a33693de51";
+
+ private static final String FILTER = "filter";
+
+ @Override
+ public boolean kill(RefExecutionAction action) {
+ String applicationId = ((SparkEtlRefExecutionAction) action).getApplicationId();
+ String executtionUser = ((SparkEtlRefExecutionAction) action).getExecutionUser();
+ if (applicationId == null) {
+ LOGGER.error("Cannot get application id from SparkEtlNodeExecutionAction. Kill SparkEtl job failed.");
+ return false;
+ }
+ try {
+ // Send request and get response
+ RestTemplate restTemplate = new RestTemplate();
+ HttpHeaders headers = new HttpHeaders();
+ HttpEntity entity = new HttpEntity(headers);
+
+ String path = KILL_TASK_PATH.replace("{applicationId}", applicationId).replace("{executionUser}", executtionUser);
+ URI url = HttpUtils.buildUrI(getBaseUrl(), path, appId, appToken, RandomStringUtils.randomNumeric(5), String.valueOf(System.currentTimeMillis()));
+ String startLog = String.format("Start to kill job. url: %s, method: %s, body: %s", url, HttpMethod.GET, entity);
+ LOGGER.info(startLog);
+ Map response = restTemplate.getForEntity(url.toString(), Map.class).getBody();
+
+ if (response == null) {
+ String errorMsg = String.format("Error! Can not get kill result, job_id: %s, response is null", applicationId);
+ LOGGER.error(errorMsg);
+ return false;
+ }
+
+ if (!checkResponse(response)) {
+ String message = (String) response.get("message");
+ String errorMsg = String.format("Error! Can not get kill result, exception: {}", message);
+ LOGGER.error(errorMsg);
+ return false;
+ }
+
+ String finishLog = String.format("Succeed to get kill result. response: %s", response);
+ LOGGER.info(finishLog);
+ return true;
+ } catch (Exception e) {
+ String errorMsg = String.format("Error! Can not kill job result, job_id: %s", applicationId);
+ LOGGER.error(errorMsg, e);
+ return false;
+ }
+
+ }
+
+ @Override
+ protected RefExecutionAction submit(RefExecutionProjectWithContextRequestRef requestRef) throws ExternalOperationFailedException {
+ try {
+ Map jobContent = requestRef.getRefJobContent();
+ Map runtimeMap = requestRef.getExecutionRequestRefContext().getRuntimeMap();
+ String executionUser = String.valueOf(runtimeMap.get(WDS_SUBMIT_USER_KEY).toString());
+ String realExecutionUser = String.valueOf(runtimeMap.get(EXECUTION_USER_KEY) != null ? runtimeMap.get(EXECUTION_USER_KEY).toString() : "");
+ String nodeName = String.valueOf(runtimeMap.get(NODE_NAME_KEY));
+ if (StringUtils.isEmpty(nodeName)) {
+ nodeName = requestRef.getName();
+ }
+ LOGGER.info("The node name: " + nodeName);
+
+ if (nodeName == null) {
+ String errorMsg = "Error! Can not submit job, node name is null";
+ LOGGER.error(errorMsg);
+ return null;
+ }
+ HttpHeaders headers = new HttpHeaders();
+ headers.setContentType(MediaType.APPLICATION_JSON);
+
+ Gson gson = new Gson();
+
+ Map requestPayLoad = new HashMap<>();
+ requestPayLoad.put("execution_user", StringUtils.isNotBlank(realExecutionUser) ? realExecutionUser : executionUser);
+ requestPayLoad.put("create_user", executionUser);
+ requestPayLoad.put("node_name", nodeName);
+
+ LOGGER.info("The execution user: " + (StringUtils.isNotBlank(realExecutionUser) ? realExecutionUser : executionUser));
+ // Get parameters.
+ LOGGER.info("The execution request: " + runtimeMap);
+
+ StringBuffer executionParam = new StringBuffer();
+
+ getVariables(requestRef, runtimeMap, requestPayLoad, executionParam);
+
+ HttpEntity