Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepSamudrala committed Jan 3, 2017
2 parents cc28658 + 8054727 commit 73fbf75
Show file tree
Hide file tree
Showing 32 changed files with 583 additions and 145 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ activemq-data
#log files
logs
*.log
*.patch

#Falcon UI NPM files
falcon-ui/dist
Expand Down
10 changes: 9 additions & 1 deletion cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class FalconExtensionCLI {
public static final String UNREGISTER_OPT = "unregister";
public static final String DETAIL_OPT = "detail";
public static final String REGISTER_OPT = "register";
public static final String ENABLE_OPT = "enable";
public static final String DISABLE_OPT = "disable";

// Input parameters
public static final String EXTENSION_NAME_OPT = "extensionName";
Expand Down Expand Up @@ -153,10 +155,16 @@ public void extensionCommand(CommandLine commandLine, FalconClient client) {
commandLine.getOptionValue(FalconCLIConstants.OFFSET_OPT),
commandLine.getOptionValue(FalconCLIConstants.NUM_RESULTS_OPT));
result = instances != null ? instances.toString() : "No instance (" + jobName + ") found.";
} else if (optionsList.contains(ENABLE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.enableExtension(extensionName).getMessage();
} else if (optionsList.contains(DISABLE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.disableExtension(extensionName).getMessage();
} else {
throw new FalconCLIException("Invalid/missing extension command. Supported commands include "
+ "enumerate, definition, describe, list, instances, submit, submitAndSchedule, "
+ "schedule, suspend, resume, delete, update, validate. "
+ "schedule, suspend, resume, delete, update, validate, enable, disable. "
+ "Please refer to Falcon CLI twiki for more details.");
}
OUT.get().println(result);
Expand Down
4 changes: 1 addition & 3 deletions client/src/main/java/org/apache/falcon/ExtensionHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public final class ExtensionHandler {
private static final String TMP_BASE_DIR = String.format("file://%s", System.getProperty("java.io.tmpdir"));
private static final String LOCATION = "location";
private static final String TYPE = "type";
private static final String NAME = "name";
private static final String NAME = "extensionName";
private static final String EXTENSION_BUILDER_INTERFACE_SERVICE_FILE =
"META-INF/services/org.apache.falcon.extensions.ExtensionBuilder";

Expand Down Expand Up @@ -185,8 +185,6 @@ static List<URL> getFilesInPath(URL fileURL) throws MalformedURLException {
for (File innerFile : files) {
if (innerFile.isFile()) {
urls.add(innerFile.toURI().toURL());
} else {
urls.addAll(getFilesInPath(file.toURI().toURL()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,27 @@ public abstract APIResult submitAndSchedule(String entityType, String filePath,
*/
public abstract APIResult unregisterExtension(String extensionName);

/**
*
* @param extensionName extensionName that needs to be enabled
* @return Result of the enableExtension operation
*/
public abstract APIResult enableExtension(String extensionName);

/**
*
* @param extensionName extensionName that needs to be disabled
* @return Result of the disableExtension operation
*/
public abstract APIResult disableExtension(String extensionName);

/**
* Prepares set of entities the extension has implemented and stage them to a local directory and submit them too.
* @param extensionName extension which is available in the store.
* @param jobName name to be used in all the extension entities' tagging that are built as part of
* loadAndPrepare.
* @param configPath path to extension parameters.
* @return
* @throws FalconCLIException
*/
public abstract APIResult submitExtensionJob(String extensionName, String jobName, String configPath,
String doAsUser);
Expand All @@ -216,7 +229,6 @@ public abstract APIResult submitExtensionJob(String extensionName, String jobNam
* loadAndPrepare.
* @param configPath path to extension parameters.
* @return
* @throws FalconCLIException
*/
public abstract APIResult submitAndScheduleExtensionJob(String extensionName, String jobName, String configPath,
String doAsUser);
Expand All @@ -227,10 +239,15 @@ public abstract APIResult submitAndScheduleExtensionJob(String extensionName, St
* loadAndPrepare.
* @param configPath path to extension parameters.
* @return
* @throws FalconCLIException
*/
public abstract APIResult updateExtensionJob(String jobName, String configPath, String doAsUser);

/**
* Deletes the entities that are part of the extension job and then deleted the job from the DB.
* @param jobName name of the extension job that needs to be deleted.
* @return APIResult status of the deletion query.
*/
public abstract APIResult deleteExtensionJob(final String jobName, final String doAsUser);
/**
* Prepares set of entities the extension has implemented to validate the extension job.
* @param jobName job name of the extension job.
Expand Down
16 changes: 15 additions & 1 deletion client/src/main/java/org/apache/falcon/client/FalconClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,9 @@ protected static enum ExtensionOperations {
UNREGISTER("api/extension/unregister/", HttpMethod.POST, MediaType.TEXT_XML),
DETAIL("api/extension/detail/", HttpMethod.GET, MediaType.APPLICATION_JSON),
JOB_DETAILS("api/extension/extensionJobDetails/", HttpMethod.GET, MediaType.APPLICATION_JSON),
REGISTER("api/extension/register/", HttpMethod.POST, MediaType.TEXT_XML);
REGISTER("api/extension/register/", HttpMethod.POST, MediaType.TEXT_XML),
ENABLE("api/extension/enable", HttpMethod.POST, MediaType.TEXT_XML),
DISABLE("api/extension/disable", HttpMethod.POST, MediaType.TEXT_XML);

private String path;
private String method;
Expand Down Expand Up @@ -1049,6 +1051,18 @@ public APIResult registerExtension(final String extensionName, final String pack
return getResponse(APIResult.class, clientResponse);
}

public APIResult enableExtension(final String extensionName) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.ENABLE.path, extensionName).call(ExtensionOperations.ENABLE);
return getResponse(APIResult.class, clientResponse);
}

public APIResult disableExtension(final String extensionName) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.DISABLE.path, extensionName).call(ExtensionOperations.DISABLE);
return getResponse(APIResult.class, clientResponse);
}

public APIResult getExtensionDefinition(final String extensionName) {
ClientResponse clientResponse = new ResourceBuilder()
.path(ExtensionOperations.DEFINITION.path, extensionName)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.extensions;

/**
* Enum to store ExtensionStatus.
*/
public enum ExtensionStatus {
ENABLED("enabled state"),
DISABLED("disabled state");

private final String text;

ExtensionStatus(final String text) {
this.text = text;
}

@Override
public String toString(){
return text;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,8 @@ public StoreAccessException(String message, Exception e) {
public StoreAccessException(Exception e) {
super(e);
}

public StoreAccessException(String message){
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.falcon.persistence;

import org.apache.falcon.extensions.ExtensionStatus;
import org.apache.falcon.extensions.ExtensionType;

import javax.persistence.Basic;
Expand All @@ -44,7 +45,8 @@
@NamedQuery(name = PersistenceConstants.GET_ALL_EXTENSIONS, query = "select OBJECT(a) from ExtensionBean a "),
@NamedQuery(name = PersistenceConstants.DELETE_EXTENSIONS_OF_TYPE, query = "delete from ExtensionBean a where a.extensionType = :extensionType "),
@NamedQuery(name = PersistenceConstants.DELETE_EXTENSION, query = "delete from ExtensionBean a where a.extensionName = :extensionName "),
@NamedQuery(name = PersistenceConstants.GET_EXTENSION, query = "select OBJECT(a) from ExtensionBean a where a.extensionName = :extensionName")
@NamedQuery(name = PersistenceConstants.GET_EXTENSION, query = "select OBJECT(a) from ExtensionBean a where a.extensionName = :extensionName"),
@NamedQuery(name = PersistenceConstants.CHANGE_EXTENSION_STATUS, query = "update ExtensionBean a set a.status = :extensionStatus where a.extensionName = :extensionName")
})
//RESUME CHECKSTYLE CHECK LineLengthCheck
public class ExtensionBean {
Expand Down Expand Up @@ -79,6 +81,12 @@ public class ExtensionBean {
@Column(name = "extension_owner")
private String extensionOwner;

@Basic
@NotNull
@Column(name = "status")
@Enumerated(EnumType.STRING)
private ExtensionStatus status;

public ExtensionType getExtensionType() {
return extensionType;
}
Expand Down Expand Up @@ -127,4 +135,12 @@ public void setExtensionOwner(String extensionOwner) {
this.extensionOwner = extensionOwner;
}

public ExtensionStatus getStatus() {
return status;
}

public void setStatus(ExtensionStatus status) {
this.status = status;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ private PersistenceConstants(){
public static final String DELETE_EXTENSIONS_OF_TYPE = "DELETE_EXTENSIONS_OF_TYPE";
public static final String DELETE_EXTENSION = "DELETE_EXTENSION";
public static final String GET_EXTENSION = "GET_EXTENSION";
public static final String CHANGE_EXTENSION_STATUS = "CHANGE_EXTENSION_STATUS";

public static final String GET_ALL_EXTENSION_JOBS = "GET_ALL_EXTENSION_JOBS";
public static final String DELETE_EXTENSION_JOB = "DELETE_EXTENSION_JOB";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public enum WorkflowExecutionArgs {
// workflow execution details
WORKFLOW_ID("workflowId", "current workflow-id of the instance"),
RUN_ID("runId", "current run-id of the instance"),
STATUS("status", "status of the user workflow isnstance"),
STATUS("status", "status of the user workflow instance"),
WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex:oozie", false),
USER_SUBFLOW_ID("subflowId", "external id of user workflow", false),
PARENT_ID("parentId", "The parent of the current workflow, typically coord action", false),
Expand All @@ -70,6 +70,7 @@ public enum WorkflowExecutionArgs {
// what outputs
OUTPUT_FEED_NAMES("feedNames", "name of the feeds which are generated/replicated/deleted"),
OUTPUT_FEED_PATHS("feedInstancePaths", "comma separated feed instance paths"),
OUTPUT_NAMES("feedInstanceNames", "comma separated list of names of outputs", false),

// broker related parameters
TOPIC_NAME("topicName", "name of the topic to be used to send JMS message", false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.falcon.extensions.jdbc;

import org.apache.falcon.extensions.ExtensionStatus;
import org.apache.falcon.extensions.ExtensionType;
import org.apache.falcon.persistence.ExtensionBean;
import org.apache.falcon.persistence.ExtensionJobsBean;
Expand All @@ -36,6 +37,7 @@ public class ExtensionMetaStore {
private static final String EXTENSION_NAME = "extensionName";
private static final String JOB_NAME = "jobName";
private static final String EXTENSION_TYPE = "extensionType";
private static final String EXTENSION_STATUS = "extensionStatus";

private EntityManager getEntityManager() {
return FalconJPAService.get().getEntityManager();
Expand All @@ -50,6 +52,7 @@ public void storeExtensionBean(String extensionName, String location, ExtensionT
extensionBean.setCreationTime(new Date(System.currentTimeMillis()));
extensionBean.setDescription(description);
extensionBean.setExtensionOwner(extensionOwner);
extensionBean.setStatus(ExtensionStatus.ENABLED);
EntityManager entityManager = getEntityManager();
try {
beginTransaction(entityManager);
Expand Down Expand Up @@ -229,4 +232,16 @@ private void commitAndCloseTransaction(EntityManager entityManager) {
entityManager.close();
}
}

public void updateExtensionStatus(String extensionName, ExtensionStatus status) {
EntityManager entityManager = getEntityManager();
beginTransaction(entityManager);
Query q = entityManager.createNamedQuery(PersistenceConstants.CHANGE_EXTENSION_STATUS);
q.setParameter(EXTENSION_NAME, extensionName).setParameter(EXTENSION_STATUS, status);
try {
q.executeUpdate();
} finally {
commitAndCloseTransaction(entityManager);
}
}
}

0 comments on commit 73fbf75

Please sign in to comment.