From 842aa28eeb161e601464dc77d055d4f09185d0ce Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 13 Sep 2023 15:26:30 +0200 Subject: [PATCH 1/2] [agentrunner] Split the docker image and clean up the logs --- .github/workflows/release.yml | 1 + docker/build.sh | 2 + .../webcrawler-source/crawler.yaml | 2 + .../pom.xml | 77 +++++ .../src/main/docker/Dockerfile | 53 +++ .../langstream-runtime-impl/pom.xml | 97 ------ .../src/main/docker/Dockerfile | 37 +-- .../langstream/runtime/agent/AgentRunner.java | 301 +++++++++++------- .../runtime/agent/nar/NarFileHandler.java | 20 +- langstream-runtime/pom.xml | 1 + 10 files changed, 328 insertions(+), 263 deletions(-) create mode 100644 langstream-runtime/langstream-runtime-base-docker-image/pom.xml create mode 100644 langstream-runtime/langstream-runtime-base-docker-image/src/main/docker/Dockerfile diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 35501880f..afb6460d3 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -49,6 +49,7 @@ jobs: docker tag langstream/$image:latest-dev $repo/$image:$docker_tag docker push $repo/$image:$docker_tag } + tag_and_push langstream-runtime-base tag_and_push langstream-runtime tag_and_push langstream-cli tag_and_push langstream-deployer diff --git a/docker/build.sh b/docker/build.sh index 73a2c434b..13969472b 100755 --- a/docker/build.sh +++ b/docker/build.sh @@ -41,6 +41,8 @@ if [ "$only_image" == "control-plane" ]; then build_docker_image langstream-webservice elif [ "$only_image" == "operator" ] || [ "$only_image" == "deployer" ]; then build_docker_image langstream-k8s-deployer/langstream-k8s-deployer-operator +elif [ "$only_image" == "runtime-base-docker-image" ]; then + build_docker_image langstream-runtime/langstream-runtime-base-docker-image elif [ "$only_image" == "runtime" ]; then build_docker_image langstream-runtime/langstream-runtime-impl elif [ "$only_image" == "runtime-tester" ]; then diff --git a/examples/applications/webcrawler-source/crawler.yaml b/examples/applications/webcrawler-source/crawler.yaml index 7cfb117f5..9b5d7b03c 100644 --- a/examples/applications/webcrawler-source/crawler.yaml +++ b/examples/applications/webcrawler-source/crawler.yaml @@ -18,6 +18,8 @@ name: "Crawl a website" topics: - name: "chunks-topic" creation-mode: create-if-not-exists +resources: + size: 2 pipeline: - name: "Crawl the WebSite" type: "webcrawler-source" diff --git a/langstream-runtime/langstream-runtime-base-docker-image/pom.xml b/langstream-runtime/langstream-runtime-base-docker-image/pom.xml new file mode 100644 index 000000000..38c96005f --- /dev/null +++ b/langstream-runtime/langstream-runtime-base-docker-image/pom.xml @@ -0,0 +1,77 @@ + + + + + langstream-runtime + ai.langstream + 0.0.17-SNAPSHOT + + 4.0.0 + + langstream-runtime-base-docker-image + + + linux/amd64 + false + + + + + + docker + + + docker + + + + + + io.fabric8 + docker-maven-plugin + + true + + + langstream/langstream-runtime-base:latest-dev + + ${project.basedir}/src/main/docker/Dockerfile + + + ${docker.platforms} + + + + + + + + + package + + build + + + + + + + + + diff --git a/langstream-runtime/langstream-runtime-base-docker-image/src/main/docker/Dockerfile b/langstream-runtime/langstream-runtime-base-docker-image/src/main/docker/Dockerfile new file mode 100644 index 000000000..d4d12cbed --- /dev/null +++ b/langstream-runtime/langstream-runtime-base-docker-image/src/main/docker/Dockerfile @@ -0,0 +1,53 @@ +# +# +# Copyright DataStax, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM ubuntu:22.04 + +ARG DEBIAN_FRONTEND=noninteractive + +# Install some utilities +RUN echo 'Acquire::http::Timeout "30";\nAcquire::ftp::Timeout "30";\nAcquire::Retries "3";' > /etc/apt/apt.conf.d/99timeout_and_retries \ + && apt-get update \ + && apt-get -y dist-upgrade \ + && apt-get -y install --no-install-recommends vim netcat dnsutils less procps net-tools iputils-ping unzip \ + curl ca-certificates wget apt-transport-https software-properties-common gpg-agent + +# Install Python3.11 +RUN apt-get update && add-apt-repository ppa:deadsnakes/ppa \ + && apt-get -y install --no-install-recommends python3.11-full \ + && update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.11 1 \ + && python3 -m ensurepip && python3 -m pip install pipenv + + +# Install Eclipse Temurin Package +RUN mkdir -p /etc/apt/keyrings \ + && wget -O - https://packages.adoptium.net/artifactory/api/gpg/key/public | tee /etc/apt/keyrings/adoptium.asc \ + && echo "deb [signed-by=/etc/apt/keyrings/adoptium.asc] https://packages.adoptium.net/artifactory/deb $(awk -F= '/^VERSION_CODENAME/{print$2}' /etc/os-release) main" | tee /etc/apt/sources.list.d/adoptium.list \ + && apt-get update \ + && apt-get -y dist-upgrade \ + && apt-get -y install temurin-17-jdk \ + && export ARCH=$(uname -m | sed -r 's/aarch64/arm64/g' | awk '!/arm64/{$0="amd64"}1') \ + && echo networkaddress.cache.ttl=1 >> /usr/lib/jvm/temurin-17-jdk-$ARCH/conf/security/java.security + +# Cleanup apt +RUN apt-get -y --purge autoremove \ + && apt-get autoclean \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +LABEL org.opencontainers.image.source=https://github.com/LangStream/langstream +LABEL org.opencontainers.image.licenses=Apache-2.0 \ No newline at end of file diff --git a/langstream-runtime/langstream-runtime-impl/pom.xml b/langstream-runtime/langstream-runtime-impl/pom.xml index 422e4bbad..debf905a6 100644 --- a/langstream-runtime/langstream-runtime-impl/pom.xml +++ b/langstream-runtime/langstream-runtime-impl/pom.xml @@ -27,7 +27,6 @@ langstream-runtime-impl - linux/amd64 false @@ -209,97 +208,6 @@ kubernetes-server-mock test - ${project.groupId} langstream-k8s-common @@ -653,11 +561,6 @@ ${project.basedir}/src/main/assemble/langstream-runtime.xml - - - ${docker.platforms} - - diff --git a/langstream-runtime/langstream-runtime-impl/src/main/docker/Dockerfile b/langstream-runtime/langstream-runtime-impl/src/main/docker/Dockerfile index 2d8767d7f..aeae4b175 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/docker/Dockerfile +++ b/langstream-runtime/langstream-runtime-impl/src/main/docker/Dockerfile @@ -15,41 +15,10 @@ # limitations under the License. # -FROM ubuntu:22.04 +FROM langstream/langstream-runtime-base:latest-dev ARG DEBIAN_FRONTEND=noninteractive -# Install some utilities -RUN echo 'Acquire::http::Timeout "30";\nAcquire::ftp::Timeout "30";\nAcquire::Retries "3";' > /etc/apt/apt.conf.d/99timeout_and_retries \ - && apt-get update \ - && apt-get -y dist-upgrade \ - && apt-get -y install --no-install-recommends vim netcat dnsutils less procps net-tools iputils-ping \ - curl ca-certificates wget apt-transport-https software-properties-common gpg-agent - -# Install Python3.11 -RUN apt-get update && add-apt-repository ppa:deadsnakes/ppa \ - && apt-get -y install --no-install-recommends python3.11-full \ - && update-alternatives --install /usr/bin/python3 python3 /usr/bin/python3.11 1 \ - && python3 -m ensurepip && python3 -m pip install pipenv - - -# Install Eclipse Temurin Package -RUN mkdir -p /etc/apt/keyrings \ - && wget -O - https://packages.adoptium.net/artifactory/api/gpg/key/public | tee /etc/apt/keyrings/adoptium.asc \ - && echo "deb [signed-by=/etc/apt/keyrings/adoptium.asc] https://packages.adoptium.net/artifactory/deb $(awk -F= '/^VERSION_CODENAME/{print$2}' /etc/os-release) main" | tee /etc/apt/sources.list.d/adoptium.list \ - && apt-get update \ - && apt-get -y dist-upgrade \ - && apt-get -y install temurin-17-jdk \ - && export ARCH=$(uname -m | sed -r 's/aarch64/arm64/g' | awk '!/arm64/{$0="amd64"}1') \ - && echo networkaddress.cache.ttl=1 >> /usr/lib/jvm/temurin-17-jdk-$ARCH/conf/security/java.security - -# Cleanup apt -RUN apt-get -y install unzip \ - && apt-get -y --purge autoremove \ - && apt-get autoclean \ - && apt-get clean \ - && rm -rf /var/lib/apt/lists/* - RUN mkdir /app && chmod g+w /app ADD maven/Pipfile.lock /app/Pipfile.lock @@ -58,8 +27,8 @@ ENV NLTK_DATA="/app/nltk_data" # Install python runtime deps RUN cd /app && pipenv requirements --categories="packages full" > /app/requirements.txt \ - && python3 -m pip install -r /app/requirements.txt \ - && python3 -m nltk.downloader -d /app/nltk_data punkt averaged_perceptron_tagger + && python3 -m pip install -r /app/requirements.txt \ + && python3 -m nltk.downloader -d /app/nltk_data punkt averaged_perceptron_tagger ENV PYTHONPATH="$PYTHONPATH:/app/python_libs" diff --git a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunner.java b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunner.java index 83c9d536a..bcf88278e 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunner.java +++ b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/AgentRunner.java @@ -55,6 +55,8 @@ import io.prometheus.client.hotspot.DefaultExports; import jakarta.servlet.Servlet; import java.io.IOException; +import java.lang.management.BufferPoolMXBean; +import java.lang.management.ManagementFactory; import java.net.InetAddress; import java.net.MalformedURLException; import java.net.URL; @@ -69,6 +71,10 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -285,151 +291,172 @@ private static void runJavaAgent( Runnable beforeStopSource) throws Exception { - topicConnectionsRuntime.init(configuration.streamingCluster()); - - // this is closed by the TopicSource - final TopicConsumer consumer; - TopicProducer deadLetterProducer = null; - if (configuration.input() != null && !configuration.input().isEmpty()) { - consumer = - topicConnectionsRuntime.createConsumer( - agentId, configuration.streamingCluster(), configuration.input()); - deadLetterProducer = - topicConnectionsRuntime.createDeadletterTopicProducer( - agentId, configuration.streamingCluster(), configuration.input()); - } else { - consumer = new NoopTopicConsumer(); - } - - if (deadLetterProducer == null) { - deadLetterProducer = new NoopTopicProducer(); - } - - // this is closed by the TopicSink - final TopicProducer producer; - if (configuration.output() != null && !configuration.output().isEmpty()) { - producer = - topicConnectionsRuntime.createProducer( - agentId, configuration.streamingCluster(), configuration.output()); - } else { - producer = new NoopTopicProducer(); - } - - ErrorsHandler errorsHandler = - new StandardErrorsHandler(configuration.agent().errorHandlerConfiguration()); + ScheduledExecutorService statsScheduler = Executors.newSingleThreadScheduledExecutor(); + try { - try (TopicAdmin topicAdmin = - topicConnectionsRuntime.createTopicAdmin( - agentId, configuration.streamingCluster(), configuration.output())) { - AgentProcessor mainProcessor; - if (agentCodeWithLoader.isProcessor()) { - mainProcessor = agentCodeWithLoader.asProcessor(); + topicConnectionsRuntime.init(configuration.streamingCluster()); + + // this is closed by the TopicSource + final TopicConsumer consumer; + TopicProducer deadLetterProducer = null; + if (configuration.input() != null && !configuration.input().isEmpty()) { + consumer = + topicConnectionsRuntime.createConsumer( + agentId, configuration.streamingCluster(), configuration.input()); + deadLetterProducer = + topicConnectionsRuntime.createDeadletterTopicProducer( + agentId, configuration.streamingCluster(), configuration.input()); } else { - mainProcessor = new IdentityAgentProvider.IdentityAgentCode(); - mainProcessor.setMetadata("identity", "identity", System.currentTimeMillis()); + consumer = new NoopTopicConsumer(); } - agentInfo.watchProcessor(mainProcessor); - AgentSource source = null; - if (agentCodeWithLoader.isSource()) { - source = agentCodeWithLoader.asSource(); - } else if (agentCodeWithLoader.is(code -> code instanceof CompositeAgentProcessor)) { - source = ((CompositeAgentProcessor) agentCodeWithLoader.agentCode()).getSource(); + if (deadLetterProducer == null) { + deadLetterProducer = new NoopTopicProducer(); } - if (source == null) { - source = new TopicConsumerSource(consumer, deadLetterProducer); - source.setMetadata("topic-source", "topic-source", System.currentTimeMillis()); - source.init(Map.of()); - } - agentInfo.watchSource(source); - - AgentSink sink = null; - if (agentCodeWithLoader.isSink()) { - sink = agentCodeWithLoader.asSink(); - } else if (agentCodeWithLoader.is(code -> code instanceof CompositeAgentProcessor)) { - sink = ((CompositeAgentProcessor) agentCodeWithLoader.agentCode()).getSink(); - } - if (sink == null) { - sink = new TopicProducerSink(producer); - sink.setMetadata("topic-sink", "topic-sink", System.currentTimeMillis()); - sink.init(Map.of()); + // this is closed by the TopicSink + final TopicProducer producer; + if (configuration.output() != null && !configuration.output().isEmpty()) { + producer = + topicConnectionsRuntime.createProducer( + agentId, configuration.streamingCluster(), configuration.output()); + } else { + producer = new NoopTopicProducer(); } - agentInfo.watchSink(sink); - String onBadRecord = - configuration - .agent() - .errorHandlerConfiguration() - .getOrDefault("onFailure", FAIL) - .toString(); - final BadRecordHandler brh = getBadRecordHandler(onBadRecord, deadLetterProducer); + ErrorsHandler errorsHandler = + new StandardErrorsHandler(configuration.agent().errorHandlerConfiguration()); + + try (TopicAdmin topicAdmin = + topicConnectionsRuntime.createTopicAdmin( + agentId, configuration.streamingCluster(), configuration.output())) { + AgentProcessor mainProcessor; + if (agentCodeWithLoader.isProcessor()) { + mainProcessor = agentCodeWithLoader.asProcessor(); + } else { + mainProcessor = new IdentityAgentProvider.IdentityAgentCode(); + mainProcessor.setMetadata("identity", "identity", System.currentTimeMillis()); + } + agentInfo.watchProcessor(mainProcessor); + + AgentSource source = null; + if (agentCodeWithLoader.isSource()) { + source = agentCodeWithLoader.asSource(); + } else if (agentCodeWithLoader.is( + code -> code instanceof CompositeAgentProcessor)) { + source = + ((CompositeAgentProcessor) agentCodeWithLoader.agentCode()).getSource(); + } - try { - topicAdmin.start(); - AgentContext agentContext = - new SimpleAgentContext( - agentId, - consumer, - producer, - topicAdmin, - brh, - new TopicConnectionProvider() { - @Override - public TopicConsumer createConsumer( - String agentId, Map config) { - return topicConnectionsRuntime.createConsumer( - agentId, configuration.streamingCluster(), config); - } + if (source == null) { + source = new TopicConsumerSource(consumer, deadLetterProducer); + source.setMetadata("topic-source", "topic-source", System.currentTimeMillis()); + source.init(Map.of()); + } + agentInfo.watchSource(source); + + AgentSink sink = null; + if (agentCodeWithLoader.isSink()) { + sink = agentCodeWithLoader.asSink(); + } else if (agentCodeWithLoader.is( + code -> code instanceof CompositeAgentProcessor)) { + sink = ((CompositeAgentProcessor) agentCodeWithLoader.agentCode()).getSink(); + } + if (sink == null) { + sink = new TopicProducerSink(producer); + sink.setMetadata("topic-sink", "topic-sink", System.currentTimeMillis()); + sink.init(Map.of()); + } + agentInfo.watchSink(sink); - @Override - public TopicProducer createProducer( - String agentId, Map config) { - return topicConnectionsRuntime.createProducer( - agentId, configuration.streamingCluster(), config); - } - }); - log.info("Source: {}", source); - log.info("Processor: {}", mainProcessor); - log.info("Sink: {}", sink); - PendingRecordsCounterSource pendingRecordsCounterSource = - new PendingRecordsCounterSource(source); - runMainLoop( - pendingRecordsCounterSource, - mainProcessor, - sink, - agentContext, - errorsHandler, - continueLoop); + String onBadRecord = + configuration + .agent() + .errorHandlerConfiguration() + .getOrDefault("onFailure", FAIL) + .toString(); + final BadRecordHandler brh = getBadRecordHandler(onBadRecord, deadLetterProducer); - pendingRecordsCounterSource.waitForNoPendingRecords(); + try { + topicAdmin.start(); + AgentContext agentContext = + new SimpleAgentContext( + agentId, + consumer, + producer, + topicAdmin, + brh, + new TopicConnectionProvider() { + @Override + public TopicConsumer createConsumer( + String agentId, Map config) { + return topicConnectionsRuntime.createConsumer( + agentId, + configuration.streamingCluster(), + config); + } + + @Override + public TopicProducer createProducer( + String agentId, Map config) { + return topicConnectionsRuntime.createProducer( + agentId, + configuration.streamingCluster(), + config); + } + }); + log.info("Source: {}", source); + log.info("Processor: {}", mainProcessor); + log.info("Sink: {}", sink); + PendingRecordsCounterSource pendingRecordsCounterSource = + new PendingRecordsCounterSource(source, sink.handlesCommit()); + + statsScheduler.scheduleAtFixedRate( + pendingRecordsCounterSource::dumpStats, 5, 5, TimeUnit.SECONDS); + + runMainLoop( + pendingRecordsCounterSource, + mainProcessor, + sink, + agentContext, + errorsHandler, + continueLoop); + + pendingRecordsCounterSource.waitForNoPendingRecords(); + + log.info("Main loop ended"); - log.info("Main loop ended"); + } finally { + mainProcessor.close(); + + if (beforeStopSource != null) { + // we want to perform validations after stopping the processors + // but without stopping the source (otherwise we cannot see the status of + // the + // consumers) + beforeStopSource.run(); + } - } finally { - mainProcessor.close(); + source.close(); + sink.close(); - if (beforeStopSource != null) { - // we want to perform validations after stopping the processors - // but without stopping the source (otherwise we cannot see the status of the - // consumers) - beforeStopSource.run(); + log.info("Agent fully stopped"); } - - source.close(); - sink.close(); - - log.info("Agent fully stopped"); } + } finally { + statsScheduler.shutdown(); } } private static final class PendingRecordsCounterSource implements AgentSource { private final AgentSource wrapped; private final Set pendingRecords = ConcurrentHashMap.newKeySet(); + private final AtomicLong totalSourceRecords = new AtomicLong(); + private final boolean sinkHandlesCommits; - public PendingRecordsCounterSource(AgentSource wrapped) { + public PendingRecordsCounterSource(AgentSource wrapped, boolean sinkHandlesCommits) { this.wrapped = wrapped; + this.sinkHandlesCommits = sinkHandlesCommits; } @Override @@ -476,7 +503,13 @@ public List getAgentStatus() { public List read() throws Exception { List read = wrapped.read(); if (read != null) { - pendingRecords.addAll(read); + totalSourceRecords.addAndGet(read.size()); + if (!sinkHandlesCommits) { + // is the Sink handles the commit (Kafka Connect case) + // then it doesn't notify the Source of the commit, + // so we cannot track this here, otherwise it is a memory leak + pendingRecords.addAll(read); + } } return read; } @@ -541,6 +574,30 @@ public void waitForNoPendingRecords() { Thread.currentThread().interrupt(); } } + + static final int MB = 1024 * 1024; + + public void dumpStats() { + Object currentRecords = sinkHandlesCommits ? "N/A" : pendingRecords.size(); + Runtime instance = Runtime.getRuntime(); + + BufferPoolMXBean directMemory = + ManagementFactory.getPlatformMXBeans(BufferPoolMXBean.class).stream() + .filter(b -> b.getName().contains("direct")) + .findFirst() + .orElse(null); + + log.info( + "Records: total {}, working {}, Memory stats: used {} MB, total {} MB, free {} MB, max {} MB " + + "Direct memory {} MB", + totalSourceRecords.get(), + currentRecords, + (instance.totalMemory() - instance.freeMemory()) / MB, + instance.totalMemory() / MB, + instance.freeMemory() / MB, + instance.maxMemory() / MB, + directMemory != null ? directMemory.getMemoryUsed() / MB : "?"); + } } private static BadRecordHandler getBadRecordHandler( diff --git a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/nar/NarFileHandler.java b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/nar/NarFileHandler.java index 1f0551bb5..c0b2a0e46 100644 --- a/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/nar/NarFileHandler.java +++ b/langstream-runtime/langstream-runtime-impl/src/main/java/ai/langstream/runtime/agent/nar/NarFileHandler.java @@ -190,7 +190,7 @@ public void handleNarFile(Path narFile) throws Exception { String string = new String(bytes, StandardCharsets.UTF_8); BufferedReader reader = new BufferedReader(new StringReader(string)); agents = reader.lines().filter(s -> !s.isBlank() && !s.startsWith("#")).toList(); - log.info( + log.debug( "The file {} contains a static agents index, skipping the unpacking. It is expected that handles these agents: {}", narFile, agents); @@ -204,7 +204,7 @@ public void handleNarFile(Path narFile) throws Exception { BufferedReader reader = new BufferedReader(new StringReader(string)); assetTypes = reader.lines().filter(s -> !s.isBlank() && !s.startsWith("#")).toList(); - log.info( + log.debug( "The file {} contains a static assetTypes index, skipping the unpacking. It is expected that handles these assetTypes: {}", narFile, assetTypes); @@ -219,7 +219,7 @@ public void handleNarFile(Path narFile) throws Exception { BufferedReader reader = new BufferedReader(new StringReader(string)); streamingClusterTypes = reader.lines().filter(s -> !s.isBlank() && !s.startsWith("#")).toList(); - log.info( + log.debug( "The file {} contains a static streamingClusters index, skipping the unpacking. It is expected that handles these streamingClusters: {}", narFile, assetTypes); @@ -249,14 +249,14 @@ public void handleNarFile(Path narFile) throws Exception { if (serviceProviderForAgents == null && serviceProviderForAssets == null && serviceProviderForStreamingClusters == null) { - log.info( + log.debug( "The file {} does not contain any AgentCodeProvider/AssetManagerProvider/TopicConnectionProvider, skipping the file", narFile); return; } } - log.info("The file {} does not contain any indexes, still adding the file", narFile); + log.debug("The file {} does not contain any indexes, still adding the file", narFile); PackageMetadata metadata = new PackageMetadata(narFile, filename, null, null, null); packages.put(filename, metadata); } @@ -271,7 +271,7 @@ public AssetManagerRegistry.AssetPackage loadPackageForAsset(String assetType) URLClassLoader classLoader = createClassloaderForPackage( customLibClasspath, packageForAssetType, parentClassloader); - log.info("For package {}, classloader {}", packageForAssetType.getName(), classLoader); + log.debug("For package {}, classloader {}", packageForAssetType.getName(), classLoader); return new AssetManagerRegistry.AssetPackage() { @Override public ClassLoader getClassloader() { @@ -295,7 +295,7 @@ public String getName() { URLClassLoader classLoader = createClassloaderForPackage( customLibClasspath, packageForStreamingCluster, parentClassloader); - log.info( + log.debug( "For package {}, classloader {}, parent {}", packageForStreamingCluster.getName(), classLoader, @@ -323,7 +323,7 @@ public AgentCodeRegistry.AgentPackage loadPackageForAgent(String agentType) { URLClassLoader classLoader = createClassloaderForPackage( customLibClasspath, packageForAgentType, parentClassloader); - log.info( + log.debug( "For package {}, classloader {}, parent {}", packageForAgentType.getName(), classLoader, @@ -396,10 +396,10 @@ private static URLClassLoader createClassloaderForPackage( metadata.unpack(); - log.info("Creating classloader for package {}", metadata.name); + log.debug("Creating classloader for package {}", metadata.name); List urls = new ArrayList<>(); - log.info("Adding agents code {}", metadata.directory); + log.debug("Adding agents code {}", metadata.directory); urls.add(metadata.directory.toFile().toURI().toURL()); Path metaInfDirectory = metadata.directory.resolve("META-INF"); diff --git a/langstream-runtime/pom.xml b/langstream-runtime/pom.xml index 0c5192b68..3edf02064 100644 --- a/langstream-runtime/pom.xml +++ b/langstream-runtime/pom.xml @@ -28,6 +28,7 @@ LangStream - Pod Runtime langstream-runtime-api + langstream-runtime-base-docker-image langstream-runtime-impl langstream-runtime-tester From 453da953630bb93013f4c2732fb7ea9e86e5b9c1 Mon Sep 17 00:00:00 2001 From: Enrico Olivelli Date: Wed, 13 Sep 2023 16:15:32 +0200 Subject: [PATCH 2/2] Fix CI --- .github/workflows/ci.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6f04db80b..4f3301ec3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -81,6 +81,7 @@ jobs: - name: 'Test: Runtime (Integration Tests)' if: ${{ matrix.name == 'Runtime IT' }} run: | + ./mvnw package -pl ":langstream-runtime-base-docker-image" -Pdocker -DskipTests -PskipPython ./mvnw package -pl ":langstream-runtime-impl" -Pdocker -DskipTests -PskipPython ./mvnw failsafe:integration-test failsafe:verify -pl ":langstream-runtime-impl" -PskipPython