Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize the flink EngineConn and enhance the operator capabilities of ECM module and EC module. #1248

Merged
merged 19 commits into from Dec 28, 2021
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
7842b70
Resolve the problem of fetching progress with no information.
wushengyeyouya Dec 28, 2021
f8d4271
Resolve the problem of deploying flink application without yarn appId.
wushengyeyouya Dec 28, 2021
756eb87
Enhance the FlinkOnceJob, support to execute set, show grammar of fli…
wushengyeyouya Dec 28, 2021
f2bacc1
Enhance the FlinkOnceJobTest, add the support of using onceJob operat…
wushengyeyouya Dec 28, 2021
9b5e2d9
Since both ECM and EC need Operator module, so the Operator module ha…
wushengyeyouya Dec 28, 2021
2b13129
Since both ECM and EC need Operator module, so the Operator module ha…
wushengyeyouya Dec 28, 2021
dd46508
Add the capability of Operator to ECM, so LinkisManager can request t…
wushengyeyouya Dec 28, 2021
7eab04b
Add the capability of fetch EC logs from ECM, now we can get all logs…
wushengyeyouya Dec 28, 2021
bead7e1
Optimize the architecture of LinkisManager, so linkis-computation-cli…
wushengyeyouya Dec 28, 2021
e5bec14
Optimize the logic of EC creation, and fill in more EC information wh…
wushengyeyouya Dec 28, 2021
8314db4
Delete the EngineConnLogOperator in accessible-executor, since some e…
wushengyeyouya Dec 28, 2021
9b27740
Optimize the logic of EC creation, and fill in more EC information wh…
wushengyeyouya Dec 28, 2021
736520d
Add the capability of executing Operators to ECM in linkis-computatio…
wushengyeyouya Dec 28, 2021
a1fefc2
Add the capability of fetching EC logs, this operator will ask Linkis…
wushengyeyouya Dec 28, 2021
1d67577
Merge branch 'apache:dev-1.0.3' into dev-1.0.3
wushengyeyouya Dec 28, 2021
fc946d5
Merge branch 'dev-1.0.3' of github.com:/wushengyeyouya/Linkis into de…
wushengyeyouya Dec 28, 2021
b8e7ba0
Fixes some bugs to help compilation.
wushengyeyouya Dec 28, 2021
386a40b
Remove author information and WeBank copyright.
wushengyeyouya Dec 28, 2021
adabaac
Update Apache license.
wushengyeyouya Dec 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -77,6 +77,7 @@ trait AbstractLinkisJob extends LinkisJob with Logging {
override final def getOperator(operatorName: String): Operator[_] = {
var operator = OperatorFactory().createOperatorByName(operatorName)
operatorActions.foreach(action => operator = action(operator))
operator.initOperator(this)
operator
// operatorActions.foldLeft(operator)((operator, action) => action(operator))
}
Expand Down
Expand Up @@ -32,6 +32,7 @@ trait OnceJob extends AbstractLinkisJob {

protected var engineConnId: String = _
protected var serviceInstance: ServiceInstance = _
protected var ticketId: String = _

protected def wrapperEC[T](op: => T): T = wrapperObj(serviceInstance, "Please submit job first.")(op)

Expand All @@ -45,15 +46,20 @@ trait OnceJob extends AbstractLinkisJob {
.setInstance(serviceInstance.getInstance).setUser(user).build()).getNodeInfo
}

protected def getStatus(nodeInfo: util.Map[String, Any]): String = nodeInfo.get("nodeStatus") match {
case status: String => status
}
protected def getTicketId(nodeInfo: util.Map[String, Any]): String = getAs(nodeInfo, "ticketId")

protected def getStatus(nodeInfo: util.Map[String, Any]): String = getAs(nodeInfo, "nodeStatus")

protected def getServiceInstance(nodeInfo: util.Map[String, Any]): ServiceInstance = nodeInfo.get("serviceInstance") match {
case serviceInstance: util.Map[String, Any] =>
ServiceInstance(getAs(serviceInstance, "applicationName"), getAs(serviceInstance, "instance"))
}

protected def getECMServiceInstance(nodeInfo: util.Map[String, Any]): ServiceInstance = nodeInfo.get("ecmServiceInstance") match {
case serviceInstance: util.Map[String, Any] =>
ServiceInstance(getAs(serviceInstance, "applicationName"), getAs(serviceInstance, "instance"))
}

protected def getAs[T](map: util.Map[String, Any], key: String): T = map.get(key).asInstanceOf[T]

}
Expand Down
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.computation.client.once.action

class ECMOperateAction extends EngineConnOperateAction {

override def suffixURLs: Array[String] = Array("linkisManager", "executeECMOperation")

}

object ECMOperateAction {

def newBuilder(): Builder = new Builder

class Builder extends EngineConnOperateAction.Builder {
override protected def newEngineConnOperateAction(): EngineConnOperateAction =
new ECMOperateAction
}
}

class ECMOperateByECAction extends ECMOperateAction {
override def suffixURLs: Array[String] = Array("linkisManager", "executeECMOperationByEC")
}

object ECMOperateByECAction {
def newBuilder(): Builder = new Builder

class Builder extends EngineConnOperateAction.Builder {
override protected def newEngineConnOperateAction(): EngineConnOperateAction =
new ECMOperateByECAction
}
}
Expand Up @@ -37,7 +37,7 @@ object EngineConnOperateAction {

private var parameters: util.Map[String, Any] = new util.HashMap[String, Any]

def operatorName(operatorName: String): this.type = {
def operatorName(operatorName: String): this.type = {
this.operatorName = operatorName
this
}
Expand All @@ -55,8 +55,11 @@ object EngineConnOperateAction {
this
}

protected def newEngineConnOperateAction(): EngineConnOperateAction =
new EngineConnOperateAction

override protected def createGetEngineConnAction(): EngineConnOperateAction = {
val action = new EngineConnOperateAction
val action = newEngineConnOperateAction()
addParameter(OPERATOR_NAME_KEY, operatorName)
action.addRequestPayload("parameters", parameters)
action
Expand Down
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.computation.client.once.result

import org.apache.linkis.httpclient.dws.annotation.DWSHttpMessageResult

/**
* Created by enjoyyin on 2021/12/18.
*/
wushengyeyouya marked this conversation as resolved.
Show resolved Hide resolved
@DWSHttpMessageResult("/api/rest_j/v\\d+/linkisManager/executeECMOperation")
class ECMOperateResult extends EngineConnOperateResult {

}

@DWSHttpMessageResult("/api/rest_j/v\\d+/linkisManager/executeECMOperationByEC")
class ECMOperateByECResult extends EngineConnOperateResult {}
Expand Up @@ -43,7 +43,9 @@ class EngineConnOperateResult extends LinkisManagerResult {

def getAsOption[T](key: String): Option[T] = Option(getAs(key))

def getAs[T](key: String): T = if(getResult != null && result.containsKey(key)) result.get(key).asInstanceOf[T]
else null.asInstanceOf[T]
def getAs[T](key: String): T = getAs(key, null.asInstanceOf[T])

def getAs[T](key: String, defaultValue: T): T = if(getResult != null && result.containsKey(key)) result.get(key).asInstanceOf[T]
else defaultValue

}
Expand Up @@ -68,31 +68,41 @@ trait SimpleOnceJob extends OnceJob {
}

protected def transformToId(): Unit = {
engineConnId = serviceInstance.getApplicationName.length + serviceInstance.getApplicationName + serviceInstance.getInstance
engineConnId = ticketId.length + "_" + serviceInstance.getApplicationName.length +
ticketId + serviceInstance.getApplicationName + serviceInstance.getInstance
}

protected def transformToServiceInstance(): Unit = engineConnId match {
case SimpleOnceJob.ENGINE_CONN_ID_REGEX(len, serviceInstanceStr) =>
val length = len.toInt
serviceInstance = ServiceInstance(serviceInstanceStr.substring(0, length), serviceInstanceStr.substring(length))
case SimpleOnceJob.ENGINE_CONN_ID_REGEX(ticketIdLen, appNameLen, serviceInstanceStr) =>
val index1 = ticketIdLen.toInt
val index2 = index1 + appNameLen.toInt
ticketId = serviceInstanceStr.substring(0, index1)
serviceInstance = ServiceInstance(serviceInstanceStr.substring(index1, index2), serviceInstanceStr.substring(index2))
}

protected def initOnceOperatorActions(): Unit = addOperatorAction {
case onceJobOperator: OnceJobOperator[_] =>
onceJobOperator.setUser(user).setServiceInstance(serviceInstance).setLinkisManagerClient(linkisManagerClient)
onceJobOperator.setUser(user).setTicketId(ticketId).setServiceInstance(serviceInstance).setLinkisManagerClient(linkisManagerClient)
case operator => operator
}

}

class SubmittableSimpleOnceJob(protected override val linkisManagerClient: LinkisManagerClient,
createEngineConnAction: CreateEngineConnAction)
val createEngineConnAction: CreateEngineConnAction)
extends SimpleOnceJob with SubmittableOnceJob with AbstractSubmittableLinkisJob {

private var ecmServiceInstance: ServiceInstance = _

def getECMServiceInstance: ServiceInstance = ecmServiceInstance

override protected def doSubmit(): Unit = {
info(s"Ready to create a engineConn: ${createEngineConnAction.getRequestPayload}.")
val nodeInfo = linkisManagerClient.createEngineConn(createEngineConnAction)
lastNodeInfo = nodeInfo.getNodeInfo
serviceInstance = getServiceInstance(lastNodeInfo)
ticketId = getTicketId(lastNodeInfo)
ecmServiceInstance = getECMServiceInstance(lastNodeInfo)
lastEngineConnState = getStatus(lastNodeInfo)
info(s"EngineConn created with status $lastEngineConnState, the nodeInfo is $lastNodeInfo.")
initOnceOperatorActions()
Expand All @@ -102,7 +112,7 @@ class SubmittableSimpleOnceJob(protected override val linkisManagerClient: Linki
serviceInstance = getServiceInstance(lastNodeInfo)
info(s"EngineConn of $serviceInstance is in $lastEngineConnState.")
transformToId()
}else{
} else {
info(s"EngineConn $serviceInstance is aleady running, transform to id")
transformToId()
}
Expand All @@ -122,7 +132,7 @@ class ExistingSimpleOnceJob(protected override val linkisManagerClient: LinkisMa

object SimpleOnceJob {

private val ENGINE_CONN_ID_REGEX = "(\\d+)(.+)".r
private val ENGINE_CONN_ID_REGEX = "(\\d+)_(\\d+)(.+)".r

def builder(): SimpleOnceJobBuilder = new SimpleOnceJobBuilder

Expand Down
Expand Up @@ -5,16 +5,16 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.computation.client.operator

import org.apache.linkis.common.ServiceInstance
Expand All @@ -29,10 +29,12 @@ trait OnceJobOperator[T] extends Operator[T] with Logging {

private var user: String = _
private var serviceInstance: ServiceInstance = _
private var ticketId: String = _
private var linkisManagerClient: LinkisManagerClient = _

protected def getUser: String = user
protected def getServiceInstance: ServiceInstance = serviceInstance
protected def getTicketId: String = ticketId
protected def getLinkisManagerClient: LinkisManagerClient = linkisManagerClient

def setUser(user: String): this.type = {
Expand All @@ -45,25 +47,35 @@ trait OnceJobOperator[T] extends Operator[T] with Logging {
this
}

def setTicketId(ticketId: String): this.type = {
this.ticketId = ticketId
this
}

def setLinkisManagerClient(linkisManagerClient: LinkisManagerClient): this.type = {
this.linkisManagerClient = linkisManagerClient
this
}

protected def createOperateActionBuilder(): EngineConnOperateAction.Builder =
EngineConnOperateAction.newBuilder()

override def apply(): T = {
val builder = EngineConnOperateAction.newBuilder()
val builder = createOperateActionBuilder()
.operatorName(getName)
.setUser(user)
.setApplicationName(serviceInstance.getApplicationName)
.setInstance(serviceInstance.getInstance)
addParameters(builder)
val engineConnOperateAction = builder.build()
if(OnceJobOperator.ONCE_JOB_OPERATOR_LOG_ENABLE.getValue)
if (OnceJobOperator.ONCE_JOB_OPERATOR_LOG_ENABLE.getValue) {
info(s"$getUser try to ask EngineConn($serviceInstance) to execute $getName operation, parameters is ${engineConnOperateAction.getRequestPayload}.")
}
val result = linkisManagerClient.executeEngineConnOperation(engineConnOperateAction)
val resultStr = String.valueOf(result.getResult)
if(OnceJobOperator.ONCE_JOB_OPERATOR_LOG_ENABLE.getValue)
info(s"$getUser asked EngineConn($serviceInstance) to execute $getName operation, results is ${if(resultStr.length <= 250) resultStr else resultStr.substring(0, 250) + "..."} .")
if (OnceJobOperator.ONCE_JOB_OPERATOR_LOG_ENABLE.getValue) {
val resultStr = String.valueOf(result.getResult)
info(s"$getUser asked EngineConn($serviceInstance) to execute $getName operation, results is ${if (resultStr.length <= 250) resultStr else resultStr.substring(0, 250) + "..."} .")
}
resultToObject(result)
}

Expand All @@ -77,4 +89,4 @@ object OnceJobOperator {

val ONCE_JOB_OPERATOR_LOG_ENABLE = CommonVars("linkis.client.operator.once.log.enable", true)

}
}
Expand Up @@ -17,11 +17,15 @@

package org.apache.linkis.computation.client.operator

import org.apache.linkis.computation.client.LinkisJob


trait Operator[T] {

def getName: String

def apply(): T

def initOperator[U <: LinkisJob](job: U): Unit = {}

}