Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix app update and add test #479

Merged
merged 5 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,5 +247,8 @@ private void cleanupAsset(AssetDefinition asset, AssetManagerRegistry assetManag
@Override
public void close() {
registry.close();
if (deployContext != null) {
deployContext.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import ai.langstream.tests.util.BaseEndToEndTest;
import io.fabric8.kubernetes.api.model.Secret;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
Expand All @@ -35,30 +36,51 @@ public class PythonAgentsIT extends BaseEndToEndTest {

@ParameterizedTest
@ValueSource(strings = {"python-processor", "experimental-python-processor"})
public void testProcessor(String appDir) {
public void testProcessor(String appDir) throws Exception {
installLangStreamCluster(true);
final String tenant = "ten-" + System.currentTimeMillis();
setupTenant(tenant);
final String applicationId = "my-test-app";
deployLocalApplication(applicationId, appDir);
deployLocalApplication(applicationId, appDir, Map.of("SECRET1_VK", "super secret value"));
final String tenantNamespace = TENANT_NAMESPACE_PREFIX + tenant;
awaitApplicationReady(applicationId, 1);

executeCommandOnClient(
"bin/langstream gateway produce %s produce-input -v my-value --connect-timeout 30"
"bin/langstream gateway produce %s produce-input -v my-value --connect-timeout 30 -p sessionId=s1"
.formatted(applicationId)
.split(" "));

final String output =
String output =
executeCommandOnClient(
"bin/langstream gateway consume %s consume-output --position earliest -n 1 --connect-timeout 30"
"bin/langstream gateway consume %s consume-output --position earliest -n 1 --connect-timeout 30 -p sessionId=s1"
.formatted(applicationId)
.split(" "));
log.info("Output: {}", output);
Assertions.assertTrue(
output.contains(
"{\"record\":{\"key\":null,\"value\":\"my-value!!super secret value\","
+ "\"headers\":{}}"));
+ "\"headers\":{\"langstream-client-session-id\":\"s1\"}}"));

updateLocalApplication(
applicationId, appDir, Map.of("SECRET1_VK", "super secret value - changed"));
Thread.sleep(5000);
awaitApplicationReady(applicationId, 1);

executeCommandOnClient(
"bin/langstream gateway produce %s produce-input -v my-value --connect-timeout 30 -p sessionId=s2"
.formatted(applicationId)
.split(" "));

output =
executeCommandOnClient(
"bin/langstream gateway consume %s consume-output --position earliest -n 1 --connect-timeout 30 -p sessionId=s2"
.formatted(applicationId)
.split(" "));
log.info("Output2: {}", output);
Assertions.assertTrue(
output.contains(
"{\"record\":{\"key\":null,\"value\":\"my-value!!super secret value - changed\","
+ "\"headers\":{\"langstream-client-session-id\":\"s2\"}}"));

executeCommandOnClient("bin/langstream apps delete %s".formatted(applicationId).split(" "));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ private static void installLangStream(boolean authentication) {
memory: 256Mi
app:
config:
logging.level.org.apache.tomcat.websocket: debug
logging.level.ai.langstream.apigateway.websocket: debug

runtime:
image: %s/langstream-runtime
Expand Down Expand Up @@ -1026,6 +1026,18 @@ protected static boolean isApplicationReady(
@SneakyThrows
protected static void deployLocalApplication(
String applicationId, String appDirName, Map<String, String> env) {
deployLocalApplication(false, applicationId, appDirName, env);
}

@SneakyThrows
protected static void updateLocalApplication(
String applicationId, String appDirName, Map<String, String> env) {
deployLocalApplication(true, applicationId, appDirName, env);
}

@SneakyThrows
private static void deployLocalApplication(
boolean isUpdate, String applicationId, String appDirName, Map<String, String> env) {
String testAppsBaseDir = "src/test/resources/apps";
String testSecretBaseDir = "src/test/resources/secrets";

Expand All @@ -1048,8 +1060,8 @@ protected static void deployLocalApplication(

executeCommandOnClient(
(beforeCmd
+ "bin/langstream apps deploy %s -app /tmp/app -i /tmp/instance.yaml -s /tmp/secrets.yaml")
.formatted(applicationId)
+ "bin/langstream apps %s %s -app /tmp/app -i /tmp/instance.yaml -s /tmp/secrets.yaml")
.formatted(isUpdate ? "update" : "deploy", applicationId)
.split(" "));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@ gateways:
- id: produce-input
type: produce
topic: ls-test-topic0
parameters:
- sessionId
produce-options:
headers:
- key: langstream-client-session-id
value-from-parameters: sessionId

- id: consume-output
type: consume
topic: ls-test-topic1
topic: ls-test-topic1
parameters:
- sessionId
consume-options:
filters:
headers:
- key: langstream-client-session-id
value-from-parameters: sessionId
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
#

from langstream import SingleRecordProcessor
from langstream import SimpleRecord, SingleRecordProcessor


class Exclamation(SingleRecordProcessor):
Expand All @@ -23,4 +23,4 @@ def init(self, config):
self.secret_value = config["secret_value"]

def process_record(self, record):
return [(record.value() + "!!" + self.secret_value,)]
return [SimpleRecord(record.value() + "!!" + self.secret_value, headers=record.headers())]
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ gateways:
- id: produce-input
type: produce
topic: ls-test-topic0
parameters:
- sessionId
produce-options:
headers:
- key: langstream-client-session-id
value-from-parameters: sessionId
- id: consume-output
type: consume
topic: ls-test-topic1
topic: ls-test-topic1
parameters:
- sessionId
consume-options:
filters:
headers:
- key: langstream-client-session-id
value-from-parameters: sessionId
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ def init(self, config):
self.secret_value = config["secret_value"]

def process_record(self, record):
return [SimpleRecord(record.value() + "!!" + self.secret_value)]
return [SimpleRecord(record.value() + "!!" + self.secret_value, headers=record.headers())]
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ secrets:
- name: secret1
id: secret1
data:
value-key: super secret value
value-key: "${SECRET1_VK}"
- name: cassandra
id: cassandra
data:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import ai.langstream.deployer.k8s.ResolvedDeployerConfiguration;
import ai.langstream.deployer.k8s.TenantLimitsChecker;
import ai.langstream.deployer.k8s.api.crds.BaseStatus;
import ai.langstream.deployer.k8s.util.JSONComparator;
import ai.langstream.deployer.k8s.util.SerializationUtil;
import ai.langstream.deployer.k8s.util.SpecDiffer;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
Expand All @@ -30,6 +28,7 @@
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import jakarta.inject.Inject;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.jbosslog.JBossLog;

@JBossLog
Expand All @@ -42,7 +41,28 @@ public abstract class BaseController<T extends CustomResource<?, ? extends BaseS

@Inject protected TenantLimitsChecker appResourcesLimiter;

protected abstract UpdateControl<T> patchResources(T resource, Context<T> context);
@Getter
protected static class PatchResult {

public static PatchResult patch(UpdateControl<?> updateControl) {
final PatchResult patchResult = new PatchResult(updateControl);
return patchResult;
}

public PatchResult(UpdateControl<?> updateControl) {
this.updateControl = updateControl;
}

UpdateControl<?> updateControl;
Object lastApplied;

public PatchResult withLastApplied(Object lastApplied) {
this.lastApplied = lastApplied;
return this;
}
}

protected abstract PatchResult patchResources(T resource, Context<T> context);

protected abstract DeleteControl cleanupResources(T resource, Context<T> context);

Expand Down Expand Up @@ -71,11 +91,16 @@ public DeleteControl cleanup(T resource, Context<T> context) {
@Override
public UpdateControl<T> reconcile(T resource, Context<T> context) {
String lastApplied;
UpdateControl<T> result;
UpdateControl<?> result;
final BaseStatus baseStatus = resource.getStatus();
try {
result = patchResources(resource, context);
lastApplied = SerializationUtil.writeAsJson(resource.getSpec());
final PatchResult patchResult = patchResources(resource, context);
result = patchResult.getUpdateControl();
final Object lastAppliedObject =
patchResult.getLastApplied() == null
? resource.getSpec()
: patchResult.getLastApplied();
lastApplied = SerializationUtil.writeAsJson(lastAppliedObject);
baseStatus.setLastApplied(lastApplied);
log.infof(
"Reconciled application %s, reschedule: %s, status: %s",
Expand All @@ -91,20 +116,6 @@ public UpdateControl<T> reconcile(T resource, Context<T> context) {
throwable.getMessage());
result = UpdateControl.updateStatus(resource).rescheduleAfter(5, TimeUnit.SECONDS);
}
return result;
}

protected static boolean areSpecChanged(CustomResource<?, ? extends BaseStatus> cr) {
final String lastApplied = cr.getStatus().getLastApplied();
if (lastApplied == null) {
return true;
}
final JSONComparator.Result diff = SpecDiffer.generateDiff(lastApplied, cr.getSpec());
if (!diff.areEquals()) {
log.infof("Spec changed for %s", cr.getMetadata().getName());
SpecDiffer.logDetailedSpecDiff(diff);
return true;
}
return false;
return (UpdateControl<T>) result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import ai.langstream.deployer.k8s.api.crds.agents.AgentStatus;
import ai.langstream.deployer.k8s.controllers.BaseController;
import ai.langstream.deployer.k8s.controllers.InfiniteRetry;
import ai.langstream.deployer.k8s.util.JSONComparator;
import ai.langstream.deployer.k8s.util.KubeUtil;
import ai.langstream.deployer.k8s.util.SerializationUtil;
import ai.langstream.deployer.k8s.util.SpecDiffer;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.StatefulSet;
import io.javaoperatorsdk.operator.api.reconciler.Constants;
Expand Down Expand Up @@ -66,7 +68,7 @@ public ErrorStatusUpdateControl<AgentCustomResource> updateErrorStatus(
}

@Override
protected UpdateControl<AgentCustomResource> patchResources(
protected PatchResult patchResources(
AgentCustomResource agent, Context<AgentCustomResource> context) {
final String targetNamespace = agent.getMetadata().getNamespace();
final String name =
Expand All @@ -80,10 +82,11 @@ protected UpdateControl<AgentCustomResource> patchResources(
setLastAppliedConfig(agent);
if (KubeUtil.isStatefulSetReady(current)) {
agent.getStatus().setStatus(AgentLifecycleStatus.DEPLOYED);
return UpdateControl.updateStatus(agent);
return PatchResult.patch(UpdateControl.updateStatus(agent));
} else {
agent.getStatus().setStatus(AgentLifecycleStatus.DEPLOYING);
return UpdateControl.updateStatus(agent).rescheduleAfter(5, TimeUnit.SECONDS);
return PatchResult.patch(
UpdateControl.updateStatus(agent).rescheduleAfter(5, TimeUnit.SECONDS));
}
}

Expand Down Expand Up @@ -136,7 +139,7 @@ protected StatefulSet desired(
final AgentStatus status = primary.getStatus();
if (status != null && existingStatefulset != null) {
// spec has not changed, do not touch the statefulset at all
if (!BaseController.areSpecChanged(primary)) {
if (!areSpecChanged(primary)) {
log.infof(
"Agent %s spec has not changed, skipping statefulset update",
primary.getMetadata().getName());
Expand Down Expand Up @@ -214,4 +217,18 @@ public static class LastAppliedConfigForStatefulset {
private String imagePullPolicy;
private PodTemplate podTemplate;
}

protected static boolean areSpecChanged(AgentCustomResource cr) {
final String lastApplied = cr.getStatus().getLastApplied();
if (lastApplied == null) {
return true;
}
final JSONComparator.Result diff = SpecDiffer.generateDiff(lastApplied, cr.getSpec());
if (!diff.areEquals()) {
log.infof("Spec changed for %s", cr.getMetadata().getName());
SpecDiffer.logDetailedSpecDiff(diff);
return true;
}
return false;
}
}
Loading
Loading