Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepSamudrala committed Dec 22, 2016
2 parents 0a433fb + 0488de3 commit b1546ed
Show file tree
Hide file tree
Showing 10 changed files with 289 additions and 77 deletions.
2 changes: 1 addition & 1 deletion common/src/main/resources/startup.properties
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
#org.apache.falcon.metadata.MetadataMappingService,\

##Add if you want to use Trusted or User Extensions
## In case of distributed Mode enable ExtensionService only on Prism
## In case of distributed Mode enable ExtensionService only on Prism via prism.application.services
## It should come after FalconJPAService in application services
#org.apache.falcon.extensions.ExtensionService,\

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.falcon.extensions.ExtensionType;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -104,7 +103,7 @@ private void initializeDbTable() {
String description = getShortDescription(extension);
String recipeName = extension;
String location = storePath.toString() + '/' + extension;
String extensionOwner = CurrentUser.getUser();
String extensionOwner = System.getProperty("user.name");
metaStore.storeExtensionBean(recipeName, location, extensionType, description, extensionOwner);
}
} catch (FalconException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.falcon.resource;

import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
import org.apache.falcon.entity.parser.ProcessEntityParser;
import org.apache.falcon.entity.parser.ValidationException;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.extensions.store.ExtensionStore;
import org.apache.falcon.persistence.ExtensionJobsBean;
import org.apache.falcon.security.CurrentUser;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* A base class for managing Extension Operations.
*/
public class AbstractExtensionManager extends AbstractSchedulableEntityManager {
public static final Logger LOG = LoggerFactory.getLogger(AbstractExtensionManager.class);

private static final String JOB_NAME = "jobName";
public static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job=";
private static final String EXTENSION_NAME = "extensionName";
private static final String FEEDS = "feeds";
private static final String PROCESSES = "processes";
private static final String CONFIG = "config";
private static final String CREATION_TIME = "creationTime";
private static final String LAST_UPDATE_TIME = "lastUpdatedTime";

public static void validateExtensionName(final String extensionName) {
if (StringUtils.isBlank(extensionName)) {
throw FalconWebException.newAPIException("Extension name is mandatory and shouldn't be blank",
Response.Status.BAD_REQUEST);
}
}

public String registerExtensionMetadata(String extensionName, String path, String description, String owner) {
validateExtensionName(extensionName);
try {
return ExtensionStore.get().registerExtension(extensionName, path, description, owner);
} catch (Throwable e) {
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
}

public String getExtensionJobDetail(String jobName) {
try {
return buildExtensionJobDetailResult(jobName).toString();
} catch (FalconException e) {
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
}

public String deleteExtensionMetadata(String extensionName){
validateExtensionName(extensionName);
try {
return ExtensionStore.get().deleteExtension(extensionName, CurrentUser.getUser());
} catch (Throwable e) {
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
}

private JSONObject buildExtensionJobDetailResult(final String jobName) throws FalconException {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean jobsBean = metaStore.getExtensionJobDetails(jobName);
if (jobsBean == null) {
throw new ValidationException("Job name not found:" + jobName);
}
JSONObject detailsObject = new JSONObject();
try {
detailsObject.put(JOB_NAME, jobsBean.getJobName());
detailsObject.put(EXTENSION_NAME, jobsBean.getExtensionName());
detailsObject.put(FEEDS, StringUtils.join(jobsBean.getFeeds(), ","));
detailsObject.put(PROCESSES, StringUtils.join(jobsBean.getProcesses(), ","));
detailsObject.put(CONFIG, jobsBean.getConfig());
detailsObject.put(CREATION_TIME, jobsBean.getCreationTime());
detailsObject.put(LAST_UPDATE_TIME, jobsBean.getLastUpdatedTime());
} catch (JSONException e) {
LOG.error("Exception while building extension jon details for job {}", jobName, e);
}
return detailsObject;
}


protected void submitEntities(String extensionName, String doAsUser, String jobName,
Map<EntityType, List<Entity>> entityMap, InputStream configStream)
throws FalconException, IOException {
List<Entity> feeds = entityMap.get(EntityType.FEED);
List<Entity> processes = entityMap.get(EntityType.PROCESS);
validateFeeds(feeds);
validateProcesses(processes);
List<String> feedNames = new ArrayList<>();
List<String> processNames = new ArrayList<>();
for (Entity feed : feeds) {
submitInternal(feed, doAsUser);
feedNames.add(feed.getName());
}
for (Entity process: processes) {
submitInternal(process, doAsUser);
processNames.add(process.getName());
}

ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
byte[] configBytes = null;
if (configStream != null) {
configBytes = IOUtils.toByteArray(configStream);
}
metaStore.storeExtensionJob(jobName, extensionName, feedNames, processNames, configBytes);
}


private void validateFeeds(List<Entity> feeds) throws FalconException {
for (Entity feed : feeds) {
super.validate(feed);
}
}

private void validateProcesses(List<Entity> processes) throws FalconException {
ProcessEntityParser processEntityParser = new ProcessEntityParser();
for (Entity process : processes) {
processEntityParser.validate((Process)process, false);
}
}

protected void scheduleEntities(Map<EntityType, List<Entity>> entityMap) throws FalconException,
AuthorizationException {
for (Object feed: entityMap.get(EntityType.FEED)) {
scheduleInternal(EntityType.FEED.name(), ((Feed)feed).getName(), null, null);
}
for (Object process: entityMap.get(EntityType.PROCESS)) {
scheduleInternal(EntityType.PROCESS.name(), ((Process)process).getName(), null, null);
}
}

public static String getJobNameFromTag(String tags) {
int nameStart = tags.indexOf(TAG_PREFIX_EXTENSION_JOB);
if (nameStart == -1) {
return null;
}

nameStart = nameStart + TAG_PREFIX_EXTENSION_JOB.length();
int nameEnd = tags.indexOf(',', nameStart);
if (nameEnd == -1) {
nameEnd = tags.length();
}
return tags.substring(nameStart, nameEnd);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
* limitations under the License.
*/

package org.apache.falcon.resource.extensions;
package org.apache.falcon.resource.proxy;

import com.sun.jersey.multipart.FormDataBodyPart;
import com.sun.jersey.multipart.FormDataParam;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
import org.apache.falcon.entity.EntityUtil;
Expand All @@ -39,13 +38,12 @@
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.extensions.store.ExtensionStore;
import org.apache.falcon.persistence.ExtensionBean;
import org.apache.falcon.persistence.ExtensionJobsBean;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.AbstractSchedulableEntityManager;
import org.apache.falcon.resource.AbstractExtensionManager;
import org.apache.falcon.resource.EntityList;
import org.apache.falcon.resource.ExtensionInstanceList;
import org.apache.falcon.resource.ExtensionJobList;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.service.Services;
import org.apache.falcon.util.DeploymentUtil;
Expand Down Expand Up @@ -83,11 +81,10 @@
* Jersey Resource for extension job operations.
*/
@Path("extension")
public class ExtensionManager extends AbstractSchedulableEntityManager {
public static final Logger LOG = LoggerFactory.getLogger(ExtensionManager.class);
public class ExtensionManagerProxy extends AbstractExtensionManager {
public static final Logger LOG = LoggerFactory.getLogger(ExtensionManagerProxy.class);

private static final String TAG_PREFIX_EXTENSION_NAME = "_falcon_extension_name=";
private static final String TAG_PREFIX_EXTENSION_JOB = "_falcon_extension_job=";
private static final String ASCENDING_SORT_ORDER = "asc";
private static final String DESCENDING_SORT_ORDER = "desc";

Expand All @@ -100,14 +97,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager {
private static final String EXTENSION_TYPE = "type";
private static final String EXTENSION_DESC = "description";
private static final String EXTENSION_LOCATION = "location";
private static final String JOB_NAME = "jobName";

private static final String EXTENSION_NAME = "extensionName";
private static final String FEEDS = "feeds";
private static final String PROCESSES = "processes";
private static final String CONFIG = "config";
private static final String CREATION_TIME = "creationTime";
private static final String LAST_UPDATE_TIME = "lastUpdatedTime";


private static final String EXTENSION_PROPERTY_JSON_SUFFIX = "-properties.json";
Expand Down Expand Up @@ -577,8 +567,8 @@ public Response getDetail(@PathParam("extension-name") String extensionName) {
public String getExtensionJobDetail(@PathParam("job-name") String jobName) {
checkIfExtensionServiceIsEnabled();
try {
return buildExtensionJobDetailResult(jobName).toString();
} catch (FalconException e) {
return super.getExtensionJobDetail(jobName);
} catch (Throwable e) {
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
}
Expand All @@ -590,9 +580,8 @@ public String getExtensionJobDetail(@PathParam("job-name") String jobName) {
public String deleteExtensionMetadata(
@PathParam("extension-name") String extensionName){
checkIfExtensionServiceIsEnabled();
validateExtensionName(extensionName);
try {
return ExtensionStore.get().deleteExtension(extensionName, CurrentUser.getUser());
return super.deleteExtensionMetadata(extensionName);
} catch (Throwable e) {
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
Expand All @@ -607,9 +596,8 @@ public String registerExtensionMetadata(
@QueryParam("path") String path,
@QueryParam("description") String description) {
checkIfExtensionServiceIsEnabled();
validateExtensionName(extensionName);
try {
return ExtensionStore.get().registerExtension(extensionName, path, description, CurrentUser.getUser());
return super.registerExtensionMetadata(extensionName, path, description, CurrentUser.getUser());
} catch (Throwable e) {
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
Expand All @@ -621,7 +609,6 @@ public String registerExtensionMetadata(
public String getExtensionDefinition(
@PathParam("extension-name") String extensionName) {
checkIfExtensionServiceIsEnabled();
validateExtensionName(extensionName);
try {
return ExtensionStore.get().getResource(extensionName,
extensionName.toLowerCase() + EXTENSION_PROPERTY_JSON_SUFFIX);
Expand All @@ -630,13 +617,6 @@ public String getExtensionDefinition(
}
}

private static void validateExtensionName(final String extensionName) {
if (StringUtils.isBlank(extensionName)) {
throw FalconWebException.newAPIException("Extension name is mandatory and shouldn't be blank",
Response.Status.BAD_REQUEST);
}
}

private static JSONArray buildEnumerateResult() throws FalconException {
JSONArray results = new JSONArray();
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
Expand Down Expand Up @@ -672,27 +652,6 @@ private List<Entity> generateEntities(String extensionName, InputStream configSt
return entities;
}

private JSONObject buildExtensionJobDetailResult(final String jobName) throws FalconException {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionJobsBean jobsBean = metaStore.getExtensionJobDetails(jobName);
if (jobsBean == null) {
throw new ValidationException("Job name not found:" + jobName);
}
JSONObject detailsObject = new JSONObject();
try {
detailsObject.put(JOB_NAME, jobsBean.getJobName());
detailsObject.put(EXTENSION_NAME, jobsBean.getExtensionName());
detailsObject.put(FEEDS, StringUtils.join(jobsBean.getFeeds(), ","));
detailsObject.put(PROCESSES, StringUtils.join(jobsBean.getProcesses(), ","));
detailsObject.put(CONFIG, jobsBean.getConfig());
detailsObject.put(CREATION_TIME, jobsBean.getCreationTime());
detailsObject.put(LAST_UPDATE_TIME, jobsBean.getLastUpdatedTime());
} catch (JSONException e) {
LOG.error("Exception while building extension jon details for job {}", jobName, e);
}
return detailsObject;
}

private JSONObject buildExtensionDetailResult(final String extensionName) throws FalconException {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();

Expand Down Expand Up @@ -726,20 +685,6 @@ private Map<String, List<Entity>> groupEntitiesByJob(List<Entity> entities) {
return groupedEntities;
}

public static String getJobNameFromTag(String tags) {
int nameStart = tags.indexOf(TAG_PREFIX_EXTENSION_JOB);
if (nameStart == -1) {
return null;
}

nameStart = nameStart + TAG_PREFIX_EXTENSION_JOB.length();
int nameEnd = tags.indexOf(',', nameStart);
if (nameEnd == -1) {
nameEnd = tags.length();
}
return tags.substring(nameStart, nameEnd);
}

private static void checkIfExtensionServiceIsEnabled() {
if (!Services.get().isRegistered(ExtensionService.SERVICE_NAME)) {
throw FalconWebException.newAPIException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.falcon.resource.SchedulableEntityInstanceResult;
import org.apache.falcon.resource.channel.Channel;
import org.apache.falcon.resource.channel.ChannelFactory;
import org.apache.falcon.resource.extensions.ExtensionManager;
import org.apache.falcon.resource.AbstractExtensionManager;
import org.apache.falcon.util.DeploymentUtil;

import javax.servlet.http.HttpServletRequest;
Expand Down Expand Up @@ -403,7 +403,7 @@ private void isEntityPartOfAnExtension(Entity entity) {
private void entityHasExtensionJobTag(Entity entity) {
String tags = entity.getTags();
if (StringUtils.isNotBlank(tags)) {
String jobName = ExtensionManager.getJobNameFromTag(tags);
String jobName = AbstractExtensionManager.getJobNameFromTag(tags);
if (StringUtils.isNotBlank(jobName)) {
throw FalconWebException.newAPIException("Entity has extension job name in the tag. Such entities need "
+ "to be submitted as extension jobs:" + jobName);
Expand All @@ -413,7 +413,7 @@ private void entityHasExtensionJobTag(Entity entity) {

private void checkExtensionJobExist(String tags) {
if (tags != null) {
String jobName = ExtensionManager.getJobNameFromTag(tags);
String jobName = AbstractExtensionManager.getJobNameFromTag(tags);
ExtensionMetaStore extensionMetaStore = ExtensionStore.getMetaStore();
if (jobName != null && extensionMetaStore.checkIfExtensionJobExists(jobName)) {
throw FalconWebException.newAPIException("Entity operation is not allowed on this entity as it is"
Expand Down
Loading

0 comments on commit b1546ed

Please sign in to comment.