Skip to content

Commit

Permalink
Functions - add debug information
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Nov 16, 2021
1 parent 5d69f0b commit 05d4590
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.swagger.annotations.ApiResponses;
import io.swagger.annotations.Example;
import io.swagger.annotations.ExampleProperty;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.functions.FunctionConfig;
Expand Down Expand Up @@ -52,7 +53,7 @@
import java.io.InputStream;
import java.util.List;
import java.util.function.Supplier;

@Slf4j
public class FunctionsBase extends AdminResource implements Supplier<WorkerService> {

private final FunctionsImpl functions;
Expand Down Expand Up @@ -709,7 +710,7 @@ public void updateFunctionOnWorkerLeader(final @PathParam("tenant") String tenan
final @PathParam("functionName") String functionName,
final @FormDataParam("functionMetaData") InputStream uploadedInputStream,
final @FormDataParam("delete") boolean delete) {

log.info("updateFunctionOnWorkerLeader {} {} {} {}", tenant, namespace, functionName, delete);
functions.updateFunctionOnWorkerLeader(tenant, namespace, functionName, uploadedInputStream,
delete, uri.getRequestUri(), clientAppId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.HttpResponseStatus;
import org.asynchttpclient.Request;
import org.asynchttpclient.RequestBuilder;
import org.asynchttpclient.request.body.multipart.ByteArrayPart;
import org.asynchttpclient.request.body.multipart.FilePart;
Expand Down Expand Up @@ -1066,10 +1067,14 @@ public CompletableFuture<Void> updateOnWorkerLeaderAsync(String tenant, String n
.path(function).getUri().toASCIIString())
.addBodyPart(new ByteArrayPart("functionMetaData", functionMetaData))
.addBodyPart(new StringPart("delete", Boolean.toString(delete)));

asyncHttpClient.executeRequest(addAuthHeaders(functions, builder).build())
final Request req = addAuthHeaders(functions, builder).build();
log.info("updateOnWorkerLeaderAsync {} {} {} delete={} request={}",
tenant, namespace, function, delete, req);
asyncHttpClient.executeRequest(req)
.toCompletableFuture()
.thenAccept(response -> {
log.info("updateOnWorkerLeaderAsync {} {} {} delete={} response {}",
tenant, namespace, function, delete, response);
if (response.getStatusCode() < 200 || response.getStatusCode() >= 300) {
future.completeExceptionally(
getApiException(Response
Expand All @@ -1081,6 +1086,8 @@ public CompletableFuture<Void> updateOnWorkerLeaderAsync(String tenant, String n
}
})
.exceptionally(throwable -> {
log.error("updateOnWorkerLeaderAsync {} {} {} delete={} ",
tenant, namespace, function, delete, throwable);
future.completeExceptionally(getApiException(throwable));
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ public void deregisterFunction(final String tenant,
newVersionedMetaData.getFunctionDetails().getNamespace(),
newVersionedMetaData.getFunctionDetails().getName(),
newVersionedMetaData, true,
String.format("Error deleting {} @ /{}/{}/{}",
String.format("Error deleting %s @ /%s/%s/%s",
ComponentTypeUtils.toString(componentType), tenant, namespace, componentName));

// clean up component files stored in BK
Expand Down Expand Up @@ -1564,7 +1564,10 @@ public boolean allowFunctionOps(NamespaceName namespaceName, String role,
private void internalProcessFunctionRequest(final String tenant, final String namespace, final String functionName,
final FunctionMetaData functionMetadata, boolean delete, String errorMsg) {
try {
if (worker().getLeaderService().isLeader()) {
boolean isLeader = worker().getLeaderService().isLeader();
log.info("internalProcessFunctionRequest {} {} {} {} delete={} isLeader={}", tenant, namespace, functionName,
functionMetadata, delete, isLeader);
if (isLeader) {
worker().getFunctionMetaDataManager().updateFunctionOnLeader(functionMetadata, delete);
} else {
FunctionsImpl functions = (FunctionsImpl) worker().getFunctionAdmin().functions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ public void updateFunction(final String tenant,
final String clientRole,
AuthenticationDataHttps clientAuthenticationDataHttps,
UpdateOptions updateOptions) {

log.info("updateFunction {} {} {}", tenant, namespace, functionName);
if (!isWorkerServiceAvailable()) {
throwUnavailableException();
}
Expand Down Expand Up @@ -685,14 +685,18 @@ public void updateFunctionOnWorkerLeader(final String tenant,
throw new RestException(Response.Status.BAD_REQUEST, "Corrupt Function MetaData");
}

log.info("updateFunctionOnWorkerLeader leader={}", worker().getLeaderService().isLeader());
// Redirect if we are not the leader
if (!worker().getLeaderService().isLeader()) {
WorkerInfo workerInfo = worker().getMembershipManager().getLeader();
log.info("currentLeader is {}", workerInfo);
log.info("local worker id is {}", worker().getWorkerConfig().getWorkerId());
if (workerInfo.getWorkerId().equals(worker().getWorkerConfig().getWorkerId())) {
throw new RestException(Response.Status.SERVICE_UNAVAILABLE,
"Leader not yet ready. Please retry again");
}
URI redirect = UriBuilder.fromUri(uri).host(workerInfo.getWorkerHostname()).port(workerInfo.getPort()).build();
log.info("redirect to {}", redirect);
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ public void updateFunctionOnWorkerLeader(final @PathParam("tenant") String tenan
final @PathParam("functionName") String functionName,
final @FormDataParam("functionMetaData") InputStream uploadedInputStream,
final @FormDataParam("delete") boolean delete) {

log.info("updateFunctionOnWorkerLeader {} {} {} {}", tenant, namespace, functionName, delete);
functions.updateFunctionOnWorkerLeader(tenant, namespace, functionName, uploadedInputStream,
delete, uri.getRequestUri(), clientAppId());
}
Expand Down

0 comments on commit 05d4590

Please sign in to comment.