Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.Utils;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
Expand Down Expand Up @@ -70,6 +71,7 @@ class ProcessRuntime implements Runtime {
private final SecretsProviderConfigurator secretsProviderConfigurator;
private final String extraDependenciesDir;
private static final long GRPC_TIMEOUT_SECS = 5;
private final String funcLogDir;

ProcessRuntime(InstanceConfig instanceConfig,
String instanceFile,
Expand All @@ -86,6 +88,7 @@ class ProcessRuntime implements Runtime {
this.metricsPort = Utils.findAvailablePort();
this.expectedHealthCheckInterval = expectedHealthCheckInterval;
this.secretsProviderConfigurator = secretsProviderConfigurator;
this.funcLogDir = RuntimeUtils.genFunctionLogFolder(logDirectory, instanceConfig);
String logConfigFile = null;
String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails());
String secretsProviderConfig = null;
Expand Down Expand Up @@ -131,6 +134,19 @@ class ProcessRuntime implements Runtime {
@Override
public void start() {
java.lang.Runtime.getRuntime().addShutdownHook(new Thread(() -> process.destroy()));

// Note: we create the expected log folder before the function process logger attempts to create it
// This is because if multiple instances are launched they can encounter a race condition creation of the dir.
log.info("Creating function log directory {}", funcLogDir);
boolean success = createFolder(funcLogDir);

if (!success) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what if the directory is already there? Shouldn't throw exception because directory is already there

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine since new File(path).mkdirs() is idempotent

log.error("Log folder could not be created : {}", funcLogDir);
throw new RuntimeException("Log folder creation error");
}

log.info("Created function log directory {}", funcLogDir);

startProcess();
if (channel == null && stub == null) {
channel = ManagedChannelBuilder.forAddress("127.0.0.1", instancePort)
Expand Down Expand Up @@ -335,6 +351,11 @@ public boolean isAlive() {
return true;
}

private boolean createFolder(final String path) {
final boolean success = new File(path).mkdirs();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should only attempt to create directories if they are not present

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine since new File(path).mkdirs() is idempotent

return success;
}

private void tryExtractingDeathException() {
InputStream errorStream = process.getErrorStream();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,7 @@ public static List<String> composeArgs(InstanceConfig instanceConfig,
args.add(String.format("-D%s=%s", FUNCTIONS_EXTRA_DEPS_PROPERTY, extraDependenciesDir));
}
args.add("-Dlog4j.configurationFile=" + logConfigFile);
args.add("-Dpulsar.function.log.dir=" + String.format(
"%s/%s",
logDirectory,
FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails())));
args.add("-Dpulsar.function.log.dir=" + genFunctionLogFolder(logDirectory, instanceConfig));
args.add("-Dpulsar.function.log.file=" + String.format(
"%s-%s",
instanceConfig.getFunctionDetails().getName(),
Expand Down Expand Up @@ -194,6 +191,13 @@ && isNotBlank(authConfig.getClientAuthenticationParameters())) {
return args;
}

public static String genFunctionLogFolder(String logDirectory, InstanceConfig instanceConfig) {
return String.format(
"%s/%s",
logDirectory,
FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
}

public static String getPrometheusMetrics(int metricsPort) throws IOException{
StringBuilder result = new StringBuilder();
URL url = new URL(String.format("http://%s:%s", InetAddress.getLocalHost().getHostAddress(), metricsPort));
Expand Down