Skip to content

Commit

Permalink
Rewrite to use dynamic data models
Browse files Browse the repository at this point in the history
  • Loading branch information
hylkevds committed Jul 25, 2023
1 parent 49e9a7a commit 29d3e3b
Show file tree
Hide file tree
Showing 26 changed files with 648 additions and 1,914 deletions.
8 changes: 4 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<mainClass>de.fraunhofer.iosb.ilt.sensorthingsmanager.MainApp</mainClass>

<maven-compiler-plugin.version>3.11.0</maven-compiler-plugin.version>
Expand All @@ -32,7 +32,7 @@
<configurable.version>0.34</configurable.version>
<logback.version>1.4.6</logback.version>
<openjfx.version>19.0.2.1</openjfx.version>
<FROST-Client.version>0.44</FROST-Client.version>
<FROST-Client.version>2.2-SNAPSHOT</FROST-Client.version>
</properties>

<licenses>
Expand Down Expand Up @@ -200,7 +200,7 @@
<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>FROST-Client</artifactId>
<artifactId>FROST-Client-Dynamic</artifactId>
<version>${FROST-Client.version}</version>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package de.fraunhofer.iosb.ilt.sensorthingsmanager;

import de.fraunhofer.iosb.ilt.sta.ServiceFailureException;
import de.fraunhofer.iosb.ilt.frostclient.exception.ServiceFailureException;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,35 @@
*/
package de.fraunhofer.iosb.ilt.sensorthingsmanager.aggregation;

import de.fraunhofer.iosb.ilt.sta.ServiceFailureException;
import de.fraunhofer.iosb.ilt.sta.dao.BaseDao;
import de.fraunhofer.iosb.ilt.sta.model.Datastream;
import de.fraunhofer.iosb.ilt.sta.model.EntityType;
import de.fraunhofer.iosb.ilt.sta.model.Id;
import de.fraunhofer.iosb.ilt.sta.model.MultiDatastream;
import de.fraunhofer.iosb.ilt.sta.model.Observation;
import de.fraunhofer.iosb.ilt.sta.model.Thing;
import de.fraunhofer.iosb.ilt.sta.model.TimeObject;
import de.fraunhofer.iosb.ilt.sta.model.ext.EntityList;
import de.fraunhofer.iosb.ilt.frostclient.SensorThingsService;
import de.fraunhofer.iosb.ilt.frostclient.dao.Dao;
import de.fraunhofer.iosb.ilt.frostclient.exception.ServiceFailureException;
import de.fraunhofer.iosb.ilt.frostclient.model.Entity;
import de.fraunhofer.iosb.ilt.frostclient.model.EntitySet;
import de.fraunhofer.iosb.ilt.frostclient.model.EntityType;
import de.fraunhofer.iosb.ilt.frostclient.model.ext.TimeInterval;
import de.fraunhofer.iosb.ilt.frostclient.model.ext.TimeValue;
import de.fraunhofer.iosb.ilt.frostclient.model.property.NavigationPropertyEntitySet;
import de.fraunhofer.iosb.ilt.frostclient.models.SensorThingsSensingV11;
import de.fraunhofer.iosb.ilt.frostclient.models.ext.MapValue;
import static de.fraunhofer.iosb.ilt.frostclient.utils.ParserUtils.formatKeyValuesForUrl;
import java.time.DateTimeException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import net.time4j.ClockUnit;
import net.time4j.Moment;
import net.time4j.ZonalDateTime;
import net.time4j.tz.TZID;
import net.time4j.tz.Timezone;
import org.apache.commons.lang3.builder.CompareToBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.threeten.extra.Interval;

/**
*
Expand All @@ -51,23 +57,25 @@ public class AggregateCombo implements Comparable<AggregateCombo> {
*/
private static final Logger LOGGER = LoggerFactory.getLogger(AggregateCombo.class);

public final Thing targetThing;
public final MultiDatastream target;
public Datastream sourceDs;
public MultiDatastream sourceMds;
private final SensorThingsService service;
public final Entity targetThing;
public final Entity targetMds;
public Entity sourceDs;
public Entity sourceMds;
public boolean sourceIsAggregate;
/**
* Indicates that observations in the source contain a list of values.
*/
public boolean sourceIsCollection = false;
public AggregationLevel level;
public String baseName;
private ZoneId zoneId;
private Interval currentInterval;
private TZID zoneId;
private TimeInterval currentInterval;

public AggregateCombo(Thing targetThing, MultiDatastream target) {
public AggregateCombo(SensorThingsService service, Entity targetThing, Entity target) {
this.service = service;
this.targetThing = targetThing;
this.target = target;
this.targetMds = target;
}

public boolean hasSource() {
Expand All @@ -76,44 +84,50 @@ public boolean hasSource() {

public EntityType getSourceType() {
if (sourceDs != null) {
return EntityType.DATASTREAM;
return sourceDs.getEntityType();
}
if (sourceMds != null) {
return EntityType.MULTIDATASTREAM;
return sourceMds.getEntityType();
}
return null;
}

public Id getSourceId() {
public Object[] getSourceId() {
if (sourceDs != null) {
return sourceDs.getId();
return sourceDs.getPrimaryKeyValues();
}
if (sourceMds != null) {
return sourceMds.getId();
return sourceMds.getPrimaryKeyValues();
}
return null;
}

public BaseDao<Observation> getObsDaoForSource() {
public Dao getObsDaoForSource() {
final EntityType sourceType = getSourceType();
if (sourceType == null) {
return null;
}
NavigationPropertyEntitySet npes = sourceType.getNavigationPropertySet("Observations");
if (sourceDs != null) {
return sourceDs.observations();
return sourceDs.dao(npes);
}
if (sourceMds != null) {
return sourceMds.observations();
return sourceMds.dao(npes);
}
return null;
}

public Observation getLastForTarget() {
public Entity getLastForTarget() {
try {
return target.observations().query().select("id", "phenomenonTime").orderBy("phenomenonTime desc").first();
NavigationPropertyEntitySet npObs = targetMds.getEntityType().getNavigationPropertySet("Observations");
return targetMds.query(npObs).select("id", "phenomenonTime").orderBy("phenomenonTime desc").first();
} catch (ServiceFailureException ex) {
LOGGER.error("Error fetching last observation.", ex);
return null;
}
}

public Observation getFirstForSource() {
public Entity getFirstForSource() {
try {
if (hasSource()) {
return getObsDaoForSource().query().select("id", "phenomenonTime").orderBy("phenomenonTime asc").first();
Expand All @@ -125,7 +139,7 @@ public Observation getFirstForSource() {
}
}

public Observation getLastForSource() {
public Entity getLastForSource() {
try {
if (hasSource()) {
return getObsDaoForSource().query().select("id", "phenomenonTime").orderBy("phenomenonTime desc").first();
Expand All @@ -137,15 +151,14 @@ public Observation getLastForSource() {
}
}

public List<Observation> getObservationsForSource(Instant start, Instant end) {
List<Observation> result = new ArrayList<>();
public List<Entity> getObservationsForSource(Instant start, Instant end) {
List<Entity> result = new ArrayList<>();
if (hasSource()) {
try {
StringBuilder filter = new StringBuilder();
filter.append("overlaps(phenomenonTime,").append(start.toString()).append("/").append(end.toString()).append(")");
EntityList<Observation> entityList = getObsDaoForSource().query().filter(filter.toString()).orderBy("phenomenonTime asc").top(1000).list();
for (Iterator<Observation> it = entityList.fullIterator(); it.hasNext();) {
Observation entity = it.next();
EntitySet entityList = getObsDaoForSource().query().filter(filter.toString()).orderBy("phenomenonTime asc").top(1000).list();
for (Entity entity : entityList) {
result.add(entity);
}
} catch (ServiceFailureException ex) {
Expand All @@ -155,15 +168,15 @@ public List<Observation> getObservationsForSource(Instant start, Instant end) {
return result;
}

public void resolveZoneId(ZoneId dflt) {
public void resolveZoneId(TZID dflt) {
if (zoneId == null) {
Map<String, Object> properties = targetThing.getProperties();
MapValue properties = targetThing.getProperty(SensorThingsSensingV11.EP_PROPERTIES);
Object zoneName = properties.get("timeZone");
if (zoneName == null || zoneName.toString().isEmpty()) {
zoneId = dflt;
} else {
try {
zoneId = ZoneId.of(zoneName.toString());
zoneId = Timezone.normalize(zoneName.toString());
} catch (DateTimeException ex) {
LOGGER.warn("Invalid zone: " + zoneName, ex);
zoneId = dflt;
Expand All @@ -172,32 +185,32 @@ public void resolveZoneId(ZoneId dflt) {
}
}

public ZoneId getZoneId() {
public TZID getZoneId() {
return zoneId;
}

public String getSourceObsMqttPath() {
if (sourceDs != null) {
return "v1.0/Datastreams(" + sourceDs.getId() + ")/Observations?$select=id,phenomenonTime";
return "v1.0/Datastreams(" + formatKeyValuesForUrl(sourceDs) + ")/Observations?$select=id,phenomenonTime";
}
if (sourceMds != null) {
return "v1.0/MultiDatastreams(" + sourceMds.getId() + ")/Observations?$select=id,phenomenonTime";
return "v1.0/MultiDatastreams(" + formatKeyValuesForUrl(sourceMds) + ")/Observations?$select=id,phenomenonTime";
}
return "";
}

public List<Interval> calculateIntervalsForTime(TimeObject phenTime) {
List<Interval> retval = new ArrayList<>();
Instant phenTimeStart = Utils.getPhenTimeStart(phenTime);
Instant phenTimeEnd = Utils.getPhenTimeEnd(phenTime);
ZonedDateTime atZone = phenTimeStart.atZone(getZoneId());
ZonedDateTime intStart = level.toIntervalStart(atZone);
ZonedDateTime intEnd = intStart.plus(level.amount, level.unit);
retval.add(Interval.of(intStart.toInstant(), intEnd.toInstant()));
while (intEnd.toInstant().isBefore(phenTimeEnd)) {
public List<TimeInterval> calculateIntervalsForTime(TimeValue phenTime) {
List<TimeInterval> retval = new ArrayList<>();
Moment phenTimeStart = Utils.getPhenTimeStart(phenTime);
Moment phenTimeEnd = Utils.getPhenTimeEnd(phenTime);
ZonalDateTime atZone = phenTimeStart.inZonalView(getZoneId());
Moment intStart = ZonalDateTime.from(level.toIntervalStart(atZone.toTemporalAccessor())).toMoment();
Moment intEnd = Moment.from(intStart.toTemporalAccessor().plus(level.amount, level.unit));
retval.add(TimeInterval.create(intStart, intEnd));
while (intEnd.isBefore(phenTimeEnd)) {
intStart = intEnd;
intEnd = intStart.plus(level.amount, level.unit);
retval.add(Interval.of(intStart.toInstant(), intEnd.toInstant()));
intEnd = Moment.from(intStart.toTemporalAccessor().plus(level.amount, level.unit));
retval.add(TimeInterval.create(intStart, intEnd));
}
return retval;
}
Expand All @@ -211,7 +224,7 @@ public List<Interval> calculateIntervalsForTime(TimeObject phenTime) {
* @return null if the given interval is the same as the current interval,
* otherwise the current interval.
*/
public Interval replaceIfNotCurrent(Interval other) {
public TimeInterval replaceIfNotCurrent(TimeInterval other) {
if (currentInterval == null) {
// There is no interval yet. This happens the first time at startup.
currentInterval = other;
Expand All @@ -222,7 +235,7 @@ public Interval replaceIfNotCurrent(Interval other) {
return null;
} else {
// The interval changed. Recalculate the old interval.
Interval old = currentInterval;
TimeInterval old = currentInterval;
currentInterval = other;
return old;
}
Expand All @@ -236,7 +249,7 @@ public Interval replaceIfNotCurrent(Interval other) {
* @param other The interval to check against the current interval.
* @return null if the given interval is the same as the current interval.
*/
public Interval unsetCurrent(Interval other) {
public TimeInterval unsetCurrent(TimeInterval other) {
if (currentInterval == null) {
// There is no interval.
return null;
Expand All @@ -247,7 +260,7 @@ public Interval unsetCurrent(Interval other) {
return null;
} else {
// The interval is different. Recalculate the old interval.
Interval old = currentInterval;
TimeInterval old = currentInterval;
currentInterval = null;
return old;
}
Expand All @@ -268,12 +281,12 @@ public boolean equals(Object obj) {
if (!baseName.equals(otherCombo.baseName)) {
return false;
}
return target.getId().equals(otherCombo.target.getId());
return Arrays.equals(targetMds.getPrimaryKeyValues(), otherCombo.targetMds.getPrimaryKeyValues());
}

@Override
public int hashCode() {
return Objects.hash(target, level, baseName);
return Objects.hash(targetMds, level, baseName);
}

@Override
Expand All @@ -287,12 +300,12 @@ public int compareTo(AggregateCombo o) {
@Override
public String toString() {
if (sourceDs != null) {
return baseName + " " + level + ". (d " + sourceDs.getId() + " -> md " + target.getId() + ")";
return baseName + " " + level + ". (d " + Arrays.toString(sourceDs.getPrimaryKeyValues()) + " -> md " + Arrays.toString(targetMds.getPrimaryKeyValues()) + ")";
}
if (sourceMds != null) {
return baseName + " " + level + ". (md " + sourceMds.getId() + " -> md " + target.getId() + ")";
return baseName + " " + level + ". (md " + Arrays.toString(sourceMds.getPrimaryKeyValues()) + " -> md " + Arrays.toString(targetMds.getPrimaryKeyValues()) + ")";
}
return baseName + " " + level + ". (? -> md " + target.getId() + ")";
return baseName + " " + level + ". (? -> md " + Arrays.toString(targetMds.getPrimaryKeyValues()) + ")";
}

public String getBaseName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
*/
package de.fraunhofer.iosb.ilt.sensorthingsmanager.aggregation;

import de.fraunhofer.iosb.ilt.sta.model.Datastream;
import de.fraunhofer.iosb.ilt.sta.model.MultiDatastream;
import de.fraunhofer.iosb.ilt.frostclient.model.Entity;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
Expand All @@ -37,8 +36,8 @@ public class AggregationBase {
private static final Logger LOGGER = LoggerFactory.getLogger(AggregationBase.class);

private final String baseName;
private Datastream baseDatastream;
private MultiDatastream baseMultiDatastream;
private Entity baseDatastream;
private Entity baseMultiDatastream;

private final Set<AggregateCombo> combos = new TreeSet<>();
private final Map<AggregationLevel, AggregateCombo> combosByLevel = new HashMap<>();
Expand All @@ -50,7 +49,7 @@ public AggregationBase(String baseName) {
this.baseName = baseName;
}

public AggregationBase(String baseName, Datastream baseDatastream, MultiDatastream baseMultiDatastream) {
public AggregationBase(String baseName, Entity baseDatastream, Entity baseMultiDatastream) {
this.baseName = baseName;
this.baseDatastream = baseDatastream;
this.baseMultiDatastream = baseMultiDatastream;
Expand Down Expand Up @@ -103,19 +102,19 @@ public Map<AggregationLevel, Boolean> getWantedLevels() {
return wantedLevels;
}

public Datastream getBaseDatastream() {
public Entity getBaseDatastream() {
return baseDatastream;
}

public void setBaseDatastream(Datastream baseDatastream) {
public void setBaseDatastream(Entity baseDatastream) {
this.baseDatastream = baseDatastream;
}

public MultiDatastream getBaseMultiDatastream() {
public Entity getBaseMultiDatastream() {
return baseMultiDatastream;
}

public void setBaseMultiDatastream(MultiDatastream baseMultiDatastream) {
public void setBaseMultiDatastream(Entity baseMultiDatastream) {
this.baseMultiDatastream = baseMultiDatastream;
}

Expand Down
Loading

0 comments on commit 29d3e3b

Please sign in to comment.