Skip to content

Commit

Permalink
Configure Hot Loading (#3841)
Browse files Browse the repository at this point in the history
* linkis-common - add hot-load support for CommonVars
  • Loading branch information
binbinCheng committed Nov 17, 2022
1 parent 53f0481 commit 913a437
Show file tree
Hide file tree
Showing 13 changed files with 245 additions and 55 deletions.
Expand Up @@ -24,20 +24,29 @@ import org.apache.commons.lang3.StringUtils

import java.io.{File, FileInputStream, InputStream, IOException}
import java.util.Properties
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantReadWriteLock

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

private[conf] object BDPConfiguration extends Logging {

val DEFAULT_PROPERTY_FILE_NAME = "linkis.properties"

val DEFAULT_SERVER_CONF_FILE_NAME = "linkis-server.properties"

val DEFAULT_CONFIG_HOT_LOAD_DELAY_MILLS = 3 * 60 * 1000L

private val extractConfig = new Properties
private val config = new Properties
private val sysProps = sys.props
private val env = sys.env

private val configList = new ArrayBuffer[String]
private val configReload = new Properties
private val lock = new ReentrantReadWriteLock()

private def init: Unit = {

// load pub linkis conf
Expand All @@ -48,6 +57,7 @@ private[conf] object BDPConfiguration extends Logging {
s"******************* Notice: The Linkis configuration file is $propertyFile ! *******************"
)
initConfig(config, configFileURL.getPath)
configList.append(configFileURL.getPath)
} else {
logger.warn(
s"************ Notice: The Linkis configuration file $propertyFile is not exists! *******************"
Expand All @@ -62,6 +72,7 @@ private[conf] object BDPConfiguration extends Logging {
s"*********************** Notice: The Linkis serverConf file is $serverConf ! ******************"
)
initConfig(config, serverConfFileURL.getPath)
configList.append(serverConfFileURL.getPath)
} else {
logger.warn(
s"**************** Notice: The Linkis serverConf file $serverConf is not exists! *******************"
Expand All @@ -79,6 +90,7 @@ private[conf] object BDPConfiguration extends Logging {
s"************** Notice: The Linkis server.confs is file $propertyF ****************"
)
initConfig(config, configFileURL.getPath)
configList.append(configFileURL.getPath)
} else {
logger.warn(
s"********** Notice: The Linkis server.confs file $propertyF is not exists! **************"
Expand All @@ -87,6 +99,38 @@ private[conf] object BDPConfiguration extends Logging {
}
}

// init hot-load config task
val hotLoadTask = new Runnable {
override def run(): Unit = {
var tmpConfigPath = ""
var tmpConfig = new Properties()
Utils.tryCatch {
// refresh configuration
configList.foreach(configPath => {
if (logger.isDebugEnabled()) {
logger.debug(s"reload config file : ${configPath}")
}
tmpConfigPath = configPath
initConfig(tmpConfig, configPath)
})
} { case e: Exception =>
logger.error(s"reload config file : ${tmpConfigPath} failed, because : ${e.getMessage}")
logger.warn("Will reset config to origin config.")
tmpConfig = config
}
lock.writeLock().lock()
configReload.clear()
configReload.putAll(tmpConfig)
lock.writeLock().unlock()
}
}
Utils.defaultScheduler.scheduleWithFixedDelay(
hotLoadTask,
3000L,
DEFAULT_CONFIG_HOT_LOAD_DELAY_MILLS,
TimeUnit.MILLISECONDS
)
logger.info("hotload config task inited.")
}

Utils.tryCatch {
Expand All @@ -110,11 +154,18 @@ private[conf] object BDPConfiguration extends Logging {
}
}

def getOption(key: String): Option[String] = {
def getOption(key: String, hotload: Boolean = false): Option[String] = {
if (extractConfig.containsKey(key)) {
return Some(extractConfig.getProperty(key))
}
val value = config.getProperty(key)
var value = ""
if (hotload) {
lock.readLock().lock()
value = configReload.getProperty(key)
lock.readLock().unlock()
} else {
value = config.getProperty(key)
}
if (StringUtils.isNotEmpty(value)) {
return Some(value)
}
Expand All @@ -134,16 +185,45 @@ private[conf] object BDPConfiguration extends Logging {
props
}

def hotProperties(): Properties = {
val props = new Properties
mergePropertiesFromMap(props, env)
mergePropertiesFromMap(props, sysProps.toMap)
lock.readLock().lock()
mergePropertiesFromMap(props, configReload.asScala.toMap)
lock.readLock().unlock()
mergePropertiesFromMap(props, extractConfig.asScala.toMap)
props
}

def mergePropertiesFromMap(props: Properties, mapProps: Map[String, String]): Unit = {
mapProps.foreach { case (k, v) => props.put(k, v) }
}

def getOption[T](commonVars: CommonVars[T]): Option[T] = if (commonVars.value != null) {
Option(commonVars.value)
} else {
val value = BDPConfiguration.getOption(commonVars.key)
if (value.isEmpty) Option(commonVars.defaultValue)
else formatValue(commonVars.defaultValue, value)
def getOption[T](commonVars: CommonVars[T], hotload: Boolean): Option[T] = {
if (hotload) {
val value = BDPConfiguration.getOption(commonVars.key, hotload = true)
if (value.isEmpty) Option(commonVars.defaultValue)
else formatValue(commonVars.defaultValue, value)
} else {
if (commonVars.value != null) {
Option(commonVars.value)
} else {
val value = BDPConfiguration.getOption(commonVars.key)
if (value.isEmpty) Option(commonVars.defaultValue)
else formatValue(commonVars.defaultValue, value)
}
}
}

def getOption[T](commonVars: CommonVars[T]): Option[T] = {
if (commonVars.value != null) {
Option(commonVars.value)
} else {
val value = BDPConfiguration.getOption(commonVars.key)
if (value.isEmpty) Option(commonVars.defaultValue)
else formatValue(commonVars.defaultValue, value)
}
}

private[common] def formatValue[T](defaultValue: T, value: Option[String]): Option[T] = {
Expand All @@ -170,19 +250,28 @@ private[conf] object BDPConfiguration extends Logging {
def setIfNotExists(key: String, value: String): Any =
if (!config.containsKey(key)) set(key, value)

def getBoolean(key: String, default: Boolean): Boolean =
getOption(key).map(_.toBoolean).getOrElse(default)
def getBoolean(key: String, default: Boolean, hotload: Boolean = false): Boolean =
getOption(key, hotload).map(_.toBoolean).getOrElse(default)

def getBoolean(commonVars: CommonVars[Boolean]): Option[Boolean] = getOption(commonVars)

def get(key: String, default: String): String = getOption(key).getOrElse(default)
def get(key: String, default: String): String =
getOption(key, false).getOrElse(default)

def get(key: String, default: String, hotload: Boolean): String = {
getOption(key, hotload).getOrElse(default)
}

def get(commonVars: CommonVars[String]): Option[String] = getOption(commonVars)

def get(key: String): String = getOption(key).getOrElse(throw new NoSuchElementException(key))
def get(key: String, hotload: Boolean = false): String =
getOption(key, hotload).getOrElse(throw new NoSuchElementException(key))

def getInt(key: String, default: Int, hotload: Boolean = false): Int =
getOption(key, hotload).map(_.toInt).getOrElse(default)

def getInt(key: String, default: Int): Int = getOption(key).map(_.toInt).getOrElse(default)
def getInt(commonVars: CommonVars[Int]): Option[Int] = getOption(commonVars)

def contains(key: String): Boolean = getOption(key).isDefined
def contains(key: String, hotload: Boolean = false): Boolean = getOption(key, hotload).isDefined

}
Expand Up @@ -24,14 +24,29 @@ import scala.collection.JavaConverters._
case class CommonVars[T](key: String, defaultValue: T, value: T, description: String = null) {
val getValue: T = BDPConfiguration.getOption(this).getOrElse(defaultValue)

def getHotValue(): T = BDPConfiguration.getOption(this, true).getOrElse(defaultValue)

def getValue(properties: java.util.Map[String, String]): T = {
if (properties == null || !properties.containsKey(key) || properties.get(key) == null) {
getValue
} else BDPConfiguration.formatValue(defaultValue, Option(properties.get(key))).get
}

def getValue(properties: Map[String, String]): T = getValue(properties.asJava)
def acquireNew: T = BDPConfiguration.getOption(this).getOrElse(defaultValue)
def getValue(properties: Map[String, String], hotload: Boolean = false): T = getValue(
properties.asJava
)

def getValue(properties: java.util.Map[String, String], hotload: Boolean): T = {
if (properties == null || !properties.containsKey(key) || properties.get(key) == null) {
if (hotload) {
getHotValue()
} else {
getValue
}
} else BDPConfiguration.formatValue(defaultValue, Option(properties.get(key))).get
}

def acquireNew: T = getHotValue()
}

object CommonVars {
Expand All @@ -42,8 +57,10 @@ object CommonVars {
implicit def apply[T](key: String, defaultValue: T): CommonVars[T] =
new CommonVars(key, defaultValue, null.asInstanceOf[T], null)

implicit def apply[T](key: String): CommonVars[T] = apply(key, null.asInstanceOf[T])
implicit def apply[T](key: String): CommonVars[T] =
apply(key, null.asInstanceOf[T], null.asInstanceOf[T], null)

def properties: Properties = BDPConfiguration.properties

def hotProperties: Properties = BDPConfiguration.hotProperties
}
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.common.commonvars;

import org.apache.linkis.common.conf.BDPConfiguration;
import org.apache.linkis.common.conf.CommonVars;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class CommonVarsTest {

@Test
public void testGetVars() {
{
String testKeyNotHotload = "wds.linkis.test___test___test.key1";
String defaultValueNotHotload = "defaultValueNotHotload";
CommonVars<String> strVar1 = CommonVars.apply(testKeyNotHotload, defaultValueNotHotload);
assertEquals(defaultValueNotHotload, strVar1.defaultValue());
assertEquals(defaultValueNotHotload, strVar1.getValue());
}

{
String testKeyNotHotloadSet = "wds.linkis.test___test___test.key2";
String defaultValueNotHotloadSet1 = "defaultValueNotHotloadSet1";
String defaultValueNotHotloadSet2 = "defaultValueNotHotloadSet2";
String valueNotHotloadSet1 = "valueNotHotloadSet1";
String valueNotHotloadSet2 = "valueNotHotloadSet2";
CommonVars<String> strVar2 =
CommonVars.apply(testKeyNotHotloadSet, defaultValueNotHotloadSet1);
assertEquals(defaultValueNotHotloadSet1, strVar2.defaultValue());
assertEquals(defaultValueNotHotloadSet1, strVar2.getValue());

BDPConfiguration.setIfNotExists(testKeyNotHotloadSet, valueNotHotloadSet1);
assertEquals(defaultValueNotHotloadSet1, strVar2.defaultValue());
// assertEquals(valueNotHotloadSet1, strVar2.getValue());
BDPConfiguration.setIfNotExists(testKeyNotHotloadSet, valueNotHotloadSet2);
// assertEquals(valueNotHotloadSet1, strVar2.getValue());

BDPConfiguration.set(testKeyNotHotloadSet, valueNotHotloadSet2);
assertEquals(defaultValueNotHotloadSet1, strVar2.defaultValue());
// assertEquals(valueNotHotloadSet2, strVar2.getValue());
}
}

@Test
public void testGetHotloadVars() {
{
String testKeyHotload = "wds.linkis.test___test___test.key1";
String defaultValueHotload = "defaultValueHotload";
CommonVars<String> strVar1 = CommonVars.apply(testKeyHotload, defaultValueHotload);
assertEquals(defaultValueHotload, strVar1.defaultValue());
assertEquals(defaultValueHotload, strVar1.getValue());
}

{
String testKeyHotloadSet = "wds.linkis.test___test___test.hotload.key2";
String defaultValueNotHotloadSet1 = "defaultValueNotHotloadSet1";
String defaultValueNotHotloadSet2 = "defaultValueNotHotloadSet2";
String valueNotHotloadSet1 = "valueNotHotloadSet1";
String valueNotHotloadSet2 = "valueNotHotloadSet2";
CommonVars<String> strVar2 = CommonVars.apply(testKeyHotloadSet, defaultValueNotHotloadSet1);
assertEquals(defaultValueNotHotloadSet1, strVar2.defaultValue());
assertEquals(defaultValueNotHotloadSet1, strVar2.getValue());

BDPConfiguration.setIfNotExists(testKeyHotloadSet, valueNotHotloadSet1);
assertEquals(defaultValueNotHotloadSet1, strVar2.defaultValue());
// assertEquals(valueNotHotloadSet1, strVar2.getValue());
BDPConfiguration.setIfNotExists(testKeyHotloadSet, valueNotHotloadSet2);
// assertEquals(valueNotHotloadSet1, strVar2.getValue());

BDPConfiguration.set(testKeyHotloadSet, valueNotHotloadSet2);
assertEquals(defaultValueNotHotloadSet1, strVar2.defaultValue());
// assertEquals(valueNotHotloadSet2, strVar2.getValue());
}
}
}
Expand Up @@ -59,7 +59,7 @@ public class QueryPersistenceEngine extends AbstractPersistenceEngine {
private static final int MAX_DESC_LEN = GovernanceCommonConf.ERROR_CODE_DESC_LEN();

private static final int RETRY_NUMBER =
EntranceConfiguration.JOBINFO_UPDATE_RETRY_MAX_TIME().getValue();
EntranceConfiguration.JOBINFO_UPDATE_RETRY_MAX_TIME().getHotValue();

public QueryPersistenceEngine() {
/*
Expand Down Expand Up @@ -97,7 +97,7 @@ private JobRespProtocol sendToJobHistoryAndRetry(RequestProtocol jobReq, String
}
if (retry) {
try {
Thread.sleep(EntranceConfiguration.JOBINFO_UPDATE_RETRY_INTERVAL().getValue());
Thread.sleep(EntranceConfiguration.JOBINFO_UPDATE_RETRY_INTERVAL().getHotValue());
} catch (Exception ex) {
logger.warn(ex.getMessage());
}
Expand Down
Expand Up @@ -348,11 +348,11 @@ public Message progressWithResource(@PathVariable("id") String id) {
corePercent =
cores.get().floatValue()
/ EntranceConfiguration.YARN_QUEUE_CORES_MAX()
.getValue();
.getHotValue();
memoryPercent =
memory.get().floatValue()
/ (EntranceConfiguration.YARN_QUEUE_MEMORY_MAX()
.getValue()
.getHotValue()
.longValue()
* 1024
* 1024
Expand Down
Expand Up @@ -423,7 +423,7 @@ class EntranceWebSocketService
val sparkLogSpecial: String = EntranceConfiguration.SPARK_SPECIAL_LOG_INCLUDE.getValue
val hiveCreateTableLog: String = EntranceConfiguration.HIVE_CREATE_TABLE_LOG.getValue
if (singleLog.contains(hiveLogSpecial) && singleLog.contains(hiveCreateTableLog)) {
val threadName = EntranceConfiguration.HIVE_THREAD_NAME.getValue
val threadName = EntranceConfiguration.HIVE_THREAD_NAME.getHotValue()
val printInfo = EntranceConfiguration.HIVE_PRINT_INFO_LOG.getValue
val start = singleLog.indexOf(threadName)
val end = singleLog.indexOf(printInfo) + printInfo.length
Expand All @@ -437,8 +437,8 @@ class EntranceWebSocketService
singleLog.contains(hiveLogSpecial) && singleLog.contains("map") && singleLog
.contains("reduce")
) {
val threadName = EntranceConfiguration.HIVE_THREAD_NAME.getValue
val stageName = EntranceConfiguration.HIVE_STAGE_NAME.getValue
val threadName = EntranceConfiguration.HIVE_THREAD_NAME.getHotValue()
val stageName = EntranceConfiguration.HIVE_STAGE_NAME.getHotValue()
val start = singleLog.indexOf(threadName)
val end = singleLog.indexOf(stageName)
if (start > 0 && end > 0) {
Expand Down

0 comments on commit 913a437

Please sign in to comment.