Skip to content

Commit

Permalink
KYLIN-4334 Auto discard all jobs while disable the Real Time Cube
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxiaojing authored and hit-lacus committed Feb 4, 2020
1 parent 6e7c33d commit 2a68874
Showing 1 changed file with 22 additions and 0 deletions.
Expand Up @@ -32,6 +32,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.lock.DistributedLock;
import org.apache.kylin.common.persistence.RootPersistentEntity;
import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.HadoopUtil;
Expand All @@ -47,6 +48,7 @@
import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.CubingJob;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.CubeJobLockUtil;
import org.apache.kylin.engine.mr.common.CuboidRecommenderUtil;
import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.common.PatternedLogger;
Expand Down Expand Up @@ -438,6 +440,10 @@ public CubeInstance disableCube(CubeInstance cube) throws IOException {
}
//unAssign cube
getStreamingCoordinator().unAssignCube(cubeName);

//discard jobs
releaseAllJobs(cubeInstance);

}
return cubeInstance;
} catch (Exception e) {
Expand Down Expand Up @@ -670,6 +676,22 @@ protected void releaseAllJobs(CubeInstance cube) {
final ExecutableState status = cubingJob.getStatus();
if (status != ExecutableState.SUCCEED && status != ExecutableState.DISCARDED) {
getExecutableManager().discardJob(cubingJob.getId());

//release global dict lock if exists
DistributedLock lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory()
.lockForCurrentThread();
if (lock.isLocked(CubeJobLockUtil.getLockPath(cube.getName(), cubingJob.getId()))) {//release cube job dict lock if exists
lock.purgeLocks(CubeJobLockUtil.getLockPath(cube.getName(), null));
logger.info("{} unlock cube job global lock path({}) success", cubingJob.getId(),
CubeJobLockUtil.getLockPath(cube.getName(), null));

if (lock.isLocked(CubeJobLockUtil.getEphemeralLockPath(cube.getName()))) {//release cube job Ephemeral lock if exists
lock.purgeLocks(CubeJobLockUtil.getEphemeralLockPath(cube.getName()));
logger.info("{} unlock cube job ephemeral lock path({}) success", cubingJob.getId(),
CubeJobLockUtil.getEphemeralLockPath(cube.getName()));
}
}

}
}
}
Expand Down

0 comments on commit 2a68874

Please sign in to comment.