Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

Commit

Permalink
FALCON-2200 Update API support for extension job (user extension)
Browse files Browse the repository at this point in the history
Author: sandeep <sandysmdl@gmail.com>

Reviewers: @pallavi-rao

Closes #331 from sandeepSamudrala/FALCON-2200 and squashes the following commits:

737fad3 [sandeep] FALCON-2200 fixed checkstyle issues. removed unused imports
1780416 [sandeep] Incorporated review comments. Removed entitychannel and config channel from ExtensionManager Proxy as they are now used from proxyUtil
8a4d035 [sandeep] FALCON-2200 Incorporated review comments. Moved common code from proxies to proxyutil and making 2 api calls to get location in case of update extension
c8d0ab7 [sandeep] FALCON-2200 Adding changes related to clusters being removed and clusters being added into entity definition
cc7c9e9 [sandeep] FALCON-2200 Update API support for extension job (user extension)
456d4ee [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0cf9af6 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
4a2e23e [sandeep] Merge branch 'master' of https://github.com/apache/falcon
b1546ed [sandeep] Merge branch 'master' of https://github.com/apache/falcon
0a433fb [sandeep] Merge branch 'master' of https://github.com/apache/falcon
194f36a [sandeep] Merge branch 'master' of https://github.com/apache/falcon
e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes
  • Loading branch information
sandeepSamudrala authored and Pallavi Rao committed Dec 30, 2016
1 parent bc4dcf9 commit 4f42dc1
Show file tree
Hide file tree
Showing 20 changed files with 421 additions and 214 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,13 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
result = client.registerExtension(extensionName, path, description).getMessage();
} else if (optionsList.contains(FalconCLIConstants.SUBMIT_AND_SCHEDULE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(jobName, JOB_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
result = client.submitAndScheduleExtensionJob(extensionName, jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.UPDATE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(jobName, JOB_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
result = client.updateExtensionJob(extensionName, filePath, doAsUser).getMessage();
result = client.updateExtensionJob(jobName, filePath, doAsUser).getMessage();
} else if (optionsList.contains(FalconCLIConstants.VALIDATE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
validateRequiredParameter(filePath, FalconCLIConstants.FILE_PATH_OPT);
Expand Down
11 changes: 11 additions & 0 deletions client/src/main/java/org/apache/falcon/ExtensionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public final class ExtensionHandler {
private static final String TMP_BASE_DIR = String.format("file://%s", System.getProperty("java.io.tmpdir"));
private static final String LOCATION = "location";
private static final String TYPE = "type";
private static final String NAME = "name";
private static final String EXTENSION_BUILDER_INTERFACE_SERVICE_FILE =
"META-INF/services/org.apache.falcon.extensions.ExtensionBuilder";

Expand Down Expand Up @@ -220,4 +221,14 @@ public static String getExtensionType(String extensionName, JSONObject extensio
}
return extensionType;
}

public static String getExtensionName(String jobName, JSONObject extensionJobDetailJson) {
String extensionType;
try {
extensionType = extensionJobDetailJson.get(NAME).toString();
} catch (JSONException e) {
throw new FalconCLIException("Failed to get extension name for the given extension job:" + jobName, e);
}
return extensionType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,16 @@ public abstract APIResult submitExtensionJob(String extensionName, String jobNam
public abstract APIResult submitAndScheduleExtensionJob(String extensionName, String jobName, String configPath,
String doAsUser);

/**
* Prepares set of entities the extension has implemented and stage them to a local directory and updates them.
* @param jobName name to be used in all the extension entities' tagging that are built as part of
* loadAndPrepare.
* @param configPath path to extension parameters.
* @return
* @throws FalconCLIException
*/
public abstract APIResult updateExtensionJob(String jobName, String configPath, String doAsUser);

/**
* Prepares set of entities the extension has implemented to validate the extension job.
* @param jobName job name of the extension job.
Expand Down
31 changes: 24 additions & 7 deletions client/src/main/java/org/apache/falcon/client/FalconClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1028,9 +1028,12 @@ public APIResult getExtensionDetail(final String extensionName) {
}

public APIResult getExtensionJobDetails(final String jobName) {
ClientResponse clientResponse = new ResourceBuilder().path(ExtensionOperations.JOB_DETAILS.path, jobName)
return getResponse(APIResult.class, getExtensionJobDetailsResponse(jobName));
}

private ClientResponse getExtensionJobDetailsResponse(final String jobName) {
return new ResourceBuilder().path(ExtensionOperations.JOB_DETAILS.path, jobName)
.call(ExtensionOperations.JOB_DETAILS);
return getResponse(APIResult.class, clientResponse);
}

private ClientResponse getExtensionDetailResponse(final String extensionName) {
Expand Down Expand Up @@ -1097,7 +1100,11 @@ private FormDataMultiPart getEntitiesForm(String extensionName, String jobName,

private List<Entity> validateExtensionAndGetEntities(String extensionName, String jobName,
InputStream configStream) {
JSONObject extensionDetailJson = getExtensionDetailJson(extensionName);
JSONObject extensionDetailJson;
if (StringUtils.isBlank(extensionName)) {
extensionName = ExtensionHandler.getExtensionName(jobName, getExtensionJobDetailJson(jobName));
}
extensionDetailJson = getExtensionDetailJson(extensionName);
String extensionType = ExtensionHandler.getExtensionType(extensionName, extensionDetailJson);
String extensionBuildLocation = ExtensionHandler.getExtensionLocation(extensionName, extensionDetailJson);
return getEntities(extensionName, jobName, configStream, extensionType,
Expand All @@ -1115,6 +1122,16 @@ private JSONObject getExtensionDetailJson(String extensionName) {
}
return extensionDetailJson;
}
private JSONObject getExtensionJobDetailJson(String jobName) {
ClientResponse clientResponse = getExtensionJobDetailsResponse(jobName);
JSONObject extensionJobDetailJson;
try {
extensionJobDetailJson = new JSONObject(getResponse(APIResult.class, clientResponse).getMessage());
} catch (JSONException e) {
throw new FalconCLIException("Failed to get details for the given extension", e);
}
return extensionJobDetailJson;
}

private List<Entity> getEntities(String extensionName, String jobName, InputStream configStream,
String extensionType, String extensionBuildLocation) {
Expand Down Expand Up @@ -1144,12 +1161,12 @@ public APIResult submitAndScheduleExtensionJob(final String extensionName, final
return getResponse(APIResult.class, clientResponse);
}

public APIResult updateExtensionJob(final String extensionName, final String filePath, final String doAsUser) {
InputStream entityStream = getServletInputStream(filePath);
public APIResult updateExtensionJob(final String jobName, final String configPath, final String doAsUser) {
FormDataMultiPart entitiesForm = getEntitiesForm(null, jobName, configPath);
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.UPDATE.path, extensionName)
.path(ExtensionOperations.UPDATE.path, jobName)
.addQueryParam(DO_AS_OPT, doAsUser)
.call(ExtensionOperations.UPDATE, entityStream);
.call(ExtensionOperations.UPDATE, entitiesForm);
return getResponse(APIResult.class, clientResponse);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,12 +272,12 @@ private void createDB(String sqlFile, boolean run) throws Exception {
"create table FALCON_DB_PROPS (name varchar(100), data varchar(100))";

private void createFalconPropsTable(String sqlFile, boolean run, String version) throws Exception {
String insertDbVerion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')";
String insertDbVersion = "insert into FALCON_DB_PROPS (name, data) values ('db.version', '" + version + "')";

PrintWriter writer = new PrintWriter(new FileWriter(sqlFile, true));
writer.println();
writer.println(CREATE_FALCON_DB_PROPS);
writer.println(insertDbVerion);
writer.println(insertDbVersion);
writer.close();
System.out.println("Create FALCON_DB_PROPS table");
if (run) {
Expand All @@ -287,7 +287,7 @@ private void createFalconPropsTable(String sqlFile, boolean run, String version)
conn.setAutoCommit(true);
st = conn.createStatement();
st.executeUpdate(CREATE_FALCON_DB_PROPS);
st.executeUpdate(insertDbVerion);
st.executeUpdate(insertDbVersion);
st.close();
} catch (Exception ex) {
closeStatement(st);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,23 @@ public void deleteExtensionJob(String jobName) {
}
}

public void updateExtensionJob(String jobName, String extensionName, List<String> feedNames,
List<String> processNames, byte[] configBytes) {
EntityManager entityManager = getEntityManager();
ExtensionJobsBean extensionJobsBean = new ExtensionJobsBean();
extensionJobsBean.setJobName(jobName);
extensionJobsBean.setExtensionName(extensionName);
extensionJobsBean.setFeeds(feedNames);
extensionJobsBean.setProcesses(processNames);
extensionJobsBean.setConfig(configBytes);
try {
beginTransaction(entityManager);
entityManager.merge(extensionJobsBean);
} finally {
commitAndCloseTransaction(entityManager);
}
}

public ExtensionJobsBean getExtensionJobDetails(String jobName) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Expand Down
20 changes: 17 additions & 3 deletions extensions/src/test/java/org/apache/falcon/ExtensionExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,44 @@
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.Schema;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.extensions.ExtensionBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.xml.bind.JAXBException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
* Extension Example for testing extension loading and preparing entities.
*/
public class ExtensionExample implements ExtensionBuilder{

public static final Logger LOG = LoggerFactory.getLogger(ExtensionExample.class);
public static final String PROCESS_XML = "/extension-example.xml";

@Override
public List<Entity> getEntities(String extensionName, InputStream extensionConfigStream) throws FalconException {
Entity process;
Process process;
try {
process = (Entity) EntityType.PROCESS.getUnmarshaller().unmarshal(
process = (Process) EntityType.PROCESS.getUnmarshaller().unmarshal(
getClass().getResourceAsStream(PROCESS_XML));
} catch (JAXBException e) {
throw new FalconException("Failed in un-marshalling the entity");
}
if (extensionConfigStream != null) {
Properties properties = new Properties();
try {
properties.load(extensionConfigStream);
} catch (IOException e) {
LOG.warn("Not able to load the configStream");
}
process.setPipelines(properties.getProperty("pipelines.name"));
}
List<Entity> entities = new ArrayList<>();
entities.add(process);
return entities;
Expand All @@ -52,7 +67,6 @@ public List<Entity> getEntities(String extensionName, InputStream extensionConfi
@Override
public void validateExtensionConfig(String extensionName, InputStream extensionConfigStream)
throws FalconException {

}

@Override
Expand Down
1 change: 0 additions & 1 deletion extensions/src/test/resources/extension-example.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
<parallel>1</parallel>
<order>LIFO</order>
<frequency>hours(1)</frequency>
<sla shouldStartIn="hours(2)" shouldEndIn="hours(4)"/>
<!-- how -->
<properties>
<property name="name1" value="value1"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void deleteEntityInstance(String entityName){
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.DELETE_ALL_BACKLOG_ENTITY_INSTANCES);
q.setParameter("entityName", entityName);
try{
try {
q.executeUpdate();
} finally {
commitAndCloseTransaction(entityManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ protected static void checkColo(String colo) {
}
}

protected Set<String> getAllColos() {
public static Set<String> getAllColos() {
if (DeploymentUtil.isEmbeddedMode()) {
return DeploymentUtil.getDefaultColos();
}
Expand All @@ -141,7 +141,7 @@ protected Set<String> getColosFromExpression(String coloExpr, String type, Strin
return colos;
}

protected Set<String> getApplicableColos(String type, String name) {
public static Set<String> getApplicableColos(String type, String name) {
try {
if (DeploymentUtil.isEmbeddedMode()) {
return DeploymentUtil.getDefaultColos();
Expand All @@ -157,7 +157,7 @@ protected Set<String> getApplicableColos(String type, String name) {
}
}

protected Set<String> getApplicableColos(String type, Entity entity) {
public static Set<String> getApplicableColos(String type, Entity entity) {
try {
if (DeploymentUtil.isEmbeddedMode()) {
return DeploymentUtil.getDefaultColos();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,27 @@ public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
public static final Logger LOG = LoggerFactory.getLogger(AbstractExtensionManager.class);

private static final String JOB_NAME = "jobName";
public static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job=";
protected static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job=";
private static final String EXTENSION_NAME = "extensionName";
private static final String FEEDS = "feeds";
private static final String PROCESSES = "processes";
private static final String CONFIG = "config";
private static final String CREATION_TIME = "creationTime";
private static final String LAST_UPDATE_TIME = "lastUpdatedTime";

private static final String NAME = "name";
private static final String EXTENSION_TYPE = "type";
private static final String EXTENSION_DESC = "description";
private static final String EXTENSION_LOCATION = "location";
public static final String NAME = "name";
protected static final String EXTENSION_TYPE = "type";
protected static final String EXTENSION_DESC = "description";
protected static final String EXTENSION_LOCATION = "location";

public static void validateExtensionName(final String extensionName) {
protected static void validateExtensionName(final String extensionName) {
if (StringUtils.isBlank(extensionName)) {
throw FalconWebException.newAPIException("Extension name is mandatory and shouldn't be blank",
Response.Status.BAD_REQUEST);
}
}

public APIResult registerExtensionMetadata(String extensionName, String path, String description, String owner) {
protected APIResult registerExtensionMetadata(String extensionName, String path, String description, String owner) {
validateExtensionName(extensionName);
try {
return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().registerExtension(extensionName, path,
Expand All @@ -80,7 +80,7 @@ public APIResult getExtensionJobDetail(String jobName) {
}
}

public APIResult getExtensionDetail(String extensionName) {
protected APIResult getExtensionDetail(String extensionName) {
try {
return new APIResult(APIResult.Status.SUCCEEDED, buildExtensionDetailResult(extensionName).toString());
} catch (FalconException e) {
Expand Down Expand Up @@ -112,6 +112,7 @@ private JSONObject buildExtensionJobDetailResult(final String jobName) throws Fa
if (jobsBean == null) {
throw new ValidationException("Job name not found:" + jobName);
}
ExtensionBean extensionBean = metaStore.getDetail(jobsBean.getExtensionName());
JSONObject detailsObject = new JSONObject();
try {
detailsObject.put(JOB_NAME, jobsBean.getJobName());
Expand All @@ -121,6 +122,8 @@ private JSONObject buildExtensionJobDetailResult(final String jobName) throws Fa
detailsObject.put(CONFIG, jobsBean.getConfig());
detailsObject.put(CREATION_TIME, jobsBean.getCreationTime());
detailsObject.put(LAST_UPDATE_TIME, jobsBean.getLastUpdatedTime());
detailsObject.put(EXTENSION_LOCATION, extensionBean.getLocation());
detailsObject.put(EXTENSION_TYPE, extensionBean.getExtensionType());
} catch (JSONException e) {
LOG.error("Exception while building extension jon details for job {}", jobName, e);
}
Expand Down

0 comments on commit 4f42dc1

Please sign in to comment.