Skip to content

Commit

Permalink
[operator] Fix app update and add test (#479)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicoloboschi authored Sep 25, 2023
1 parent 1100720 commit 505cf57
Show file tree
Hide file tree
Showing 12 changed files with 208 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,5 +247,8 @@ private void cleanupAsset(AssetDefinition asset, AssetManagerRegistry assetManag
@Override
public void close() {
registry.close();
if (deployContext != null) {
deployContext.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import ai.langstream.tests.util.BaseEndToEndTest;
import io.fabric8.kubernetes.api.model.Secret;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
Expand All @@ -35,30 +36,51 @@ public class PythonAgentsIT extends BaseEndToEndTest {

@ParameterizedTest
@ValueSource(strings = {"python-processor", "experimental-python-processor"})
public void testProcessor(String appDir) {
public void testProcessor(String appDir) throws Exception {
installLangStreamCluster(true);
final String tenant = "ten-" + System.currentTimeMillis();
setupTenant(tenant);
final String applicationId = "my-test-app";
deployLocalApplication(applicationId, appDir);
deployLocalApplication(applicationId, appDir, Map.of("SECRET1_VK", "super secret value"));
final String tenantNamespace = TENANT_NAMESPACE_PREFIX + tenant;
awaitApplicationReady(applicationId, 1);

executeCommandOnClient(
"bin/langstream gateway produce %s produce-input -v my-value --connect-timeout 30"
"bin/langstream gateway produce %s produce-input -v my-value --connect-timeout 30 -p sessionId=s1"
.formatted(applicationId)
.split(" "));

final String output =
String output =
executeCommandOnClient(
"bin/langstream gateway consume %s consume-output --position earliest -n 1 --connect-timeout 30"
"bin/langstream gateway consume %s consume-output --position earliest -n 1 --connect-timeout 30 -p sessionId=s1"
.formatted(applicationId)
.split(" "));
log.info("Output: {}", output);
Assertions.assertTrue(
output.contains(
"{\"record\":{\"key\":null,\"value\":\"my-value!!super secret value\","
+ "\"headers\":{}}"));
+ "\"headers\":{\"langstream-client-session-id\":\"s1\"}}"));

updateLocalApplication(
applicationId, appDir, Map.of("SECRET1_VK", "super secret value - changed"));
Thread.sleep(5000);
awaitApplicationReady(applicationId, 1);

executeCommandOnClient(
"bin/langstream gateway produce %s produce-input -v my-value --connect-timeout 30 -p sessionId=s2"
.formatted(applicationId)
.split(" "));

output =
executeCommandOnClient(
"bin/langstream gateway consume %s consume-output --position earliest -n 1 --connect-timeout 30 -p sessionId=s2"
.formatted(applicationId)
.split(" "));
log.info("Output2: {}", output);
Assertions.assertTrue(
output.contains(
"{\"record\":{\"key\":null,\"value\":\"my-value!!super secret value - changed\","
+ "\"headers\":{\"langstream-client-session-id\":\"s2\"}}"));

executeCommandOnClient("bin/langstream apps delete %s".formatted(applicationId).split(" "));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ private static void installLangStream(boolean authentication) {
memory: 256Mi
app:
config:
logging.level.org.apache.tomcat.websocket: debug
logging.level.ai.langstream.apigateway.websocket: debug
runtime:
image: %s/langstream-runtime
Expand Down Expand Up @@ -1026,6 +1026,18 @@ protected static boolean isApplicationReady(
@SneakyThrows
protected static void deployLocalApplication(
String applicationId, String appDirName, Map<String, String> env) {
deployLocalApplication(false, applicationId, appDirName, env);
}

@SneakyThrows
protected static void updateLocalApplication(
String applicationId, String appDirName, Map<String, String> env) {
deployLocalApplication(true, applicationId, appDirName, env);
}

@SneakyThrows
private static void deployLocalApplication(
boolean isUpdate, String applicationId, String appDirName, Map<String, String> env) {
String testAppsBaseDir = "src/test/resources/apps";
String testSecretBaseDir = "src/test/resources/secrets";

Expand All @@ -1048,8 +1060,8 @@ protected static void deployLocalApplication(

executeCommandOnClient(
(beforeCmd
+ "bin/langstream apps deploy %s -app /tmp/app -i /tmp/instance.yaml -s /tmp/secrets.yaml")
.formatted(applicationId)
+ "bin/langstream apps %s %s -app /tmp/app -i /tmp/instance.yaml -s /tmp/secrets.yaml")
.formatted(isUpdate ? "update" : "deploy", applicationId)
.split(" "));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@ gateways:
- id: produce-input
type: produce
topic: ls-test-topic0
parameters:
- sessionId
produce-options:
headers:
- key: langstream-client-session-id
value-from-parameters: sessionId

- id: consume-output
type: consume
topic: ls-test-topic1
topic: ls-test-topic1
parameters:
- sessionId
consume-options:
filters:
headers:
- key: langstream-client-session-id
value-from-parameters: sessionId
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
# limitations under the License.
#

from langstream import SingleRecordProcessor
from langstream import SimpleRecord, SingleRecordProcessor


class Exclamation(SingleRecordProcessor):
Expand All @@ -23,4 +23,4 @@ def init(self, config):
self.secret_value = config["secret_value"]

def process_record(self, record):
return [(record.value() + "!!" + self.secret_value,)]
return [SimpleRecord(record.value() + "!!" + self.secret_value, headers=record.headers())]
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ gateways:
- id: produce-input
type: produce
topic: ls-test-topic0
parameters:
- sessionId
produce-options:
headers:
- key: langstream-client-session-id
value-from-parameters: sessionId
- id: consume-output
type: consume
topic: ls-test-topic1
topic: ls-test-topic1
parameters:
- sessionId
consume-options:
filters:
headers:
- key: langstream-client-session-id
value-from-parameters: sessionId
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,4 @@ def init(self, config):
self.secret_value = config["secret_value"]

def process_record(self, record):
return [SimpleRecord(record.value() + "!!" + self.secret_value)]
return [SimpleRecord(record.value() + "!!" + self.secret_value, headers=record.headers())]
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ secrets:
- name: secret1
id: secret1
data:
value-key: super secret value
value-key: "${SECRET1_VK}"
- name: cassandra
id: cassandra
data:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import ai.langstream.deployer.k8s.ResolvedDeployerConfiguration;
import ai.langstream.deployer.k8s.TenantLimitsChecker;
import ai.langstream.deployer.k8s.api.crds.BaseStatus;
import ai.langstream.deployer.k8s.util.JSONComparator;
import ai.langstream.deployer.k8s.util.SerializationUtil;
import ai.langstream.deployer.k8s.util.SpecDiffer;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
Expand All @@ -30,6 +28,7 @@
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import jakarta.inject.Inject;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.jbosslog.JBossLog;

@JBossLog
Expand All @@ -42,7 +41,28 @@ public abstract class BaseController<T extends CustomResource<?, ? extends BaseS

@Inject protected TenantLimitsChecker appResourcesLimiter;

protected abstract UpdateControl<T> patchResources(T resource, Context<T> context);
@Getter
protected static class PatchResult {

public static PatchResult patch(UpdateControl<?> updateControl) {
final PatchResult patchResult = new PatchResult(updateControl);
return patchResult;
}

public PatchResult(UpdateControl<?> updateControl) {
this.updateControl = updateControl;
}

UpdateControl<?> updateControl;
Object lastApplied;

public PatchResult withLastApplied(Object lastApplied) {
this.lastApplied = lastApplied;
return this;
}
}

protected abstract PatchResult patchResources(T resource, Context<T> context);

protected abstract DeleteControl cleanupResources(T resource, Context<T> context);

Expand Down Expand Up @@ -71,11 +91,16 @@ public DeleteControl cleanup(T resource, Context<T> context) {
@Override
public UpdateControl<T> reconcile(T resource, Context<T> context) {
String lastApplied;
UpdateControl<T> result;
UpdateControl<?> result;
final BaseStatus baseStatus = resource.getStatus();
try {
result = patchResources(resource, context);
lastApplied = SerializationUtil.writeAsJson(resource.getSpec());
final PatchResult patchResult = patchResources(resource, context);
result = patchResult.getUpdateControl();
final Object lastAppliedObject =
patchResult.getLastApplied() == null
? resource.getSpec()
: patchResult.getLastApplied();
lastApplied = SerializationUtil.writeAsJson(lastAppliedObject);
baseStatus.setLastApplied(lastApplied);
log.infof(
"Reconciled application %s, reschedule: %s, status: %s",
Expand All @@ -91,20 +116,6 @@ public UpdateControl<T> reconcile(T resource, Context<T> context) {
throwable.getMessage());
result = UpdateControl.updateStatus(resource).rescheduleAfter(5, TimeUnit.SECONDS);
}
return result;
}

protected static boolean areSpecChanged(CustomResource<?, ? extends BaseStatus> cr) {
final String lastApplied = cr.getStatus().getLastApplied();
if (lastApplied == null) {
return true;
}
final JSONComparator.Result diff = SpecDiffer.generateDiff(lastApplied, cr.getSpec());
if (!diff.areEquals()) {
log.infof("Spec changed for %s", cr.getMetadata().getName());
SpecDiffer.logDetailedSpecDiff(diff);
return true;
}
return false;
return (UpdateControl<T>) result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import ai.langstream.deployer.k8s.api.crds.agents.AgentStatus;
import ai.langstream.deployer.k8s.controllers.BaseController;
import ai.langstream.deployer.k8s.controllers.InfiniteRetry;
import ai.langstream.deployer.k8s.util.JSONComparator;
import ai.langstream.deployer.k8s.util.KubeUtil;
import ai.langstream.deployer.k8s.util.SerializationUtil;
import ai.langstream.deployer.k8s.util.SpecDiffer;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.apps.StatefulSet;
import io.javaoperatorsdk.operator.api.reconciler.Constants;
Expand Down Expand Up @@ -66,7 +68,7 @@ public ErrorStatusUpdateControl<AgentCustomResource> updateErrorStatus(
}

@Override
protected UpdateControl<AgentCustomResource> patchResources(
protected PatchResult patchResources(
AgentCustomResource agent, Context<AgentCustomResource> context) {
final String targetNamespace = agent.getMetadata().getNamespace();
final String name =
Expand All @@ -80,10 +82,11 @@ protected UpdateControl<AgentCustomResource> patchResources(
setLastAppliedConfig(agent);
if (KubeUtil.isStatefulSetReady(current)) {
agent.getStatus().setStatus(AgentLifecycleStatus.DEPLOYED);
return UpdateControl.updateStatus(agent);
return PatchResult.patch(UpdateControl.updateStatus(agent));
} else {
agent.getStatus().setStatus(AgentLifecycleStatus.DEPLOYING);
return UpdateControl.updateStatus(agent).rescheduleAfter(5, TimeUnit.SECONDS);
return PatchResult.patch(
UpdateControl.updateStatus(agent).rescheduleAfter(5, TimeUnit.SECONDS));
}
}

Expand Down Expand Up @@ -136,7 +139,7 @@ protected StatefulSet desired(
final AgentStatus status = primary.getStatus();
if (status != null && existingStatefulset != null) {
// spec has not changed, do not touch the statefulset at all
if (!BaseController.areSpecChanged(primary)) {
if (!areSpecChanged(primary)) {
log.infof(
"Agent %s spec has not changed, skipping statefulset update",
primary.getMetadata().getName());
Expand Down Expand Up @@ -214,4 +217,18 @@ public static class LastAppliedConfigForStatefulset {
private String imagePullPolicy;
private PodTemplate podTemplate;
}

protected static boolean areSpecChanged(AgentCustomResource cr) {
final String lastApplied = cr.getStatus().getLastApplied();
if (lastApplied == null) {
return true;
}
final JSONComparator.Result diff = SpecDiffer.generateDiff(lastApplied, cr.getSpec());
if (!diff.areEquals()) {
log.infof("Spec changed for %s", cr.getMetadata().getName());
SpecDiffer.logDetailedSpecDiff(diff);
return true;
}
return false;
}
}
Loading

0 comments on commit 505cf57

Please sign in to comment.