From 7274a33d954bb7913bbe08319bd4e65f8bf07ba2 Mon Sep 17 00:00:00 2001 From: Hylke van der Schaaf Date: Sun, 12 May 2024 23:14:01 +0200 Subject: [PATCH] Added more EEA Importer fixes --- examples/EEA_LastDayNew.json | 249 ++++----- .../csv/DsMapperFilter.java | 2 +- .../eea/DataStreamGeneratorEea2.java | 472 ++++++++++++++++++ .../importers/eea/EeaConstants.java | 3 + .../sensorthingsimporter/utils/UrlUtils.java | 15 +- 5 files changed, 613 insertions(+), 128 deletions(-) create mode 100644 src/main/java/de/fraunhofer/iosb/ilt/sensorthingsimporter/importers/eea/DataStreamGeneratorEea2.java diff --git a/examples/EEA_LastDayNew.json b/examples/EEA_LastDayNew.json index 811dc8c..53e33e3 100644 --- a/examples/EEA_LastDayNew.json +++ b/examples/EEA_LastDayNew.json @@ -1,125 +1,126 @@ { - "importer": { - "className": "de.fraunhofer.iosb.ilt.sensorthingsimporter.csv.ImporterCsv", - "classConfig": { - "recordConvertors": [ - { - "className": "de.fraunhofer.iosb.ilt.sensorthingsimporter.csv.RecordConverterNames", - "classConfig": { - "colResult": "VALUE_NUMERIC", - "resultMissing": "-1", - "colUnit": "UNIT", - "converter": { - "conversions": [ - { - "from": "mg/m3", - "to": "µg/m3", - "factor": 1000 - }, - { - "from": "mg/m3", - "to": "ug/m3", - "factor": 1000 - }, - { - "from": "ugNO2/m3", - "to": "ug/m3", - "factor": 1 - }, - { - "from": "ug.m-3", - "to": "ug/m3", - "factor": 1 - }, - { - "from": "µg/m3", - "to": "ug/m3", - "factor": 1 - } - ] - }, - "colPhenTime": [ - "DATETIME_BEGIN", - "DATETIME_END" - ], - "dsm": { - "className": "de.fraunhofer.iosb.ilt.sensorthingsimporter.csv.DsMapperFilter", - "classConfig": { - "filterTemplate": "name eq \u0027{SAMPLINGPOINT_LOCALID}\u0027", - "dsGenerator": { - "className": "de.fraunhofer.iosb.ilt.sensorthingsimporter.importers.eea.DataStreamGeneratorEea", - "classConfig": { - "stationsUrl": "http://discomap.eea.europa.eu/map/fme/metadata/PanEuropean_metadata.csv" - } - } - } - }, - "timeParser": { - "format": "yyyyMMddHHmmss", - "zone": "+01:00" - }, - "resultParser": { - "className": "de.fraunhofer.iosb.ilt.sensorthingsimporter.utils.parsers.ParserBigdecimal", - "classConfig": { - "dropTailingZeroes": true, - "decimalSeparator": "DETECT" - } - } - } - } - ], - "inputUrl": { - "className": "de.fraunhofer.iosb.ilt.sensorthingsimporter.csv.UrlGeneratorCombinations", - "classConfig": { - "baseUrl": "https://discomap.eea.europa.eu/Map/UTDViewer/dataService/Hourly?polu\u003d{polu}\u0026dt\u003d{time}", - "replaceSets": [ - { - "replaceKey": "{polu}", - "replacementGen": { - "className": "de.fraunhofer.iosb.ilt.sensorthingsimporter.csv.StringListGenStatic", - "strings": [ - "PM10", - "PM25", - "NO2", - "O3", - "SO2", - "CO" - ] - } - }, - { - "replaceKey": "{time}", - "replacementGen": { - "className": "de.fraunhofer.iosb.ilt.sensorthingsimporter.csv.StringListGeneratorTimeStrings", - "minus": 24, - "count": 26, - "delta": 1, - "zone": "+01:00", - "format": "yyyyMMddHHmmss" - } - } - ] - } - }, - "hasHeader": true - } - }, - "validator": { - "className": "de.fraunhofer.iosb.ilt.sensorthingsimporter.validator.ValidatorByPhenTime", - "classConfig": { - "update": true, - "cacheObservations": true, - "deleteDuplicates": true - } - }, - "uploader": { - "serviceUrl": "http://localhost:8080/FROST-Server/v1.1", - "authMethod": { - "className": "de.fraunhofer.iosb.ilt.sensorthingsimporter.auth.AuthNone", - "classConfig": {} - }, - "useDataArrays": true, - "maxBatch": 1000 - }, - "name": "eeaIosbH" -} \ No newline at end of file + "importer": { + "className": "de.fraunhofer.iosb.ilt.sensorthingsimporter.csv.ImporterCsv", + "classConfig": { + "recordConvertors": [ + { + "className": "de.fraunhofer.iosb.ilt.sensorthingsimporter.csv.RecordConverterNames", + "classConfig": { + "colResult": "VALUE_NUMERIC", + "resultMissing": "-1", + "colUnit": "UNIT", + "converter": { + "conversions": [ + { + "from": "mg/m3", + "to": "µg/m3", + "factor": 1000 + }, + { + "from": "mg/m3", + "to": "ug/m3", + "factor": 1000 + }, + { + "from": "ugNO2/m3", + "to": "ug/m3", + "factor": 1 + }, + { + "from": "ug.m-3", + "to": "ug/m3", + "factor": 1 + }, + { + "from": "µg/m3", + "to": "ug/m3", + "factor": 1 + } + ] + }, + "colPhenTime": [ + "DATETIME_BEGIN", + "DATETIME_END" + ], + "dsm": { + "className": "de.fraunhofer.iosb.ilt.sensorthingsimporter.csv.DsMapperFilter", + "classConfig": { + "filterTemplate": "Thing/properties/localId eq \u0027{STATIONCODE}\u0027 and ObservedProperty/name eq \u0027{PROPERTY}\u0027", + "dsGenerator": { + "className": "de.fraunhofer.iosb.ilt.sensorthingsimporter.importers.eea.DataStreamGeneratorEea2", + "classConfig": { + "stationsUrl": "https://discomap.eea.europa.eu/App/AQViewer/data?fqn\u003dAirquality_Dissem.b2g.measurements", + "template": "{\n\t\"Page\": 0,\n\t\"SortBy\": null,\n\t\"SortAscending\": true,\n\t\"RequestFilter\": {\n\t\t\"AssessmentMethodId\": {\n\t\t\t\"FieldName\": \"AssessmentMethodId\",\n\t\t\t\"Values\": [\"$SAMPLING_POINT_ID\"]\n\t\t}\n\t}\n}" + } + } + } + }, + "timeParser": { + "format": "yyyyMMddHHmmss", + "zone": "+01:00" + }, + "resultParser": { + "className": "de.fraunhofer.iosb.ilt.sensorthingsimporter.utils.parsers.ParserBigdecimal", + "classConfig": { + "dropTailingZeroes": true, + "decimalSeparator": "DETECT" + } + } + } + } + ], + "inputUrl": { + "className": "de.fraunhofer.iosb.ilt.sensorthingsimporter.csv.UrlGeneratorCombinations", + "classConfig": { + "baseUrl": "https://discomap.eea.europa.eu/Map/UTDViewer/dataService/Hourly?polu\u003d{polu}\u0026dt\u003d{time}", + "replaceSets": [ + { + "replaceKey": "{polu}", + "replacementGen": { + "className": "de.fraunhofer.iosb.ilt.sensorthingsimporter.csv.StringListGenStatic", + "strings": [ + "PM10", + "PM25", + "NO2", + "O3", + "SO2", + "CO" + ] + } + }, + { + "replaceKey": "{time}", + "replacementGen": { + "className": "de.fraunhofer.iosb.ilt.sensorthingsimporter.csv.StringListGeneratorTimeStrings", + "minus": 24, + "count": 26, + "delta": 1, + "zone": "+01:00", + "format": "yyyyMMddHHmmss" + } + } + ] + } + }, + "hasHeader": true + } + }, + "validator": { + "className": "de.fraunhofer.iosb.ilt.sensorthingsimporter.validator.ValidatorByPhenTime", + "classConfig": { + "update": true, + "cacheObservations": true, + "deleteDuplicates": true + } + }, + "uploader": { + "serviceUrl": "http://localhost:8080/FROST-Server/v1.1", + "authMethod": { + "className": "de.fraunhofer.iosb.ilt.sensorthingsimporter.auth.AuthNone", + "classConfig": {} + }, + "useDataArrays": true, + "maxBatch": 1000 + }, + "name": "eeaIosbH" +} diff --git a/src/main/java/de/fraunhofer/iosb/ilt/sensorthingsimporter/csv/DsMapperFilter.java b/src/main/java/de/fraunhofer/iosb/ilt/sensorthingsimporter/csv/DsMapperFilter.java index b2fc376..4430c3a 100644 --- a/src/main/java/de/fraunhofer/iosb/ilt/sensorthingsimporter/csv/DsMapperFilter.java +++ b/src/main/java/de/fraunhofer/iosb/ilt/sensorthingsimporter/csv/DsMapperFilter.java @@ -111,7 +111,7 @@ private Datastream getDatastreamFor(String filter, CSVRecord record, ErrorLog er EntityList streams = query.list(); if (streams.size() > 1) { LOGGER.error("Found incorrect number of datastreams: {} for filter: {}", streams.size(), filter); - throw new ImportException("Found incorrect number of datastreams: " + streams.size() + " for filter: " + filter); + return null; } else if (streams.isEmpty()) { if (dsGenerator != null) { ds = dsGenerator.createDatastreamFor(record, errorLog); diff --git a/src/main/java/de/fraunhofer/iosb/ilt/sensorthingsimporter/importers/eea/DataStreamGeneratorEea2.java b/src/main/java/de/fraunhofer/iosb/ilt/sensorthingsimporter/importers/eea/DataStreamGeneratorEea2.java new file mode 100644 index 0000000..b05ccad --- /dev/null +++ b/src/main/java/de/fraunhofer/iosb/ilt/sensorthingsimporter/importers/eea/DataStreamGeneratorEea2.java @@ -0,0 +1,472 @@ +/* + * Copyright (C) 2020 Fraunhofer IOSB + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package de.fraunhofer.iosb.ilt.sensorthingsimporter.importers.eea; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.gson.JsonElement; +import de.fraunhofer.iosb.ilt.configurable.AnnotatedConfigurable; +import de.fraunhofer.iosb.ilt.configurable.ConfigEditor; +import de.fraunhofer.iosb.ilt.configurable.ConfigurationException; +import de.fraunhofer.iosb.ilt.configurable.annotations.ConfigurableField; +import de.fraunhofer.iosb.ilt.configurable.editor.EditorString; +import de.fraunhofer.iosb.ilt.sensorthingsimporter.ImportException; +import de.fraunhofer.iosb.ilt.sensorthingsimporter.csv.DatastreamGenerator; +import static de.fraunhofer.iosb.ilt.sensorthingsimporter.importers.eea.EeaConstants.TAG_AREA_TYPE; +import static de.fraunhofer.iosb.ilt.sensorthingsimporter.importers.eea.EeaConstants.TAG_BEGIN_TIME; +import static de.fraunhofer.iosb.ilt.sensorthingsimporter.importers.eea.EeaConstants.TAG_COUNTRY_CODE; +import static de.fraunhofer.iosb.ilt.sensorthingsimporter.importers.eea.EeaConstants.TAG_END_TIME; +import static de.fraunhofer.iosb.ilt.sensorthingsimporter.importers.eea.EeaConstants.TAG_LOCAL_ID; +import static de.fraunhofer.iosb.ilt.sensorthingsimporter.importers.eea.EeaConstants.TAG_MEDIA_MONITORED; +import static de.fraunhofer.iosb.ilt.sensorthingsimporter.importers.eea.EeaConstants.TAG_METADATA; +import static de.fraunhofer.iosb.ilt.sensorthingsimporter.importers.eea.EeaConstants.TAG_NAMESPACE; +import static de.fraunhofer.iosb.ilt.sensorthingsimporter.importers.eea.EeaConstants.TAG_NETWORK; +import static de.fraunhofer.iosb.ilt.sensorthingsimporter.importers.eea.EeaConstants.TAG_NETWORK_NAME; +import static de.fraunhofer.iosb.ilt.sensorthingsimporter.importers.eea.EeaConstants.TAG_OWNER; +import static de.fraunhofer.iosb.ilt.sensorthingsimporter.importers.eea.EeaConstants.VALUE_MEDIUM_AIR; +import static de.fraunhofer.iosb.ilt.sensorthingsimporter.importers.eea.EeaConstants.VALUE_OWNER_EEA; +import de.fraunhofer.iosb.ilt.sensorthingsimporter.utils.EntityCache; +import de.fraunhofer.iosb.ilt.sensorthingsimporter.utils.ErrorLog; +import de.fraunhofer.iosb.ilt.sensorthingsimporter.utils.FrostUtils; +import de.fraunhofer.iosb.ilt.sensorthingsimporter.utils.Translator; +import de.fraunhofer.iosb.ilt.sensorthingsimporter.utils.UrlUtils; +import de.fraunhofer.iosb.ilt.sta.ServiceFailureException; +import de.fraunhofer.iosb.ilt.sta.Utils; +import de.fraunhofer.iosb.ilt.sta.jackson.ObjectMapperFactory; +import de.fraunhofer.iosb.ilt.sta.model.Datastream; +import de.fraunhofer.iosb.ilt.sta.model.Location; +import de.fraunhofer.iosb.ilt.sta.model.Observation; +import de.fraunhofer.iosb.ilt.sta.model.ObservedProperty; +import de.fraunhofer.iosb.ilt.sta.model.Sensor; +import de.fraunhofer.iosb.ilt.sta.model.Thing; +import de.fraunhofer.iosb.ilt.sta.model.ext.UnitOfMeasurement; +import de.fraunhofer.iosb.ilt.sta.query.Query; +import de.fraunhofer.iosb.ilt.sta.service.SensorThingsService; +import java.io.IOException; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.commons.csv.CSVRecord; +import org.apache.commons.lang3.StringUtils; +import org.apache.http.ParseException; +import org.geojson.Point; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * @author hylke + */ +public class DataStreamGeneratorEea2 implements DatastreamGenerator, AnnotatedConfigurable { + + private static final Logger LOGGER = LoggerFactory.getLogger(DataStreamGeneratorEea2.class.getName()); + + private static final String OLD_DS_SEARCH_TEMPLATE = "endswith(Thing/properties/localId,\u0027{STATIONCODE}\u0027) and ObservedProperty/name eq \u0027{PROPERTY}\u0027"; + + private static final String TEMPLATE = """ + { + \t"Page": 0, + \t"SortBy": null, + \t"SortAscending": true, + \t"RequestFilter": { + \t\t"AssessmentMethodId": { + \t\t\t"FieldName": "AssessmentMethodId", + \t\t\t"Values": ["$SAMPLING_POINT_ID"] + \t\t} + \t} + }"""; + + @ConfigurableField(editor = EditorString.class, + label = "Stations Url", description = "The url to download the stations CSV file from.") + @EditorString.EdOptsString(dflt = "https://discomap.eea.europa.eu/App/AQViewer/data?fqn=Airquality_Dissem.b2g.measurements") + private String stationsUrl; + + @ConfigurableField(editor = EditorString.class, + label = "Query Template", description = "The POST template to use.") + @EditorString.EdOptsString(lines = 10, + dflt = TEMPLATE) + private String template; + + private FrostUtils frostUtils; + + /** + * StationLocalid/SamplingPointLocalId + */ + private static final Map SAMPLING_POINTS = new HashMap<>(); + private final EntityCache observedPropertyCache = EeaObservedProperty.createObservedPropertyCache(); + + @Override + public void configure(JsonElement config, SensorThingsService context, Object edtCtx, ConfigEditor configEditor) throws ConfigurationException { + frostUtils = new FrostUtils(context); + AnnotatedConfigurable.super.configure(config, context, edtCtx, configEditor); + } + + private List getDatastreamFor(String filterTemplate, CSVRecord record) throws ServiceFailureException, ImportException { + String filter = Translator.fillTemplate(filterTemplate, record, Translator.StringType.URL, true); + SensorThingsService service = frostUtils.getService(); + Query query = service.datastreams().query().orderBy("id asc").filter(filter); + return query.list().toList(); + } + + @Override + public Datastream createDatastreamFor(CSVRecord record, ErrorLog errorLog) throws ImportException { + EeaStationRecord stationRecord = findStation(record); + if (stationRecord == null) { + return null; + } + + Datastream ds = findAndFixOldData(record, stationRecord); + if (ds != null) { + return ds; + } + + return importEntities(stationRecord, record); + } + + private Datastream findAndFixOldData(CSVRecord record, EeaStationRecord stationRecord) throws ImportException { + try { + String stationLocalId = getFromRecord(record, "STATIONCODE"); + String obsProp = getFromRecord(record, "PROPERTY"); + String pointLocalId = getFromRecord(record, "SAMPLINGPOINT_LOCALID"); + List oldDsList = getDatastreamFor(OLD_DS_SEARCH_TEMPLATE, record); + if (oldDsList.isEmpty()) { + return null; + } + if (oldDsList.size() == 1) { + Datastream ds = oldDsList.get(0); + Thing thing = ds.getThing(); + LOGGER.info("Updating localId of Things({}) from {} to {} and name to {}", thing.getId().getUrl(), thing.getProperties().get("localId"), stationLocalId, stationRecord.airQualityStationName); + thing.getProperties().put(TAG_LOCAL_ID, stationLocalId); + thing.setName(stationRecord.airQualityStationName); + frostUtils.update(thing); + ds.setName(obsProp + " at " + stationRecord.airQualityStationName); + updateDsLocalId(ds, pointLocalId); + frostUtils.update(ds); + return ds; + } + if (oldDsList.size() > 1) { + Datastream mainDs = oldDsList.get(0); + LOGGER.info("Merging {} datastreams for {} into Datastreams({})", oldDsList.size(), pointLocalId, mainDs.getId().getUrl()); + Thing thing = mainDs.getThing(); + LOGGER.info("Updating localId of Things({}) from {} to {} and name to {}", thing.getId().getUrl(), thing.getProperties().get("localId"), stationLocalId, stationRecord.airQualityStationName); + thing.getProperties().put(TAG_LOCAL_ID, stationLocalId); + thing.setName(stationRecord.airQualityStationName); + frostUtils.update(thing); + mainDs.setName(obsProp + " at " + stationRecord.airQualityStationName); + updateDsLocalId(mainDs, pointLocalId); + frostUtils.update(mainDs); + Datastream mainDsOnlyId = mainDs.withOnlyId(); + for (int idx = 1; idx < oldDsList.size(); idx++) { + int obsCount = 0; + Datastream badDs = oldDsList.get(idx); + Iterator it = badDs.observations().query().orderBy("phenomenonTime asc").list().fullIterator(); + while (it.hasNext()) { + Observation badObs = it.next(); + badObs.setDatastream(mainDsOnlyId); + frostUtils.update(badObs); + obsCount++; + } + LOGGER.info("Moved {} Observations from Datastreams({})", obsCount, badDs.getId().getUrl()); + frostUtils.delete(Arrays.asList(badDs), 1); + } + return mainDs; + } + } catch (ServiceFailureException ex) { + LOGGER.error("Failed to update old data: {}", ex.getMessage()); + } + return null; + } + + private void updateDsLocalId(Datastream ds, String localId) { + Map properties = ds.getProperties(); + Object localIds = properties.get(TAG_LOCAL_ID); + if (localIds instanceof String lids) { + List localIdList = new ArrayList<>(); + localIdList.add(lids); + if (!localId.equals(lids)) { + localIdList.add(localId); + } + properties.put(TAG_LOCAL_ID, localIdList); + return; + } + if (localIds instanceof List localIdList) { + if (!localIdList.contains(localId)) { + localIdList.add(localId); + } + return; + } + LOGGER.error("Unknown type of LocalId property of Datastreams({})! {}", ds.getId().getUrl(), localIds); + } + + private Datastream importEntities(EeaStationRecord sr, CSVRecord record) throws ImportException { + String pointLocalId = getFromRecord(record, "SAMPLINGPOINT_LOCALID"); + Point point = new Point( + sr.longitude.setScale(6, RoundingMode.HALF_EVEN).doubleValue(), + sr.latitude.setScale(6, RoundingMode.HALF_EVEN).doubleValue()); + + Map stationProps = new HashMap<>(); + stationProps.put(TAG_LOCAL_ID, sr.airQualityStationEoICode); + stationProps.put(TAG_COUNTRY_CODE, sr.countrycode); + stationProps.put(TAG_NETWORK, sr.airQualityNetwork); + stationProps.put(TAG_NETWORK_NAME, sr.airQualityNetworkName); + stationProps.put(TAG_OWNER, VALUE_OWNER_EEA); + stationProps.put(TAG_NAMESPACE, sr.namespace); + stationProps.put(TAG_MEDIA_MONITORED, VALUE_MEDIUM_AIR); + stationProps.put(TAG_AREA_TYPE, sr.airQualityStationArea); + stationProps.put(TAG_BEGIN_TIME, sr.observationDateBegin); + if (!sr.observationDateEnd.isEmpty()) { + stationProps.put(TAG_END_TIME, sr.observationDateEnd); + } + stationProps.put(TAG_METADATA, stationsUrl); + + Map locationProps = new HashMap<>(); + locationProps.put(TAG_LOCAL_ID, sr.airQualityStationEoICode); + locationProps.put(TAG_COUNTRY_CODE, sr.countrycode); + locationProps.put(TAG_OWNER, VALUE_OWNER_EEA); + locationProps.put(TAG_NAMESPACE, sr.namespace); + locationProps.put(TAG_METADATA, stationsUrl); + locationProps.put(TAG_AREA_TYPE, sr.airQualityStationArea); + + Map sensorProps = new HashMap<>(); + sensorProps.put(TAG_LOCAL_ID, sr.assessmentMethodId); + sensorProps.put(EeaConstants.TAG_SAMPLING_METHOD, sr.samplingMethod); + sensorProps.put(TAG_COUNTRY_CODE, sr.countrycode); + sensorProps.put(TAG_OWNER, VALUE_OWNER_EEA); + sensorProps.put(TAG_NAMESPACE, sr.namespace); + sensorProps.put(TAG_METADATA, stationsUrl); + + Map dsProps = new HashMap<>(); + dsProps.put(TAG_LOCAL_ID, new String[]{pointLocalId}); + dsProps.put(TAG_COUNTRY_CODE, sr.countrycode); + dsProps.put(TAG_OWNER, VALUE_OWNER_EEA); + dsProps.put(TAG_NAMESPACE, sr.namespace); + dsProps.put(TAG_METADATA, stationsUrl); + + try { + String filter = "properties/" + TAG_LOCAL_ID + " eq " + Utils.quoteForUrl(sr.airQualityStationEoICode); + Location location = frostUtils.findOrCreateLocation( + filter, + sr.airQualityStationName, + "Location of station " + sr.airQualityStationEoICode, + locationProps, + point, + null); + Thing thing = frostUtils.findOrCreateThing( + filter, + sr.airQualityStationName, + "Measurement station " + sr.airQualityStationName, + stationProps, + location, + null); + + filter = "properties/" + TAG_LOCAL_ID + " eq " + Utils.quoteForUrl(sr.assessmentMethodId); + Sensor sensor = frostUtils.findOrCreateSensor( + filter, + sr.processId, + "Sensor " + sr.processId, + "text/html", + sr.measurementEquipment, + sensorProps, + null); + ObservedProperty observedProperty = observedPropertyCache.get(FrostUtils.afterLastSlash(sr.airPollutant).trim()); + + String valueUnit = getFromRecord(record, "UNIT", "value_unit", "UnitOfMeasurement"); + if (valueUnit == null) { + throw new ImportException("Could not find unit in record."); + } + UnitOfMeasurement uom = new UnitOfMeasurement(valueUnit, valueUnit, valueUnit); + filter = "properties/" + TAG_LOCAL_ID + " eq " + Utils.quoteForUrl(pointLocalId) + " or " + Utils.quoteForUrl(pointLocalId) + " in properties/" + TAG_LOCAL_ID; + Datastream ds = frostUtils.findOrCreateDatastream( + filter, + observedProperty.getName() + " at " + thing.getName(), + observedProperty.getName() + " at " + thing.getName(), + dsProps, + uom, thing, observedProperty, sensor, null); + return ds; + } catch (ServiceFailureException ex) { + LOGGER.debug("Exception: {}", ex.getMessage()); + throw new ImportException(ex); + } + } + + private String getFromRecord(CSVRecord record, String... names) { + for (String name : names) { + try { + return record.get(name); + } catch (IllegalArgumentException ex) { + // It's fine + } + } + return null; + } + + private EeaStationRecord findStation(CSVRecord record) throws ImportException { + loadObservedProperties(); + String pointLocalId = getFromRecord(record, "SAMPLINGPOINT_LOCALID", "samplingpoint_localid", "SamplingPoint"); + if (Utils.isNullOrEmpty(pointLocalId)) { + return null; + } + EeaStationRecord station = SAMPLING_POINTS.get(pointLocalId); + if (station != null) { + return station; + } + String postData = StringUtils.replace(template, "$SAMPLING_POINT_ID", pointLocalId); + + JsonNode tree; + try { + String data = UrlUtils.postToUrl(stationsUrl, postData, null, null).data; + tree = ObjectMapperFactory.get().readTree(data); + JsonNode rows = tree.get("Rows"); + if (rows == null || !rows.isArray() || rows.size() == 0) { + LOGGER.warn("No data received for sampling point {}", pointLocalId); + return null; + } + if (rows.size() > 1) { + LOGGER.warn("Multiple data received for sampling point {}", pointLocalId); + } + JsonNode samplingPoint = rows.get(0); + station = new EeaStationRecord(samplingPoint); + SAMPLING_POINTS.put(pointLocalId, station); + return station; + } catch (IOException | ParseException ex) { + LOGGER.debug("IOException parsing JSON response: {}", ex.getMessage()); + throw new ImportException("Failed to parse station JSON data", ex); + } + } + + private void loadObservedProperties() throws ImportException { + if (!observedPropertyCache.isEmpty()) { + return; + } + try { + observedPropertyCache.load( + frostUtils.getService().observedProperties(), + "", + "id,name,description,definition,properties", + ""); + EeaObservedProperty.importObservedProperties(frostUtils, observedPropertyCache); + } catch (ServiceFailureException ex) { + LOGGER.debug("Exception: {}", ex.getMessage()); + throw new ImportException("Failed to load observed properties", ex); + } + + } + + private static class EeaStationRecord { + + String countrycode; + String country; + String timezone; + String namespace; + String airQualityNetwork; + String airQualityNetworkName; + String airQualityStationEoICode; + String airQualityStationNatCode; + String processId; + String assessmentMethodId; + String sample; + String samplingMethod; + String airPollutant; + String observationDateBegin; + String observationDateEnd; + + BigDecimal longitude; + BigDecimal latitude; + BigDecimal altitude; + + String measurementType; + String airQualityStationType; + String airQualityStationArea; + String airQualityStationName; + String equivalenceDemonstrated; + String measurementEquipment; + BigDecimal inletHeight; + String inletHeightUnit; + BigDecimal buildingDistance; + String buildingDistanceUnit; + BigDecimal kerbDistance; + String kerbDistanceUnit; + + public EeaStationRecord(JsonNode samplingPoint) { + countrycode = deQuote(getString(samplingPoint, "AirQualityStationEoICode")).substring(0, 2); + country = deQuote(getString(samplingPoint, "Country")); + timezone = deQuote(getString(samplingPoint, "Timezone")); + namespace = deQuote(getString(samplingPoint, "B2G_Namespace")); + airQualityNetwork = deQuote(getString(samplingPoint, "AirQualityNetwork")); + airQualityNetworkName = deQuote(getString(samplingPoint, "AirQualityNetworkName")); + airQualityStationEoICode = deQuote(getString(samplingPoint, "AirQualityStationEoICode")); + airQualityStationNatCode = deQuote(getString(samplingPoint, "AirQualityStationNatCode")); + + processId = deQuote(getString(samplingPoint, "ProcessId")); + assessmentMethodId = deQuote(getString(samplingPoint, "AssessmentMethodId")); + airPollutant = deQuote(getString(samplingPoint, "AirPollutant")); + + observationDateBegin = deQuote(getString(samplingPoint, "OperationalActivityBegin")); + observationDateEnd = deQuote(getString(samplingPoint, "OperationalActivityEnd")); + + longitude = getNumber(samplingPoint, "Longitude"); + latitude = getNumber(samplingPoint, "Latitude"); + altitude = getNumber(samplingPoint, "Altitude"); + + measurementType = getString(samplingPoint, "MeasurementType"); + airQualityStationType = getString(samplingPoint, "AirQualityStationType"); + airQualityStationArea = getString(samplingPoint, "AirQualityStationArea"); + airQualityStationName = deQuote(getString(samplingPoint, "AQStationName")); + + equivalenceDemonstrated = getString(samplingPoint, "EquivalenceDemonstrated"); + measurementEquipment = getString(samplingPoint, "MeasurementEquipment"); + + inletHeight = getNumber(samplingPoint, "InletHeight"); + inletHeightUnit = getString(samplingPoint, "InletHeightUnit"); + buildingDistance = getNumber(samplingPoint, "BuildingDistance"); + buildingDistanceUnit = getString(samplingPoint, "BuilldingDistanceUnit"); + kerbDistance = getNumber(samplingPoint, "KerbDistance"); + kerbDistanceUnit = getString(samplingPoint, "KerbDistanceUnit"); + + samplingMethod = getString(samplingPoint, "SamplingMethod"); + + } + + private BigDecimal getNumber(JsonNode node, String property) { + JsonNode value = node.get(property); + return value.decimalValue(); + } + + private String getString(JsonNode node, String property) { + JsonNode value = node.get(property); + if (value == null) { + return null; + } + if (value.isTextual()) { + return value.textValue(); + } + LOGGER.error("No text value for {}: {}", property, value); + return ""; + } + + private String deQuote(String input) { + if (Utils.isNullOrEmpty(input)) { + return input; + } + return StringUtils.remove(input, '"'); + } + } +} diff --git a/src/main/java/de/fraunhofer/iosb/ilt/sensorthingsimporter/importers/eea/EeaConstants.java b/src/main/java/de/fraunhofer/iosb/ilt/sensorthingsimporter/importers/eea/EeaConstants.java index dfddabf..d7cf4d1 100644 --- a/src/main/java/de/fraunhofer/iosb/ilt/sensorthingsimporter/importers/eea/EeaConstants.java +++ b/src/main/java/de/fraunhofer/iosb/ilt/sensorthingsimporter/importers/eea/EeaConstants.java @@ -29,11 +29,14 @@ public class EeaConstants { public static final String TAG_LOCAL_ID = "localId"; public static final String TAG_MEASUREMENT_REGIME = "measurementRegime"; public static final String TAG_MEDIA_MONITORED = "mediaMonitored"; + public static final String TAG_NETWORK = "AirQualityNetwork"; + public static final String TAG_NETWORK_NAME = "AirQualityNetworkName"; public static final String TAG_METADATA = "metadata"; public static final String TAG_MOBILE = "mobile"; public static final String TAG_NAMESPACE = "namespace"; public static final String TAG_OWNER = "owner"; public static final String TAG_RECOMMENDED_UNIT = "recommendedUnit"; + public static final String TAG_SAMPLING_METHOD = "samplingMethod"; public static final String VALUE_OWNER_EEA = "http://dd.eionet.europa.eu"; public static final String VALUE_MEDIUM_AIR = "http://inspire.ec.europa.eu/codelist/MediaValue/air"; diff --git a/src/main/java/de/fraunhofer/iosb/ilt/sensorthingsimporter/utils/UrlUtils.java b/src/main/java/de/fraunhofer/iosb/ilt/sensorthingsimporter/utils/UrlUtils.java index fe8b499..f975b5c 100644 --- a/src/main/java/de/fraunhofer/iosb/ilt/sensorthingsimporter/utils/UrlUtils.java +++ b/src/main/java/de/fraunhofer/iosb/ilt/sensorthingsimporter/utils/UrlUtils.java @@ -176,10 +176,19 @@ public static HttpResponse postToUrl(String targetUrl, List
headers, Str final String authHeader = "Basic " + new String(encodedAuth); post.setHeader(HttpHeaders.AUTHORIZATION, authHeader); } - if (headers.isEmpty()) { + boolean hasAccept = false; + boolean hasContentType = false; + for (var h : headers) { + if ("accept".equalsIgnoreCase(h.getName())) { + hasAccept = true; + } + post.addHeader(h); + } + if (!hasAccept) { + post.addHeader("Accept", "*/*"); + } + if (!hasContentType) { post.addHeader("Content-Type", "application/json"); - } else { - headers.stream().forEach(h -> post.addHeader(h)); } post.setEntity(new StringEntity(queryBody)); LOGGER.debug("Posting to {}", targetUrl);