Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: HiveEngineConn supports concurrency.(#4175) #4359

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@ wds.linkis.engine.connector.hooks=org.apache.linkis.engineconn.computation.execu
wds.linkis.engineconn.maintain.enable=true

#Depending on the engine selected in HIVE_ENGINE_TYPE, control the function called when canceling the task in scripts.
wds.linkis.hive.engine.type=mr
wds.linkis.hive.engine.type=mr

# support parallelism execution
wds.linkis.engineconn.support.parallelism=true
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,11 @@ object HiveEngineConfiguration {
).getValue

val HIVE_ENGINE_TYPE = CommonVars[String]("wds.linkis.hive.engine.type", "mr").getValue

val HIVE_ENGINE_CONCURRENT_LIMIT =
CommonVars[Int]("linkis.hive.engineconn.concurrent.limit", 10).getValue

val HIVE_ENGINE_CONCURRENT_SUPPORT =
CommonVars[Boolean]("linkis.hive.engineconn.concurrent.support", true).getValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first version is recommended to be set to false first, and then adjusted to the default concurrent execution

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your advice, I would change the default value to false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first version is recommended to be set to false first, and then adjusted to the default concurrent execution

I have already changed the default linkis.hive.engineconn.concurrent.support value to false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get


}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,18 @@ import org.apache.linkis.engineconn.computation.executor.creation.ComputationSin
import org.apache.linkis.engineconn.executor.entity.LabelExecutor
import org.apache.linkis.engineplugin.hive.common.HiveUtils
import org.apache.linkis.engineplugin.hive.conf.HiveEngineConfiguration
import org.apache.linkis.engineplugin.hive.entity.HiveSession
import org.apache.linkis.engineplugin.hive.entity.{
AbstractHiveSession,
HiveConcurrentSession,
HiveSession
}
import org.apache.linkis.engineplugin.hive.errorcode.HiveErrorCodeSummary.CREATE_HIVE_EXECUTOR_ERROR
import org.apache.linkis.engineplugin.hive.errorcode.HiveErrorCodeSummary.HIVE_EXEC_JAR_ERROR
import org.apache.linkis.engineplugin.hive.exception.HiveSessionStartFailedException
import org.apache.linkis.engineplugin.hive.executor.HiveEngineConnExecutor
import org.apache.linkis.engineplugin.hive.executor.{
HiveEngineConcurrentConnExecutor,
HiveEngineConnExecutor
}
import org.apache.linkis.hadoop.common.utils.HDFSUtils
import org.apache.linkis.manager.label.entity.engine.{EngineType, RunType}
import org.apache.linkis.manager.label.entity.engine.EngineType.EngineType
Expand All @@ -38,9 +45,11 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.security.UserGroupInformation

import java.io.{ByteArrayOutputStream, PrintStream}
import java.security.PrivilegedExceptionAction
import java.util

import scala.collection.JavaConverters._

Expand All @@ -63,6 +72,14 @@ class HiveEngineConnFactory extends ComputationSingleExecutorEngineConnFactory w
hiveSession.hiveConf,
hiveSession.baos
)
case hiveConcurrentSession: HiveConcurrentSession =>
new HiveEngineConcurrentConnExecutor(
id,
hiveConcurrentSession.sessionState,
hiveConcurrentSession.ugi,
hiveConcurrentSession.hiveConf,
hiveConcurrentSession.baos
)
case _ =>
throw HiveSessionStartFailedException(
CREATE_HIVE_EXECUTOR_ERROR.getErrorCode,
Expand All @@ -73,8 +90,48 @@ class HiveEngineConnFactory extends ComputationSingleExecutorEngineConnFactory w

override protected def createEngineConnSession(
engineCreationContext: EngineCreationContext
): HiveSession = {
val options = engineCreationContext.getOptions
): AbstractHiveSession = {
// if hive engine support concurrent, return HiveConcurrentSession
if (HiveEngineConfiguration.HIVE_ENGINE_CONCURRENT_SUPPORT) {
return doCreateHiveConcurrentSession(engineCreationContext.getOptions)
}

// return HiveSession
doCreateHiveSession(engineCreationContext.getOptions)
}

def doCreateHiveConcurrentSession(options: util.Map[String, String]): HiveConcurrentSession = {
val hiveConf: HiveConf = getHiveConf(options)
val ugi = HDFSUtils.getUserGroupInformation(Utils.getJvmUser)
val baos = new ByteArrayOutputStream()
val sessionState: SessionState = getSessionState(hiveConf, ugi, baos)
HiveConcurrentSession(sessionState, ugi, hiveConf, baos)
}

def doCreateHiveSession(options: util.Map[String, String]): HiveSession = {
val hiveConf: HiveConf = getHiveConf(options)
val ugi = HDFSUtils.getUserGroupInformation(Utils.getJvmUser)
val baos = new ByteArrayOutputStream()
val sessionState: SessionState = getSessionState(hiveConf, ugi, baos)
HiveSession(sessionState, ugi, hiveConf, baos)
}

private def getSessionState(
hiveConf: HiveConf,
ugi: UserGroupInformation,
baos: ByteArrayOutputStream
) = {
val sessionState: SessionState = ugi.doAs(new PrivilegedExceptionAction[SessionState] {
override def run(): SessionState = new SessionState(hiveConf)
})
sessionState.out = new PrintStream(baos, true, "utf-8")
sessionState.info = new PrintStream(System.out, true, "utf-8")
sessionState.err = new PrintStream(System.out, true, "utf-8")
SessionState.start(sessionState)
sessionState
}

private def getHiveConf(options: util.Map[String, String]) = {
val hiveConf: HiveConf = HiveUtils.getHiveConf
hiveConf.setVar(
HiveConf.ConfVars.HIVEJAR,
Expand Down Expand Up @@ -126,17 +183,7 @@ class HiveEngineConnFactory extends ComputationSingleExecutorEngineConnFactory w
*/
// enable hive.stats.collect.scancols
hiveConf.setBoolean("hive.stats.collect.scancols", true)
val ugi = HDFSUtils.getUserGroupInformation(Utils.getJvmUser)
val sessionState: SessionState = ugi.doAs(new PrivilegedExceptionAction[SessionState] {
override def run(): SessionState = new SessionState(hiveConf)
})
val baos = new ByteArrayOutputStream()
sessionState.out = new PrintStream(baos, true, "utf-8")
sessionState.info = new PrintStream(System.out, true, "utf-8")
sessionState.err = new PrintStream(System.out, true, "utf-8")
SessionState.start(sessionState)

HiveSession(sessionState, ugi, hiveConf, baos)
hiveConf
}

override protected def getEngineConnType: EngineType = EngineType.HIVE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,18 @@ import org.apache.hadoop.security.UserGroupInformation

import java.io.ByteArrayOutputStream

abstract class AbstractHiveSession

case class HiveSession(
sessionState: SessionState,
ugi: UserGroupInformation,
hiveConf: HiveConf,
baos: ByteArrayOutputStream = null
)
) extends AbstractHiveSession

case class HiveConcurrentSession(
sessionState: SessionState,
ugi: UserGroupInformation,
hiveConf: HiveConf,
baos: ByteArrayOutputStream = null
) extends AbstractHiveSession