Skip to content

Commit

Permalink
fix: refactor taskmanager config and support deleting HDFS files when…
Browse files Browse the repository at this point in the history
… dropping tables (#3369)
  • Loading branch information
tobegit3hub committed Sep 15, 2023
1 parent 330d171 commit 1792700
Show file tree
Hide file tree
Showing 18 changed files with 581 additions and 275 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -28,28 +28,29 @@ public class JobIdGenerator {
static {
try {
zkClient = new ZKClient(ZKConfig.builder()
.cluster(TaskManagerConfig.ZK_CLUSTER)
.namespace(TaskManagerConfig.ZK_ROOT_PATH)
.sessionTimeout(TaskManagerConfig.ZK_SESSION_TIMEOUT)
.baseSleepTime(TaskManagerConfig.ZK_BASE_SLEEP_TIME)
.connectionTimeout(TaskManagerConfig.ZK_CONNECTION_TIMEOUT)
.maxConnectWaitTime(TaskManagerConfig.ZK_MAX_CONNECT_WAIT_TIME)
.maxRetries(TaskManagerConfig.ZK_MAX_RETRIES)
.cluster(TaskManagerConfig.getZkCluster())
.namespace(TaskManagerConfig.getZkRootPath())
.sessionTimeout(TaskManagerConfig.getZkSessionTimeout())
.baseSleepTime(TaskManagerConfig.getZkBaseSleepTime())
.connectionTimeout(TaskManagerConfig.getZkConnectionTimeout())
.maxConnectWaitTime(TaskManagerConfig.getZkMaxConnectWaitTime())
.maxRetries(TaskManagerConfig.getZkMaxRetries())
.build());
zkClient.connect();

// Initialize zk nodes
zkClient.createNode(TaskManagerConfig.ZK_ROOT_PATH, "".getBytes());
zkClient.createNode(TaskManagerConfig.ZK_TASKMANAGER_PATH, "".getBytes());
zkClient.createNode(TaskManagerConfig.getZkRootPath(), "".getBytes());
zkClient.createNode(TaskManagerConfig.getZkTaskmanagerPath(), "".getBytes());

int lastMaxJobId = 0;
if (zkClient.checkExists(TaskManagerConfig.ZK_MAX_JOB_ID_PATH)) {
if (zkClient.checkExists(TaskManagerConfig.getZkMaxJobIdPath())) {
// Get last max job id from zk
lastMaxJobId = Integer.parseInt(zkClient.getNodeValue(TaskManagerConfig.ZK_MAX_JOB_ID_PATH));
lastMaxJobId = Integer.parseInt(zkClient.getNodeValue(TaskManagerConfig.getZkMaxJobIdPath()));
}
currentJobId = lastMaxJobId;
maxJobId = lastMaxJobId + TaskManagerConfig.PREFETCH_JOBID_NUM;
maxJobId = lastMaxJobId + TaskManagerConfig.getPrefetchJobidNum();
// set max job id in zk
zkClient.setNodeValue(TaskManagerConfig.ZK_MAX_JOB_ID_PATH, String.valueOf(maxJobId).getBytes());
zkClient.setNodeValue(TaskManagerConfig.getZkMaxJobIdPath(), String.valueOf(maxJobId).getBytes());

} catch (Exception e) {
zkClient = null;
Expand All @@ -67,8 +68,8 @@ public static int getUniqueId() throws Exception {
currentJobId += 1;
if (currentJobId > maxJobId) {
// Update zk before returning job id
maxJobId += TaskManagerConfig.PREFETCH_JOBID_NUM;
zkClient.setNodeValue(TaskManagerConfig.ZK_MAX_JOB_ID_PATH, String.valueOf(maxJobId).getBytes());
maxJobId += TaskManagerConfig.getPrefetchJobidNum();
zkClient.setNodeValue(TaskManagerConfig.getZkMaxJobIdPath(), String.valueOf(maxJobId).getBytes());
}
return currentJobId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public boolean saveFile(int resultId, String jsonData) {
return true;
}
// save to <log path>/tmp_result/<result_id>/<unique file name>
String savePath = String.format("%s/tmp_result/%d", TaskManagerConfig.JOB_LOG_PATH, resultId);
String savePath = String.format("%s/tmp_result/%d", TaskManagerConfig.getJobLogPath(), resultId);
synchronized (this) {
File saveP = new File(savePath);
if (!saveP.exists()) {
Expand Down Expand Up @@ -151,7 +151,7 @@ public String readResult(int resultId, long timeoutMs) throws InterruptedExcepti
}
String output = "";
// all finished, read csv from savePath
String savePath = String.format("%s/tmp_result/%d", TaskManagerConfig.JOB_LOG_PATH, resultId);
String savePath = String.format("%s/tmp_result/%d", TaskManagerConfig.getJobLogPath(), resultId);
File saveP = new File(savePath);
// If saveP not exists, means no real result saved. But it may use a uncleaned
// path, whether read result succeed or not, we should delete it.
Expand Down Expand Up @@ -225,7 +225,7 @@ public void reset() throws IOException {
synchronized (idStatus) {
Collections.fill(idStatus, 0);
}
String tmpResultDir = String.format("%s/tmp_result", TaskManagerConfig.JOB_LOG_PATH);
String tmpResultDir = String.format("%s/tmp_result", TaskManagerConfig.getJobLogPath());
// delete anyway
FileUtils.forceDelete(new File(tmpResultDir));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com._4paradigm.openmldb.taskmanager.config.ConfigException;
import com._4paradigm.openmldb.taskmanager.tracker.JobTrackerService;
import com._4paradigm.openmldb.taskmanager.util.VersionUtil;
import com._4paradigm.openmldb.taskmanager.zk.FailoverWatcher;
import lombok.extern.slf4j.Slf4j;
import com._4paradigm.openmldb.taskmanager.config.TaskManagerConfig;
Expand All @@ -45,7 +44,7 @@ public class TaskManagerServer {
* @throws ConfigException if config file does not exist or some configs are incorrect.
*/
public TaskManagerServer() throws ConfigException {
TaskManagerConfig.parse();
TaskManagerConfig.print();
}

/**
Expand All @@ -69,7 +68,7 @@ public void start(Boolean blocking) throws ConfigException, IOException, Interru
logger.info("The server runs and prepares for leader election");
if (failoverWatcher.blockUntilActive()) {
logger.info("The server becomes active master and prepare to do business logic");
if (TaskManagerConfig.TRACK_UNFINISHED_JOBS) {
if (TaskManagerConfig.getTrackUnfinishedJobs()) {
// Start threads to track unfinished jobs
JobTrackerService.startTrackerThreads();
}
Expand Down Expand Up @@ -97,14 +96,14 @@ public void startRpcServer(Boolean blocking) throws ConfigException, Interrupted
RpcServerOptions options = new RpcServerOptions();
options.setReceiveBufferSize(64 * 1024 * 1024);
options.setSendBufferSize(64 * 1024 * 1024);
options.setIoThreadNum(TaskManagerConfig.IO_THREAD);
options.setWorkThreadNum(TaskManagerConfig.WORKER_THREAD);
options.setKeepAliveTime(TaskManagerConfig.CHANNEL_KEEP_ALIVE_TIME);
rpcServer = new RpcServer(TaskManagerConfig.PORT, options);
options.setIoThreadNum(TaskManagerConfig.getServerIoThreads());
options.setWorkThreadNum(TaskManagerConfig.getServerWorkerThreads());
options.setKeepAliveTime(TaskManagerConfig.getChannelKeepAliveTime());
rpcServer = new RpcServer(TaskManagerConfig.getServerPort(), options);
rpcServer.registerService(new TaskManagerImpl());
rpcServer.start();
log.info("Start TaskManager on {} with worker thread number {}", TaskManagerConfig.PORT,
TaskManagerConfig.WORKER_THREAD);
log.info("Start TaskManager on {} with worker thread number {}", TaskManagerConfig.getServerPort(),
TaskManagerConfig.getServerWorkerThreads());

if (blocking) {
// make server keep running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,17 @@ public TaskManagerImpl() throws InterruptedException, ConfigException {
*/
private void initExternalFunction() throws InterruptedException {
ZKClient zkClient = new ZKClient(ZKConfig.builder()
.cluster(TaskManagerConfig.ZK_CLUSTER)
.namespace(TaskManagerConfig.ZK_ROOT_PATH)
.sessionTimeout(TaskManagerConfig.ZK_SESSION_TIMEOUT)
.baseSleepTime(TaskManagerConfig.ZK_BASE_SLEEP_TIME)
.connectionTimeout(TaskManagerConfig.ZK_CONNECTION_TIMEOUT)
.maxConnectWaitTime(TaskManagerConfig.ZK_MAX_CONNECT_WAIT_TIME)
.maxRetries(TaskManagerConfig.ZK_MAX_RETRIES)
.cluster(TaskManagerConfig.getZkCluster())
.namespace(TaskManagerConfig.getZkRootPath())
.sessionTimeout(TaskManagerConfig.getZkSessionTimeout())
.baseSleepTime(TaskManagerConfig.getZkBaseSleepTime())
.connectionTimeout(TaskManagerConfig.getZkConnectionTimeout())
.maxConnectWaitTime(TaskManagerConfig.getZkMaxConnectWaitTime())
.maxRetries(TaskManagerConfig.getZkMaxRetries())
.build());
zkClient.connect();

String funPath = TaskManagerConfig.ZK_ROOT_PATH + "/data/function";
String funPath = TaskManagerConfig.getZkRootPath() + "/data/function";
try {
List<String> funNames = zkClient.getChildren(funPath);
for (String name : funNames) {
Expand Down Expand Up @@ -220,7 +220,7 @@ public TaskManager.RunBatchSqlResponse RunBatchSql(TaskManager.RunBatchSqlReques
// HOST can't be 0.0.0.0 if distributed or spark is not local
confMap.put("spark.openmldb.savejobresult.http",
String.format("http://%s:%d/openmldb.taskmanager.TaskManagerServer/SaveJobResult",
TaskManagerConfig.HOST, TaskManagerConfig.PORT));
TaskManagerConfig.getServerHost(), TaskManagerConfig.getServerPort()));
// we can't get spark job id here, so we use JobResultSaver id, != spark job id
// if too much running jobs to save result, throw exception
int resultId = jobResultSaver.genResultId();
Expand All @@ -234,7 +234,7 @@ public TaskManager.RunBatchSqlResponse RunBatchSql(TaskManager.RunBatchSqlReques
if (finalJobInfo.isSuccess()) {
// wait for all files of result saved and read them, large timeout
// TODO: Test for K8S backend
String output = jobResultSaver.readResult(resultId, TaskManagerConfig.BATCH_JOB_RESULT_MAX_WAIT_TIME);
String output = jobResultSaver.readResult(resultId, TaskManagerConfig.getBatchJobResultMaxWaitTime());
return TaskManager.RunBatchSqlResponse.newBuilder().setCode(StatusCode.SUCCESS).setOutput(output)
.build();
} else {
Expand All @@ -253,7 +253,7 @@ public TaskManager.RunBatchSqlResponse RunBatchSql(TaskManager.RunBatchSqlReques
// rpc max time is CHANNEL_KEEP_ALIVE_TIME, so we don't need to wait too long
private JobInfo busyWaitJobInfo(int jobId, int waitSeconds) throws InterruptedException {
long maxWaitEnd = System.currentTimeMillis()
+ (waitSeconds == 0 ? TaskManagerConfig.CHANNEL_KEEP_ALIVE_TIME : waitSeconds) * 1000;
+ (waitSeconds == 0 ? TaskManagerConfig.getChannelKeepAliveTime() : waitSeconds) * 1000;
while (System.currentTimeMillis() < maxWaitEnd) {
Option<JobInfo> info = JobInfoManager.getJob(jobId);
if (info.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class ExternalFunctionManager {
static private Map<String, String> nameFileMap = new ConcurrentHashMap<>();

static public String getLibraryFilePath(String libraryFileName) {
return Paths.get(TaskManagerConfig.EXTERNAL_FUNCTION_DIR, libraryFileName).toString();
return Paths.get(TaskManagerConfig.getExternalFunctionDir(), libraryFileName).toString();
}

static public void addFunction(String fnName, String libraryFileName) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,13 @@ public class FailoverWatcher implements Watcher {
*/
public FailoverWatcher() throws IOException {

baseZnode = TaskManagerConfig.ZK_ROOT_PATH + "/taskmanager";
baseZnode = TaskManagerConfig.getZkRootPath() + "/taskmanager";
masterZnode = baseZnode + "/leader";
zkQuorum = TaskManagerConfig.ZK_CLUSTER;
sessionTimeout = TaskManagerConfig.ZK_SESSION_TIMEOUT;
zkQuorum = TaskManagerConfig.getZkCluster();
sessionTimeout = TaskManagerConfig.getZkSessionTimeout();
connectRetryTimes = 3;
String serverHost = TaskManagerConfig.HOST;
int serverPort = TaskManagerConfig.PORT;
String serverHost = TaskManagerConfig.getServerHost();
int serverPort = TaskManagerConfig.getServerPort();
hostPort = new HostPort(serverHost, serverPort);

connectZooKeeper();
Expand Down Expand Up @@ -91,7 +91,7 @@ protected void connectZooKeeper() throws IOException {
*/
protected void initZnode() {
try {
ZooKeeperUtil.createAndFailSilent(this, TaskManagerConfig.ZK_ROOT_PATH);
ZooKeeperUtil.createAndFailSilent(this, TaskManagerConfig.getZkRootPath());
ZooKeeperUtil.createAndFailSilent(this, baseZnode);
} catch (Exception e) {
LOG.fatal("Error to create znode " + baseZnode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,5 @@ spark.default.conf=
spark.eventLog.dir=
spark.yarn.maxAppAttempts=1
batchjob.jar.path=
namenode.uri=
offline.data.prefix=file:///tmp/openmldb_offline_storage/
hadoop.conf.dir=
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com._4paradigm.openmldb.sdk.SdkOption
import com._4paradigm.openmldb.sdk.impl.SqlClusterExecutor
import com._4paradigm.openmldb.taskmanager.config.TaskManagerConfig
import com._4paradigm.openmldb.taskmanager.dao.{JobIdGenerator, JobInfo}
import com._4paradigm.openmldb.taskmanager.util.HdfsUtil
import com._4paradigm.openmldb.taskmanager.yarn.YarnClientUtil
import org.slf4j.LoggerFactory
import org.apache.hadoop.fs.{FileSystem, LocalFileSystem, Path}
Expand All @@ -42,8 +43,8 @@ object JobInfoManager {
private val JOB_INFO_TABLE_NAME = "JOB_INFO"

private val option = new SdkOption
option.setZkCluster(TaskManagerConfig.ZK_CLUSTER)
option.setZkPath(TaskManagerConfig.ZK_ROOT_PATH)
option.setZkCluster(TaskManagerConfig.getZkCluster)
option.setZkPath(TaskManagerConfig.getZkRootPath)
val sqlExecutor = new SqlClusterExecutor(option)
sqlExecutor.executeSQL("", "set @@execute_mode='online';")

Expand All @@ -52,7 +53,7 @@ object JobInfoManager {
val startTime = new java.sql.Timestamp(Calendar.getInstance.getTime().getTime())
val initialState = "Submitted"
val parameter = if (args != null && args.length>0) args.mkString(",") else ""
val cluster = sparkConf.getOrElse("spark.master", TaskManagerConfig.SPARK_MASTER)
val cluster = sparkConf.getOrElse("spark.master", TaskManagerConfig.getSparkMaster)

// TODO: Parse if run in yarn or local
val jobInfo = new JobInfo(jobId, jobType, initialState, startTime, null, parameter, cluster, "", "")
Expand Down Expand Up @@ -210,12 +211,8 @@ object JobInfoManager {
FileUtils.deleteDirectory(dir)

} else if (filePath.startsWith("hdfs://")) {
val conf = new Configuration();
// TODO: Get namenode uri from config file
val namenodeUri = TaskManagerConfig.NAMENODE_URI
val hdfs = FileSystem.get(URI.create(s"hdfs://$namenodeUri"), conf)
hdfs.delete(new Path(filePath), true)

logger.info(s"Try to delete the HDFS path ${filePath}")
HdfsUtil.deleteHdfsDir(filePath)
} else {
throw new Exception(s"Get unsupported file path: $filePath")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ object LogManager {
private val logger = LoggerFactory.getLogger(this.getClass)

def getJobLogFile(id: Int): File = {
Paths.get(TaskManagerConfig.JOB_LOG_PATH, s"job_${id}.log").toFile
Paths.get(TaskManagerConfig.getJobLogPath, s"job_${id}.log").toFile
}

def getJobErrorLogFile(id: Int): File = {
Paths.get(TaskManagerConfig.JOB_LOG_PATH, s"job_${id}_error.log").toFile
Paths.get(TaskManagerConfig.getJobLogPath, s"job_${id}_error.log").toFile
}

def getFileContent(inputFile: File): String = {
Expand Down
Loading

0 comments on commit 1792700

Please sign in to comment.