From bd951ebd6bf23c2e794d1af0df53c223d08fae96 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Sat, 11 Jan 2020 15:44:24 +0800 Subject: [PATCH] KYLIN-4334 Auto discard all jobs while disable the Real Time Cube --- .../kylin/rest/service/CubeService.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java index 8dd7dd9832c..9f7e616f6dd 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java +++ b/server-base/src/main/java/org/apache/kylin/rest/service/CubeService.java @@ -32,6 +32,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.lock.DistributedLock; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.common.util.CliCommandExecutor; import org.apache.kylin.common.util.HadoopUtil; @@ -47,6 +48,7 @@ import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.CubingJob; import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.engine.mr.common.CubeJobLockUtil; import org.apache.kylin.engine.mr.common.CuboidRecommenderUtil; import org.apache.kylin.job.JobInstance; import org.apache.kylin.job.common.PatternedLogger; @@ -436,6 +438,10 @@ public CubeInstance disableCube(CubeInstance cube) throws IOException { } //unAssign cube getStreamingCoordinator().unAssignCube(cubeName); + + //discard jobs + releaseAllJobs(cubeInstance); + } return cubeInstance; } catch (Exception e) { @@ -665,6 +671,22 @@ protected void releaseAllJobs(CubeInstance cube) { final ExecutableState status = cubingJob.getStatus(); if (status != ExecutableState.SUCCEED && status != ExecutableState.DISCARDED) { getExecutableManager().discardJob(cubingJob.getId()); + + //release global dict lock if exists + DistributedLock lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory() + .lockForCurrentThread(); + if (lock.isLocked(CubeJobLockUtil.getLockPath(cube.getName(), cubingJob.getId()))) {//release cube job dict lock if exists + lock.purgeLocks(CubeJobLockUtil.getLockPath(cube.getName(), null)); + logger.info("{} unlock cube job global lock path({}) success", cubingJob.getId(), + CubeJobLockUtil.getLockPath(cube.getName(), null)); + + if (lock.isLocked(CubeJobLockUtil.getEphemeralLockPath(cube.getName()))) {//release cube job Ephemeral lock if exists + lock.purgeLocks(CubeJobLockUtil.getEphemeralLockPath(cube.getName())); + logger.info("{} unlock cube job ephemeral lock path({}) success", cubingJob.getId(), + CubeJobLockUtil.getEphemeralLockPath(cube.getName())); + } + } + } } }