Skip to content

Commit

Permalink
Incorporate LocalnetManager and fix ip route parsing (#769)
Browse files Browse the repository at this point in the history
  • Loading branch information
grafnu committed Nov 18, 2023
1 parent 8772e26 commit 975ccf5
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 116 deletions.
24 changes: 15 additions & 9 deletions pubber/src/main/java/daq/pubber/DeviceManager.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package daq.pubber;

import java.util.Map;
import udmi.schema.Config;
import udmi.schema.DevicePersistent;
import udmi.schema.Entry;
import udmi.schema.FamilyDiscoveryEvent;
import udmi.schema.Level;
import udmi.schema.Metadata;
import udmi.schema.Operation.SystemMode;
Expand All @@ -13,8 +15,10 @@
*/
public class DeviceManager extends ManagerBase {

private PointsetManager pointsetManager;
private SystemManager systemManager;
private final PointsetManager pointsetManager;
private final SystemManager systemManager;
private final LocalnetManager localnetManager;


/**
* Create a new instance.
Expand All @@ -23,6 +27,7 @@ public DeviceManager(ManagerHost host, PubberConfiguration configuration) {
super(host, configuration);
systemManager = new SystemManager(host, configuration);
pointsetManager = new PointsetManager(host, configuration);
localnetManager = new LocalnetManager(host, configuration);
}

public void setPersistentData(DevicePersistent persistentData) {
Expand All @@ -34,13 +39,6 @@ public void setMetadata(Metadata metadata) {
systemManager.setMetadata(metadata);
}

@Override
public void cancelPeriodicSend() {
super.cancelPeriodicSend();
pointsetManager.cancelPeriodicSend();
systemManager.cancelPeriodicSend();
}

public void systemLifecycle(SystemMode mode) {
systemManager.systemLifecycle(mode);
}
Expand Down Expand Up @@ -74,8 +72,16 @@ public void cloudLog(String message, Level level, String detail) {
systemManager.cloudLog(message, level, detail);
}

/**
* Shutdown everything, including sub-managers.
*/
public void shutdown() {
systemManager.shutdown();
pointsetManager.shutdown();
localnetManager.shutdown();
}

public Map<String, FamilyDiscoveryEvent> enumerateFamilies() {
return localnetManager.enumerateFamilies();
}
}
141 changes: 85 additions & 56 deletions pubber/src/main/java/daq/pubber/LocalnetManager.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package daq.pubber;

import static com.google.udmi.util.GeneralUtils.friendlyStackTrace;
import static com.google.udmi.util.GeneralUtils.runtimeExec;
import static java.util.Optional.ofNullable;
import static java.util.stream.Collectors.toMap;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -17,11 +19,12 @@
import udmi.schema.FamilyDiscoveryEvent;
import udmi.schema.FamilyLocalnetState;
import udmi.schema.LocalnetState;
import udmi.schema.PubberConfiguration;

/**
* Container class for dealing with the localnet subblock of UDMI.
*/
public class LocalnetManager {
public class LocalnetManager extends ManagerBase {

public static final int DEFAULT_METRIC = 0;
private static final List<Pattern> familyPatterns = ImmutableList.of(
Expand All @@ -34,20 +37,68 @@ public class LocalnetManager {
"inet", "ipv4",
"inet6", "ipv6"
);
private final Pubber parent;
private final LocalnetState localnetState;

/**
* Create a new container with the given Pubber parent.
*
* @param parent parent pubber class
* Create a new container with the given host.
*/
public LocalnetManager(Pubber parent) {
this.parent = parent;
parent.deviceState.localnet = new LocalnetState();
public LocalnetManager(ManagerHost host, PubberConfiguration configuration) {
super(host, configuration);
localnetState = new LocalnetState();
populateInterfaceAddresses();
host.update(localnetState);
}

@VisibleForTesting
static String getDefaultInterfaceStatic(List<String> routeLines) {
AtomicReference<String> currentInterface = new AtomicReference<>();
AtomicInteger currentMaxMetric = new AtomicInteger(Integer.MAX_VALUE);
routeLines.forEach(line -> {
try {
String[] parts = line.split(" ", 13);
int baseIndex = parts[0].equals("none") ? 1 : 0;
if (parts[baseIndex].equals("default")) {
int metric = parts.length < (baseIndex + 11) ? DEFAULT_METRIC
: Integer.parseInt(parts[baseIndex + 10]);
if (metric < currentMaxMetric.get()) {
currentMaxMetric.set(metric);
currentInterface.set(parts[baseIndex + 4]);
}
}
} catch (Exception e) {
throw new RuntimeException("While processing ip route line: " + line, e);
}
});
return currentInterface.get();
}

@VisibleForTesting
static Map<String, String> getInterfaceAddressesStatic(List<String> strings) {
Map<String, String> interfaceMap = new HashMap<>();
strings.forEach(line -> {
for (Pattern pattern : familyPatterns) {
Matcher matcher = pattern.matcher(line);
if (matcher.matches()) {
interfaceMap.put(ifaceMap.get(matcher.group(1)), matcher.group(2));
}
}
});
return interfaceMap;
}

private String getDefaultInterface() {
final List<String> routeLines;
try {
populateInterfaceAddresses();
routeLines = runtimeExec("ip", "route");
} catch (Exception e) {
throw new RuntimeException("While populating interface addresses", e);
error("Could not execute ip route command: " + friendlyStackTrace(e));
return null;
}
try {
return getDefaultInterfaceStatic(routeLines);
} catch (Exception e) {
error("Could not infer default interface: " + friendlyStackTrace(e));
return null;
}
}

Expand All @@ -74,10 +125,11 @@ public LocalnetManager(Pubber parent) {
* </pre>
*/
private void populateInterfaceAddresses() {
parent.deviceState.localnet.families = new HashMap<>();
localnetState.families = new HashMap<>();
String defaultInterface = getDefaultInterface();
parent.info("Using addresses from default interface " + defaultInterface);
Map<String, String> interfaceAddresses = getInterfaceAddresses(defaultInterface);
info("Using addresses from default interface " + defaultInterface);
Map<String, String> interfaceAddresses = ofNullable(
getInterfaceAddresses(defaultInterface)).orElse(ImmutableMap.of());
interfaceAddresses.entrySet().forEach(this::addStateMapEntry);
HashMap<String, FamilyLocalnetState> stateMap = new HashMap<>();
}
Expand All @@ -86,61 +138,38 @@ private void addStateMapEntry(Entry<String, String> entry) {
FamilyLocalnetState stateEntry = new FamilyLocalnetState();
stateEntry.addr = entry.getValue();
String family = entry.getKey();
parent.info("Family " + family + " address is " + stateEntry.addr);
parent.deviceState.localnet.families.put(family, stateEntry);
}

private String getDefaultInterface() {
return getDefaultInterface(runtimeExec("ip", "route"));
}

@VisibleForTesting
static String getDefaultInterface(List<String> routeLines) {
AtomicReference<String> currentInterface = new AtomicReference<>();
AtomicInteger currentMaxMetric = new AtomicInteger(Integer.MAX_VALUE);
routeLines.forEach(line -> {
try {
String[] parts = line.split(" ", 12);
if (parts[0].equals("default")) {
int metric = parts.length < 11 ? DEFAULT_METRIC : Integer.parseInt(parts[10]);
if (metric < currentMaxMetric.get()) {
currentMaxMetric.set(metric);
currentInterface.set(parts[4]);
}
}
} catch (Exception e) {
throw new RuntimeException("While processing ip route line: " + line, e);
}
});
return currentInterface.get();
info("Family " + family + " address is " + stateEntry.addr);
localnetState.families.put(family, stateEntry);
}

private Map<String, String> getInterfaceAddresses(String defaultInterface) {
return getInterfaceAddresses(runtimeExec("ip", "addr", "show", "dev", defaultInterface));
}
if (defaultInterface == null) {
return null;
}
final List<String> strings;
try {
strings = runtimeExec("ip", "addr", "show", "dev", defaultInterface);
} catch (Exception e) {
error("Could not execute ip addr command: " + friendlyStackTrace(e));
return null;
}

@VisibleForTesting
static Map<String, String> getInterfaceAddresses(List<String> strings) {
Map<String, String> interfaceMap = new HashMap<>();
strings.forEach(line -> {
for (Pattern pattern : familyPatterns) {
Matcher matcher = pattern.matcher(line);
if (matcher.matches()) {
interfaceMap.put(ifaceMap.get(matcher.group(1)), matcher.group(2));
}
}
});
return interfaceMap;
try {
return getInterfaceAddressesStatic(strings);
} catch (Exception e) {
error("Could not infer interface addresses: " + friendlyStackTrace(e));
return null;
}
}

Map<String, FamilyDiscoveryEvent> enumerateFamilies() {
return parent.deviceState.localnet.families.keySet().stream()
return localnetState.families.keySet().stream()
.collect(toMap(key -> key, this::makeFamilyDiscoveryEvent));
}

private FamilyDiscoveryEvent makeFamilyDiscoveryEvent(String key) {
FamilyDiscoveryEvent familyDiscoveryEvent = new FamilyDiscoveryEvent();
familyDiscoveryEvent.addr = parent.deviceState.localnet.families.get(key).addr;
familyDiscoveryEvent.addr = localnetState.families.get(key).addr;
return familyDiscoveryEvent;
}
}
42 changes: 23 additions & 19 deletions pubber/src/main/java/daq/pubber/Pubber.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import udmi.schema.FamilyDiscoveryState;
import udmi.schema.FamilyLocalnetModel;
import udmi.schema.Level;
import udmi.schema.LocalnetState;
import udmi.schema.Metadata;
import udmi.schema.Operation.SystemMode;
import udmi.schema.PointEnumerationEvent;
Expand Down Expand Up @@ -173,7 +174,6 @@ public class Pubber extends ManagerBase implements ManagerHost {
private EndpointConfiguration extractedEndpoint;
private SiteModel siteModel;
private MqttDevice gatewayTarget;
private LocalnetManager localnetManager;
private SchemaVersion targetSchema;
private int deviceUpdateCount = -1;
private DeviceManager deviceManager;
Expand All @@ -194,16 +194,6 @@ public Pubber(String configPath) {
outDir = new File(PUBBER_OUT);
}

private static PubberConfiguration loadConfiguration(String configPath) {
File configFile = new File(configPath);
try {
configuration = sanitizeConfiguration(fromJsonFile(configFile, PubberConfiguration.class));
return configuration;
} catch (Exception e) {
throw new RuntimeException("While configuring from " + configFile.getAbsolutePath(), e);
}
}

/**
* Start an instance from explicit args.
*
Expand All @@ -221,6 +211,16 @@ public Pubber(String iotProject, String sitePath, String deviceId, String serial
}
}

private static PubberConfiguration loadConfiguration(String configPath) {
File configFile = new File(configPath);
try {
configuration = sanitizeConfiguration(fromJsonFile(configFile, PubberConfiguration.class));
return configuration;
} catch (Exception e) {
throw new RuntimeException("While configuring from " + configFile.getAbsolutePath(), e);
}
}

private static PubberConfiguration makeExplicitConfiguration(String iotProject, String sitePath,
String deviceId, String serialNo) {
configuration = new PubberConfiguration();
Expand Down Expand Up @@ -389,8 +389,6 @@ private void initializeDevice() {
configuration.deviceId, configuration.serialNo, configuration.macAddr,
configuration.gatewayId, optionsString(configuration.options)));

localnetManager = new LocalnetManager(this);

markStateDirty();
}

Expand Down Expand Up @@ -493,10 +491,12 @@ public void update(Object update) {
}
if (checkTarget == this) {
publishSynchronousState();
} else if (checkTarget instanceof PointsetState) {
deviceState.pointset = (PointsetState) checkValue;
} else if (checkTarget instanceof SystemState) {
deviceState.system = (SystemState) checkValue;
} else if (checkTarget instanceof PointsetState) {
deviceState.pointset = (PointsetState) checkValue;
} else if (checkTarget instanceof LocalnetState) {
deviceState.localnet = (LocalnetState) checkValue;
} else {
throw new RuntimeException(
"Unrecognized update type " + checkTarget.getClass().getSimpleName());
Expand Down Expand Up @@ -707,7 +707,7 @@ public void shutdown() {
}

super.shutdown();
captureExceptions("device manager shutdown", deviceManager::shutdown);
ifNotNullThen(deviceManager, dm -> captureExceptions("device manager shutdown", dm::shutdown));
captureExceptions("publishing shutdown state", this::publishSynchronousState);
captureExceptions("disconnecting mqtt", this::disconnectMqtt);
}
Expand Down Expand Up @@ -1100,7 +1100,7 @@ private void updateDiscoveryEnumeration(DiscoveryConfig config) {
Enumerate enumerate = config.enumerate;
discoveryEvent.uniqs = ifTrue(enumerate.uniqs, () -> enumeratePoints(configuration.deviceId));
discoveryEvent.features = ifTrue(enumerate.features, SupportedFeatures::getFeatures);
discoveryEvent.families = ifTrue(enumerate.families, () -> localnetManager.enumerateFamilies());
discoveryEvent.families = ifTrue(enumerate.families, () -> deviceManager.enumerateFamilies());
publishDeviceMessage(discoveryEvent);
}

Expand Down Expand Up @@ -1464,11 +1464,15 @@ private String traceTimestamp(String messageBase) {
}

private void cloudLog(String message, Level level) {
deviceManager.cloudLog(message, level, null);
cloudLog(message, level, null);
}

private void cloudLog(String message, Level level, String detail) {
deviceManager.cloudLog(message, level, detail);
if (deviceManager != null) {
deviceManager.cloudLog(message, level, detail);
} else {
SystemManager.localLog(message, level, getTimestamp(), detail);
}
}

private void trace(String message) {
Expand Down
Loading

0 comments on commit 975ccf5

Please sign in to comment.