In [15]:
import requests, json
from datetime import datetime
import re
import pandas as pd
import hashlib
import sys

In [16]:
# function that takes two lists of same length and makes a dict of them, identifying element i with element i
def stitch_to_dict(a,b):
    r={}
    for i in range(len(a)):
        r[a[i]]=b[i]
    return r

# everything lowercase, replaces slashes and spaces with underscores, ...
def idstr(idinput):
    """returns strings in the conventional format for iot.ids"""
    return (str(idinput).lower().replace(" ", "_").replace("/", "_").replace(
        "ä", "ae").replace("ö", "oe").replace("ü", "ue").replace("ß", "ss"))

# returns the first 7 digits of the sha1 hash of the input string
def hashfunc(inputstring):
    return hashlib.sha1(bytes(str(inputstring), 'utf-8')).hexdigest()[0:7]

# returns the full 40 digits of the sha1 hash of the input string
def hashfuncfull(inputstring):
    return hashlib.sha1(bytes(str(inputstring), 'utf-8')).hexdigest()

# UBA has phenomenonTimes end with hour 24 instead of 0 the next day. datetime cant deal with that, have to replace
def todatetimeUTCstring(string):
    if(string[-8:-6] == '24'):
        res=pd.to_datetime(string.replace(" 24:"," 23:"))+pd.Timedelta('1 hour')
    else: 
        res=pd.to_datetime(string)
    return (res - pd.Timedelta('1 hour')).strftime("%Y-%m-%d" + "T" + "%H" + ":00:00.000Z")



url = "http://193.196.38.108:8080/FROST-Server/v1.0"
#url = "http://api.smartaq.net/v1.0"

In [17]:
file = pd.read_excel('metadata/Bericht_EU_Meta_Stationen.xlsx')
filemeta=pd.read_excel('metadata/Bericht_EU_Meta_Stationsparameter.xlsx')

# Metadata Info

In [18]:
# https://www.umweltbundesamt.de/daten/luft/luftdaten/doc

component_data=json.loads(requests.get("https://www.umweltbundesamt.de/api/air_data/v2/components/json").text)
component_data

{'count': 5,
 'indices': ['component id',
  'component code',
  'component symbol',
  'component unit',
  'component name'],
 '1': ['1', 'PM10', 'PM₁₀', 'µg/m³', 'Particulate matter'],
 '2': ['2', 'CO', 'CO', 'mg/m³', 'Carbon monoxide'],
 '3': ['3', 'O3', 'O₃', 'µg/m³', 'Ozone'],
 '4': ['4', 'SO2', 'SO₂', 'µg/m³', 'Sulphur dioxide'],
 '5': ['5', 'NO2', 'NO₂', 'µg/m³', 'Nitrogen dioxide']}

In [19]:
scope_data=json.loads(requests.get("https://www.umweltbundesamt.de/api/air_data/v2/scopes/json").text)
scope_data

{'count': 6,
 'indices': ['scope id',
  'scope code',
  'scope time base',
  'scope time scope',
  'scope time is max',
  'scope name'],
 '1': ['1', '1TMW', 'day', '86400', '0', 'Daily average'],
 '2': ['2', '1SMW', 'hour', '3600', '0', 'One hour average'],
 '3': ['3', '1SMW_MAX', 'hour', '3600', '1', 'Maximum one hour average'],
 '4': ['4', '8SMW', 'hour', '28800', '0', 'Eight hour average'],
 '5': ['5', '8SMW_MAX', 'hour', '28800', '1', 'Maximum eight hour average'],
 '6': ['6', '1TMWGL', 'hour', '3600', '0', 'floating hourly daily average']}

In [20]:
station_data=json.loads(requests.get("https://www.umweltbundesamt.de/api/air_data/v2/stations/json").text)

# Data

In [21]:
stations_to_parse = []
for thisstatnr in station_data["data"].keys():
    thisstatdict = stitch_to_dict(station_data["indices"],station_data["data"][thisstatnr])
    if(thisstatdict["station city"] == "Augsburg"):
        stations_to_parse.append(thisstatdict)

print(str(len(stations_to_parse)) + " Stations to parse.")
current_station=stations_to_parse[0] #nr 2 dbs augsburg hat keine informationen zum sensor im metadatensheet. bisher nicht geparsed. 
current_station

5 Stations to parse.


{'station id': '441',
 'station code': 'DEBY007',
 'station name': 'Augsburg/Bourges-Platz',
 'station city': 'Augsburg',
 'station synonym': '',
 'station active from': '1986-08-01',
 'station active to': None,
 'station longitude': '10.8884',
 'station latitude': '48.3766',
 'network id': '2',
 'station setting id': '1',
 'station type id': '1',
 'network code': 'BY',
 'network name': 'Bavaria',
 'station setting name': 'urban area',
 'station setting short name': 'urban',
 'station type name': 'background'}

## Observed Property

In [22]:
current_component = "1"
current_obsprop_meta_dict = stitch_to_dict(component_data["indices"],component_data[current_component])
current_obsprop_meta_dict

{'component id': '1',
 'component code': 'PM10',
 'component symbol': 'PM₁₀',
 'component unit': 'µg/m³',
 'component name': 'Particulate matter'}

In [23]:
current_obsprop = json.loads(requests.get(url + "/ObservedProperties('saqn:op:mcpm10')").text)

current_obsprop

{'name': 'PM10 Mass Concetration',
 'description': 'Mass concentration of Particulate Matter with a diameter of equal or less than 10 micrometers in air.',
 'properties': {'shortname': 'mcpm10'},
 'Datastreams@iot.navigationLink': "http://193.196.38.108:8080/FROST-Server/v1.0/ObservedProperties('saqn%3Aop%3Amcpm10')/Datastreams",
 'MultiDatastreams@iot.navigationLink': "http://193.196.38.108:8080/FROST-Server/v1.0/ObservedProperties('saqn%3Aop%3Amcpm10')/MultiDatastreams",
 'definition': 'http://cfconventions.org/Data/cf-standard-names/63/build/cf-standard-name-table.html#mass_concentration_of_pm10_ambient_aerosol_particles_in_air',
 '@iot.id': 'saqn:op:mcpm10',
 '@iot.selfLink': "http://193.196.38.108:8080/FROST-Server/v1.0/ObservedProperties('saqn%3Aop%3Amcpm10')"}

# Parser start

## Thing

In [24]:
current_station_thing = {
    "name": "Station " + current_station["station name"], 
    "description": "",
    "properties": {
        "operator.domain": idstr("umweltbundesamt.de"),
        "hardware.id": idstr(current_station["station code"]),
        "station_active_from": current_station["station active from"],
        "station_active_to": current_station["station active to"],
        'station_setting_name': current_station['station setting name'],
        'station type name': current_station['station type name'],
        'station_no': current_station['station id']
    }
}
current_station_thing["properties"]["shortname"] = idstr(current_station_thing["name"])

current_thing_idparts = current_station_thing["properties"]["operator.domain"] + ":" + current_station_thing["properties"]["shortname"] + ":" + current_station_thing["properties"]["hardware.id"]
current_station_thing ["@iot.id"] = "saqn:t:" + current_thing_idparts

print(current_station_thing)

requests.post(url + "/Things",json=current_station_thing)

{'name': 'Station Augsburg/Bourges-Platz', 'description': '', 'properties': {'operator.domain': 'umweltbundesamt.de', 'hardware.id': 'deby007', 'station_active_from': '1986-08-01', 'station_active_to': None, 'station_setting_name': 'urban area', 'station type name': 'background', 'station_no': '441', 'shortname': 'station_augsburg_bourges-platz'}, '@iot.id': 'saqn:t:umweltbundesamt.de:station_augsburg_bourges-platz:deby007'}


<Response [500]>

## (Historical) Location

In [25]:
current_station_historical_location = {
    "time": current_station["station active from"] + "T00:00:00.000Z",
    "Locations": [{
        "name": current_station["station name"],
        "description": current_station["network name"] + ", " + current_station["station setting name"] + ", " + current_station["station type name"],
        "encodingType": "application/vnd.geo+json",
        "location": {
            "type": "Point",
            "coordinates": [float(current_station["station longitude"]),float(current_station["station latitude"])]
        },
        "@iot.id": "geo:" + str(current_station["station latitude"]) + ":" + str(current_station["station longitude"])
    }]
}

current_station_historical_location["@iot.id"] = "saqn:hl:" + current_thing_idparts + ":" + current_station_historical_location["time"]

print(current_station_historical_location)

p = requests.post(url + "/Things('" + current_station_thing["@iot.id"] + "')/HistoricalLocations",json=current_station_historical_location)

if (p.status_code  == 201):
    print("Creation successful")
else:
    print("Error:", p.status_code)
    for chunk in p.iter_content(chunk_size=128):
        print(chunk)

{'time': '1986-08-01T00:00:00.000Z', 'Locations': [{'name': 'Augsburg/Bourges-Platz', 'description': 'Bavaria, urban area, background', 'encodingType': 'application/vnd.geo+json', 'location': {'type': 'Point', 'coordinates': [10.8884, 48.3766]}, '@iot.id': 'geo:48.3766:10.8884'}], '@iot.id': 'saqn:hl:umweltbundesamt.de:station_augsburg_bourges-platz:deby007:1986-08-01T00:00:00.000Z'}
Error: 500
b'{\n  "code" : 500,\n  "type" : "error",\n  "message" : "Failed to store data."\n}'


## Sensor

### Sensor not in API, need to parse file

In [26]:
# in file, component code 5.0 is PM10

mtp = filemeta[filemeta["station_code"]==current_station["station code"]][filemeta["component_code"]==5.0]["measurement_technique_principle"].values[0]

current_sensor = {
    "name": "Generic " + mtp + " Sensor",
    "description": "Sensor Instance for unknown Sensors that use " + mtp + " as measurement technique principle",
    "encodingType": "application/json",
    "properties": {
        "datasheet.url": "",
        "manufacturer.domain": ""
    }

}
current_sensor["properties"]["shortname"] = idstr(current_sensor["name"])

current_sensor_idparts = current_sensor["properties"]["manufacturer.domain"] + ":" + current_sensor["properties"]["shortname"]
current_sensor["@iot.id"] = "saqn:s:" + current_sensor_idparts
current_sensor["metadata"] = url + "/Sensors('" + current_sensor["@iot.id"] + "')/properties"

print(current_sensor)

p = requests.post(url + "/Sensors",json=current_sensor)

if (p.status_code  == 201):
    print("Creation successful")
else:
    print("Error:", p.status_code)
    for chunk in p.iter_content(chunk_size=128):
        print(chunk)

{'name': 'Generic nephelometry and beta attenuation Sensor', 'description': 'Sensor Instance for unknown Sensors that use nephelometry and beta attenuation as measurement technique principle', 'encodingType': 'application/json', 'properties': {'datasheet.url': '', 'manufacturer.domain': '', 'shortname': 'generic_nephelometry_and_beta_attenuation_sensor'}, '@iot.id': 'saqn:s::generic_nephelometry_and_beta_attenuation_sensor', 'metadata': "http://193.196.38.108:8080/FROST-Server/v1.0/Sensors('saqn:s::generic_nephelometry_and_beta_attenuation_sensor')/properties"}
Error: 500
b'{\n  "code" : 500,\n  "type" : "error",\n  "message" : "Failed to store data."\n}'


  This is separate from the ipykernel package so we can avoid doing imports until


## Datastream

In [27]:
current_datastream = {
    "name": current_obsprop["name"] + " Measurements of Umweltbundesamt's " + current_station_thing["name"],
    "description": "Datastream for " + current_obsprop_meta_dict["component name"],
    "unitOfMeasurement": {
        "name": "Microgram per Cubic Meter", 
        "symbol": "ug/m3",
        "definition": "http://unitsofmeasure.org/ucum.html"
    },
    "observationType": "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement",
    "properties": {
        "operator.domain": "umweltbundesamt.de",
        "hardware.serial_number": "",
        "license": {
            "name": "CC BY-NC-ND 4.0",
            "legal_notice": "https://www.umweltbundesamt.de/datenschutz-haftung",
            "url": "https://creativecommons.org/licenses/by-nc-nd/4.0/"
        }
    },
    "ObservedProperty": {"@iot.id": current_obsprop["@iot.id"]},
    "Sensor": {"@iot.id": current_sensor["@iot.id"]},
    "Thing": {"@iot.id": current_station_thing["@iot.id"]},
}

current_datastream_idparts = current_thing_idparts + ":" + current_sensor_idparts + ":" + current_datastream["properties"]["hardware.serial_number"] + ":" + current_obsprop["properties"]["shortname"]

current_datastream["@iot.id"] = "saqn:ds:" + hashfunc(current_datastream_idparts)

print(current_datastream)

p = requests.post(url + "/Things('" + current_station_thing["@iot.id"] + "')/Datastreams",json=current_datastream)

if (p.status_code  == 201):
    print("Creation successful")
else:
    print("Error:", p.status_code)
    for chunk in p.iter_content(chunk_size=128):
        print(chunk)

{'name': "PM10 Mass Concetration Measurements of Umweltbundesamt's Station Augsburg/Bourges-Platz", 'description': 'Datastream for Particulate matter', 'unitOfMeasurement': {'name': 'Microgram per Cubic Meter', 'symbol': 'ug/m3', 'definition': 'http://unitsofmeasure.org/ucum.html'}, 'observationType': 'http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement', 'properties': {'operator.domain': 'umweltbundesamt.de', 'hardware.serial_number': '', 'license': {'name': 'CC BY-NC-ND 4.0', 'legal_notice': 'https://www.umweltbundesamt.de/datenschutz-haftung', 'url': 'https://creativecommons.org/licenses/by-nc-nd/4.0/'}}, 'ObservedProperty': {'@iot.id': 'saqn:op:mcpm10'}, 'Sensor': {'@iot.id': 'saqn:s::generic_nephelometry_and_beta_attenuation_sensor'}, 'Thing': {'@iot.id': 'saqn:t:umweltbundesamt.de:station_augsburg_bourges-platz:deby007'}, '@iot.id': 'saqn:ds:d7fbdb6'}
Error: 500
b'{\n  "code" : 500,\n  "type" : "error",\n  "message" : "Failed to store data."\n}'


## Observations

In [28]:
current_scope = "2" #hourly averages

start_date=current_station["station active from"]
start_result_time="1"
end_date=datetime.strftime(datetime.now(),"%Y-%m-%d")
end_result_time=datetime.strftime(datetime.now(),"%H")

data=json.loads(requests.get("https://www.umweltbundesamt.de/api/air_data/v2/measures/json?date_from=" + start_date + "&time_from=" + start_result_time + "&date_to=" + end_date + "&time_to=" + end_result_time + "&station=" + current_station["station id"] + "&component=" + current_component + "&scope=" + current_scope).text)

val_index=data["indices"]["data"]["station id"]["date start"].index("value")
end_index=data["indices"]["data"]["station id"]["date start"].index("date end")

#function todatetimeUTCstring converts to pandas datetime, converts CET to UTC, then outputs ISO string
list_of_results=list(map(lambda x: {"result":data['data'][current_station["station id"]][x][val_index],"resultTime":todatetimeUTCstring(data['data'][current_station["station id"]][x][end_index]) ,"phenomenonTime": todatetimeUTCstring(x) + "/" + todatetimeUTCstring(data['data'][current_station["station id"]][x][end_index])},data['data'][current_station["station id"]].keys()))

In [29]:
fails = 0
totalobs = len(list_of_results)

for thisresult in list_of_results:
    thisobs = thisresult
    thisobs["@iot.id"] = "saqn:o:" + hashfuncfull(current_datastream_idparts + ":" + idstr(thisresult["phenomenonTime"]))
    p = requests.post(url + "/Datastreams('" + current_datastream["@iot.id"]  + "')/Observations",json.dumps(thisobs))
    if (p.status_code  != 201):
        fails += 1
    sys.stdout.write("Writing " + str(list_of_results.index(thisresult)) + " out of " + str(totalobs) + "\r")
print(str(fails) + " out of " + str(totalobs) + " could not be created.")
print("Datastream has now " + str(json.loads(requests.get(url + "/Datastreams('" + current_datastream["@iot.id"]  + "')/Observations?$count=true").text)["@iot.count"]) + " Observations.")

6278 out of 6710 could not be created.
Datastream has now 6710 Observations.
