Skip to content

Commit

Permalink
remove function package on function delete (#1385)
Browse files Browse the repository at this point in the history
* remove function package on function delete

* using apache commons for deletion

* refactoring

* removing unnecessary import

* removing unncessary dependency
  • Loading branch information
jerrypeng authored and merlimat committed Mar 15, 2018
1 parent e0c9e7a commit 5c31b2e
Showing 1 changed file with 29 additions and 10 deletions.
Expand Up @@ -18,11 +18,14 @@
*/ */
package org.apache.pulsar.functions.worker; package org.apache.pulsar.functions.worker;


import java.io.IOException;
import java.nio.file.FileAlreadyExistsException; import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Paths; import java.nio.file.Paths;

import lombok.*; import lombok.*;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function;
Expand Down Expand Up @@ -112,13 +115,7 @@ private void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Excep
functionMetaData.getFunctionConfig().getName(), instance.getInstanceId()); functionMetaData.getFunctionConfig().getName(), instance.getInstanceId());
File pkgDir = new File( File pkgDir = new File(
workerConfig.getDownloadDirectory(), workerConfig.getDownloadDirectory(),
StringUtils.join( getDownloadPackagePath(functionMetaData));
new String[]{
functionMetaData.getFunctionConfig().getTenant(),
functionMetaData.getFunctionConfig().getNamespace(),
functionMetaData.getFunctionConfig().getName(),
},
File.separatorChar));
pkgDir.mkdirs(); pkgDir.mkdirs();


int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId(); int instanceId = functionRuntimeInfo.getFunctionInstance().getInstanceId();
Expand Down Expand Up @@ -178,16 +175,38 @@ private void startFunction(FunctionRuntimeInfo functionRuntimeInfo) throws Excep
runtimeSpawner.start(); runtimeSpawner.start();
} }


private boolean stopFunction(FunctionRuntimeInfo functionRuntimeInfo) { private void stopFunction(FunctionRuntimeInfo functionRuntimeInfo) {
Function.Instance instance = functionRuntimeInfo.getFunctionInstance(); Function.Instance instance = functionRuntimeInfo.getFunctionInstance();
FunctionMetaData functionMetaData = instance.getFunctionMetaData(); FunctionMetaData functionMetaData = instance.getFunctionMetaData();
log.info("Stopping function {} - {}...", log.info("Stopping function {} - {}...",
functionMetaData.getFunctionConfig().getName(), instance.getInstanceId()); functionMetaData.getFunctionConfig().getName(), instance.getInstanceId());
if (functionRuntimeInfo.getRuntimeSpawner() != null) { if (functionRuntimeInfo.getRuntimeSpawner() != null) {
functionRuntimeInfo.getRuntimeSpawner().close(); functionRuntimeInfo.getRuntimeSpawner().close();
functionRuntimeInfo.setRuntimeSpawner(null); functionRuntimeInfo.setRuntimeSpawner(null);
return true;
} }
return false;
// clean up function package
File pkgDir = new File(
workerConfig.getDownloadDirectory(),
getDownloadPackagePath(functionMetaData));

if (pkgDir.exists()) {
try {
FileUtils.deleteDirectory(pkgDir);
} catch (IOException e) {
log.warn("Failed to delete package for function: {}",
FunctionConfigUtils.getFullyQualifiedName(functionMetaData.getFunctionConfig()), e);
}
}
}

private String getDownloadPackagePath(FunctionMetaData functionMetaData) {
return StringUtils.join(
new String[]{
functionMetaData.getFunctionConfig().getTenant(),
functionMetaData.getFunctionConfig().getNamespace(),
functionMetaData.getFunctionConfig().getName(),
},
File.separatorChar);
} }
} }

0 comments on commit 5c31b2e

Please sign in to comment.