Skip to content

Commit

Permalink
Added Submit Function capability to the pulsar-functions cli (#33)
Browse files Browse the repository at this point in the history
* Create pulsar-functions module (#1)

* Create pulsar-functions module

* rename `sdk` package to `api`

* Added the first cut of the Java interface for Pulsar functions (#2)

* Added the first cut of the pulsar submit

* Working
  • Loading branch information
srkukarni authored and sijie committed Mar 4, 2018
1 parent 4dd4089 commit 48d4aa4
Show file tree
Hide file tree
Showing 11 changed files with 364 additions and 18 deletions.
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Properties; import java.util.Properties;
import java.util.function.BiFunction;


import java.util.function.BiFunction; import java.util.function.BiFunction;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
Expand Down
2 changes: 1 addition & 1 deletion pulsar-functions/conf/example.yml
Expand Up @@ -17,7 +17,7 @@
# under the License. # under the License.
# #


tenant: "test"
nameSpace: "test" nameSpace: "test"
userName: "test"
name: "example" name: "example"
className: "org.apache.pulsar.functions.api.examples.ExclamationFunction" className: "org.apache.pulsar.functions.api.examples.ExclamationFunction"
2 changes: 1 addition & 1 deletion pulsar-functions/run-examples.sh
Expand Up @@ -17,7 +17,7 @@
# under the License. # under the License.
# #


bin/pulsar-functions functions run \ bin/pulsar-functions functions localrun \
--function-config conf/example.yml \ --function-config conf/example.yml \
--sink-topic persistent://sample/standalone/ns1/test_result \ --sink-topic persistent://sample/standalone/ns1/test_result \
--source-topic persistent://sample/standalone/ns1/test_src \ --source-topic persistent://sample/standalone/ns1/test_src \
Expand Down
16 changes: 16 additions & 0 deletions pulsar-functions/runtime/pom.xml
Expand Up @@ -86,6 +86,22 @@
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>


<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-client</artifactId>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-json-jackson</artifactId>
</dependency>

<dependency>
<groupId>org.glassfish.jersey.media</groupId>
<artifactId>jersey-media-multipart</artifactId>
<version>2.23</version>
</dependency>

<dependency> <dependency>
<groupId>org.mockito</groupId> <groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId> <artifactId>mockito-core</artifactId>
Expand Down
Expand Up @@ -24,15 +24,15 @@
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import lombok.Getter; import lombok.Getter;
import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarFunctionsAdmin;
import org.apache.pulsar.functions.fs.FunctionConfig; import org.apache.pulsar.functions.fs.FunctionConfig;
import org.apache.pulsar.functions.runtime.spawner.LimitsConfig; import org.apache.pulsar.functions.runtime.spawner.LimitsConfig;
import org.apache.pulsar.functions.runtime.spawner.Spawner; import org.apache.pulsar.functions.runtime.spawner.Spawner;


@Parameters(commandDescription = "Operations about functions") @Parameters(commandDescription = "Operations about functions")
public class CmdFunctions extends CmdBase { public class CmdFunctions extends CmdBase {

private LocalRunner localRunner;
private final LocalRunner cmdRunner; private SubmitFunction submitter;

@Getter @Getter
abstract class FunctionsCommand extends CliCommand { abstract class FunctionsCommand extends CliCommand {
@Parameter(names = "--name", description = "Function Name\n") @Parameter(names = "--name", description = "Function Name\n")
Expand Down Expand Up @@ -94,7 +94,6 @@ void run() throws Exception {
abstract void run_functions_cmd() throws Exception; abstract void run_functions_cmd() throws Exception;
} }


@Getter
@Parameters(commandDescription = "Run function locally") @Parameters(commandDescription = "Run function locally")
class LocalRunner extends FunctionsCommand { class LocalRunner extends FunctionsCommand {


Expand All @@ -114,17 +113,32 @@ void run_functions_cmd() throws Exception {
spawner.start(); spawner.start();
spawner.join(); spawner.join();
} }
}


@Parameters(commandDescription = "Submit function")
class SubmitFunction extends FunctionsCommand {
@Override
void run_functions_cmd() throws Exception {
PulsarFunctionsAdmin a = (PulsarFunctionsAdmin)admin;
a.functions().createFunction(functionConfig, functionConfig.loadCodeFile());
}
} }


public CmdFunctions(PulsarAdmin admin) { public CmdFunctions(PulsarAdmin admin) {
super("functions", admin); super("functions", admin);
cmdRunner = new LocalRunner(); localRunner = new LocalRunner();
jcommander.addCommand("run", cmdRunner); submitter = new SubmitFunction();
jcommander.addCommand("localrun", localRunner);
jcommander.addCommand("submit", submitter);
}

@VisibleForTesting
LocalRunner getLocalRunner() {
return localRunner;
} }


@VisibleForTesting @VisibleForTesting
LocalRunner getCmdRunner() { SubmitFunction getSubmitter() {
return cmdRunner; return submitter;
} }
} }
Expand Up @@ -19,6 +19,8 @@


package org.apache.pulsar.admin.cli; package org.apache.pulsar.admin.cli;


import org.apache.pulsar.client.admin.PulsarFunctionsAdmin;

import java.io.FileInputStream; import java.io.FileInputStream;
import java.util.Arrays; import java.util.Arrays;
import java.util.Properties; import java.util.Properties;
Expand Down Expand Up @@ -50,7 +52,15 @@ public static void main(String[] args) throws Exception {


FunctionsTool tool = new FunctionsTool(properties); FunctionsTool tool = new FunctionsTool(properties);


if (tool.run(Arrays.copyOfRange(args, 1, args.length))) { if (tool.run(Arrays.copyOfRange(args, 1, args.length), (url, config) -> {
try {
return new PulsarFunctionsAdmin(url, config);
} catch (Exception ex) {
System.err.println(ex.getClass() + ": " + ex.getMessage());
System.exit(1);
return null;
}
})) {
System.exit(0); System.exit(0);
} else { } else {
System.exit(1); System.exit(1);
Expand Down
@@ -0,0 +1,135 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.admin;

import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.functions.fs.FunctionConfig;

import java.util.List;

/**
* Admin interface for function management.
*/
public interface Functions {
/**
* Get the list of functions.
* <p>
* Get the list of all the Pulsar functions.
* <p>
* Response Example:
*
* <pre>
* <code>["f1", "f2", "f3"]</code>
* </pre>
*
* @throws NotAuthorizedException
* Don't have admin permission
* @throws PulsarAdminException
* Unexpected error
*/
List<String> getFunctions(String tenant, String namespace) throws PulsarAdminException;

/**
* Get the configuration for the specified function.
* <p>
* Response Example:
*
* <pre>
* <code>{ serviceUrl : "http://my-broker.example.com:8080/" }</code>
* </pre>
*
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param function
* Function name
*
* @return the function configuration
*
* @throws NotAuthorizedException
* You don't have admin permission to get the configuration of the cluster
* @throws NotFoundException
* Cluster doesn't exist
* @throws PulsarAdminException
* Unexpected error
*/
FunctionConfig getFunction(String tenant, String namespace, String function) throws PulsarAdminException;

/**
* Create a new cluster.
* <p>
* Provisions a new cluster. This operation requires Pulsar super-user privileges.
* <p>
* The name cannot contain '/' characters.
*
* @param functionConfig
* the function configuration object
*
* @throws NotAuthorized
* You don't have admin permission to create the cluster
* @throws ConflictException
* Cluster already exists
* @throws PulsarAdminException
* Unexpected error
*/
void createFunction(FunctionConfig functionConfig, byte[] code) throws PulsarAdminException;

/**
* Update the configuration for a function.
* <p>
*
* @param functionConfig
* the function configuration object
*
* @throws NotAuthorizedException
* You don't have admin permission to create the cluster
* @throws NotFoundException
* Cluster doesn't exist
* @throws PulsarAdminException
* Unexpected error
*/
void updateFunction(FunctionConfig functionConfig) throws PulsarAdminException;

/**
* Delete an existing function
* <p>
* Delete a function
*
* @param tenant
* Tenant name
* @param namespace
* Namespace name
* @param function
* Function name
*
* @throws NotAuthorizedException
* You don't have admin permission
* @throws NotFoundException
* Cluster does not exist
* @throws PreconditionFailedException
* Cluster is not empty
* @throws PulsarAdminException
* Unexpected error
*/
void deleteFunction(String tenant, String namespace, String function) throws PulsarAdminException;
}
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.admin;

import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.client.api.*;

import org.glassfish.jersey.media.multipart.MultiPartFeature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URL;


/**
* Pulsar client admin API client.
*/
public class PulsarFunctionsAdmin extends PulsarAdmin {
private static final Logger LOG = LoggerFactory.getLogger(PulsarFunctionsAdmin.class);

private final Functions functions;

/**
* Construct a new Pulsar Admin client object.
* <p>
* This client object can be used to perform many subsquent API calls
*
* @param serviceUrl
* the Pulsar service URL (eg. "http://my-broker.example.com:8080")
* @param pulsarConfig
* the ClientConfiguration object to be used to talk with Pulsar
*/
public PulsarFunctionsAdmin(URL serviceUrl, ClientConfiguration pulsarConfig) throws PulsarClientException {
super(serviceUrl, pulsarConfig);
this.functions = new FunctionsImpl(web, auth);
}

/**
* @return the function management object
*/
public Functions functions() {
return functions;
}
}

0 comments on commit 48d4aa4

Please sign in to comment.