Skip to content

Commit

Permalink
Merge pull request #1205 from casionone/bugfix_1
Browse files Browse the repository at this point in the history
Add LINKIS_CONF_DIR to ec classpath and fixed oom bug for LogWirter
  • Loading branch information
peacewong committed Dec 15, 2021
2 parents 1db0bd6 + e2cc2a7 commit 83ad189
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 85 deletions.
Expand Up @@ -57,25 +57,25 @@ public long getId() {
@Override
public ExecuteResponse execute(ExecuteRequest executeRequest) {
if (event instanceof MessageJob) {
TransactionManager txManager = ((MessageJob) event).getContext().getTxManager();
Object o = txManager.begin();
// TransactionManager txManager = ((MessageJob) event).getContext().getTxManager();
// Object o = txManager.begin();
try {
run((MessageJob) event);
txManager.commit(o);
//txManager.commit(o);
return new SuccessExecuteResponse();
} catch (InterruptedException ie) {
//handle InterruptedException
logger().error("message job execution interrupted", ie);
txManager.rollback(o);
// txManager.rollback(o);
return new ErrorExecuteResponse("message job execution interrupted", ie);
} catch (MessageWarnException mwe) {
//handle method call failed
logger().error("method call normal error return");
txManager.rollback(o);
// txManager.rollback(o);
return new ErrorExecuteResponse("method call failed", mwe);
} catch (Throwable t) {
logger().debug("unexpected error occur", t);
txManager.rollback(o);
//txManager.rollback(o);
return new ErrorExecuteResponse("unexpected error", t);
}
}
Expand Down
Expand Up @@ -139,6 +139,9 @@ public void onProgressUpdate(Job job, float progress, JobProgressInfo[] progress
// todo check
updatedProgress = -1 * progress;
}
if(Double.isNaN(updatedProgress)){
return ;
}
job.setProgress(updatedProgress);
EntranceJob entranceJob = (EntranceJob) job;
entranceJob.getJobRequest().setProgress(String.valueOf(updatedProgress));
Expand Down
Expand Up @@ -168,11 +168,18 @@ object CSEntranceHelper extends Logging {
variableMap.put(linkisVariable.getKey, linkisVariable.getValue)
}
}
if(variableMap.nonEmpty){
TaskUtils.addVariableMap(requestPersistTask.getParams.asInstanceOf[util.Map[String, Any]], variableMap)
if(variableMap.nonEmpty) {
// 1.cs priority is low, the same ones are not added
val varMap = TaskUtils.getVariableMap(requestPersistTask.getParams.asInstanceOf[util.Map[String, Any]])
variableMap.foreach { keyAndValue =>
if (! varMap.containsKey(keyAndValue._1)) {
varMap.put(keyAndValue._1, keyAndValue._2)
}
}
TaskUtils.addVariableMap(requestPersistTask.getParams.asInstanceOf[util.Map[String, Any]], varMap)
}

info(s"parse variable end nodeName:$nodeNameStr")
}
}
}
}
Expand Up @@ -17,47 +17,53 @@

package org.apache.linkis.entrance.log

import org.apache.linkis.common.utils.Logging
import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.entrance.exception.{CacheNotReadyException, EntranceErrorCode}
import org.apache.linkis.entrance.job.EntranceExecutionJob
import org.apache.linkis.governance.common.entity.task.RequestPersistTask
import org.apache.linkis.scheduler.queue.Job
/**
* LogManager implementation, using a singleton class
* LogManager 的实现, 采用单例类进行
*/
class CacheLogManager extends LogManager {
class CacheLogManager extends LogManager with Logging {

override def getLogReader(execId: String): LogReader = {
var retLogReader:LogReader = null
this.entranceContext.getOrCreateScheduler().get(execId).foreach {
case entranceExecutionJob: EntranceExecutionJob =>
retLogReader = entranceExecutionJob.getLogReader.getOrElse({
this.synchronized {
val logWriter: CacheLogWriter =
entranceExecutionJob.getLogWriter.getOrElse(createLogWriter(entranceExecutionJob)).asInstanceOf[CacheLogWriter]
val sharedCache: Cache = logWriter.getCache.
getOrElse(throw CacheNotReadyException(EntranceErrorCode.CACHE_NOT_READY.getErrCode, EntranceErrorCode.CACHE_NOT_READY.getDesc))
val logPath: String = entranceExecutionJob.getJobRequest.getLogPath
new CacheLogReader(logPath, EntranceConfiguration.DEFAULT_LOG_CHARSET.getValue, sharedCache, entranceExecutionJob.getUser)
}
})
entranceExecutionJob.setLogReader(retLogReader)
case _ => null
}
var retLogReader: LogReader = null
this.entranceContext.getOrCreateScheduler().get(execId).foreach {
case entranceExecutionJob: EntranceExecutionJob =>
retLogReader = entranceExecutionJob.getLogReader.getOrElse({
this.synchronized {
val sharedCache: Cache =
entranceExecutionJob.getLogWriter.getOrElse(createLogWriter(entranceExecutionJob)) match {
case cacheLogWriter: CacheLogWriter =>
cacheLogWriter.getCache.getOrElse(throw CacheNotReadyException(EntranceErrorCode.CACHE_NOT_READY.getErrCode, EntranceErrorCode.CACHE_NOT_READY.getDesc))
case _ =>
Cache(1)
}
val logPath: String = entranceExecutionJob.getJobRequest.getLogPath
new CacheLogReader(logPath, EntranceConfiguration.DEFAULT_LOG_CHARSET.getValue, sharedCache, entranceExecutionJob.getUser)
}
})
entranceExecutionJob.setLogReader(retLogReader)
case _ => null
}
retLogReader
}


override def createLogWriter(job: Job): LogWriter = {

if (null != job && job.isCompleted) {
return null
}
job match {
case entranceExecutionJob: EntranceExecutionJob => {
val cache: Cache = Cache(EntranceConfiguration.DEFAULT_CACHE_MAX.getValue)
val logPath: String = entranceExecutionJob.getJobRequest.getLogPath
val cacheLogWriter: CacheLogWriter =
new CacheLogWriter(logPath, EntranceConfiguration.DEFAULT_LOG_CHARSET.getValue, cache, entranceExecutionJob.getUser)
entranceExecutionJob.setLogWriter(cacheLogWriter)
logger.info(s"job ${entranceExecutionJob.getJobRequest.getId} create cacheLogWriter")
val webSocketCacheLogReader: WebSocketCacheLogReader =
new WebSocketCacheLogReader(logPath, EntranceConfiguration.DEFAULT_LOG_CHARSET.getValue, cache, entranceExecutionJob.getUser)
entranceExecutionJob.setWebSocketLogReader(webSocketCacheLogReader)
Expand Down
Expand Up @@ -45,7 +45,12 @@ abstract class LogManager extends LogListener with Logging with EntranceLogListe
job match{
case entranceExecutionJob: EntranceExecutionJob =>
if (entranceExecutionJob.getLogWriter.isEmpty) entranceExecutionJob synchronized {
if (entranceExecutionJob.getLogWriter.isEmpty) createLogWriter(entranceExecutionJob)
if (entranceExecutionJob.getLogWriter.isEmpty) {
val logWriter = createLogWriter(entranceExecutionJob)
if (null == logWriter) {
return
}
}
}
entranceExecutionJob.getLogWriter.foreach(logWriter => logWriter.write(log))
entranceExecutionJob.getWebSocketLogWriter.foreach(writer => writer.write(log))
Expand Down
Expand Up @@ -31,65 +31,64 @@ import org.apache.hadoop.hdfs.client.HdfsDataOutputStream

abstract class LogWriter(charset: String) extends Closeable with Flushable with Logging {

private var firstWrite = true
private var firstWrite = true

protected val outputStream: OutputStream
protected var outputStream: OutputStream

def write(msg: String): Unit = synchronized {
val log = if (!firstWrite) "\n" + msg else {
firstWrite = false
msg
def write(msg: String): Unit = synchronized {
val log = if (!firstWrite) "\n" + msg else {
logger.info(s"$toString write first one line log")
firstWrite = false
msg
}
Utils.tryAndWarnMsg{
outputStream.write(log.getBytes(charset))
outputStream.flush()
}(s"$toString error when write query log to outputStream.")
}
Utils.tryQuietly({
outputStream.write(log.getBytes(charset))
outputStream.flush()
}, t => {
warn("error when write query log to outputStream.", t)
info(msg)
})
}



def flush(): Unit = Utils.tryAndWarnMsg[Unit] {
outputStream match {
case hdfs: HdfsDataOutputStream =>
// todo check
hdfs.hsync()
case _ =>
outputStream.flush()
}
}("Error encounters when flush log, ")

def close(): Unit = {
info(s" $toString logWriter close")
flush()
if (outputStream != null) {
Utils.tryCatch{
outputStream.close()
}{
case t:Throwable => //ignore
def flush(): Unit = Utils.tryAndWarnMsg[Unit] {
outputStream match {
case hdfs: HdfsDataOutputStream =>
// todo check
hdfs.hflush()
case _ =>
outputStream.flush()
}
}(s"$toString Error encounters when flush log, ")

def close(): Unit = {
logger.info(s" $toString logWriter close")
flush()
if (outputStream != null) {
Utils.tryQuietly(outputStream.close())
outputStream = null
}
}
}
}

abstract class AbstractLogWriter(logPath: String,
user: String,
charset: String) extends LogWriter(charset) {
if(StringUtils.isBlank(logPath)) throw new EntranceErrorException(20301, "logPath cannot be empty.")
protected val fileSystem = FSFactory.getFsByProxyUser(new FsPath(logPath), user)
fileSystem.init(new util.HashMap[String, String]())

protected val outputStream: OutputStream = {
FileSystemUtils.createNewFile(new FsPath(logPath), user, true)
fileSystem.write(new FsPath(logPath), true)
}

override def close(): Unit = {
super.close()
if (fileSystem != null) Utils.tryQuietly(fileSystem.close(), t => {
warn("Error encounters when closing fileSystem", t)
})
}
}
abstract class AbstractLogWriter(logPath: String,
user: String,
charset: String) extends LogWriter(charset) {
if (StringUtils.isBlank(logPath)) throw new EntranceErrorException(20301, "logPath cannot be empty.")
protected var fileSystem = FSFactory.getFsByProxyUser(new FsPath(logPath), user)
fileSystem.init(new util.HashMap[String, String]())

protected var outputStream: OutputStream = {
FileSystemUtils.createNewFile(new FsPath(logPath), user, true)
fileSystem.write(new FsPath(logPath), true)
}


override def close(): Unit = {
super.close()
if (fileSystem != null) Utils.tryAndWarnMsg{
fileSystem.close()
fileSystem = null
}(s"$toString Error encounters when closing fileSystem")
}

override def toString: String = logPath
}
Expand Up @@ -65,7 +65,7 @@ object SparkConfiguration extends Logging {

val SPARK_DRIVER_CLASSPATH = CommonVars[String]("spark.driver.extraClassPath", "")

val SPARK_DRIVER_EXTRA_JAVA_OPTIONS = CommonVars[String]("spark.driver.extraJavaOptions", "\"-Dwds.linkis.configuration=linkis-engine.properties " + "\"")
val SPARK_DRIVER_EXTRA_JAVA_OPTIONS = CommonVars[String]("spark.driver.extraJavaOptions", "\"-Dwds.linkis.server.conf=linkis-engine.properties " + "\"")

val SPARK_DEFAULT_EXTERNAL_JARS_PATH = CommonVars[String]("spark.external.default.jars", "")

Expand Down
Expand Up @@ -34,7 +34,7 @@ object EnvConfiguration {
val ENGINE_CONN_CLASSPATH_FILES = CommonVars("wds.linkis.engineConn.files", "", "engineConn额外的配置文件")

val ENGINE_CONN_DEFAULT_JAVA_OPTS = CommonVars[String]("wds.linkis.engineConn.javaOpts.default", s"-XX:+UseG1GC -XX:MaxPermSize=250m -XX:PermSize=128m " +
s"-Xloggc:%s -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Dwds.linkis.configuration=linkis-engineconn.properties -Dwds.linkis.gateway.url=${Configuration.getGateWayURL()}")
s"-Xloggc:%s -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Dwds.linkis.server.conf=linkis-engineconn.properties -Dwds.linkis.gateway.url=${Configuration.getGateWayURL()}")

val ENGINE_CONN_MEMORY = CommonVars("wds.linkis.engineConn.memory", new ByteType("2g"), "Specify the memory size of the java client(指定java进程的内存大小)")

Expand All @@ -49,5 +49,7 @@ object EnvConfiguration {

val LOG4J2_XML_FILE = CommonVars[String]("wds.linkis.engineconn.log4j2.xml.file", "log4j2-engineconn.xml")

val LINKIS_PUBLIC_MODULE_PATH = CommonVars("wds.linkis.public_module.path", Configuration.LINKIS_HOME.getValue + "/lib/linkis-commons/public-module")
val LINKIS_PUBLIC_MODULE_PATH = CommonVars("wds.linkis.public_module.path", Configuration.getLinkisHome + "/lib/linkis-commons/public-module")

val LINKIS_CONF_DIR = CommonVars("LINKIS_CONF_DIR", Configuration.getLinkisHome() + "/conf")
}
Expand Up @@ -85,7 +85,9 @@ abstract class JavaProcessEngineConnLaunchBuilder extends ProcessEngineConnLaunc
// addPathToClassPath(environment, variable(PWD))
// first, add engineconn conf dirs.
addPathToClassPath(environment, Seq(variable(PWD), ENGINE_CONN_CONF_DIR_NAME))
// second, add engineconn libs.
// then, add LINKIS_CONF_DIR conf dirs.
addPathToClassPath(environment, Seq(EnvConfiguration.LINKIS_CONF_DIR.getValue))
// then, add engineconn libs.
addPathToClassPath(environment, Seq(variable(PWD), ENGINE_CONN_LIB_DIR_NAME + "/*"))
// then, add public modules.
if (!enablePublicModule) {
Expand Down Expand Up @@ -116,6 +118,7 @@ abstract class JavaProcessEngineConnLaunchBuilder extends ProcessEngineConnLaunc
environment
}


override protected def getNecessaryEnvironment(implicit engineConnBuildRequest: EngineConnBuildRequest): Array[String] =
if(!ifAddHiveConfigPath) Array.empty else Array(HADOOP_CONF_DIR.toString, HIVE_CONF_DIR.toString)

Expand Down

0 comments on commit 83ad189

Please sign in to comment.