Skip to content

Commit

Permalink
Squash commit
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu committed Dec 9, 2022
1 parent e90e149 commit be35c2a
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 8 deletions.
4 changes: 2 additions & 2 deletions docs/specs/sequences/generated.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ Push endpoint config message to device that results in a connection error.

Push endpoint config message to device that results in successful reconnect to the same endpoint.

1. Update config before blobset entry config status is success:
1. Update config before blobset phase is FINAL and stateStatus is null:
* Add `blobset` = { "blobs": { "_iot_endpoint_config": { "phase": `final`, "content_type": `application/json`, "base64": `endpoint_base64_payload`, "nonce": `endpoint_nonce` } } }
1. Wait for blobset entry config status is success
1. Wait for blobset phase is FINAL and stateStatus is null

## extra_config

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@

Push endpoint config message to device that results in successful reconnect to the same endpoint.

1. Update config before blobset entry config status is success:
1. Update config before blobset phase is FINAL and stateStatus is null:
* Add `blobset` = { "blobs": { "_iot_endpoint_config": { "phase": `final`, "content_type": `application/json`, "base64": `endpoint_base64_payload`, "nonce": `endpoint_nonce` } } }
1. Wait for blobset entry config status is success
1. Wait for blobset phase is FINAL and stateStatus is null
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ public class SequenceBase {
private static File deviceOutputDir;
private static File resultSummary;
private static MessagePublisher client;
private static SequenceBase activeInstance;
private static MessageBundle stashedBundle;
private static Date stateTimestamp;

private final Map<SubFolder, String> sentConfig = new HashMap<>();
Expand Down Expand Up @@ -780,11 +782,34 @@ private void processMessage() {
}
}

private MessageBundle nextMessageBundle() {
if (!client.isActive()) {
throw new RuntimeException("Trying to receive message from inactive client");
/**
* Thread-safe way to get a message. Tests are run in different threads, and if one blocks it
* might end up trying to take a message while another thread is still looping. This prevents that
* by checking that the calling test is still active, and then if not, saves the message for the
* next round and interrupts the current thread.
*
* @return message bundle
*/
MessageBundle nextMessageBundle() {
synchronized (SequenceBase.class) {
if (stashedBundle != null) {
debug("using stashed message bundle");
MessageBundle bundle = stashedBundle;
stashedBundle = null;
return bundle;
}
if (!client.isActive()) {
throw new RuntimeException("Trying to receive message from inactive client");
}
MessageBundle bundle = client.takeNextMessage();
if (activeInstance != this) {
debug("stashing interrupted message bundle");
assert stashedBundle == null;
stashedBundle = bundle;
throw new RuntimeException("Message loop no longer for active thread");
}
return bundle;
}
return client.takeNextMessage();
}

private void processConfig(Map<String, Object> message, Map<String, String> attributes) {
Expand Down Expand Up @@ -1039,6 +1064,7 @@ protected void starting(org.junit.runner.Description description) {
writeSequenceMdHeader();

notice("starting test " + testName);
activeInstance = SequenceBase.this;
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("While preparing " + deviceOutputDir.getAbsolutePath(), e);
Expand All @@ -1059,6 +1085,7 @@ protected void finished(org.junit.runner.Description description) {
systemLog.close();
sequencerLog.close();
sequenceMd.close();
activeInstance = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.google.daq.mqtt.sequencer;

import static org.junit.Assert.assertEquals;

import com.google.bos.iot.core.proxy.MockPublisher;
import com.google.daq.mqtt.TestCommon;
import com.google.daq.mqtt.validator.Validator.MessageBundle;
import com.google.udmi.util.JsonUtil;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.Description;
import udmi.schema.State;

/**
* Unit tests for the SequenceBaseTest class.
*/
public class SequenceBaseTest {

private static final String TEST_TOPIC = "mock/topic";

/**
* Reset the state of the underlying infrastructure for each test.
*/
@Before
public void resetForTest() {
SequenceBase.resetState();
SequenceRunner.executionConfiguration = TestCommon.testConfiguration();
SequenceRunner.executionConfiguration.device_id = TestCommon.DEVICE_ID;
}

@Test
public void messageInterrupted() {
final SequenceBase base1 = new SequenceBase();
base1.testWatcher.starting(makeTestDescription("test_one"));

final MockPublisher mockClient = SequenceBase.getMockClient(false);

State stateMessage = new State();
stateMessage.version = "hello";
mockClient.publish(TestCommon.DEVICE_ID, TEST_TOPIC, JsonUtil.stringify(stateMessage));
stateMessage.version = "world";
mockClient.publish(TestCommon.DEVICE_ID, TEST_TOPIC, JsonUtil.stringify(stateMessage));

MessageBundle bundle1 = base1.nextMessageBundle();
assertEquals("first message contents", "hello", bundle1.message.get("version"));

final SequenceBase base2 = new SequenceBase();
base2.testWatcher.starting(makeTestDescription("test_two"));
try {
base1.nextMessageBundle();
assert false;
} catch (RuntimeException e) {
// This is expected, but then also preserve the message for the next call.
}
MessageBundle bundle2 = base2.nextMessageBundle();
assertEquals("second message contents", "world", bundle2.message.get("version"));
}

private Description makeTestDescription(String testName) {
return Description.createTestDescription(SequenceBase.class, testName);
}
}

0 comments on commit be35c2a

Please sign in to comment.