Skip to content

Commit

Permalink
Gracefully handle udmis pod shutdown (#723)
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu committed Aug 30, 2023
1 parent a2243f9 commit 21efe5f
Show file tree
Hide file tree
Showing 19 changed files with 217 additions and 97 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ jobs:
java-version: '17'
- name: Checkout source
uses: actions/checkout@v3
with:
fetch-depth: 0
fetch-tags: true
- name: bin/run_tests install_dependencies
run: bin/run_tests install_dependencies
- name: bin/start_pubsub
Expand All @@ -49,7 +52,6 @@ jobs:
- name: Setup udmis container build
if: ${{ github.event_name == 'push' }}
run: |
set -x
revhash=$(git rev-parse $GITHUB_REF)
IMAGE_TAG=g${revhash:0:9}
PUSH_REPO=$PUSH_REGISTRY/${{ github.repository }}
Expand Down
2 changes: 1 addition & 1 deletion udmis/bin/container
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ revparse=`git rev-parse HEAD`
udmi_ver=g${revparse:0:9}
udmi_ref=$REPOSITORY/udmi:$udmi_ver

version=`git describe || echo $udmi_ref`
version=`git describe`

RUNARGS="--rm -ti -v $PWD/var:/udmi -v $HOME/.config:/root/.config --tmpfs /tmp"
TEMPLATES=$(cd etc; ls k8s_*.yaml)
Expand Down
5 changes: 2 additions & 3 deletions udmis/bin/run
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,5 @@ else
false
fi

java -jar $POD_JAR $POD_CONFIG

echo $?
# Run with exec to replace shell so java receives SIGTERM signal.
exec java -jar $POD_JAR $POD_CONFIG
5 changes: 5 additions & 0 deletions udmis/etc/k8s_udmis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ spec:
- name: udmis-core
image: @IMAGE-udmis@
imagePullPolicy: Always
readinessProbe:
exec:
command:
- cat
- /tmp/pod_ready.txt
resources:
requests:
cpu: 100m
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.google.bos.udmi.service.access;

import static com.google.common.base.Preconditions.checkState;
import static com.google.udmi.util.GeneralUtils.using;
import static java.lang.String.format;

import com.google.bos.udmi.service.core.ComponentName;
import com.google.common.collect.ImmutableSet;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
Expand All @@ -14,17 +16,20 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.jetbrains.annotations.TestOnly;
import udmi.schema.CloudModel;
import udmi.schema.Envelope.SubFolder;
import udmi.schema.IotAccess;

/**
* Iot Access Provider that runs locally (through filesystem).
*/
@ComponentName("iot-access")
public class LocalIotAccessProvider extends IotAccessBase {

private static final Map<String, Entry<Long, String>> DEVICE_CONFIGS = new HashMap<>();
BlockingQueue<String> sentCommands = new LinkedBlockingQueue<>();
private boolean failActivation;

/**
* Create a new instance for interfacing with multiple providers.
Expand Down Expand Up @@ -57,6 +62,7 @@ protected Set<String> getRegistriesForRegion(String region) {
@Override
public void activate() {
debug("activate");
checkState(!failActivation, "failing activation for test");
}

@Override
Expand Down Expand Up @@ -94,6 +100,11 @@ public void sendCommandBase(String registryId, String deviceId, SubFolder folder
sentCommands.add(format("%s/%s/%s:%s", registryId, deviceId, folder, message));
}

@TestOnly
public void setFailureForTest() {
failActivation = true;
}

@Override
public void shutdown() {
debug("shutdown");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.google.bos.udmi.service.core;

import static com.google.bos.udmi.service.access.IotAccessBase.MAX_CONFIG_LENGTH;
import static com.google.bos.udmi.service.core.ReflectProcessor.UDMI_VERSION;
import static com.google.bos.udmi.service.messaging.MessageDispatcher.messageHandlerFor;
import static com.google.bos.udmi.service.pod.UdmiServicePod.UDMI_VERSION;
import static com.google.common.base.Preconditions.checkState;
import static com.google.udmi.util.Common.DEVICE_ID_PROPERTY_KEY;
import static com.google.udmi.util.Common.REGISTRY_ID_PROPERTY_KEY;
Expand Down Expand Up @@ -159,7 +159,7 @@ protected void reflectError(SubType subType, BundleException bundleException) {
ErrorMessage errorMessage = new ErrorMessage();
errorMessage.error = (String) bundle.message;
errorMessage.data = encodeBase64(bundle.payload);
errorMessage.version = ReflectProcessor.DEPLOYED_CONFIG.udmi_version;
errorMessage.version = UdmiServicePod.getDeployedConfig().udmi_version;
errorMessage.timestamp = getTimestamp();
errorMap.put("payload", encodeBase64(stringify(errorMessage)));
error(format("Reflecting error %s/%s for %s", errorMap.get(SUBTYPE_PROPERTY_KEY),
Expand Down Expand Up @@ -318,8 +318,8 @@ public void shutdown() {
}

/**
* Simple exception indicator that a parse error occurred, so it wasn't something about the
* new config, but the previous config, so should essentially be retried.
* Simple exception indicator that a parse error occurred, so it wasn't something about the new
* config, but the previous config, so should essentially be retried.
*/
public static class PreviousParseException extends RuntimeException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.udmi.util.Common.ERROR_KEY;
import static com.google.udmi.util.Common.TIMESTAMP_KEY;
import static com.google.udmi.util.GeneralUtils.copyFields;
import static com.google.udmi.util.GeneralUtils.decodeBase64;
import static com.google.udmi.util.GeneralUtils.deepCopy;
import static com.google.udmi.util.GeneralUtils.encodeBase64;
Expand All @@ -26,8 +25,8 @@

import com.google.bos.udmi.service.messaging.MessageContinuation;
import com.google.bos.udmi.service.messaging.StateUpdate;
import com.google.bos.udmi.service.pod.UdmiServicePod;
import com.google.udmi.util.JsonUtil;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -36,7 +35,6 @@
import udmi.schema.Envelope;
import udmi.schema.Envelope.SubFolder;
import udmi.schema.Envelope.SubType;
import udmi.schema.SetupUdmiConfig;
import udmi.schema.UdmiConfig;
import udmi.schema.UdmiState;

Expand All @@ -47,11 +45,6 @@
public class ReflectProcessor extends ProcessorBase {

public static final String PAYLOAD_KEY = "payload";
public static final String HOSTNAME = System.getenv("HOSTNAME");
static final String DEPLOY_FILE = "var/deployed_version.json";
static final SetupUdmiConfig DEPLOYED_CONFIG =
loadFileStrictRequired(SetupUdmiConfig.class, new File(DEPLOY_FILE));
static final String UDMI_VERSION = requireNonNull(ReflectProcessor.DEPLOYED_CONFIG.udmi_version);

@Override
protected void defaultHandler(Object message) {
Expand Down Expand Up @@ -221,17 +214,11 @@ private void reflectStateHandler(Envelope envelope, UdmiState toolState) {
ifNotNullThen(distributor, d -> d.distribute(envelope, toolState));
updateAwareness(envelope, toolState);

UdmiConfig udmiConfig = new UdmiConfig();
udmiConfig.last_state = toolState.timestamp;
udmiConfig.setup = new SetupUdmiConfig();
copyFields(DEPLOYED_CONFIG, udmiConfig.setup, false);
udmiConfig.setup.hostname = HOSTNAME;
udmiConfig.setup.udmi_version = UDMI_VERSION;
udmiConfig.setup.functions_min = FUNCTIONS_VERSION_MIN;
udmiConfig.setup.functions_max = FUNCTIONS_VERSION_MAX;
UdmiConfig udmiConfig = UdmiServicePod.getUdmiConfig(toolState);

Map<String, Object> configMap = new HashMap<>();
configMap.put(SubFolder.UDMI.value(), udmiConfig);
configMap.put(TIMESTAMP_KEY, getTimestamp());
String contents = stringifyTerse(configMap);
debug("Setting reflector config %s %s: %s", registryId, deviceId, contents);
iotAccess.modifyConfig(registryId, deviceId, previous -> contents);
Expand Down Expand Up @@ -265,7 +252,7 @@ private void updateRegistryRegions(Map<String, String> regions) {

@Override
public void activate() {
debug("Deployment configuration: " + stringifyTerse(DEPLOYED_CONFIG));
debug("Deployment configuration: " + stringifyTerse(UdmiServicePod.getDeployedConfig()));
super.activate();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.google.bos.udmi.service.core;

import static com.google.bos.udmi.service.core.ReflectProcessor.UDMI_VERSION;
import static com.google.bos.udmi.service.pod.UdmiServicePod.UDMI_VERSION;
import static com.google.udmi.util.Common.TIMESTAMP_KEY;
import static com.google.udmi.util.Common.VERSION_KEY;
import static com.google.udmi.util.GeneralUtils.ifNotNullThen;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.google.bos.udmi.service.messaging.impl;

import static com.google.bos.udmi.service.pod.UdmiServicePod.HOSTNAME;
import static com.google.udmi.util.Common.SUBFOLDER_PROPERTY_KEY;
import static com.google.udmi.util.GeneralUtils.deepCopy;
import static com.google.udmi.util.GeneralUtils.friendlyStackTrace;
Expand All @@ -18,8 +19,10 @@

import com.google.bos.udmi.service.messaging.MessagePipe;
import com.google.bos.udmi.service.pod.ContainerBase;
import com.google.bos.udmi.service.pod.UdmiServicePod;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -37,6 +40,7 @@
import udmi.schema.Envelope;
import udmi.schema.Envelope.SubFolder;
import udmi.schema.Envelope.SubType;
import udmi.schema.UdmiConfig;

/**
* Base class for supporting a variety of messaging interfaces.
Expand Down Expand Up @@ -82,6 +86,15 @@ protected Bundle makeExceptionBundle(Envelope envelope, Exception exception) {
return bundle;
}

protected Bundle makeHelloBundle() {
UdmiConfig udmiConfig = UdmiServicePod.getUdmiConfig(null);
Bundle bundle = new Bundle(udmiConfig);
bundle.envelope.subType = SubType.CONFIG;
bundle.envelope.subFolder = SubFolder.UDMI;
bundle.envelope.publishTime = new Date();
return bundle;
}

protected void pushQueueEntry(BlockingQueue<QueueEntry> queue, String stringBundle) {
try {
requireNonNull(stringBundle, "missing queue bundle");
Expand Down Expand Up @@ -191,7 +204,6 @@ private void messageLoop(String id) {
debug("Processing waited %ds on message loop %s", waiting, id);
if (bundle.message.equals(TERMINATE_MARKER)) {
info("Terminating message loop %s", id);
terminateHandlers();
return;
}
envelope = bundle.envelope;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import static com.google.udmi.util.Common.SUBFOLDER_PROPERTY_KEY;
import static com.google.udmi.util.Common.SUBTYPE_PROPERTY_KEY;
import static com.google.udmi.util.GeneralUtils.friendlyStackTrace;
import static com.google.udmi.util.GeneralUtils.ifNotNullGet;
import static com.google.udmi.util.GeneralUtils.ifNotNullThen;
import static com.google.udmi.util.GeneralUtils.ifNullThen;
import static com.google.udmi.util.JsonUtil.stringify;
import static com.google.udmi.util.JsonUtil.toMap;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiService.Listener;
Expand All @@ -21,22 +22,23 @@
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PublisherGrpc;
import com.google.pubsub.v1.PubsubMessage;
import com.google.udmi.util.Common;
import com.google.udmi.util.GeneralUtils;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
Expand All @@ -63,6 +65,7 @@ public PubSubPipe(EndpointConfiguration configuration) {
projectId = variableSubstitution(configuration.hostname,
"no project id defined in configuration as 'hostname'");
publisher = ifNotNullGet(configuration.send_id, this::getPublisher);
ifNotNullThen(publisher, this::checkPublisher);
subscriber = ifNotNullGet(configuration.recv_id, this::getSubscriber);
String subscriptionName = ifNotNullGet(subscriber, Subscriber::getSubscriptionNameString);
String topicName = ifNotNullGet(publisher, Publisher::getTopicNameString);
Expand All @@ -72,6 +75,10 @@ public PubSubPipe(EndpointConfiguration configuration) {
}
}

private void checkPublisher() {
publish(makeHelloBundle());
}

public static MessagePipe fromConfig(EndpointConfiguration configuration) {
return new PubSubPipe(configuration);
}
Expand Down Expand Up @@ -103,6 +110,12 @@ public void activate(Consumer<Bundle> bundleConsumer) {
subscriber.startAsync();
}

@Override
public void shutdown() {
subscriber.stopAsync().awaitTerminated();
super.shutdown();
}

@Override
public void publish(Bundle bundle) {
try {
Expand Down Expand Up @@ -153,29 +166,38 @@ Publisher getPublisher(String topicName) {
info(format("Publisher %s:%s", Optional.ofNullable(emu).orElse(GCP_HOST), projectTopicName));
return builder.build();
} catch (Exception e) {
throw new RuntimeException("While creating emulator publisher", e);
throw new RuntimeException("While creating publisher", e);
}
}

Subscriber getSubscriber(String subName) {
try {
ProjectSubscriptionName subscription = ProjectSubscriptionName.of(projectId, subName);
Subscriber.Builder builder = Subscriber.newBuilder(subscription, this);
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subName);
Subscriber.Builder builder = Subscriber.newBuilder(subscriptionName, this);
String emu = getEmulatorHost();
ifNullThen(emu, () -> checkSubscription(subscriptionName));
ifNotNullThen(emu, host -> builder.setChannelProvider(getTransportChannelProvider(host)));
ifNotNullThen(emu, host -> builder.setCredentialsProvider(NoCredentialsProvider.create()));
info(format("Subscriber %s:%s", Optional.ofNullable(emu).orElse(GCP_HOST), subscription));
builder.setParallelPullCount(EXECUTION_THREADS);
Subscriber built = builder.build();
info(format("Subscriber %s:%s", Optional.ofNullable(emu).orElse(GCP_HOST), subscriptionName));
built.addListener(new Listener() {
@Override
public void failed(State from, Throwable failure) {
debug("Subscriber state %s: %s", from, GeneralUtils.stackTraceString(failure));
debug("Subscriber state %s: %s", from, friendlyStackTrace(failure));
}
}, Executors.newSingleThreadExecutor());
return built;
} catch (Exception e) {
throw new RuntimeException("While creating emulator subscriber", e);
throw new RuntimeException("While creating subscriber", e);
}
}

private static void checkSubscription(ProjectSubscriptionName subscriptionName) {
try (SubscriptionAdminClient client = SubscriptionAdminClient.create()) {
client.getSubscription(subscriptionName).getAckDeadlineSeconds();
} catch (Exception e) {
throw new RuntimeException("Checking subscription " + subscriptionName, e);
}
}

Expand Down
Loading

0 comments on commit 21efe5f

Please sign in to comment.