Skip to content

Commit

Permalink
[docker run] Some improvements (#420)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed Sep 15, 2023
1 parent 688c865 commit efcdbca
Show file tree
Hide file tree
Showing 15 changed files with 186 additions and 47 deletions.
7 changes: 7 additions & 0 deletions examples/applications/python-processor-exclamation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ The code in `example.py` adds an exclamation mark to the end of a string message
./bin/langstream apps deploy test -app examples/applications/python-processor-exclamation -i examples/instances/kafka-kubernetes.yaml -s examples/secrets/secrets.yaml
```

## Talk with the Chat bot using the CLI
Since the application opens a gateway, we can use the gateway API to send and consume messages.

```
./bin/langstream gateway chat test -cg consume-output -pg produce-input -p sessionId=$(uuidgen)
```

## Start a Producer
```
kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.35.1-kafka-3.4.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic input-topic
Expand Down
15 changes: 4 additions & 11 deletions examples/applications/query-jdbc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ Insert some data:
./bin/langstream apps deploy test -app examples/applications/query-jdbc -i examples/instances/kafka-kubernetes.yaml -s examples/secrets/secrets.yaml
```

## Start a Producer
## Talk with the Chat bot using the CLI
Since the application opens a gateway, we can use the gateway API to send and consume messages.

```
kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.35.1-kafka-3.4.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic input-topic
./bin/langstream gateway chat test -cg consume-output -pg produce-input -p sessionId=$(uuidgen)
```

Insert a JSON with "id", "name" and "description":
Expand All @@ -71,12 +73,3 @@ Insert a JSON with "id", "name" and "description":
{"id": 1, "name": "test", "description": "test"}
```


## Start a Consumer

Start a Kafka Consumer on a terminal

```
kubectl -n kafka run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.35.1-kafka-3.4.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic output-topic --from-beginning
```

39 changes: 39 additions & 0 deletions examples/applications/query-jdbc/gateways.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#
#
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

gateways:
- id: produce-input
type: produce
topic: input-topic
parameters:
- sessionId
produceOptions:
headers:
- key: langstream-client-session-id
valueFromParameters: sessionId

- id: consume-output
type: consume
topic: output-topic
parameters:
- sessionId
consumeOptions:
filters:
headers:
- key: langstream-client-session-id
valueFromParameters: sessionId

Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,13 @@ public AgentNode getAgentImplementation(Module module, String id) {

public void registerAgent(Module module, String id, AgentNode agentImplementation) {
String internalId = module.getId() + "#" + id;
log.info(
"registering agent {} for module {} with id {}",
agentImplementation,
module.getId(),
id);
if (log.isDebugEnabled()) {
log.debug(
"registering agent {} for module {} with id {}",
agentImplementation,
module.getId(),
id);
}
agents.put(internalId, agentImplementation);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ public class LocalRunApplicationCmd extends BaseDockerCmd {

@CommandLine.Option(
names = {"-i", "--instance"},
description = "Instance file path",
required = true)
description = "Instance file path")
private String instanceFilePath;

@CommandLine.Option(
Expand Down Expand Up @@ -118,7 +117,13 @@ public void run() {
}

final File appDirectory = checkFileExistsOrDownload(appPath);
final File instanceFile = checkFileExistsOrDownload(instanceFilePath);
final File instanceFile;

if (instanceFilePath != null) {
instanceFile = checkFileExistsOrDownload(instanceFilePath);
} else {
instanceFile = null;
}
final File secretsFile = checkFileExistsOrDownload(secretFilePath);

log("Tenant " + getTenant());
Expand All @@ -129,16 +134,16 @@ public void run() {
} else {
log("Running all the agents in the application");
}
log("Instance file: " + instanceFile.getAbsolutePath());
log("Instance file: " + instanceFile);
if (secretsFile != null) {
log("Secrets file: " + secretsFile.getAbsolutePath());
}
log("Start broker: " + startBroker);
log("Start S3: " + startS3);
log("Start Webservices " + startWebservices);

if ((appDirectory == null || instanceFile == null)) {
throw new IllegalArgumentException("application and instance files are required");
if (appDirectory == null) {
throw new IllegalArgumentException("application files are required");
}

downloadDependencies(appDirectory.toPath(), getClient(), this::log);
Expand All @@ -147,9 +152,27 @@ public void run() {
final String instanceContents;

try {
instanceContents =
LocalFileReferenceResolver.resolveFileReferencesInYAMLFile(
instanceFile.toPath());
if (instanceFile != null) {

instanceContents =
LocalFileReferenceResolver.resolveFileReferencesInYAMLFile(
instanceFile.toPath());
} else {
if (startBroker) {
instanceContents =
"instance:\n"
+ " streamingCluster:\n"
+ " type: \"kafka\"\n"
+ " configuration:\n"
+ " admin:\n"
+ " bootstrap.servers: localhost:9092";
log(
"Using default instance file that connects to the Kafka broker inside the docker container");
} else {
throw new IllegalArgumentException(
"instance file is required if broker is not started");
}
}
} catch (Exception e) {
log(
"Failed to resolve instance file references. Please double check the file path: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ public SerializedApplicationInstance(
this.instance = applicationInstance.getInstance();
this.gateways = applicationInstance.getGateways();
this.agentRunners = new HashMap<>();
log.info(
"Serializing application instance {} executionPlan {}",
applicationInstance,
executionPlan);
if (log.isDebugEnabled()) {
log.debug(
"Serializing application instance {} executionPlan {}",
applicationInstance,
executionPlan);
}
if (executionPlan != null && executionPlan.getAgents() != null) {
for (Map.Entry<String, AgentNode> entry : executionPlan.getAgents().entrySet()) {
AgentNode agentNode = entry.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,10 +248,12 @@ private void collectAgentCustomResourceAndSecret(
StreamingClusterRuntime streamingClusterRuntime,
ExecutionPlan applicationInstance,
String codeStorageArchiveId) {
log.info(
"Building configuration for Agent {}, codeStorageArchiveId {}",
agent,
codeStorageArchiveId);
if (log.isDebugEnabled()) {
log.debug(
"Building configuration for Agent {}, codeStorageArchiveId {}",
agent,
codeStorageArchiveId);
}
if (!(agent instanceof DefaultAgentNode defaultAgentImplementation)) {
throw new UnsupportedOperationException(
"Only default agent implementations are supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,14 +147,16 @@ public void run(
Runnable beforeStopSource,
boolean startHttpServer)
throws Exception {
log.info("Pod Configuration {}", configuration);
if (log.isDebugEnabled()) {
log.debug("Pod Configuration {}", configuration);
}

// agentId is the identity of the agent in the cluster
// it is shared by all the instances of the agent
String agentId =
configuration.agent().applicationId() + "-" + configuration.agent().agentId();

log.info("Starting agent {} with configuration {}", agentId, configuration.agent());
log.info("Starting agent {}", agentId);

List<URL> customLibClasspath = buildCustomLibClasspath(codeDirectory);
try (NarFileHandler narFileHandler =
Expand Down Expand Up @@ -915,7 +917,10 @@ public PermanentFailureException(Throwable cause) {
private static AgentCodeAndLoader initAgent(
RuntimePodConfiguration configuration, AgentCodeRegistry agentCodeRegistry)
throws Exception {
log.info("Bootstrapping agent with configuration {}", configuration.agent());
log.info(
"Bootstrapping agent {} type {}",
configuration.agent().agentId(),
configuration.agent().agentType());
return initAgent(
configuration.agent().agentId(),
configuration.agent().agentType(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
# limitations under the License.
#

echo "Hosts:"
cat /etc/hosts

START_KAFKA=${START_BROKER:-true}
if [ "$START_BROKER" = "true" ]; then
echo "Starting Broker"
Expand All @@ -30,4 +27,4 @@ if [ "$START_MINIO" = "true" ]; then
/minio/minio server /tmp &
fi

exec java ${JAVA_OPTS} -Djdk.lang.Process.launchMechanism=vfork -cp "/app/lib/*:/app/tester/lib/*" "ai.langstream.runtime.tester.Main"
exec java ${JAVA_OPTS} -Dlogging.config=/app/logback.xml -Djdk.lang.Process.launchMechanism=vfork -cp "/app/lib/*:/app/tester/lib/*" "ai.langstream.runtime.tester.Main"
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@
<lineEnding>unix</lineEnding>
<fileMode>755</fileMode>
</fileSet>
<fileSet>
<directory>src/main/assemble/</directory>
<outputDirectory>.</outputDirectory>
<includes>
<include>logback.xml</include>
</includes>
<lineEnding>unix</lineEnding>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?xml version="1.0" encoding="UTF-8" ?>
<!--
Copyright DataStax, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<!DOCTYPE configuration>
<configuration>
<import class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"/>
<import class="ch.qos.logback.core.ConsoleAppender"/>

<appender name="STDOUT" class="ConsoleAppender">
<encoder class="PatternLayoutEncoder">
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} -%kvp- %msg%n</pattern>
</encoder>
</appender>

<root level="info">
<appender-ref ref="STDOUT"/>
</root>

<!-- shutdown noisy loggers -->
<logger name="org.apache.kafka.clients" level="WARN"/>

</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ RUN mkdir /minio \
ADD maven/lib /app/tester/lib
RUN rm -f /app/tester/lib/*netty*
ADD maven/entrypoint.sh /app/entrypoint.sh
ADD maven/logback.xml /app/logback.xml

WORKDIR /app

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@
import ai.langstream.runtime.agent.AgentRunner;
import ai.langstream.runtime.agent.api.AgentInfo;
import ai.langstream.runtime.api.agent.RuntimePodConfiguration;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import io.fabric8.kubernetes.api.model.Secret;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -44,11 +49,14 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;

@Slf4j
public class LocalApplicationRunner
implements AutoCloseable, InMemoryApplicationStore.AgentInfoCollector {

private static final ObjectMapper MAPPER = new ObjectMapper(new YAMLFactory());

final KubeTestServer kubeServer = new KubeTestServer();
final InMemoryApplicationStore applicationStore = new InMemoryApplicationStore();
final ApplicationDeployer applicationDeployer;
Expand Down Expand Up @@ -162,11 +170,13 @@ public AgentRunResult executeAgentRunners(ApplicationRuntime runtime, List<Strin
RuntimePodConfiguration runtimePodConfiguration =
AgentResourcesFactory
.readRuntimePodConfigurationFromSecret(secret);
log.info(
"{} Pod configuration {} = {}",
runnerExecutionId,
key,
runtimePodConfiguration);
if (log.isDebugEnabled()) {
log.debug(
"{} Pod configuration {} = {}",
runnerExecutionId,
key,
runtimePodConfiguration);
}
pods.add(runtimePodConfiguration);
} else {
log.info("Agent {} won't be executed", key);
Expand All @@ -177,6 +187,7 @@ public AgentRunResult executeAgentRunners(ApplicationRuntime runtime, List<Strin
ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture> futures = new ArrayList<>();
for (RuntimePodConfiguration podConfiguration : pods) {
Path podRuntimeConfigurationFile = persistPodConfiguration(podConfiguration);
CompletableFuture<?> handle = new CompletableFuture<>();
futures.add(handle);
executorService.submit(
Expand All @@ -196,7 +207,7 @@ public AgentRunResult executeAgentRunners(ApplicationRuntime runtime, List<Strin
allAgentsInfo.put(podConfiguration.agent().agentId(), agentInfo);
AgentRunner.runAgent(
podConfiguration,
null,
podRuntimeConfigurationFile,
codeDirectory,
agentsDirectory,
agentInfo,
Expand Down Expand Up @@ -248,6 +259,16 @@ public AgentRunResult executeAgentRunners(ApplicationRuntime runtime, List<Strin
return new AgentRunResult(allAgentsInfo);
}

@NotNull
private static Path persistPodConfiguration(RuntimePodConfiguration podConfiguration)
throws IOException {
Path podRuntimeConfigurationFile = Files.createTempFile("podruntime", ".yaml");
try (OutputStream out = Files.newOutputStream(podRuntimeConfigurationFile)) {
MAPPER.writeValue(out, podConfiguration);
}
return podRuntimeConfigurationFile;
}

public void close() {
continueLoop.set(false);

Expand Down
Loading

0 comments on commit efcdbca

Please sign in to comment.