Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send out model and config messages through registrar #293

Merged
merged 5 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions validator/.idea/runConfigurations/Registrar.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion validator/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ sourceSets {

checkstyle {
ignoreFailures = false
maxWarnings = 61
maxWarnings = 48
}
checkstyleMain.source = 'src/main/java'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.FileInputStream;
import java.io.FilenameFilter;
import java.io.InputStream;
import java.lang.reflect.Field;
import java.math.BigInteger;
import java.net.URI;
import java.time.Duration;
Expand All @@ -49,6 +50,7 @@
import java.util.stream.Collectors;
import udmi.schema.Config;
import udmi.schema.Envelope.SubFolder;
import udmi.schema.Metadata;

public class Registrar {

Expand All @@ -75,12 +77,14 @@ public class Registrar {
private static final String SCHEMA_NAME = "UDMI";
private static final String SWARM_SUBFOLDER = "swarm";
private static final long PROCESSING_TIMEOUT_MIN = 60;
private static final String CONFIG_SUB_TYPE = "config";
private static final String MODEL_SUB_TYPE = "model";
private final Map<String, JsonSchema> schemas = new HashMap<>();
private final String generation = getGenerationString();
private CloudIotManager cloudIotManager;
private File siteDir;
private File schemaBase;
private PubSubPusher configPusher;
private PubSubPusher updatePusher;
private PubSubPusher feedPusher;
private Map<String, LocalDevice> localDevices;
private File summaryFile;
Expand Down Expand Up @@ -130,9 +134,6 @@ private static boolean processArgs(List<String> argList, Registrar registrar) {
case "-s":
registrar.setSitePath(argList.remove(0));
break;
case "-c":
registrar.setConfigTopic(argList.remove(0));
break;
case "-f":
registrar.setFeedTopic(argList.remove(0));
break;
Expand Down Expand Up @@ -173,11 +174,6 @@ private void setFeedTopic(String feedTopic) {
}
}

private void setConfigTopic(String configTopic) {
System.err.println("Sending updates to config topic " + configTopic);
configPusher = new PubSubPusher(projectId, configTopic);
}

private void writeErrors() throws Exception {
Map<String, Map<String, String>> errorSummary = new TreeMap<>();
DeviceExceptionManager dem = new DeviceExceptionManager(siteDir);
Expand Down Expand Up @@ -234,6 +230,10 @@ private void initializeCloudProject() {
cloudIotManager.getProjectId(),
cloudIotManager.getCloudRegion(),
cloudIotManager.getRegistryId()));

if (cloudIotManager.getUpdateTopic() != null) {
updatePusher = new PubSubPusher(projectId, cloudIotManager.getUpdateTopic());
}
}

private String getGenerationString() {
Expand Down Expand Up @@ -312,7 +312,7 @@ private void processLocalDevice(String localName, AtomicInteger processedDeviceC
localDevice.writeConfigFile();
if (cloudDevices != null) {
updateCloudIoT(localName, localDevice);
sendConfigMessages(localDevice);
sendUpdateMessages(localDevice);
if (cloudDevices.contains(localName)) {
sendFeedMessage(localDevice);
}
Expand Down Expand Up @@ -434,31 +434,47 @@ private Device fetchDevice(String localName) {
}
}

private void sendConfigMessages(LocalDevice localDevice) {
if (configPusher == null) {
return;
private void sendUpdateMessages(LocalDevice localDevice) {
if (updatePusher != null) {
System.err.println("Sending model/config update for " + localDevice.getDeviceId());
sendUpdateMessage(localDevice, SubFolder.SYSTEM);
sendUpdateMessage(localDevice, SubFolder.POINTSET);
sendUpdateMessage(localDevice, SubFolder.GATEWAY);
sendUpdateMessage(localDevice, SubFolder.LOCALNET);
}
System.err.println("Sending config messages for " + localDevice.getDeviceId());
}

Config deviceConfig = localDevice.deviceConfigObject();
sendConfigMessage(localDevice, SubFolder.SYSTEM, deviceConfig.system);
sendConfigMessage(localDevice, SubFolder.POINTSET, deviceConfig.pointset);
sendConfigMessage(localDevice, SubFolder.GATEWAY, deviceConfig.gateway);
sendConfigMessage(localDevice, SubFolder.LOCALNET, deviceConfig.localnet);
private void sendUpdateMessage(LocalDevice localDevice, SubFolder subFolder) {
sendUpdateMessage(localDevice, MODEL_SUB_TYPE, subFolder, localDevice.getMetadata());
sendUpdateMessage(localDevice, CONFIG_SUB_TYPE, subFolder, localDevice.deviceConfigObject());
}

private void sendConfigMessage(LocalDevice localDevice, SubFolder subFolder, Object subConfig) {
private void sendUpdateMessage(LocalDevice localDevice, String subType, SubFolder subfolder, Object target) {
String fieldName = subfolder.toString().toLowerCase();
try {
Field declaredField = target.getClass().getDeclaredField(fieldName);
sendSubMessage(localDevice, subType, subfolder, declaredField.get(target));
} catch (Exception e) {
throw new RuntimeException(String.format("Getting field %s from target %s", fieldName, target.getClass().getSimpleName()));
}
}

private void sendSubMessage(LocalDevice localDevice, String subType, SubFolder subFolder,
Object subConfig) {
try {
Map<String, String> attributes = new HashMap<>();
attributes.put("deviceId", localDevice.getDeviceId());
attributes.put("deviceNumId", localDevice.getDeviceNumId());
attributes.put("deviceRegistryId", cloudIotManager.getRegistryId());
attributes.put("projectId", cloudIotManager.getProjectId());
attributes.put("subType", subType);
attributes.put("subFolder", subFolder.value());
String messageString = OBJECT_MAPPER.writeValueAsString(subConfig);
configPusher.sendMessage(attributes, messageString);
} catch (JsonProcessingException e) {
throw new RuntimeException("While sending config " + subFolder, e);
updatePusher.sendMessage(attributes, messageString);
} catch (Exception e) {
throw new RuntimeException(
String.format("Sending %s/%s messages for %s%n", subType, subFolder,
localDevice.getDeviceId()), e);
}
}

Expand All @@ -483,8 +499,8 @@ private void bindGatewayDevices(Map<String, LocalDevice> localDevices, Set<Strin
}

private void shutdown() {
if (configPusher != null) {
configPusher.shutdown();
if (updatePusher != null) {
updatePusher.shutdown();
}
if (feedPusher != null) {
feedPusher.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import com.google.api.services.cloudiot.v1.model.DeviceCredential;
import java.util.List;

/**
* Bucket of settings to use for a cloud device entry.
*/
public class CloudDeviceSettings {

public List<DeviceCredential> credentials;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public class CloudIotConfig {
public String cloud_region;
public String reflect_region;
public String site_name;
public String update_topic;
public String alt_project;
public String alt_registry;
public boolean block_unknown;
Expand Down
Loading