Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' of https://github.com/apache/falcon
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepSamudrala committed Dec 15, 2016
2 parents 194f36a + d1052bf commit 0a433fb
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public class ExtensionBean {
@Column(name = "creation_time")
private Date creationTime;

@Basic
@NotNull
@Column(name = "extension_owner")
private String extensionOwner;

public ExtensionType getExtensionType() {
return extensionType;
}
Expand All @@ -90,7 +95,6 @@ public void setCreationTime(Date creationTime) {
this.creationTime = creationTime;
}


public String getExtensionName() {
return extensionName;
}
Expand All @@ -114,4 +118,13 @@ public String getDescription() {
public void setDescription(String description) {
this.description = description;
}

public String getExtensionOwner() {
return extensionOwner;
}

public void setExtensionOwner(String extensionOwner) {
this.extensionOwner = extensionOwner;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ public void setCreationTime(Date creationTime) {
this.creationTime = creationTime;
}


public byte[] getConfig() {
return config;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
public class HdfsClassLoader extends URLClassLoader {

private static final Logger LOG = LoggerFactory.getLogger(HdfsClassLoader.class);
private static Map<String, HdfsClassLoader> classLoaderCache = new ConcurrentHashMap<String, HdfsClassLoader>();
private static Map<String, HdfsClassLoader> classLoaderCache = new ConcurrentHashMap<>();
private static final Object LOCK = new Object();

public static ClassLoader load(final String name, final List<String> jarHdfsPath) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ private EntityManager getEntityManager() {
}

public void storeExtensionBean(String extensionName, String location, ExtensionType extensionType,
String description){
String description, String extensionOwner) {
ExtensionBean extensionBean = new ExtensionBean();
extensionBean.setLocation(location);
extensionBean.setExtensionName(extensionName);
extensionBean.setExtensionType(extensionType);
extensionBean.setCreationTime(new Date(System.currentTimeMillis()));
extensionBean.setDescription(description);
extensionBean.setExtensionOwner(extensionOwner);
EntityManager entityManager = getEntityManager();
try {
beginTransaction(entityManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.falcon.extensions.ExtensionType;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.util.StartupProperties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -94,7 +95,7 @@ private ExtensionStore() {
}

private void initializeDbTable() {
try{
try {
metaStore.deleteExtensionsOfType(ExtensionType.TRUSTED);
List<String> extensions = getExtensions();
for (String extension : extensions) {
Expand All @@ -103,9 +104,10 @@ private void initializeDbTable() {
String description = getShortDescription(extension);
String recipeName = extension;
String location = storePath.toString() + '/' + extension;
metaStore.storeExtensionBean(recipeName, location, extensionType, description);
String extensionOwner = CurrentUser.getUser();
metaStore.storeExtensionBean(recipeName, location, extensionType, description, extensionOwner);
}
} catch (FalconException e){
} catch (FalconException e) {
LOG.error("Exception in ExtensionMetaStore:", e);
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -251,63 +253,65 @@ public List<String> getExtensions() throws StoreAccessException {
}
return extensionList;
}
public String deleteExtension(final String extensionName) throws ValidationException{

public String deleteExtension(final String extensionName, String currentUser) throws FalconException {
ExtensionType extensionType = AbstractExtension.isExtensionTrusted(extensionName) ? ExtensionType.TRUSTED
: ExtensionType.CUSTOM;
if (extensionType.equals(ExtensionType.TRUSTED)){
if (extensionType.equals(ExtensionType.TRUSTED)) {
throw new ValidationException(extensionName + " is trusted cannot be deleted.");
}
if (metaStore.checkIfExtensionExists(extensionName)) {
} else if (!metaStore.checkIfExtensionExists(extensionName)) {
throw new FalconException("Extension:" + extensionName + " is not registered with Falcon.");
} else if (!metaStore.getDetail(extensionName).getExtensionOwner().equals(currentUser)) {
throw new FalconException("User: " + currentUser + " is not allowed to delete extension: " + extensionName);
} else {
metaStore.deleteExtension(extensionName);
return "Deleted extension:" + extensionName;
} else {
return "Extension:" + extensionName + " is not registered with Falcon.";
}
}

public String registerExtension(final String extensionName, final String path, final String description)
throws URISyntaxException, FalconException {
public String registerExtension(final String extensionName, final String path, final String description,
String extensionOwner) throws URISyntaxException, FalconException {
Configuration conf = new Configuration();
URI uri = new URI(path);
conf.set("fs.defaultFS", uri.getScheme() + "://" + uri.getAuthority());
FileSystem fileSystem = HadoopClientFactory.get().createFalconFileSystem(uri);
FileSystem fileSystem = HadoopClientFactory.get().createFalconFileSystem(uri);
try {
fileSystem.listStatus(new Path(uri.getPath() + "/README"));
} catch (IOException e){
} catch (IOException e) {
LOG.error("Exception in registering Extension:{}", extensionName, e);
throw new ValidationException("README file is not present in the " + path);
}
PathFilter filter=new PathFilter(){
public boolean accept(Path file){
PathFilter filter = new PathFilter() {
public boolean accept(Path file) {
return file.getName().endsWith(".jar");
}
};
FileStatus[] jarStatus;
try {
jarStatus = fileSystem.listStatus(new Path(uri.getPath(), "libs/build"), filter);
if (jarStatus.length <=0) {
if (jarStatus.length <= 0) {
throw new ValidationException("Jars are not present in the " + uri.getPath() + "/libs/build.");
}
} catch (IOException e){
} catch (IOException e) {
LOG.error("Exception in registering Extension:{}", extensionName, e);
throw new ValidationException("Jars are not present in the " + uri.getPath() + "/libs/build.");
}
FileStatus[] propStatus;
try{
try {
propStatus = fileSystem.listStatus(new Path(uri.getPath() + "/META"));
if (propStatus.length <=0){
if (propStatus.length <= 0) {
throw new ValidationException("No properties file is not present in the " + uri.getPath() + "/META"
+ " structure.");
}
} catch (IOException e){
} catch (IOException e) {
LOG.error("Exception in registering Extension:{}", extensionName, e);
throw new ValidationException("Directory is not present in the " + uri.getPath() + "/META"
+ " structure.");
}

if (!metaStore.checkIfExtensionExists(extensionName)){
metaStore.storeExtensionBean(extensionName, path, ExtensionType.CUSTOM, description);
}else{
if (!metaStore.checkIfExtensionExists(extensionName)) {
metaStore.storeExtensionBean(extensionName, path, ExtensionType.CUSTOM, description, extensionOwner);
} else {
throw new ValidationException(extensionName + " already exists.");
}
return "Extension :" + extensionName + " registered successfully.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ public void init() {
@Test
public void testExtension(){
//insert
stateStore.storeExtensionBean("test1", "test_location", ExtensionType.TRUSTED, "test_description");
stateStore.storeExtensionBean("test1", "test_location", ExtensionType.TRUSTED, "test_description",
"falconUser");

Assert.assertEquals(stateStore.getAllExtensions().size(), 1);
//check data
Expand All @@ -73,7 +74,8 @@ public void testExtension(){

@Test
public void testExtensionJob() {
stateStore.storeExtensionBean("test2", "test_location", ExtensionType.CUSTOM, "test2_description");
stateStore.storeExtensionBean("test2", "test_location", ExtensionType.CUSTOM, "test2_description",
"falconUser");
List<String> processes = new ArrayList<>();
processes.add("testProcess");
List<String> feeds = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import java.util.Map;

/**
* Tests for extension store.
* Tests for extension store.
*/
public class ExtensionStoreTest extends AbstractTestExtensionStore {
private static Map<String, String> resourcesMap;
Expand Down Expand Up @@ -104,60 +104,85 @@ public void clean() {
}
}


@Test
public void testRegisterExtension() throws IOException, URISyntaxException, FalconException{
createLibs();
createReadmeAndJar();
createMETA();
public void testRegisterExtension() throws IOException, URISyntaxException, FalconException {
String extensionPath = EXTENSION_PATH + "testRegister";
createLibs(extensionPath);
createReadmeAndJar(extensionPath);
createMETA(extensionPath);
store = ExtensionStore.get();
store.registerExtension("test", STORAGE_URL + EXTENSION_PATH, "test desc");
store.registerExtension("test", STORAGE_URL + extensionPath, "test desc", "falconUser");
ExtensionMetaStore metaStore = new ExtensionMetaStore();
Assert.assertEquals(metaStore.getAllExtensions().size(), 1);
}

@Test(expectedExceptions=ValidationException.class)
public void testFailureCaseRegisterExtension() throws IOException, URISyntaxException, FalconException{
@Test(expectedExceptions = ValidationException.class)
public void testFailureCaseRegisterExtension() throws IOException, URISyntaxException, FalconException {
String extensionPath = EXTENSION_PATH + "testRegister";
store = ExtensionStore.get();
createLibs(extensionPath);
store.registerExtension("test", STORAGE_URL + EXTENSION_PATH, "test desc", "falconUser");
}

@Test
public void testDeleteExtension() throws IOException, URISyntaxException, FalconException {
String extensionPath = EXTENSION_PATH + "testDelete";
createLibs(extensionPath);
createReadmeAndJar(extensionPath);
createMETA(extensionPath);
store = ExtensionStore.get();
store.registerExtension("toBeDeleted", STORAGE_URL + extensionPath, "test desc", "falconUser");
store.deleteExtension("toBeDeleted", "falconUser");
ExtensionMetaStore metaStore = new ExtensionMetaStore();
Assert.assertEquals(metaStore.getAllExtensions().size(), 0);
}

@Test(expectedExceptions = FalconException.class)
public void testFailureDeleteExtension() throws IOException, URISyntaxException, FalconException {
String extensionPath = EXTENSION_PATH + "testACLOnDelete";
createLibs(extensionPath);
createReadmeAndJar(extensionPath);
createMETA(extensionPath);
store = ExtensionStore.get();
createLibs();
store.registerExtension("test", STORAGE_URL + EXTENSION_PATH, "test desc");
store.registerExtension("ACLFailure", STORAGE_URL + extensionPath, "test desc", "oozieUser");
store.deleteExtension("ACLFailure", "falconUser");
}

private void createMETA() throws IOException{
Path path = new Path(EXTENSION_PATH + "/META");
if (fs.exists(path)){
private void createMETA(String extensionPath) throws IOException {
Path path = new Path(extensionPath + "/META");
if (fs.exists(path)) {
fs.delete(path, true);
}
fs.mkdirs(path);
path = new Path(EXTENSION_PATH + "/META/test.properties");
path = new Path(extensionPath + "/META/test.properties");
OutputStream os = fs.create(path);
BufferedWriter br = new BufferedWriter(new OutputStreamWriter(os, "UTF-8"));
br.write("Hello World");
if (fs.exists(path)){
if (fs.exists(path)) {
fs.delete(path, true);
}
br.write("test properties");
fs.create(path);
br.close();
}

private void createLibs() throws IOException{
Path path = new Path(EXTENSION_PATH);
if (fs.exists(path)){
private void createLibs(String extensionPath) throws IOException {
Path path = new Path(extensionPath);
if (fs.exists(path)) {
fs.delete(path, true);
}
fs.mkdirs(path);
path = new Path(EXTENSION_PATH + "/libs//libs/build");
path = new Path(extensionPath + "/libs//libs/build");
fs.mkdirs(path);
}

private void createReadmeAndJar() throws IOException{
Path path = new Path(EXTENSION_PATH + "/README");
if (fs.exists(path)){
private void createReadmeAndJar(String extensionPath) throws IOException {
Path path = new Path(extensionPath + "/README");
if (fs.exists(path)) {
fs.delete(path, true);
}
fs.create(path);
path = new Path(EXTENSION_PATH + "/libs/build/test.jar");
path = new Path(extensionPath + "/libs/build/test.jar");
OutputStream os = fs.create(path);
BufferedWriter br = new BufferedWriter(new OutputStreamWriter(os, "UTF-8"));
br.write("Hello World");
Expand All @@ -177,6 +202,5 @@ private void clearDB() {
em.close();
}
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.falcon.resource.ExtensionInstanceList;
import org.apache.falcon.resource.ExtensionJobList;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.service.Services;
import org.apache.falcon.util.DeploymentUtil;
import org.apache.hadoop.security.authorize.AuthorizationException;
Expand Down Expand Up @@ -591,7 +592,7 @@ public String deleteExtensionMetadata(
checkIfExtensionServiceIsEnabled();
validateExtensionName(extensionName);
try {
return ExtensionStore.get().deleteExtension(extensionName);
return ExtensionStore.get().deleteExtension(extensionName, CurrentUser.getUser());
} catch (Throwable e) {
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
Expand All @@ -608,7 +609,7 @@ public String registerExtensionMetadata(
checkIfExtensionServiceIsEnabled();
validateExtensionName(extensionName);
try {
return ExtensionStore.get().registerExtension(extensionName, path, description);
return ExtensionStore.get().registerExtension(extensionName, path, description, CurrentUser.getUser());
} catch (Throwable e) {
throw FalconWebException.newAPIException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
Expand Down

0 comments on commit 0a433fb

Please sign in to comment.