Skip to content

Commit

Permalink
KYLIN-3194 use BrokenExecutable to tolerate missing job classes
Browse files Browse the repository at this point in the history
  • Loading branch information
liyang-kylin committed Feb 4, 2018
1 parent c771833 commit f797c84
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 110 deletions.
Expand Up @@ -259,7 +259,7 @@ public final void setId(String id) {
}

@Override
public final ExecutableState getStatus() {
public ExecutableState getStatus() {
ExecutableManager manager = getManager();
return manager.getOutput(this.getId()).getState();
}
Expand Down
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kylin.job.execution;

import org.apache.kylin.job.exception.ExecuteException;

/**
* A special Executable used to indicate any executable whose metadata is broken.
*/
public class BrokenExecutable extends AbstractExecutable {

public BrokenExecutable() {
super();
}

@Override
public String getName() {
return "[BROKEN] " + super.getName();
}

@Override
public ExecutableState getStatus() {
return ExecutableState.DISCARDED;
}

@Override
protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
throw new UnsupportedOperationException();
}

}
Expand Up @@ -23,7 +23,6 @@
import static org.apache.kylin.job.constant.ExecutableConstants.YARN_APP_URL;

import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.IllegalFormatException;
import java.util.List;
Expand Down Expand Up @@ -239,45 +238,6 @@ public List<AbstractExecutable> getAllExecutables(long timeStartInMillis, long t
}
}

/**
* Since ExecutableManager will instantiate all AbstractExecutable class by Class.forName(), but for each version release,
* new classes are introduced, old classes are deprecated, renamed or removed. The Class.forName() will throw out
* ClassNotFoundException. This API is used to retrieve the Executable Object list, not for calling the object method,
* so we could just instance the parent common class instead of the concrete class. It will tolerate the class missing issue.
*
* @param timeStartInMillis
* @param timeEndInMillis
* @param expectedClass
* @return
*/
public List<AbstractExecutable> getAllAbstractExecutables(long timeStartInMillis, long timeEndInMillis,
Class<? extends AbstractExecutable> expectedClass) {
try {
List<AbstractExecutable> ret = Lists.newArrayList();
for (ExecutablePO po : executableDao.getJobs(timeStartInMillis, timeEndInMillis)) {
try {
AbstractExecutable ae = parseToAbstract(po, expectedClass);
ret.add(ae);
} catch (IllegalArgumentException e) {
logger.error("error parsing one executabePO: ", e);
}
}
return ret;
} catch (PersistentException e) {
logger.error("error get All Jobs", e);
throw new RuntimeException(e);
}
}

public AbstractExecutable getAbstractExecutable(String uuid, Class<? extends AbstractExecutable> expectedClass) {
try {
return parseToAbstract(executableDao.getJob(uuid), expectedClass);
} catch (PersistentException e) {
logger.error("fail to get job:" + uuid, e);
throw new RuntimeException(e);
}
}

public List<String> getAllJobIds() {
try {
return executableDao.getJobIds();
Expand Down Expand Up @@ -536,70 +496,40 @@ private AbstractExecutable parseTo(ExecutablePO executablePO) {
return null;
}
String type = executablePO.getType();
try {
Class<? extends AbstractExecutable> clazz = ClassUtil.forName(type, AbstractExecutable.class);
Constructor<? extends AbstractExecutable> constructor = clazz.getConstructor();
AbstractExecutable result = constructor.newInstance();
result.initConfig(config);
result.setId(executablePO.getUuid());
result.setName(executablePO.getName());
result.setParams(executablePO.getParams());
List<ExecutablePO> tasks = executablePO.getTasks();
if (tasks != null && !tasks.isEmpty()) {
Preconditions.checkArgument(result instanceof ChainedExecutable);
for (ExecutablePO subTask : tasks) {
((ChainedExecutable) result).addTask(parseTo(subTask));
}
AbstractExecutable result = newExecutable(type);
result.initConfig(config);
result.setId(executablePO.getUuid());
result.setName(executablePO.getName());
result.setParams(executablePO.getParams());
List<ExecutablePO> tasks = executablePO.getTasks();
if (tasks != null && !tasks.isEmpty()) {
Preconditions.checkArgument(result instanceof ChainedExecutable);
for (ExecutablePO subTask : tasks) {
((ChainedExecutable) result).addTask(parseTo(subTask));
}
List<ExecutablePO> tasksForCheck = executablePO.getTasksForCheck();
if (tasksForCheck != null && !tasksForCheck.isEmpty()) {
Preconditions.checkArgument(result instanceof CheckpointExecutable);
for (ExecutablePO subTaskForCheck : tasksForCheck) {
((CheckpointExecutable) result).addTaskForCheck(parseTo(subTaskForCheck));
}
}
List<ExecutablePO> tasksForCheck = executablePO.getTasksForCheck();
if (tasksForCheck != null && !tasksForCheck.isEmpty()) {
Preconditions.checkArgument(result instanceof CheckpointExecutable);
for (ExecutablePO subTaskForCheck : tasksForCheck) {
((CheckpointExecutable) result).addTaskForCheck(parseTo(subTaskForCheck));
}
return result;
} catch (ReflectiveOperationException e) {
throw new IllegalStateException("cannot parse this job:" + executablePO.getId(), e);
}
return result;
}

private AbstractExecutable parseToAbstract(ExecutablePO executablePO,
Class<? extends AbstractExecutable> expectedClass) {
if (executablePO == null) {
logger.warn("executablePO is null");
return null;
private AbstractExecutable newExecutable(String type) {
Class<? extends AbstractExecutable> clazz;
try {
clazz = ClassUtil.forName(type, AbstractExecutable.class);
} catch (ClassNotFoundException ex) {
clazz = BrokenExecutable.class;
logger.error("Unknown executable type '" + type + "', using BrokenExecutable");
}
String type = executablePO.getType();
try {
Class<? extends AbstractExecutable> clazz = null;
try {
clazz = ClassUtil.forName(type, AbstractExecutable.class);
} catch (ClassNotFoundException e) {
clazz = ClassUtil.forName(expectedClass.getName(), AbstractExecutable.class);
}
Constructor<? extends AbstractExecutable> constructor = clazz.getConstructor();
AbstractExecutable result = constructor.newInstance();
result.initConfig(config);
result.setId(executablePO.getUuid());
result.setName(executablePO.getName());
result.setParams(executablePO.getParams());
List<ExecutablePO> tasks = executablePO.getTasks();
if (tasks != null && !tasks.isEmpty()) {
Preconditions.checkArgument(result instanceof ChainedExecutable);
for (ExecutablePO subTask : tasks) {
AbstractExecutable parseToTask = null;
try {
parseToTask = parseTo(subTask);
} catch (IllegalStateException e) {
parseToTask = parseToAbstract(subTask, DefaultChainedExecutable.class);
}
((ChainedExecutable) result).addTask(parseToTask);
}
}
return result;
} catch (ReflectiveOperationException e) {
throw new IllegalStateException("cannot parse this job:" + executablePO.getId(), e);
return clazz.getConstructor().newInstance();
} catch (Exception e) {
throw new RuntimeException("Failed to instantiate " + clazz, e);
}
}
}
Expand Up @@ -47,12 +47,6 @@ public class ExecutableManagerTest extends LocalFileMetadataTestCase {
public void setup() throws Exception {
createTestMetadata();
service = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());

for (String jobId : service.getAllJobIds()) {
System.out.println("deleting " + jobId);
service.deleteJob(jobId);
}

}

@After
Expand All @@ -63,13 +57,21 @@ public void after() throws Exception {
@Test
public void test() throws Exception {
assertNotNull(service);

// all existing are broken jobs
List<AbstractExecutable> existing = service.getAllExecutables();
for (AbstractExecutable exec : existing) {
assertEquals("BrokenExecutable", exec.getClass().getSimpleName());
assertEquals(ExecutableState.DISCARDED, exec.getStatus());
}

BaseTestExecutable executable = new SucceedTestExecutable();
executable.setParam("test1", "test1");
executable.setParam("test2", "test2");
executable.setParam("test3", "test3");
service.addJob(executable);
List<AbstractExecutable> result = service.getAllExecutables();
assertEquals(1, result.size());
assertEquals(existing.size() + 1, result.size());
AbstractExecutable another = service.getJob(executable.getId());
assertJobEqual(executable, another);

Expand Down
@@ -0,0 +1,14 @@
{
"uuid" : "d9a2b721-9916-4607-8047-148ceb2473b1",
"last_modified" : 1516778161249,
"version" : "2.3.0",
"name" : null,
"tasks" : null,
"tasks_check" : null,
"type" : "org.apache.kylin.job.BadClassName",
"params" : {
"test2" : "test2",
"test3" : "test3",
"test1" : "test1"
}
}
Expand Up @@ -725,9 +725,9 @@ public boolean apply(@Nullable JobInstance input) {
public List<CubingJob> innerSearchCubingJobs(final String cubeName, final String jobName,
final Set<ExecutableState> statusList, long timeStartInMillis, long timeEndInMillis,
final Map<String, Output> allOutputs, final boolean nameExactMatch, final String projectName) {
List<CubingJob> results = Lists.newArrayList(FluentIterable.from(
getExecutableManager().getAllAbstractExecutables(timeStartInMillis, timeEndInMillis, CubingJob.class))
.filter(new Predicate<AbstractExecutable>() {
List<CubingJob> results = Lists.newArrayList(
FluentIterable.from(getExecutableManager().getAllExecutables(timeStartInMillis, timeEndInMillis))
.filter(new Predicate<AbstractExecutable>() {
@Override
public boolean apply(AbstractExecutable executable) {
if (executable instanceof CubingJob) {
Expand Down Expand Up @@ -824,8 +824,7 @@ public List<CheckpointExecutable> innerSearchCheckpointJobs(final String cubeNam
List<CheckpointExecutable> results = Lists
.newArrayList(
FluentIterable
.from(getExecutableManager().getAllAbstractExecutables(timeStartInMillis,
timeEndInMillis, CheckpointExecutable.class))
.from(getExecutableManager().getAllExecutables(timeStartInMillis, timeEndInMillis))
.filter(new Predicate<AbstractExecutable>() {
@Override
public boolean apply(AbstractExecutable executable) {
Expand Down

0 comments on commit f797c84

Please sign in to comment.