diff --git a/linkis-commons/linkis-message-scheduler/src/main/java/org/apache/linkis/message/scheduler/DefaultMessageExecutor.java b/linkis-commons/linkis-message-scheduler/src/main/java/org/apache/linkis/message/scheduler/DefaultMessageExecutor.java index 186200c6c7..14dd03d597 100644 --- a/linkis-commons/linkis-message-scheduler/src/main/java/org/apache/linkis/message/scheduler/DefaultMessageExecutor.java +++ b/linkis-commons/linkis-message-scheduler/src/main/java/org/apache/linkis/message/scheduler/DefaultMessageExecutor.java @@ -57,25 +57,25 @@ public long getId() { @Override public ExecuteResponse execute(ExecuteRequest executeRequest) { if (event instanceof MessageJob) { - TransactionManager txManager = ((MessageJob) event).getContext().getTxManager(); - Object o = txManager.begin(); +// TransactionManager txManager = ((MessageJob) event).getContext().getTxManager(); +// Object o = txManager.begin(); try { run((MessageJob) event); - txManager.commit(o); + //txManager.commit(o); return new SuccessExecuteResponse(); } catch (InterruptedException ie) { //handle InterruptedException logger().error("message job execution interrupted", ie); - txManager.rollback(o); + // txManager.rollback(o); return new ErrorExecuteResponse("message job execution interrupted", ie); } catch (MessageWarnException mwe) { //handle method call failed logger().error("method call normal error return"); - txManager.rollback(o); + // txManager.rollback(o); return new ErrorExecuteResponse("method call failed", mwe); } catch (Throwable t) { logger().debug("unexpected error occur", t); - txManager.rollback(o); + //txManager.rollback(o); return new ErrorExecuteResponse("unexpected error", t); } } diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java index edfe83e5c7..6c868a8ab9 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java +++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/persistence/QueryPersistenceManager.java @@ -139,6 +139,9 @@ public void onProgressUpdate(Job job, float progress, JobProgressInfo[] progress // todo check updatedProgress = -1 * progress; } + if(Double.isNaN(updatedProgress)){ + return ; + } job.setProgress(updatedProgress); EntranceJob entranceJob = (EntranceJob) job; entranceJob.getJobRequest().setProgress(String.valueOf(updatedProgress)); diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cs/CSEntranceHelper.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cs/CSEntranceHelper.scala index f3d55c4c96..385e0b52f9 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cs/CSEntranceHelper.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/cs/CSEntranceHelper.scala @@ -168,11 +168,18 @@ object CSEntranceHelper extends Logging { variableMap.put(linkisVariable.getKey, linkisVariable.getValue) } } - if(variableMap.nonEmpty){ - TaskUtils.addVariableMap(requestPersistTask.getParams.asInstanceOf[util.Map[String, Any]], variableMap) + if(variableMap.nonEmpty) { + // 1.cs priority is low, the same ones are not added + val varMap = TaskUtils.getVariableMap(requestPersistTask.getParams.asInstanceOf[util.Map[String, Any]]) + variableMap.foreach { keyAndValue => + if (! varMap.containsKey(keyAndValue._1)) { + varMap.put(keyAndValue._1, keyAndValue._2) + } + } + TaskUtils.addVariableMap(requestPersistTask.getParams.asInstanceOf[util.Map[String, Any]], varMap) } info(s"parse variable end nodeName:$nodeNameStr") } } -} +} \ No newline at end of file diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogManager.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogManager.scala index 69af696550..5f062af11f 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogManager.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/CacheLogManager.scala @@ -17,40 +17,45 @@ package org.apache.linkis.entrance.log +import org.apache.linkis.common.utils.Logging import org.apache.linkis.entrance.conf.EntranceConfiguration import org.apache.linkis.entrance.exception.{CacheNotReadyException, EntranceErrorCode} import org.apache.linkis.entrance.job.EntranceExecutionJob -import org.apache.linkis.governance.common.entity.task.RequestPersistTask import org.apache.linkis.scheduler.queue.Job /** * LogManager implementation, using a singleton class * LogManager 的实现, 采用单例类进行 */ -class CacheLogManager extends LogManager { +class CacheLogManager extends LogManager with Logging { override def getLogReader(execId: String): LogReader = { - var retLogReader:LogReader = null - this.entranceContext.getOrCreateScheduler().get(execId).foreach { - case entranceExecutionJob: EntranceExecutionJob => - retLogReader = entranceExecutionJob.getLogReader.getOrElse({ - this.synchronized { - val logWriter: CacheLogWriter = - entranceExecutionJob.getLogWriter.getOrElse(createLogWriter(entranceExecutionJob)).asInstanceOf[CacheLogWriter] - val sharedCache: Cache = logWriter.getCache. - getOrElse(throw CacheNotReadyException(EntranceErrorCode.CACHE_NOT_READY.getErrCode, EntranceErrorCode.CACHE_NOT_READY.getDesc)) - val logPath: String = entranceExecutionJob.getJobRequest.getLogPath - new CacheLogReader(logPath, EntranceConfiguration.DEFAULT_LOG_CHARSET.getValue, sharedCache, entranceExecutionJob.getUser) - } - }) - entranceExecutionJob.setLogReader(retLogReader) - case _ => null - } + var retLogReader: LogReader = null + this.entranceContext.getOrCreateScheduler().get(execId).foreach { + case entranceExecutionJob: EntranceExecutionJob => + retLogReader = entranceExecutionJob.getLogReader.getOrElse({ + this.synchronized { + val sharedCache: Cache = + entranceExecutionJob.getLogWriter.getOrElse(createLogWriter(entranceExecutionJob)) match { + case cacheLogWriter: CacheLogWriter => + cacheLogWriter.getCache.getOrElse(throw CacheNotReadyException(EntranceErrorCode.CACHE_NOT_READY.getErrCode, EntranceErrorCode.CACHE_NOT_READY.getDesc)) + case _ => + Cache(1) + } + val logPath: String = entranceExecutionJob.getJobRequest.getLogPath + new CacheLogReader(logPath, EntranceConfiguration.DEFAULT_LOG_CHARSET.getValue, sharedCache, entranceExecutionJob.getUser) + } + }) + entranceExecutionJob.setLogReader(retLogReader) + case _ => null + } retLogReader } override def createLogWriter(job: Job): LogWriter = { - + if (null != job && job.isCompleted) { + return null + } job match { case entranceExecutionJob: EntranceExecutionJob => { val cache: Cache = Cache(EntranceConfiguration.DEFAULT_CACHE_MAX.getValue) @@ -58,6 +63,7 @@ class CacheLogManager extends LogManager { val cacheLogWriter: CacheLogWriter = new CacheLogWriter(logPath, EntranceConfiguration.DEFAULT_LOG_CHARSET.getValue, cache, entranceExecutionJob.getUser) entranceExecutionJob.setLogWriter(cacheLogWriter) + logger.info(s"job ${entranceExecutionJob.getJobRequest.getId} create cacheLogWriter") val webSocketCacheLogReader: WebSocketCacheLogReader = new WebSocketCacheLogReader(logPath, EntranceConfiguration.DEFAULT_LOG_CHARSET.getValue, cache, entranceExecutionJob.getUser) entranceExecutionJob.setWebSocketLogReader(webSocketCacheLogReader) diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogManager.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogManager.scala index cd22eb7284..89107b3643 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogManager.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogManager.scala @@ -45,7 +45,12 @@ abstract class LogManager extends LogListener with Logging with EntranceLogListe job match{ case entranceExecutionJob: EntranceExecutionJob => if (entranceExecutionJob.getLogWriter.isEmpty) entranceExecutionJob synchronized { - if (entranceExecutionJob.getLogWriter.isEmpty) createLogWriter(entranceExecutionJob) + if (entranceExecutionJob.getLogWriter.isEmpty) { + val logWriter = createLogWriter(entranceExecutionJob) + if (null == logWriter) { + return + } + } } entranceExecutionJob.getLogWriter.foreach(logWriter => logWriter.write(log)) entranceExecutionJob.getWebSocketLogWriter.foreach(writer => writer.write(log)) diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogWriter.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogWriter.scala index ea7272a53a..513ad76f4e 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogWriter.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/log/LogWriter.scala @@ -31,65 +31,64 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream abstract class LogWriter(charset: String) extends Closeable with Flushable with Logging { - private var firstWrite = true + private var firstWrite = true - protected val outputStream: OutputStream + protected var outputStream: OutputStream - def write(msg: String): Unit = synchronized { - val log = if (!firstWrite) "\n" + msg else { - firstWrite = false - msg + def write(msg: String): Unit = synchronized { + val log = if (!firstWrite) "\n" + msg else { + logger.info(s"$toString write first one line log") + firstWrite = false + msg + } + Utils.tryAndWarnMsg{ + outputStream.write(log.getBytes(charset)) + outputStream.flush() + }(s"$toString error when write query log to outputStream.") } - Utils.tryQuietly({ - outputStream.write(log.getBytes(charset)) - outputStream.flush() - }, t => { - warn("error when write query log to outputStream.", t) - info(msg) - }) - } - def flush(): Unit = Utils.tryAndWarnMsg[Unit] { - outputStream match { - case hdfs: HdfsDataOutputStream => - // todo check - hdfs.hsync() - case _ => - outputStream.flush() - } - }("Error encounters when flush log, ") - - def close(): Unit = { - info(s" $toString logWriter close") - flush() - if (outputStream != null) { - Utils.tryCatch{ - outputStream.close() - }{ - case t:Throwable => //ignore + def flush(): Unit = Utils.tryAndWarnMsg[Unit] { + outputStream match { + case hdfs: HdfsDataOutputStream => + // todo check + hdfs.hflush() + case _ => + outputStream.flush() + } + }(s"$toString Error encounters when flush log, ") + + def close(): Unit = { + logger.info(s" $toString logWriter close") + flush() + if (outputStream != null) { + Utils.tryQuietly(outputStream.close()) + outputStream = null } } } -} - -abstract class AbstractLogWriter(logPath: String, - user: String, - charset: String) extends LogWriter(charset) { - if(StringUtils.isBlank(logPath)) throw new EntranceErrorException(20301, "logPath cannot be empty.") - protected val fileSystem = FSFactory.getFsByProxyUser(new FsPath(logPath), user) - fileSystem.init(new util.HashMap[String, String]()) - - protected val outputStream: OutputStream = { - FileSystemUtils.createNewFile(new FsPath(logPath), user, true) - fileSystem.write(new FsPath(logPath), true) - } - override def close(): Unit = { - super.close() - if (fileSystem != null) Utils.tryQuietly(fileSystem.close(), t => { - warn("Error encounters when closing fileSystem", t) - }) - } -} \ No newline at end of file + abstract class AbstractLogWriter(logPath: String, + user: String, + charset: String) extends LogWriter(charset) { + if (StringUtils.isBlank(logPath)) throw new EntranceErrorException(20301, "logPath cannot be empty.") + protected var fileSystem = FSFactory.getFsByProxyUser(new FsPath(logPath), user) + fileSystem.init(new util.HashMap[String, String]()) + + protected var outputStream: OutputStream = { + FileSystemUtils.createNewFile(new FsPath(logPath), user, true) + fileSystem.write(new FsPath(logPath), true) + } + + + override def close(): Unit = { + super.close() + if (fileSystem != null) Utils.tryAndWarnMsg{ + fileSystem.close() + fileSystem = null + }(s"$toString Error encounters when closing fileSystem") + } + + override def toString: String = logPath + } \ No newline at end of file diff --git a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala index 5f43f5d44a..af073d0a75 100644 --- a/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala +++ b/linkis-engineconn-plugins/engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/config/SparkConfiguration.scala @@ -65,7 +65,7 @@ object SparkConfiguration extends Logging { val SPARK_DRIVER_CLASSPATH = CommonVars[String]("spark.driver.extraClassPath", "") - val SPARK_DRIVER_EXTRA_JAVA_OPTIONS = CommonVars[String]("spark.driver.extraJavaOptions", "\"-Dwds.linkis.configuration=linkis-engine.properties " + "\"") + val SPARK_DRIVER_EXTRA_JAVA_OPTIONS = CommonVars[String]("spark.driver.extraJavaOptions", "\"-Dwds.linkis.server.conf=linkis-engine.properties " + "\"") val SPARK_DEFAULT_EXTERNAL_JARS_PATH = CommonVars[String]("spark.external.default.jars", "") diff --git a/linkis-engineconn-plugins/linkis-engineconn-plugin-framework/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala b/linkis-engineconn-plugins/linkis-engineconn-plugin-framework/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala index 21c2e2c965..bfdf176f37 100644 --- a/linkis-engineconn-plugins/linkis-engineconn-plugin-framework/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala +++ b/linkis-engineconn-plugins/linkis-engineconn-plugin-framework/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala @@ -34,7 +34,7 @@ object EnvConfiguration { val ENGINE_CONN_CLASSPATH_FILES = CommonVars("wds.linkis.engineConn.files", "", "engineConn额外的配置文件") val ENGINE_CONN_DEFAULT_JAVA_OPTS = CommonVars[String]("wds.linkis.engineConn.javaOpts.default", s"-XX:+UseG1GC -XX:MaxPermSize=250m -XX:PermSize=128m " + - s"-Xloggc:%s -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Dwds.linkis.configuration=linkis-engineconn.properties -Dwds.linkis.gateway.url=${Configuration.getGateWayURL()}") + s"-Xloggc:%s -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Dwds.linkis.server.conf=linkis-engineconn.properties -Dwds.linkis.gateway.url=${Configuration.getGateWayURL()}") val ENGINE_CONN_MEMORY = CommonVars("wds.linkis.engineConn.memory", new ByteType("2g"), "Specify the memory size of the java client(指定java进程的内存大小)") @@ -49,5 +49,7 @@ object EnvConfiguration { val LOG4J2_XML_FILE = CommonVars[String]("wds.linkis.engineconn.log4j2.xml.file", "log4j2-engineconn.xml") - val LINKIS_PUBLIC_MODULE_PATH = CommonVars("wds.linkis.public_module.path", Configuration.LINKIS_HOME.getValue + "/lib/linkis-commons/public-module") + val LINKIS_PUBLIC_MODULE_PATH = CommonVars("wds.linkis.public_module.path", Configuration.getLinkisHome + "/lib/linkis-commons/public-module") + + val LINKIS_CONF_DIR = CommonVars("LINKIS_CONF_DIR", Configuration.getLinkisHome() + "/conf") } diff --git a/linkis-engineconn-plugins/linkis-engineconn-plugin-framework/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/JavaProcessEngineConnLaunchBuilder.scala b/linkis-engineconn-plugins/linkis-engineconn-plugin-framework/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/JavaProcessEngineConnLaunchBuilder.scala index e41bfd2789..bfcb63c3eb 100644 --- a/linkis-engineconn-plugins/linkis-engineconn-plugin-framework/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/JavaProcessEngineConnLaunchBuilder.scala +++ b/linkis-engineconn-plugins/linkis-engineconn-plugin-framework/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/launch/process/JavaProcessEngineConnLaunchBuilder.scala @@ -85,7 +85,9 @@ abstract class JavaProcessEngineConnLaunchBuilder extends ProcessEngineConnLaunc // addPathToClassPath(environment, variable(PWD)) // first, add engineconn conf dirs. addPathToClassPath(environment, Seq(variable(PWD), ENGINE_CONN_CONF_DIR_NAME)) - // second, add engineconn libs. + // then, add LINKIS_CONF_DIR conf dirs. + addPathToClassPath(environment, Seq(EnvConfiguration.LINKIS_CONF_DIR.getValue)) + // then, add engineconn libs. addPathToClassPath(environment, Seq(variable(PWD), ENGINE_CONN_LIB_DIR_NAME + "/*")) // then, add public modules. if (!enablePublicModule) { @@ -116,6 +118,7 @@ abstract class JavaProcessEngineConnLaunchBuilder extends ProcessEngineConnLaunc environment } + override protected def getNecessaryEnvironment(implicit engineConnBuildRequest: EngineConnBuildRequest): Array[String] = if(!ifAddHiveConfigPath) Array.empty else Array(HADOOP_CONF_DIR.toString, HIVE_CONF_DIR.toString)