In [None]:
# Code to upload DMI data to scorpio_broker
# Descrition: the code takes required information (API key, staion id, feature etc) for dowloading data from DMI and saves them as json file format
############## generate an access token to enter scropio_broker and update the data

# By Akanksha Upadhyay (akankshaupadhyayiitkgp@gmail.com)
# date: 22/march/2025


In [None]:
import requests
import pandas as pd
import json
from datetime import datetime
import os
from dotenv import load_dotenv
load_dotenv(override=True)

In [None]:
# Define the API call to fetch data

url = "https://dmigw.govcloud.dk/v2/climateData/collections/stationValue/items?"
# param = "min_temp"          ## check the param list from DMI, add more if want

application_key = os.getenv("application_key")
station_ID = os.getenv("station_ID")        # check the station list from DMI

start_time = os.getenv("start_time")
end_time = os.getenv("end_time")


broker_url = os.getenv("broker_url")
payload_token = os.getenv("payload_token")
ngsild_tenant = os.getenv("ngsild_tenant")


In [None]:
##  getting response for data using above parameters
res = requests.get(url, params={"api-key": application_key, "stationId": station_ID, "datetime": f"{start_time}/{end_time}"})
print(res)

In [None]:
# Get the JSON response
response_json = res.json()

df = pd.json_normalize(response_json['features'])

In [None]:
# Filter the DataFrame for the required features

selected_features = ['bright_sunshine', 'max_temp_w_date', 'min_temp', 'acc_precip']
df = df[df['properties.parameterId'].isin(selected_features)]

In [None]:
# Ensure proper GeoJSON format for geometry
if 'geometry.coordinates' in df.columns:
    df['geometry'] = df.apply(
        lambda row: {
            "type": "Point",
            "coordinates": row["geometry.coordinates"]
        } if isinstance(row["geometry.coordinates"], list) else None,
        axis=1
    )

In [None]:
# Fix the observedAt format to ISO 8601 without microseconds
def fix_datetime_format(date_str):
    try:
        # Parse date-time string and drop microseconds
        dt = datetime.fromisoformat(date_str.replace("Z", "+00:00"))  # Handle 'Z' for UTC
        return dt.isoformat(timespec='seconds').replace("+00:00", "Z")  # Convert back to ISO 8601
    except ValueError:
        return None  # Return None if parsing fails

df['properties.calculatedAt'] = df['properties.calculatedAt'].apply(fix_datetime_format)


In [None]:
# Build the payload in the expected format for updating entities
payload = []
for _, row in df.iterrows():
    entity = {
        "id": f"urn:ngsi-ld:WeatherStation:{row['properties.stationId']}",
        "type": "WeatherObservation",
        row['properties.parameterId']: {  # Dynamically add the parameter as an attribute
            "type": "Property",
            "value": row["properties.value"]
        },
        "geometry": {
            "type": "GeoProperty",
            "value": row["geometry"]
        },
        "observedAt": row["properties.calculatedAt"],  # Reformatted datetime
        "@context": [
            "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context.jsonld"
        ]
    }
    payload.append(entity)

In [None]:
# Save the payload as a JSON file for review
output_file_path = r"output_new.json"
with open(output_file_path, "w") as file:
    json.dump(payload, file, indent=4)
print(f"Filtered JSON file saved at {output_file_path}")


In [None]:
# Obtain the access token
token_url = "https://keycloak.prod.os2iot.kmd.dk/realms/master/protocol/openid-connect/token"

headers = {
    'Content-Type': 'application/x-www-form-urlencoded'
}
token_response = requests.post(token_url, headers=headers, data=payload_token)
if token_response.status_code != 200:
    print(f"Error fetching token: {token_response.status_code}, {token_response.text}")
    exit()

access_token = token_response.json()["access_token"]
#print(f"Access Token: {access_token}")


In [None]:
# Use the update endpoint to update existing entities

# Load the generated JSON file as payload
with open(output_file_path, "r") as file:
    payload = json.load(file)

# Define the headers for the update request
headers = {
    'Content-Type': 'application/ld+json',
    'ngsild-tenant': ngsild_tenant,
    'Authorization': f'Bearer {access_token}'
}



In [None]:
# Split the payload into smaller batches
# Run in batches because the server shut downs the connection in some time

batch_size = 10  # Number of entities per batch
retries = 5  # Number of retries for each batch

for i in range(0, len(payload), batch_size):
    payload_batch = payload[i:i + batch_size]  # Create a batch
    for attempt in range(retries):  # Retry logic
        response = requests.request("POST", broker_url, headers=headers, json=payload_batch)
        # response = requests.post(broker_url, headers=headers, json=payload_batch)
        if response.status_code == 204:  # Success with no content
            print(f"Batch {i // batch_size + 1} processed successfully (204 No Content).")
            break
        elif response.status_code == 504:  # Gateway timeout
            print(f"504 error for batch {i // batch_size + 1} on attempt {attempt + 1}. Retrying...")
            time.sleep(2 ** attempt)  # Exponential backoff
        else:  # Handle other unexpected errors
            print(f"Error for batch {i // batch_size + 1}: {response.status_code}, {response.text}")
            break