Skip to content

Commit

Permalink
Merge pull request #1268 from apache/1040-change-event-runtime-name-o…
Browse files Browse the repository at this point in the history
…f-geometry-field

[#1040] rework schema definitions in geo processors
  • Loading branch information
flomickl committed Feb 18, 2023
2 parents 0c51f2e + c1317a2 commit 8f0a350
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class LatLngToJtsPointProcessor extends StreamPipesDataProcessor {
private static final String LAT_KEY = "latitude-key";
private static final String LNG_KEY = "longitude-key";
private static final String EPSG_KEY = "epsg-key";
private static final String WKT_RUNTIME = "geomWKT";
private static final String GEOMETRY_RUNTIME = "geometry";
private String latitudeMapper;
private String longitudeMapper;
private String epsgMapper;
Expand Down Expand Up @@ -76,7 +76,7 @@ public DataProcessorDescription declareModel() {
.outputStrategy(
OutputStrategies.append(
PrimitivePropertyBuilder
.create(Datatypes.String, WKT_RUNTIME)
.create(Datatypes.String, GEOMETRY_RUNTIME)
.domainProperty("http://www.opengis.net/ont/geosparql#Geometry")
.build()
)
Expand All @@ -101,7 +101,8 @@ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeEx
Point geom = SpGeometryBuilder.createSPGeom(lng, lat, epsg);

if (!geom.isEmpty()) {
event.addField(WKT_RUNTIME, geom.toString());
event.addField(GEOMETRY_RUNTIME, geom.toString());

LOG.debug("Created Geometry: " + geom.toString());
collector.collect(event);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.helpers.SupportedFormats;
import org.apache.streampipes.sdk.helpers.SupportedProtocols;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
Expand All @@ -49,7 +47,7 @@ public class ReprojectionProcessor extends StreamPipesDataProcessor {
public static final String GEOM_KEY = "geom-key";
public static final String SOURCE_EPSG_KEY = "source-epsg-key";
public static final String TARGET_EPSG_KEY = "target-epsg-key";
public static final String GEOM_RUNTIME = "geomWKT";
public static final String GEOMETRY_RUNTIME = "geometry";
public static final String EPSG_RUNTIME = "epsg";
private String geometryMapper;
private String sourceEpsgMapper;
Expand All @@ -75,8 +73,6 @@ public DataProcessorDescription declareModel() {
.build())
.outputStrategy(OutputStrategies.keep())
.requiredIntegerParameter(Labels.withId(TARGET_EPSG_KEY), 32632)
.supportedFormats(SupportedFormats.jsonFormat())
.supportedProtocols(SupportedProtocols.kafka())
.build();
}

Expand Down Expand Up @@ -134,7 +130,7 @@ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeEx

if (!reprojected.isEmpty()) {
event.updateFieldBySelector("s0::" + EPSG_RUNTIME, targetEpsg);
event.updateFieldBySelector("s0::" + GEOM_RUNTIME, reprojected.toText());
event.updateFieldBySelector("s0::" + GEOMETRY_RUNTIME, reprojected.toText());

collector.collect(event);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ public class TrajectoryFromPointsProcessor extends StreamPipesDataProcessor {
private static final String SUBPOINTS_KEY = "subpoints-key";
private static final String DESCRIPTION_KEY = "description-key";
private static final String TRAJECTORY_KEY = "trajectory-key";
private static final String TRAJECTORY_RUNTIME = "trajectoryWKT";
private static final String DESCRIPTION_RUNTIME = "trajectoryDescription";
private static final String TRAJECTORY_GEOMETRY_RUNTIME = "trajectory-geometry";
private static final String TRAJECTORY_EPSG_RUNTIME = "trajectory-epsg";
private static final String DESCRIPTION_RUNTIME = "trajectory-description";
private String pointMapper;
private String epsgMapper;
private String mValueMapper;
Expand Down Expand Up @@ -96,8 +97,12 @@ public DataProcessorDescription declareModel() {
SO.TEXT),
EpProperties.stringEp(
Labels.withId(TRAJECTORY_KEY),
TRAJECTORY_RUNTIME,
"http://www.opengis.net/ont/geosparql#Geometry")
TRAJECTORY_GEOMETRY_RUNTIME,
"http://www.opengis.net/ont/geosparql#Geometry"),
EpProperties.integerEp(
Labels.withId(EPSG_KEY),
TRAJECTORY_EPSG_RUNTIME,
"http://data.ign.fr/def/ignf#CartesianCS")
)
)
.build();
Expand Down Expand Up @@ -131,7 +136,8 @@ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeEx
LineString geom = trajectory.returnAsLineString(eventGeom.getFactory());
// adds to stream
event.addField(DESCRIPTION_RUNTIME, trajectory.getDescription());
event.addField(TRAJECTORY_RUNTIME, geom.toString());
event.addField(TRAJECTORY_GEOMETRY_RUNTIME, geom.toString());
event.addField(TRAJECTORY_EPSG_RUNTIME, epsg);
collector.collect(event);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,30 @@
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.schema.PropertyScope;
import org.apache.streampipes.processors.geo.jvm.latlong.helper.HaversineDistanceUtil;
import org.apache.streampipes.sdk.builder.PrimitivePropertyBuilder;
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
import org.apache.streampipes.sdk.helpers.EpProperties;
import org.apache.streampipes.sdk.helpers.EpRequirements;
import org.apache.streampipes.sdk.helpers.Labels;
import org.apache.streampipes.sdk.helpers.Locales;
import org.apache.streampipes.sdk.helpers.OutputStrategies;
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.sdk.utils.Datatypes;
import org.apache.streampipes.vocabulary.Geo;
import org.apache.streampipes.vocabulary.SO;
import org.apache.streampipes.wrapper.context.EventProcessorRuntimeContext;
import org.apache.streampipes.wrapper.routing.SpOutputCollector;
import org.apache.streampipes.wrapper.standalone.ProcessorParams;
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;

import java.net.URI;

public class HaversineDistanceCalculatorProcessor extends StreamPipesDataProcessor {
private static final String LAT_1_KEY = "lat1";
private static final String LONG_1_KEY = "long1";
private static final String LAT_2_KEY = "lat2";
private static final String LONG_2_KEY = "long2";
private static final String CALCULATED_DISTANCE_KEY = "calculatedDistance";
private static final String DISTANCE_RUNTIME_NAME = "distance";
String lat1FieldMapper;
String long1FieldMapper;
String lat2FieldMapper;
Expand All @@ -69,12 +72,12 @@ public DataProcessorDescription declareModel() {
Labels.withId(LONG_2_KEY), PropertyScope.MEASUREMENT_PROPERTY)
.build()
)
.outputStrategy(OutputStrategies
.append(EpProperties.numberEp(
Labels.withId(CALCULATED_DISTANCE_KEY),
"distance",
SO.NUMBER))
)
.outputStrategy(OutputStrategies.append(PrimitivePropertyBuilder
.create(Datatypes.Float, DISTANCE_RUNTIME_NAME)
.domainProperty(SO.NUMBER)
.measurementUnit(URI.create("http://qudt.org/vocab/unit#Kilometer"))
.build())
)
.build();
}

Expand All @@ -99,7 +102,7 @@ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeEx

double resultDist = HaversineDistanceUtil.dist(lat1, long1, lat2, long2);

event.addField("distance", resultDist);
event.addField(DISTANCE_RUNTIME_NAME, resultDist);

collector.collect(event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class SpeedCalculatorProcessor extends StreamPipesDataProcessor {
private static final String LATITUDE_KEY = "latitude-key";
private static final String LONGITUDE_KEY = "longitude-key";
private static final String COUNT_WINDOW_KEY = "count-window-key";
private static final String SPEED_KEY = "speed-key";
private static final String SPEED_RUNTIME_NAME = "speed";
private String latitudeFieldMapper;
private String longitudeFieldMapper;
private String timestampFieldMapper;
Expand All @@ -75,7 +75,7 @@ public DataProcessorDescription declareModel() {
.requiredIntegerParameter(Labels.withId(COUNT_WINDOW_KEY))
.outputStrategy(
OutputStrategies.append(PrimitivePropertyBuilder
.create(Datatypes.Float, SPEED_KEY)
.create(Datatypes.Float, SPEED_RUNTIME_NAME)
.domainProperty(SO.NUMBER)
.measurementUnit(URI.create("http://qudt.org/vocab/unit#KilometerPerHour"))
.build())
Expand All @@ -98,7 +98,7 @@ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeEx
if (this.buffer.isFull()) {
Event firstEvent = (Event) buffer.get();
double speed = calculateSpeed(firstEvent, event);
event.addField(SPEED_KEY, speed);
event.addField(SPEED_RUNTIME_NAME, speed);
collector.collect(event);
}
this.buffer.add(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection.title=Geo CRS Reprojection
org.apache.streampipes.processors.geo.jvm.jts.processor.reprojection.description=Coordinate reprojection from source to target CRS

wkt-key=WKT.title=WKT
wkt-key=WKT.description=Geometry
geom-key.title=WKT
geom-key.description=Geometry

source-epsg-key.title=CRS of Input Geometry
source-epsg-key.description=EPSG-Code of input point

target_epsg-key.title=Target CRS
target_epsg-key.description=EPSG-Code of target CRS
target-epsg-key.title=Target CRS
target-epsg-key.description=EPSG-Code of target CRS

0 comments on commit 8f0a350

Please sign in to comment.