Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
PracheerAgarwal-zz committed Feb 2, 2017
2 parents ed65aa0 + 0825d80 commit ba60452
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 127 deletions.
Expand Up @@ -92,7 +92,6 @@ void extensionCommand(CommandLine commandLine, FalconClient client) throws IOExc
} else if (optionsList.contains(DEFINITION_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.getExtensionDefinition(extensionName).getMessage();
result = prettyPrintJson(result);
} else if (optionsList.contains(DESCRIBE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.getExtensionDescription(extensionName).getMessage();
Expand Down
Expand Up @@ -18,6 +18,15 @@

package org.apache.falcon.extensions.store;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.parser.ValidationException;
Expand All @@ -41,16 +50,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Store for Falcon extensions.
*/
Expand Down Expand Up @@ -114,8 +113,9 @@ private void initializeDbTable() {
}

private String getShortDescription(final String extensionName) throws FalconException {
String content = getResource(extensionName, extensionName.toLowerCase()
+ EXTENSION_PROPERTY_JSON_SUFFIX);
String location = storePath.toString() + "/" + extensionName + "/META/"
+ extensionName.toLowerCase() + EXTENSION_PROPERTY_JSON_SUFFIX;
String content = getResource(location);
String description;
try {
JSONObject jsonObject = new JSONObject(content);
Expand All @@ -141,40 +141,6 @@ private FileSystem initializeFileSystem() {
}
}

private Map<String, String> getExtensionArtifacts(final String extensionName) throws
FalconException {
Map<String, String> extensionFileMap = new HashMap<>();
Path extensionPath;
try {
RemoteIterator<LocatedFileStatus> fileStatusListIterator;
if (AbstractExtension.isExtensionTrusted(extensionName)) {
extensionPath = new Path(storePath, extensionName.toLowerCase());
fileStatusListIterator = fs.listFiles(extensionPath, true);
} else {
ExtensionBean extensionBean = metaStore.getDetail(extensionName);
if (null == extensionBean) {
throw new StoreAccessException("Extension not found:" + extensionName);
}
extensionPath = new Path(extensionBean.getLocation());
FileSystem fileSystem = getHdfsFileSystem(extensionBean.getLocation());
fileStatusListIterator = fileSystem.listFiles(extensionPath, true);
}

if (!fileStatusListIterator.hasNext()) {
throw new StoreAccessException(" For extension " + extensionName
+ " there are no artifacts at the extension store path " + storePath);
}
while (fileStatusListIterator.hasNext()) {
LocatedFileStatus fileStatus = fileStatusListIterator.next();
Path filePath = fileStatus.getPath();
extensionFileMap.put(filePath.getName(), filePath.toString());
}
} catch (IOException e) {
throw new StoreAccessException(e);
}
return extensionFileMap;
}


public Map<String, String> getExtensionResources(final String extensionName) throws StoreAccessException {
Map<String, String> extensionFileMap = new HashMap<>();
Expand Down Expand Up @@ -340,7 +306,7 @@ public boolean accept(Path file) {
}
FileStatus[] propStatus;
try {
propStatus = fileSystem.listStatus(new Path(uri.getPath() + "/META"));
propStatus = fileSystem.listStatus(new Path(uri.getPath() , "META"));
if (propStatus.length <= 0) {
throw new ValidationException("No properties file is not present in the " + uri.getPath() + "/META"
+ " structure.");
Expand All @@ -360,13 +326,30 @@ public boolean accept(Path file) {
return "Extension :" + extensionName + " registered successfully.";
}

public String getResource(final String extensionName, final String resourceName) throws FalconException {
Map<String, String> resources = getExtensionArtifacts(extensionName);
if (resources.isEmpty()) {
throw new StoreAccessException("No extension resources found for " + extensionName);
public String getResource(final String extensionResourcePath)
throws FalconException {
StringBuilder definition = new StringBuilder();
Path resourcePath = new Path(extensionResourcePath);
FileSystem fileSystem = HadoopClientFactory.get().createFalconFileSystem(resourcePath.toUri());
try {
if (fileSystem.isFile(resourcePath)) {
definition.append(getExtensionResource(extensionResourcePath.toString()));
} else {
RemoteIterator<LocatedFileStatus> fileStatusListIterator = fileSystem.listFiles(resourcePath, false);
while (fileStatusListIterator.hasNext()) {
LocatedFileStatus fileStatus = fileStatusListIterator.next();
Path filePath = fileStatus.getPath();
definition.append("Contents of file ").append(filePath.getName()).append(":\n");
definition.append(getExtensionResource(filePath.toString())).append("\n \n");
}
}
} catch (IOException e) {
LOG.error("Exception while getting file(s) with path : " + extensionResourcePath, e);
throw new StoreAccessException(e);
}

return getExtensionResource(resources.get(resourceName));
return definition.toString();

}

public Path getExtensionStorePath() {
Expand Down
Expand Up @@ -50,7 +50,7 @@
public class ExtensionStoreTest extends AbstractTestExtensionStore {
private static Map<String, String> resourcesMap;
private static JailedFileSystem fs;
protected static final String EXTENSION_PATH = "/projects/falcon/extension";
protected static final String EXTENSION_PATH = "/projects/falcon/extension/";
private static final String STORAGE_URL = "jail://global:00";

@BeforeClass
Expand Down Expand Up @@ -140,8 +140,7 @@ public void testDeleteExtension() throws IOException, URISyntaxException, Falcon
createMETA(extensionPath);
store = ExtensionStore.get();
store.registerExtension("toBeDeleted", STORAGE_URL + extensionPath, "test desc", "falconUser");
Assert.assertTrue(store.getResource("toBeDeleted", "README").equals("README"));
store.getResource("toBeDeleted", "README");
Assert.assertTrue(store.getResource(STORAGE_URL + extensionPath + "/README").equals("README"));
store.deleteExtension("toBeDeleted", "falconUser");
ExtensionMetaStore metaStore = new ExtensionMetaStore();
Assert.assertEquals(metaStore.getAllExtensions().size(), 0);
Expand Down
Expand Up @@ -37,10 +37,7 @@
import org.slf4j.LoggerFactory;
import javax.ws.rs.core.Response;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;

Expand Down Expand Up @@ -111,36 +108,27 @@ public APIResult getExtensions() {
}

public ExtensionJobList getExtensionJobs(String extensionName, String sortOrder, String doAsUser) {

Comparator<ExtensionJobsBean> compareByJobName = new Comparator<ExtensionJobsBean>() {
@Override
public int compare(ExtensionJobsBean o1, ExtensionJobsBean o2) {
return o1.getJobName().compareToIgnoreCase(o2.getJobName());
}
};

Map<String, String> jobAndExtensionNames = new HashMap<>();
TreeMap<String, String> jobAndExtensionNames = new TreeMap<>();
List<ExtensionJobsBean> extensionJobs = null;

if (extensionName != null) {
extensionJobs = ExtensionStore.getMetaStore().getJobsForAnExtension(extensionName);
} else {
extensionJobs = ExtensionStore.getMetaStore().getAllExtensionJobs();
}

for (ExtensionJobsBean job : extensionJobs) {
jobAndExtensionNames.put(job.getJobName(), job.getExtensionName());
}

sortOrder = (sortOrder == null) ? ASCENDING_SORT_ORDER : sortOrder;
switch (sortOrder.toLowerCase()) {
case DESCENDING_SORT_ORDER:
Collections.sort(extensionJobs, Collections.reverseOrder(compareByJobName));
break;
return new ExtensionJobList(extensionJobs.size(), jobAndExtensionNames.descendingMap());

default:
Collections.sort(extensionJobs, compareByJobName);
return new ExtensionJobList(extensionJobs.size(), jobAndExtensionNames);
}

for (ExtensionJobsBean job : extensionJobs) {
jobAndExtensionNames.put(job.getJobName(), job.getExtensionName());
}
return new ExtensionJobList(extensionJobs.size(), jobAndExtensionNames);
}

public APIResult deleteExtensionMetadata(String extensionName) {
Expand Down Expand Up @@ -280,28 +268,23 @@ private static JSONArray buildEnumerateResult() throws FalconException {
}

protected static void checkIfExtensionIsEnabled(String extensionName) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionBean extensionBean = metaStore.getDetail(extensionName);
if (extensionBean == null) {
LOG.error("Extension not found: " + extensionName);
throw FalconWebException.newAPIException("Extension not found:" + extensionName,
Response.Status.NOT_FOUND);
}
ExtensionBean extensionBean = getExtensionIfExists(extensionName);
if (!extensionBean.getStatus().equals(ExtensionStatus.ENABLED)) {
LOG.error("Extension: " + extensionName + " is in disabled state.");
throw FalconWebException.newAPIException("Extension: " + extensionName + " is in disabled state.",
Response.Status.INTERNAL_SERVER_ERROR);
}
}

protected static void checkIfExtensionExists(String extensionName) {
protected static ExtensionBean getExtensionIfExists(String extensionName) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionBean extensionBean = metaStore.getDetail(extensionName);
if (extensionBean == null) {
LOG.error("Extension not found: " + extensionName);
throw FalconWebException.newAPIException("Extension not found:" + extensionName,
Response.Status.NOT_FOUND);
}
return extensionBean;
}

protected static void checkIfExtensionJobNameExists(String jobName, String extensionName) {
Expand Down
Expand Up @@ -20,6 +20,34 @@

import com.sun.jersey.multipart.FormDataBodyPart;
import com.sun.jersey.multipart.FormDataParam;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.xml.bind.JAXBException;
import org.apache.commons.io.IOUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
Expand All @@ -30,52 +58,24 @@
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.extensions.Extension;
import org.apache.falcon.extensions.ExtensionProperties;
import org.apache.falcon.extensions.ExtensionService;
import org.apache.falcon.extensions.ExtensionType;
import org.apache.falcon.extensions.ExtensionProperties;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.extensions.store.ExtensionStore;
import org.apache.falcon.persistence.ExtensionBean;
import org.apache.falcon.persistence.ExtensionJobsBean;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.AbstractExtensionManager;
import org.apache.falcon.resource.ExtensionInstanceList;
import org.apache.falcon.resource.ExtensionJobList;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.service.Services;
import org.apache.falcon.util.DeploymentUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.xml.bind.JAXBException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Set;
import java.util.Properties;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.SortedMap;
import java.io.IOException;
import java.io.InputStream;
import java.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream;

/**
* Jersey Resource for extension job operations.
*/
Expand All @@ -101,7 +101,7 @@ public ExtensionJobList getExtensionJobs(
@DefaultValue(ASCENDING_SORT_ORDER) @QueryParam("sortOrder") String sortOrder,
@DefaultValue("") @QueryParam("doAs") String doAsUser) {
checkIfExtensionServiceIsEnabled();
checkIfExtensionExists(extensionName);
getExtensionIfExists(extensionName);
try {
return super.getExtensionJobs(extensionName, sortOrder, doAsUser);
} catch (Throwable e) {
Expand Down Expand Up @@ -341,14 +341,7 @@ private SortedMap<EntityType, List<Entity>> getEntityList(String extensionName,
}

private ExtensionType getExtensionType(String extensionName) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionBean extensionDetails = metaStore.getDetail(extensionName);
if (extensionDetails == null) {
// return failure if the extension job doesn't exist
LOG.error("Extension not found: " + extensionName);
throw FalconWebException.newAPIException("Extension not found:" + extensionName,
Response.Status.NOT_FOUND);
}
ExtensionBean extensionDetails = getExtensionIfExists(extensionName);
return extensionDetails.getExtensionType();
}

Expand Down Expand Up @@ -623,9 +616,10 @@ public APIResult getExtensions() {
public APIResult getExtensionDescription(
@PathParam("extension-name") String extensionName) {
checkIfExtensionServiceIsEnabled();
validateExtensionName(extensionName);
ExtensionBean extensionBean = getExtensionIfExists(extensionName);
try {
return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().getResource(extensionName, README));
String extensionResourcePath = extensionBean.getLocation() + File.separator + README;
return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().getResource(extensionResourcePath));
} catch (FalconException e) {
throw FalconWebException.newAPIException(e, Response.Status.BAD_REQUEST);
} catch (Throwable e) {
Expand Down Expand Up @@ -694,9 +688,18 @@ public APIResult registerExtensionMetadata(
public APIResult getExtensionDefinition(
@PathParam("extension-name") String extensionName) {
checkIfExtensionServiceIsEnabled();
ExtensionBean extensionBean = getExtensionIfExists(extensionName);
try {
return new APIResult(APIResult.Status.SUCCEEDED, ExtensionStore.get().getResource(extensionName,
extensionName.toLowerCase() + EXTENSION_PROPERTY_JSON_SUFFIX));
ExtensionType extensionType = extensionBean.getExtensionType();
String extensionResourcePath;
if (ExtensionType.TRUSTED.equals(extensionType)) {
extensionResourcePath = extensionBean.getLocation() + "/META/"
+ extensionName.toLowerCase() + EXTENSION_PROPERTY_JSON_SUFFIX;
} else {
extensionResourcePath = extensionBean.getLocation() + "/META";
}
return new APIResult(APIResult.Status.SUCCEEDED,
ExtensionStore.get().getResource(extensionResourcePath));
} catch (FalconException e) {
throw FalconWebException.newAPIException(e, Response.Status.BAD_REQUEST);
} catch (Throwable e) {
Expand Down

0 comments on commit ba60452

Please sign in to comment.