Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gracefully handle udmis pod shutdown #723

Merged
merged 17 commits into from
Aug 30, 2023
4 changes: 3 additions & 1 deletion .github/workflows/testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ jobs:
java-version: '17'
- name: Checkout source
uses: actions/checkout@v3
with:
fetch-depth: 0
fetch-tags: true
- name: bin/run_tests install_dependencies
run: bin/run_tests install_dependencies
- name: bin/start_pubsub
Expand All @@ -49,7 +52,6 @@ jobs:
- name: Setup udmis container build
if: ${{ github.event_name == 'push' }}
run: |
set -x
revhash=$(git rev-parse $GITHUB_REF)
IMAGE_TAG=g${revhash:0:9}
PUSH_REPO=$PUSH_REGISTRY/${{ github.repository }}
Expand Down
2 changes: 1 addition & 1 deletion udmis/bin/container
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ revparse=`git rev-parse HEAD`
udmi_ver=g${revparse:0:9}
udmi_ref=$REPOSITORY/udmi:$udmi_ver

version=`git describe || echo $udmi_ref`
version=`git describe`

RUNARGS="--rm -ti -v $PWD/var:/udmi -v $HOME/.config:/root/.config --tmpfs /tmp"
TEMPLATES=$(cd etc; ls k8s_*.yaml)
Expand Down
5 changes: 2 additions & 3 deletions udmis/bin/run
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,5 @@ else
false
fi

java -jar $POD_JAR $POD_CONFIG

echo $?
# Run with exec to replace shell so java receives SIGTERM signal.
exec java -jar $POD_JAR $POD_CONFIG
5 changes: 5 additions & 0 deletions udmis/etc/k8s_udmis.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ spec:
- name: udmis-core
image: @IMAGE-udmis@
imagePullPolicy: Always
readinessProbe:
exec:
command:
- cat
- /tmp/pod_ready.txt
resources:
requests:
cpu: 100m
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.google.bos.udmi.service.access;

import static com.google.common.base.Preconditions.checkState;
import static com.google.udmi.util.GeneralUtils.using;
import static java.lang.String.format;

import com.google.bos.udmi.service.core.ComponentName;
import com.google.common.collect.ImmutableSet;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
Expand All @@ -14,17 +16,20 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.jetbrains.annotations.TestOnly;
import udmi.schema.CloudModel;
import udmi.schema.Envelope.SubFolder;
import udmi.schema.IotAccess;

/**
* Iot Access Provider that runs locally (through filesystem).
*/
@ComponentName("iot-access")
public class LocalIotAccessProvider extends IotAccessBase {

private static final Map<String, Entry<Long, String>> DEVICE_CONFIGS = new HashMap<>();
BlockingQueue<String> sentCommands = new LinkedBlockingQueue<>();
private boolean failActivation;

/**
* Create a new instance for interfacing with multiple providers.
Expand Down Expand Up @@ -57,6 +62,7 @@ protected Set<String> getRegistriesForRegion(String region) {
@Override
public void activate() {
debug("activate");
checkState(!failActivation, "failing activation for test");
}

@Override
Expand Down Expand Up @@ -94,6 +100,11 @@ public void sendCommandBase(String registryId, String deviceId, SubFolder folder
sentCommands.add(format("%s/%s/%s:%s", registryId, deviceId, folder, message));
}

@TestOnly
public void setFailureForTest() {
failActivation = true;
}

@Override
public void shutdown() {
debug("shutdown");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.google.bos.udmi.service.core;

import static com.google.bos.udmi.service.access.IotAccessBase.MAX_CONFIG_LENGTH;
import static com.google.bos.udmi.service.core.ReflectProcessor.UDMI_VERSION;
import static com.google.bos.udmi.service.messaging.MessageDispatcher.messageHandlerFor;
import static com.google.bos.udmi.service.pod.UdmiServicePod.UDMI_VERSION;
import static com.google.common.base.Preconditions.checkState;
import static com.google.udmi.util.Common.DEVICE_ID_PROPERTY_KEY;
import static com.google.udmi.util.Common.REGISTRY_ID_PROPERTY_KEY;
Expand Down Expand Up @@ -159,7 +159,7 @@ protected void reflectError(SubType subType, BundleException bundleException) {
ErrorMessage errorMessage = new ErrorMessage();
errorMessage.error = (String) bundle.message;
errorMessage.data = encodeBase64(bundle.payload);
errorMessage.version = ReflectProcessor.DEPLOYED_CONFIG.udmi_version;
errorMessage.version = UdmiServicePod.getDeployedConfig().udmi_version;
errorMessage.timestamp = getTimestamp();
errorMap.put("payload", encodeBase64(stringify(errorMessage)));
error(format("Reflecting error %s/%s for %s", errorMap.get(SUBTYPE_PROPERTY_KEY),
Expand Down Expand Up @@ -318,8 +318,8 @@ public void shutdown() {
}

/**
* Simple exception indicator that a parse error occurred, so it wasn't something about the
* new config, but the previous config, so should essentially be retried.
* Simple exception indicator that a parse error occurred, so it wasn't something about the new
* config, but the previous config, so should essentially be retried.
*/
public static class PreviousParseException extends RuntimeException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.udmi.util.Common.ERROR_KEY;
import static com.google.udmi.util.Common.TIMESTAMP_KEY;
import static com.google.udmi.util.GeneralUtils.copyFields;
import static com.google.udmi.util.GeneralUtils.decodeBase64;
import static com.google.udmi.util.GeneralUtils.deepCopy;
import static com.google.udmi.util.GeneralUtils.encodeBase64;
Expand All @@ -26,8 +25,8 @@

import com.google.bos.udmi.service.messaging.MessageContinuation;
import com.google.bos.udmi.service.messaging.StateUpdate;
import com.google.bos.udmi.service.pod.UdmiServicePod;
import com.google.udmi.util.JsonUtil;
import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -36,7 +35,6 @@
import udmi.schema.Envelope;
import udmi.schema.Envelope.SubFolder;
import udmi.schema.Envelope.SubType;
import udmi.schema.SetupUdmiConfig;
import udmi.schema.UdmiConfig;
import udmi.schema.UdmiState;

Expand All @@ -47,11 +45,6 @@
public class ReflectProcessor extends ProcessorBase {

public static final String PAYLOAD_KEY = "payload";
public static final String HOSTNAME = System.getenv("HOSTNAME");
static final String DEPLOY_FILE = "var/deployed_version.json";
static final SetupUdmiConfig DEPLOYED_CONFIG =
loadFileStrictRequired(SetupUdmiConfig.class, new File(DEPLOY_FILE));
static final String UDMI_VERSION = requireNonNull(ReflectProcessor.DEPLOYED_CONFIG.udmi_version);

@Override
protected void defaultHandler(Object message) {
Expand Down Expand Up @@ -221,17 +214,11 @@ private void reflectStateHandler(Envelope envelope, UdmiState toolState) {
ifNotNullThen(distributor, d -> d.distribute(envelope, toolState));
updateAwareness(envelope, toolState);

UdmiConfig udmiConfig = new UdmiConfig();
udmiConfig.last_state = toolState.timestamp;
udmiConfig.setup = new SetupUdmiConfig();
copyFields(DEPLOYED_CONFIG, udmiConfig.setup, false);
udmiConfig.setup.hostname = HOSTNAME;
udmiConfig.setup.udmi_version = UDMI_VERSION;
udmiConfig.setup.functions_min = FUNCTIONS_VERSION_MIN;
udmiConfig.setup.functions_max = FUNCTIONS_VERSION_MAX;
UdmiConfig udmiConfig = UdmiServicePod.getUdmiConfig(toolState);

Map<String, Object> configMap = new HashMap<>();
configMap.put(SubFolder.UDMI.value(), udmiConfig);
configMap.put(TIMESTAMP_KEY, getTimestamp());
String contents = stringifyTerse(configMap);
debug("Setting reflector config %s %s: %s", registryId, deviceId, contents);
iotAccess.modifyConfig(registryId, deviceId, previous -> contents);
Expand Down Expand Up @@ -265,7 +252,7 @@ private void updateRegistryRegions(Map<String, String> regions) {

@Override
public void activate() {
debug("Deployment configuration: " + stringifyTerse(DEPLOYED_CONFIG));
debug("Deployment configuration: " + stringifyTerse(UdmiServicePod.getDeployedConfig()));
super.activate();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.google.bos.udmi.service.core;

import static com.google.bos.udmi.service.core.ReflectProcessor.UDMI_VERSION;
import static com.google.bos.udmi.service.pod.UdmiServicePod.UDMI_VERSION;
import static com.google.udmi.util.Common.TIMESTAMP_KEY;
import static com.google.udmi.util.Common.VERSION_KEY;
import static com.google.udmi.util.GeneralUtils.ifNotNullThen;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.google.bos.udmi.service.messaging.impl;

import static com.google.bos.udmi.service.pod.UdmiServicePod.HOSTNAME;
import static com.google.udmi.util.Common.SUBFOLDER_PROPERTY_KEY;
import static com.google.udmi.util.GeneralUtils.deepCopy;
import static com.google.udmi.util.GeneralUtils.friendlyStackTrace;
Expand All @@ -18,8 +19,10 @@

import com.google.bos.udmi.service.messaging.MessagePipe;
import com.google.bos.udmi.service.pod.ContainerBase;
import com.google.bos.udmi.service.pod.UdmiServicePod;
import java.time.Duration;
import java.time.Instant;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -37,6 +40,7 @@
import udmi.schema.Envelope;
import udmi.schema.Envelope.SubFolder;
import udmi.schema.Envelope.SubType;
import udmi.schema.UdmiConfig;

/**
* Base class for supporting a variety of messaging interfaces.
Expand Down Expand Up @@ -82,6 +86,15 @@ protected Bundle makeExceptionBundle(Envelope envelope, Exception exception) {
return bundle;
}

protected Bundle makeHelloBundle() {
UdmiConfig udmiConfig = UdmiServicePod.getUdmiConfig(null);
Bundle bundle = new Bundle(udmiConfig);
bundle.envelope.subType = SubType.CONFIG;
bundle.envelope.subFolder = SubFolder.UDMI;
bundle.envelope.publishTime = new Date();
return bundle;
}

protected void pushQueueEntry(BlockingQueue<QueueEntry> queue, String stringBundle) {
try {
requireNonNull(stringBundle, "missing queue bundle");
Expand Down Expand Up @@ -191,7 +204,6 @@ private void messageLoop(String id) {
debug("Processing waited %ds on message loop %s", waiting, id);
if (bundle.message.equals(TERMINATE_MARKER)) {
info("Terminating message loop %s", id);
terminateHandlers();
return;
}
envelope = bundle.envelope;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import static com.google.udmi.util.Common.SUBFOLDER_PROPERTY_KEY;
import static com.google.udmi.util.Common.SUBTYPE_PROPERTY_KEY;
import static com.google.udmi.util.GeneralUtils.friendlyStackTrace;
import static com.google.udmi.util.GeneralUtils.ifNotNullGet;
import static com.google.udmi.util.GeneralUtils.ifNotNullThen;
import static com.google.udmi.util.GeneralUtils.ifNullThen;
import static com.google.udmi.util.JsonUtil.stringify;
import static com.google.udmi.util.JsonUtil.toMap;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiService.Listener;
Expand All @@ -21,22 +22,23 @@
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PublisherGrpc;
import com.google.pubsub.v1.PubsubMessage;
import com.google.udmi.util.Common;
import com.google.udmi.util.GeneralUtils;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
Expand All @@ -63,6 +65,7 @@ public PubSubPipe(EndpointConfiguration configuration) {
projectId = variableSubstitution(configuration.hostname,
"no project id defined in configuration as 'hostname'");
publisher = ifNotNullGet(configuration.send_id, this::getPublisher);
ifNotNullThen(publisher, this::checkPublisher);
subscriber = ifNotNullGet(configuration.recv_id, this::getSubscriber);
String subscriptionName = ifNotNullGet(subscriber, Subscriber::getSubscriptionNameString);
String topicName = ifNotNullGet(publisher, Publisher::getTopicNameString);
Expand All @@ -72,6 +75,10 @@ public PubSubPipe(EndpointConfiguration configuration) {
}
}

private void checkPublisher() {
publish(makeHelloBundle());
}

public static MessagePipe fromConfig(EndpointConfiguration configuration) {
return new PubSubPipe(configuration);
}
Expand Down Expand Up @@ -103,6 +110,12 @@ public void activate(Consumer<Bundle> bundleConsumer) {
subscriber.startAsync();
}

@Override
public void shutdown() {
subscriber.stopAsync().awaitTerminated();
super.shutdown();
}

@Override
public void publish(Bundle bundle) {
try {
Expand Down Expand Up @@ -153,29 +166,38 @@ Publisher getPublisher(String topicName) {
info(format("Publisher %s:%s", Optional.ofNullable(emu).orElse(GCP_HOST), projectTopicName));
return builder.build();
} catch (Exception e) {
throw new RuntimeException("While creating emulator publisher", e);
throw new RuntimeException("While creating publisher", e);
}
}

Subscriber getSubscriber(String subName) {
try {
ProjectSubscriptionName subscription = ProjectSubscriptionName.of(projectId, subName);
Subscriber.Builder builder = Subscriber.newBuilder(subscription, this);
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subName);
Subscriber.Builder builder = Subscriber.newBuilder(subscriptionName, this);
String emu = getEmulatorHost();
ifNullThen(emu, () -> checkSubscription(subscriptionName));
ifNotNullThen(emu, host -> builder.setChannelProvider(getTransportChannelProvider(host)));
ifNotNullThen(emu, host -> builder.setCredentialsProvider(NoCredentialsProvider.create()));
info(format("Subscriber %s:%s", Optional.ofNullable(emu).orElse(GCP_HOST), subscription));
builder.setParallelPullCount(EXECUTION_THREADS);
Subscriber built = builder.build();
info(format("Subscriber %s:%s", Optional.ofNullable(emu).orElse(GCP_HOST), subscriptionName));
built.addListener(new Listener() {
@Override
public void failed(State from, Throwable failure) {
debug("Subscriber state %s: %s", from, GeneralUtils.stackTraceString(failure));
debug("Subscriber state %s: %s", from, friendlyStackTrace(failure));
}
}, Executors.newSingleThreadExecutor());
return built;
} catch (Exception e) {
throw new RuntimeException("While creating emulator subscriber", e);
throw new RuntimeException("While creating subscriber", e);
}
}

private static void checkSubscription(ProjectSubscriptionName subscriptionName) {
try (SubscriptionAdminClient client = SubscriptionAdminClient.create()) {
client.getSubscription(subscriptionName).getAckDeadlineSeconds();
} catch (Exception e) {
throw new RuntimeException("Checking subscription " + subscriptionName, e);
}
}

Expand Down
Loading