Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blocking/interrupt mechanism for sequence tests that timeout. #526

Merged
merged 1 commit into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/specs/sequences/generated.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ Push endpoint config message to device that results in a connection error.

Push endpoint config message to device that results in successful reconnect to the same endpoint.

1. Update config before blobset entry config status is success:
1. Update config before blobset phase is FINAL and stateStatus is null:
* Add `blobset` = { "blobs": { "_iot_endpoint_config": { "phase": `final`, "content_type": `application/json`, "base64": `endpoint_base64_payload`, "nonce": `endpoint_nonce` } } }
1. Wait for blobset entry config status is success
1. Wait for blobset phase is FINAL and stateStatus is null

## extra_config

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@

Push endpoint config message to device that results in successful reconnect to the same endpoint.

1. Update config before blobset entry config status is success:
1. Update config before blobset phase is FINAL and stateStatus is null:
* Add `blobset` = { "blobs": { "_iot_endpoint_config": { "phase": `final`, "content_type": `application/json`, "base64": `endpoint_base64_payload`, "nonce": `endpoint_nonce` } } }
1. Wait for blobset entry config status is success
1. Wait for blobset phase is FINAL and stateStatus is null
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ public class SequenceBase {
private static File deviceOutputDir;
private static File resultSummary;
private static MessagePublisher client;
private static SequenceBase activeInstance;
private static MessageBundle stashedBundle;
private static Date stateTimestamp;

private final Map<SubFolder, String> sentConfig = new HashMap<>();
Expand Down Expand Up @@ -780,11 +782,34 @@ private void processMessage() {
}
}

private MessageBundle nextMessageBundle() {
if (!client.isActive()) {
throw new RuntimeException("Trying to receive message from inactive client");
/**
* Thread-safe way to get a message. Tests are run in different threads, and if one blocks it
* might end up trying to take a message while another thread is still looping. This prevents that
* by checking that the calling test is still active, and then if not, saves the message for the
* next round and interrupts the current thread.
*
* @return message bundle
*/
MessageBundle nextMessageBundle() {
synchronized (SequenceBase.class) {
if (stashedBundle != null) {
debug("using stashed message bundle");
MessageBundle bundle = stashedBundle;
stashedBundle = null;
return bundle;
}
if (!client.isActive()) {
throw new RuntimeException("Trying to receive message from inactive client");
}
MessageBundle bundle = client.takeNextMessage();
if (activeInstance != this) {
debug("stashing interrupted message bundle");
assert stashedBundle == null;
stashedBundle = bundle;
throw new RuntimeException("Message loop no longer for active thread");
}
return bundle;
}
return client.takeNextMessage();
}

private void processConfig(Map<String, Object> message, Map<String, String> attributes) {
Expand Down Expand Up @@ -1039,6 +1064,7 @@ protected void starting(org.junit.runner.Description description) {
writeSequenceMdHeader();

notice("starting test " + testName);
activeInstance = SequenceBase.this;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("While preparing " + deviceOutputDir.getAbsolutePath(), e);
Expand All @@ -1059,6 +1085,7 @@ protected void finished(org.junit.runner.Description description) {
systemLog.close();
sequencerLog.close();
sequenceMd.close();
activeInstance = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.google.daq.mqtt.sequencer;

import static org.junit.Assert.assertEquals;

import com.google.bos.iot.core.proxy.MockPublisher;
import com.google.daq.mqtt.TestCommon;
import com.google.daq.mqtt.validator.Validator.MessageBundle;
import com.google.udmi.util.JsonUtil;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.Description;
import udmi.schema.State;

/**
* Unit tests for the SequenceBaseTest class.
*/
public class SequenceBaseTest {

private static final String TEST_TOPIC = "mock/topic";

/**
* Reset the state of the underlying infrastructure for each test.
*/
@Before
public void resetForTest() {
SequenceBase.resetState();
SequenceRunner.executionConfiguration = TestCommon.testConfiguration();
SequenceRunner.executionConfiguration.device_id = TestCommon.DEVICE_ID;
}

@Test
public void messageInterrupted() {
final SequenceBase base1 = new SequenceBase();
base1.testWatcher.starting(makeTestDescription("test_one"));

final MockPublisher mockClient = SequenceBase.getMockClient(false);

State stateMessage = new State();
stateMessage.version = "hello";
mockClient.publish(TestCommon.DEVICE_ID, TEST_TOPIC, JsonUtil.stringify(stateMessage));
stateMessage.version = "world";
mockClient.publish(TestCommon.DEVICE_ID, TEST_TOPIC, JsonUtil.stringify(stateMessage));

MessageBundle bundle1 = base1.nextMessageBundle();
assertEquals("first message contents", "hello", bundle1.message.get("version"));

final SequenceBase base2 = new SequenceBase();
base2.testWatcher.starting(makeTestDescription("test_two"));
try {
base1.nextMessageBundle();
assert false;
} catch (RuntimeException e) {
// This is expected, but then also preserve the message for the next call.
}
MessageBundle bundle2 = base2.nextMessageBundle();
assertEquals("second message contents", "world", bundle2.message.get("version"));
}

private Description makeTestDescription(String testName) {
return Description.createTestDescription(SequenceBase.class, testName);
}
}