diff --git a/examples/applications/vertexai-text-completions/pipeline.yaml b/examples/applications/vertexai-text-completions/pipeline.yaml index 5db608388..0bfd73bb3 100644 --- a/examples/applications/vertexai-text-completions/pipeline.yaml +++ b/examples/applications/vertexai-text-completions/pipeline.yaml @@ -34,5 +34,6 @@ pipeline: completion-field: "value.answer" # we are also logging the prompt we sent to the LLM log-field: "value.prompt" + max-tokens: 20 prompt: - "{{% value.question}}" diff --git a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/OpenAICompletionService.java b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/OpenAICompletionService.java index 4aa3f6f0a..ec3a0be79 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/OpenAICompletionService.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/OpenAICompletionService.java @@ -231,10 +231,11 @@ public CompletableFuture getTextCompletions( // this is the default behavior, as it is async // it works even if the streamingChunksConsumer is null + final String model = (String) options.get("model"); if (completionsOptions.isStream()) { CompletableFuture finished = new CompletableFuture<>(); Flux flux = - client.getCompletionsStream((String) options.get("model"), completionsOptions); + client.getCompletionsStream(model, completionsOptions); TextCompletionsConsumer textCompletionsConsumer = new TextCompletionsConsumer( @@ -253,8 +254,7 @@ public CompletableFuture getTextCompletions( return finished.thenApply(___ -> textCompletionsConsumer.totalAnswer.toString()); } else { com.azure.ai.openai.models.Completions completions = - client.getCompletions((String) options.get("model"), completionsOptions) - .block(); + client.getCompletions(model, completionsOptions).block(); final String text = completions.getChoices().get(0).getText(); return CompletableFuture.completedFuture(text); } diff --git a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/VertexAIProvider.java b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/VertexAIProvider.java index 1fa1b46b2..0f32a894b 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/VertexAIProvider.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/ai/langstream/ai/agents/services/impl/VertexAIProvider.java @@ -16,6 +16,7 @@ package ai.langstream.ai.agents.services.impl; import ai.langstream.ai.agents.services.ServiceProviderProvider; +import ai.langstream.api.util.ConfigurationUtils; import com.datastax.oss.streaming.ai.completions.ChatChoice; import com.datastax.oss.streaming.ai.completions.ChatCompletions; import com.datastax.oss.streaming.ai.completions.ChatMessage; @@ -315,19 +316,33 @@ private void appendRequestParameters( Map additionalConfiguration, CompletionRequest request) { request.parameters = new HashMap<>(); - if (additionalConfiguration.containsKey("temperature")) { - request.parameters.put( - "temperature", additionalConfiguration.get("temperature")); - } - if (additionalConfiguration.containsKey("max-tokens")) { - request.parameters.put( - "maxOutputTokens", additionalConfiguration.get("max-tokens")); - } - if (additionalConfiguration.containsKey("topP")) { - request.parameters.put("topP", additionalConfiguration.get("topP")); + appendDoubleValue("temperature", "temperature", additionalConfiguration, request); + appendIntValue("max-tokens", "maxOutputTokens", additionalConfiguration, request); + appendDoubleValue("topP", "topP", additionalConfiguration, request); + appendIntValue("topK", "topK", additionalConfiguration, request); + } + + private void appendDoubleValue( + String key, + String toKey, + Map additionalConfiguration, + CompletionRequest request) { + final Double typedValue = + ConfigurationUtils.getDouble(key, null, additionalConfiguration); + if (typedValue != null) { + request.parameters.put(toKey, typedValue); } - if (additionalConfiguration.containsKey("topK")) { - request.parameters.put("topK", additionalConfiguration.get("topK")); + } + + private void appendIntValue( + String key, + String toKey, + Map additionalConfiguration, + CompletionRequest request) { + final Integer typedValue = + ConfigurationUtils.getInteger(key, null, additionalConfiguration); + if (typedValue != null) { + request.parameters.put(toKey, typedValue); } } diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/ChatCompletionsStep.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/ChatCompletionsStep.java index c0912fb1c..cab1cd8b3 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/ChatCompletionsStep.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/ChatCompletionsStep.java @@ -17,7 +17,6 @@ import static com.datastax.oss.streaming.ai.util.TransformFunctionUtil.convertToMap; -import com.azure.ai.openai.models.ChatCompletionsOptions; import com.datastax.oss.streaming.ai.completions.ChatChoice; import com.datastax.oss.streaming.ai.completions.ChatCompletions; import com.datastax.oss.streaming.ai.completions.ChatMessage; @@ -121,21 +120,8 @@ public CompletableFuture processAsync(TransformContext transformContext) { .execute(jsonRecord))) .collect(Collectors.toList()); - ChatCompletionsOptions chatCompletionsOptions = - new ChatCompletionsOptions(List.of()) - .setMaxTokens(config.getMaxTokens()) - .setTemperature(config.getTemperature()) - .setTopP(config.getTopP()) - .setLogitBias(config.getLogitBias()) - .setStream(config.isStream()) - .setUser(config.getUser()) - .setStop(config.getStop()) - .setPresencePenalty(config.getPresencePenalty()) - .setFrequencyPenalty(config.getFrequencyPenalty()); - Map options = convertToMap(chatCompletionsOptions); - options.put("model", config.getModel()); + Map options = convertToMap(config); options.put("min-chunks-per-message", config.getMinChunksPerMessage()); - options.remove("messages"); CompletableFuture chatCompletionsHandle = completionsService.getChatCompletions( diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/TextCompletionsStep.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/TextCompletionsStep.java index 304080a4d..e666b83bd 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/TextCompletionsStep.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/TextCompletionsStep.java @@ -17,7 +17,6 @@ import static com.datastax.oss.streaming.ai.util.TransformFunctionUtil.convertToMap; -import com.azure.ai.openai.models.CompletionsOptions; import com.datastax.oss.streaming.ai.completions.Chunk; import com.datastax.oss.streaming.ai.completions.CompletionsService; import com.datastax.oss.streaming.ai.model.JsonRecord; @@ -87,21 +86,8 @@ public CompletableFuture processAsync(TransformContext transformContext) { .map(p -> messageTemplates.get(p).execute(jsonRecord)) .collect(Collectors.toList()); - CompletionsOptions completionsOptions = - new CompletionsOptions(List.of()) - .setMaxTokens(config.getMaxTokens()) - .setTemperature(config.getTemperature()) - .setTopP(config.getTopP()) - .setLogitBias(config.getLogitBias()) - .setStream(config.isStream()) - .setUser(config.getUser()) - .setStop(config.getStop()) - .setPresencePenalty(config.getPresencePenalty()) - .setFrequencyPenalty(config.getFrequencyPenalty()); - Map options = convertToMap(completionsOptions); - options.put("model", config.getModel()); + final Map options = convertToMap(config); options.put("min-chunks-per-message", config.getMinChunksPerMessage()); - options.remove("messages"); CompletableFuture chatCompletionsHandle = completionsService.getTextCompletions( diff --git a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/services/ServiceProvider.java b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/services/ServiceProvider.java index 920244703..7cde4d181 100644 --- a/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/services/ServiceProvider.java +++ b/langstream-agents/langstream-ai-agents/src/main/java/com/datastax/oss/streaming/ai/services/ServiceProvider.java @@ -30,7 +30,7 @@ EmbeddingsService getEmbeddingsService(Map additionalConfigurati void close(); - public static class NoopServiceProvider implements ServiceProvider { + class NoopServiceProvider implements ServiceProvider { @Override public CompletionsService getCompletionsService( Map additionalConfiguration) { diff --git a/langstream-agents/langstream-ai-agents/src/test/java/com/datastax/oss/pulsar/functions/transforms/GenAITest.java b/langstream-agents/langstream-ai-agents/src/test/java/com/datastax/oss/pulsar/functions/transforms/GenAITest.java index c2097b2ee..0485dbe77 100644 --- a/langstream-agents/langstream-ai-agents/src/test/java/com/datastax/oss/pulsar/functions/transforms/GenAITest.java +++ b/langstream-agents/langstream-ai-agents/src/test/java/com/datastax/oss/pulsar/functions/transforms/GenAITest.java @@ -284,8 +284,14 @@ void testChatCompletionsWithLogField() throws Exception { Utils.getRecord(messageSchema.getValueSchema(), (byte[]) messageValue.getValue()); assertEquals("result", valueAvroRecord.get("completion").toString()); assertEquals( - valueAvroRecord.get("log").toString(), - "{\"options\":{\"max_tokens\":null,\"temperature\":null,\"top_p\":null,\"logit_bias\":null,\"user\":null,\"n\":null,\"stop\":null,\"presence_penalty\":null,\"frequency_penalty\":null,\"stream\":true,\"model\":\"test-model\",\"functions\":null,\"function_call\":null,\"dataSources\":null,\"min-chunks-per-message\":20},\"messages\":[{\"role\":\"user\",\"content\":\"value1 key2\"}],\"model\":\"test-model\"}"); + "{\"options\":{\"type\":\"ai-chat-completions\",\"when\":null,\"model\":\"test-model\"," + + "\"messages\":[{\"role\":\"user\",\"content\":\"{{ value.valueField1 }} {{ key.keyField2 }}\"}]," + + "\"stream-to-topic\":null,\"stream-response-completion-field\":null,\"min-chunks-per-message\":20," + + "\"completion-field\":\"value.completion\",\"stream\":true,\"log-field\":\"value.log\"," + + "\"max-tokens\":null,\"temperature\":null,\"top-p\":null,\"logit-bias\":null,\"user\":null," + + "\"stop\":null,\"presence-penalty\":null,\"frequency-penalty\":null}," + + "\"messages\":[{\"role\":\"user\",\"content\":\"value1 key2\"}],\"model\":\"test-model\"}", + valueAvroRecord.get("log").toString()); } @Test diff --git a/langstream-core/src/main/java/ai/langstream/impl/agents/ai/GenAIToolKitFunctionAgentProvider.java b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/GenAIToolKitFunctionAgentProvider.java index c16a19a42..c88852e85 100644 --- a/langstream-core/src/main/java/ai/langstream/impl/agents/ai/GenAIToolKitFunctionAgentProvider.java +++ b/langstream-core/src/main/java/ai/langstream/impl/agents/ai/GenAIToolKitFunctionAgentProvider.java @@ -403,8 +403,6 @@ private void generateAIProvidersConfiguration( } } else { for (Resource resource : applicationInstance.getResources().values()) { - Map configurationCopy = - clusterRuntime.getResourceImplementation(resource, pluginsRegistry); final String configKey = switch (resource.type()) { case SERVICE_VERTEX -> "vertex"; @@ -413,6 +411,8 @@ private void generateAIProvidersConfiguration( default -> null; }; if (configKey != null) { + Map configurationCopy = + clusterRuntime.getResourceImplementation(resource, pluginsRegistry); configuration.put(configKey, configurationCopy); } } diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/ChatCompletionsIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/ChatCompletionsIT.java index 7f45cec11..14a3c25ee 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/ChatCompletionsIT.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/ChatCompletionsIT.java @@ -15,6 +15,8 @@ */ package ai.langstream.tests; +import static ai.langstream.tests.TextCompletionsIT.getAppEnvForAIServiceProvider; + import ai.langstream.tests.util.BaseEndToEndTest; import ai.langstream.tests.util.ConsumeGatewayMessage; import java.util.List; @@ -36,13 +38,10 @@ public class ChatCompletionsIT extends BaseEndToEndTest { @BeforeAll public static void checkCredentials() { - appEnv = + appEnv = getAppEnvForAIServiceProvider(); + appEnv.putAll( getAppEnvMapFromSystem( - List.of( - "OPEN_AI_ACCESS_KEY", - "OPEN_AI_URL", - "OPEN_AI_CHAT_COMPLETIONS_MODEL", - "OPEN_AI_PROVIDER")); + List.of("CHAT_COMPLETIONS_MODEL", "CHAT_COMPLETIONS_SERVICE"))); } @Test diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/TextCompletionsIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/TextCompletionsIT.java index 693e5327a..6662aafae 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/TextCompletionsIT.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/TextCompletionsIT.java @@ -36,24 +36,25 @@ public class TextCompletionsIT extends BaseEndToEndTest { @BeforeAll public static void checkCredentials() { + appEnv = getAppEnvForAIServiceProvider(); + appEnv.putAll( + getAppEnvMapFromSystem( + List.of("TEXT_COMPLETIONS_MODEL", "TEXT_COMPLETIONS_SERVICE"))); + } + + public static Map getAppEnvForAIServiceProvider() { try { - appEnv = - getAppEnvMapFromSystem( - List.of("OPEN_AI_ACCESS_KEY", "OPEN_AI_URL", "OPEN_AI_PROVIDER")); + return getAppEnvMapFromSystem( + List.of("OPEN_AI_ACCESS_KEY", "OPEN_AI_URL", "OPEN_AI_PROVIDER")); } catch (Throwable t) { // no openai - try vertex - appEnv = - getAppEnvMapFromSystem( - List.of( - "VERTEX_AI_URL", - "VERTEX_AI_TOKEN", - "VERTEX_AI_REGION", - "VERTEX_AI_PROJECT")); + return getAppEnvMapFromSystem( + List.of( + "VERTEX_AI_URL", + "VERTEX_AI_TOKEN", + "VERTEX_AI_REGION", + "VERTEX_AI_PROJECT")); } - - appEnv.putAll( - getAppEnvMapFromSystem( - List.of("TEXT_COMPLETIONS_MODEL", "TEXT_COMPLETIONS_SERVICE"))); } @Test @@ -80,6 +81,6 @@ public void test() throws Exception { .formatted(sessionId) .split(" ")); log.info("Output: {}", message); - Assertions.assertTrue(message.getAnswerFromChatCompletionsValue().contains("Bounjour")); + Assertions.assertTrue(message.getAnswerFromChatCompletionsValue().contains("Bonjour")); } } diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/WebCrawlerToVectorIT.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/WebCrawlerToVectorIT.java index 54f281591..cea83df7e 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/WebCrawlerToVectorIT.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/WebCrawlerToVectorIT.java @@ -15,6 +15,8 @@ */ package ai.langstream.tests; +import static ai.langstream.tests.TextCompletionsIT.getAppEnvForAIServiceProvider; + import ai.langstream.tests.util.BaseEndToEndTest; import ai.langstream.tests.util.ConsumeGatewayMessage; import java.util.List; @@ -36,20 +38,20 @@ public class WebCrawlerToVectorIT extends BaseEndToEndTest { @BeforeAll public static void checkCredentials() { - appEnv = + appEnv = getAppEnvForAIServiceProvider(); + appEnv.putAll( + getAppEnvMapFromSystem( + List.of("CHAT_COMPLETIONS_MODEL", "CHAT_COMPLETIONS_SERVICE"))); + appEnv.putAll(getAppEnvMapFromSystem(List.of("EMBEDDINGS_MODEL", "EMBEDDINGS_SERVICE"))); + + appEnv.putAll( getAppEnvMapFromSystem( List.of( - "OPEN_AI_ACCESS_KEY", - "OPEN_AI_URL", - "OPEN_AI_EMBEDDINGS_MODEL", - "OPEN_AI_CHAT_COMPLETIONS_MODEL", - "OPEN_AI_PROVIDER", "ASTRA_TOKEN", "ASTRA_CLIENT_ID", "ASTRA_SECRET", - "ASTRA_SECURE_BUNDLE", "ASTRA_ENVIRONMENT", - "ASTRA_DATABASE")); + "ASTRA_DATABASE"))); } @Test diff --git a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java index ad52fedd8..7dec10a46 100644 --- a/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java +++ b/langstream-e2e-tests/src/test/java/ai/langstream/tests/util/BaseEndToEndTest.java @@ -526,6 +526,9 @@ private static KubeCluster getKubeCluster() { public void setupSingleTest() { // cleanup previous runs cleanupAllEndToEndTestsNamespaces(); + codeStorageProvider.cleanup(); + streamingClusterProvider.cleanup(); + namespace = "ls-test-" + UUID.randomUUID().toString().substring(0, 8); client.resource( @@ -1132,7 +1135,6 @@ private static void deployLocalApplicationAndAwaitReady( .pollInterval(5, TimeUnit.SECONDS) .untilAsserted( () -> { - log.info("waiting new executors to be ready"); final List pods = client.pods() .inNamespace(tenantNamespace) @@ -1144,6 +1146,10 @@ private static void deployLocalApplicationAndAwaitReady( "langstream-runtime")) .list() .getItems(); + log.info( + "waiting new executors to be ready, found {}, expected {}", + pods.size(), + expectedNumExecutors); if (pods.size() != expectedNumExecutors) { fail("too many pods: " + pods.size()); } diff --git a/langstream-e2e-tests/src/test/resources/apps/chat-completions/configuration.yaml b/langstream-e2e-tests/src/test/resources/apps/chat-completions/configuration.yaml index 502376f20..79c593070 100644 --- a/langstream-e2e-tests/src/test/resources/apps/chat-completions/configuration.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/chat-completions/configuration.yaml @@ -18,8 +18,17 @@ configuration: resources: - type: "open-ai-configuration" + id: "open-ai" name: "OpenAI Azure configuration" configuration: url: "{{ secrets.open-ai.url }}" access-key: "{{ secrets.open-ai.access-key }}" provider: "{{ secrets.open-ai.provider }}" + - type: "vertex-configuration" + name: "Google Vertex AI configuration" + id: "vertex" + configuration: + url: "{{ secrets.vertex-ai.url }}" + token: "{{ secrets.vertex-ai.token }}" + region: "{{ secrets.vertex-ai.region }}" + project: "{{ secrets.vertex-ai.project }}" diff --git a/langstream-e2e-tests/src/test/resources/apps/chat-completions/pipeline.yaml b/langstream-e2e-tests/src/test/resources/apps/chat-completions/pipeline.yaml index 8c73a9e10..7810c27ae 100644 --- a/langstream-e2e-tests/src/test/resources/apps/chat-completions/pipeline.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/chat-completions/pipeline.yaml @@ -31,12 +31,14 @@ pipeline: type: "ai-chat-completions" output: "ls-test-history-topic" configuration: - model: "{{{secrets.open-ai.chat-completions-model}}}" + ai-service: "{{{secrets.chat-completions.service}}}" + model: "{{{secrets.chat-completions.model}}}" completion-field: "value.answer" log-field: "value.prompt" stream-to-topic: "ls-test-output-topic" stream-response-completion-field: "value" min-chunks-per-message: 20 + max-tokens: 20 messages: - role: user content: "You are an helpful assistant. Below you can fine a question from the user. Please try to help them the best way you can.\n\n{{% value.question}}" diff --git a/langstream-e2e-tests/src/test/resources/apps/text-completions/configuration.yaml b/langstream-e2e-tests/src/test/resources/apps/text-completions/configuration.yaml index 32ee4534d..79c593070 100644 --- a/langstream-e2e-tests/src/test/resources/apps/text-completions/configuration.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/text-completions/configuration.yaml @@ -18,6 +18,7 @@ configuration: resources: - type: "open-ai-configuration" + id: "open-ai" name: "OpenAI Azure configuration" configuration: url: "{{ secrets.open-ai.url }}" diff --git a/langstream-e2e-tests/src/test/resources/apps/text-completions/pipeline.yaml b/langstream-e2e-tests/src/test/resources/apps/text-completions/pipeline.yaml index 2f78d66f5..2efa6ac93 100644 --- a/langstream-e2e-tests/src/test/resources/apps/text-completions/pipeline.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/text-completions/pipeline.yaml @@ -38,5 +38,6 @@ pipeline: stream-to-topic: "ls-test-output-topic" stream-response-completion-field: "value" min-chunks-per-message: 20 + max-tokens: 20 prompt: - "{{% value.question}}" diff --git a/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/chatbot.yaml b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/chatbot.yaml index 40cb9128d..f6c8aff53 100644 --- a/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/chatbot.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/chatbot.yaml @@ -50,7 +50,8 @@ pipeline: type: "ai-chat-completions" configuration: - model: "{{{secrets.open-ai.chat-completions-model}}}" # This needs to be set to the model deployment name, not the base name + ai-service: "{{{secrets.chat-completions.service}}}" + model: "{{{secrets.chat-completions.model}}}" # This needs to be set to the model deployment name, not the base name # on the ls-test-log-topic we add a field with the answer completion-field: "value.answer" # we are also logging the prompt we sent to the LLM diff --git a/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/configuration.yaml b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/configuration.yaml index 525e68629..429ba9e7d 100644 --- a/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/configuration.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/configuration.yaml @@ -18,6 +18,7 @@ configuration: resources: - type: "open-ai-configuration" + id: "open-ai" name: "OpenAI Azure configuration" configuration: url: "{{ secrets.open-ai.url }}" @@ -29,7 +30,14 @@ configuration: service: "astra" clientId: "{{{ secrets.astra.clientId }}}" secret: "{{{ secrets.astra.secret }}}" - secureBundle: "{{{ secrets.astra.secureBundle }}}" database: "{{{ secrets.astra.database }}}" token: "{{{ secrets.astra.token }}}" - environment: "{{{ secrets.astra.environment }}}" \ No newline at end of file + environment: "{{{ secrets.astra.environment }}}" + - type: "vertex-configuration" + name: "Google Vertex AI configuration" + id: "vertex" + configuration: + url: "{{ secrets.vertex-ai.url }}" + token: "{{ secrets.vertex-ai.token }}" + region: "{{ secrets.vertex-ai.region }}" + project: "{{ secrets.vertex-ai.project }}" \ No newline at end of file diff --git a/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/crawler.yaml b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/crawler.yaml index 248f1abfa..1a6de0357 100644 --- a/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/crawler.yaml +++ b/langstream-e2e-tests/src/test/resources/apps/webcrawler-to-vector/crawler.yaml @@ -89,7 +89,8 @@ pipeline: type: "compute-ai-embeddings" output: "ls-test-chunks-topic" configuration: - model: "{{{secrets.open-ai.embeddings-model}}}" + ai-service: "{{{secrets.embeddings.service}}}" + model: "{{{secrets.embeddings.model}}}" embeddings-field: "value.embeddings_vector" text: "{{% value.text }}" batch-size: 10 diff --git a/langstream-e2e-tests/src/test/resources/secrets/secret1.yaml b/langstream-e2e-tests/src/test/resources/secrets/secret1.yaml index bd6f6e472..f8a31879a 100644 --- a/langstream-e2e-tests/src/test/resources/secrets/secret1.yaml +++ b/langstream-e2e-tests/src/test/resources/secrets/secret1.yaml @@ -25,6 +25,16 @@ secrets: data: model: "${TEXT_COMPLETIONS_MODEL}" service: "${TEXT_COMPLETIONS_SERVICE}" + - name: chat-completions + id: chat-completions + data: + model: "${CHAT_COMPLETIONS_MODEL}" + service: "${CHAT_COMPLETIONS_SERVICE}" + - name: embeddings + id: embeddings + data: + model: "${EMBEDDINGS_MODEL}" + service: "${EMBEDDINGS_SERVICE}" - name: cassandra id: cassandra @@ -51,7 +61,6 @@ secrets: secret: "${ASTRA_SECRET}" token: "${ASTRA_TOKEN}" database: "${ASTRA_DATABASE}" - secureBundle: "${ASTRA_SECURE_BUNDLE}" environment: "${ASTRA_ENVIRONMENT}" - id: kafka data: diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java index a87ffc680..70e15d8a8 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/AbstractApplicationRunner.java @@ -250,6 +250,8 @@ protected List waitForMessages(KafkaConsumer consumer, List e } else if (expectedValue instanceof byte[]) { assertArrayEquals((byte[]) expectedValue, (byte[]) actualValue); } else { + log.info("expected: {}", expectedValue); + log.info("got: {}", actualValue); assertEquals(expectedValue, actualValue); } } diff --git a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ChatCompletionsIT.java b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ChatCompletionsIT.java index 7121b99b9..d47e411e2 100644 --- a/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ChatCompletionsIT.java +++ b/langstream-runtime/langstream-runtime-impl/src/test/java/ai/langstream/kafka/ChatCompletionsIT.java @@ -157,13 +157,13 @@ tenant, appId, application, buildInstanceYaml(), expectedAgents)) { log.info("Topics {}", topics); assertTrue(topics.contains(applicationRuntime.getGlobal("input-topic"))); assertTrue(topics.contains(applicationRuntime.getGlobal("output-topic"))); - assertTrue(topics.contains(applicationRuntime.getGlobal("stream-topic"))); + final String streamToTopic = applicationRuntime.getGlobal("stream-topic"); + assertTrue(topics.contains(streamToTopic)); try (KafkaProducer producer = createProducer(); KafkaConsumer consumer = createConsumer(applicationRuntime.getGlobal("output-topic")); - KafkaConsumer streamConsumer = - createConsumer(applicationRuntime.getGlobal("stream-topic"))) { + KafkaConsumer streamConsumer = createConsumer(streamToTopic)) { // produce one message to the input-topic // simulate a session-id header @@ -182,7 +182,9 @@ tenant, appId, application, buildInstanceYaml(), expectedAgents)) { waitForMessages( consumer, List.of( - "{\"question\":\"the car\",\"session-id\":\"2139847128764192\",\"answer\":\"A car is a vehicle\",\"prompt\":\"{\\\"options\\\":{\\\"max_tokens\\\":null,\\\"temperature\\\":null,\\\"top_p\\\":null,\\\"logit_bias\\\":null,\\\"user\\\":null,\\\"n\\\":null,\\\"stop\\\":null,\\\"presence_penalty\\\":null,\\\"frequency_penalty\\\":null,\\\"stream\\\":true,\\\"model\\\":\\\"gpt-35-turbo\\\",\\\"functions\\\":null,\\\"function_call\\\":null,\\\"dataSources\\\":null,\\\"min-chunks-per-message\\\":3},\\\"messages\\\":[{\\\"role\\\":\\\"user\\\",\\\"content\\\":\\\"What can you tell me about the car ?\\\"}],\\\"model\\\":\\\"gpt-35-turbo\\\"}\"}")); + """ + {"question":"the car","session-id":"2139847128764192","answer":"A car is a vehicle","prompt":"{\\"options\\":{\\"type\\":\\"ai-chat-completions\\",\\"when\\":null,\\"model\\":\\"gpt-35-turbo\\",\\"messages\\":[{\\"role\\":\\"user\\",\\"content\\":\\"What can you tell me about {{ value.question}} ?\\"}],\\"stream-to-topic\\":\\"%s\\",\\"stream-response-completion-field\\":\\"value\\",\\"min-chunks-per-message\\":3,\\"completion-field\\":\\"value.answer\\",\\"stream\\":true,\\"log-field\\":\\"value.prompt\\",\\"max-tokens\\":null,\\"temperature\\":null,\\"top-p\\":null,\\"logit-bias\\":null,\\"user\\":null,\\"stop\\":null,\\"presence-penalty\\":null,\\"frequency-penalty\\":null},\\"messages\\":[{\\"role\\":\\"user\\",\\"content\\":\\"What can you tell me about the car ?\\"}],\\"model\\":\\"gpt-35-turbo\\"}"}""" + .formatted(streamToTopic))); ConsumerRecord record = mainOutputRecords.get(0); assertNull(record.headers().lastHeader("stream-id"));