New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: HiveEngineConn supports concurrency.(#4175) #4359
feat: HiveEngineConn supports concurrency.(#4175) #4359
Conversation
@@ -40,4 +40,11 @@ object HiveEngineConfiguration { | |||
).getValue | |||
|
|||
val HIVE_ENGINE_TYPE = CommonVars[String]("wds.linkis.hive.engine.type", "mr").getValue | |||
|
|||
val HIVE_ENGINE_CONCURRENT_LIMIT = | |||
CommonVars[Int]("wds.linkis.hive.engineconn.concurrent.limit", 10).getValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new parameter can remove the wds prefix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok ,i will remove wds prefix, thanks for your remind.
@@ -34,6 +34,12 @@ | |||
<version>${project.version}</version> | |||
</dependency> | |||
|
|||
<dependency> | |||
<groupId>org.apache.linkis</groupId> | |||
<artifactId>linkis-common</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this import necessary? I saw it was fine before
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah ,I forgot to revert it. I' ll fix it.
val options = engineCreationContext.getOptions | ||
// if hive engine support concurrent, hive session should init later | ||
if (HiveEngineConfiguration.HIVE_ENGINE_CONCURRENT_SUPPORT) { | ||
return null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should return a concurrent session better, so that newExecutor can case the new session and keep the case _
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea !
.toMap | ||
var engineConnConf = | ||
Map("ticketId" -> engineCreationContext.getTicketId, "user" -> engineCreationContext.getUser) | ||
engineConnConf = engineConnConf ++: labels |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
labels can remove?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
labels can be remove ,thanks for your kind advice.
LOG.info( | ||
s"beforeExecutorExecute engineExecutionContext addProperty key: ${kv._1} value: ${kv._2}" | ||
) | ||
engineExecutionContext.addProperty(kv._1, kv._2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the priority of engineExecutionContext higher than engineCreationContext?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finally i reuse the hive session and remove the following class :HiveExecutionContextPropertyPrepareHook.scala.
…rent * dev-1.4.0: support map string value (apache#4409) Improve exception information and add path information (apache#4351) [fix bug] The s3a file cannot be written because FileSystem is closed prematurely (apache#4375) Optimization of upload file interface in FsRestfulApi.java (apache#4357)
1727f83
to
ecdf1c6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
CommonVars[Int]("linkis.hive.engineconn.concurrent.limit", 10).getValue | ||
|
||
val HIVE_ENGINE_CONCURRENT_SUPPORT = | ||
CommonVars[Boolean]("linkis.hive.engineconn.concurrent.support", true).getValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first version is recommended to be set to false first, and then adjusted to the default concurrent execution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your advice, I would change the default value to false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first version is recommended to be set to false first, and then adjusted to the default concurrent execution
I have already changed the default linkis.hive.engineconn.concurrent.support value to false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
What is the purpose of the change
HiveEngineConn supports concurrency.
Related issues/PRs
Related issues: #4175
Brief change log
Checklist