In [1]:
"""
Description: This module contains the configuration for the data points of the example service
Author: Martin Altenburger
"""
import os
from loguru import logger
from filip.models.base import FiwareHeader
from filip.clients.ngsi_v2 import IoTAClient, ContextBrokerClient
from filip.models.ngsi_v2.iot import ServiceGroup
from filip.models.ngsi_v2.subscriptions import Subscription
from filip.models.ngsi_v2.context import ContextEntity, NamedContextAttribute, NamedCommand
from filip.models.ngsi_v2.base import NamedMetadata
from filip.models.base import DataType
from dotenv import load_dotenv

The following environment variables are required. Please provide them in the [.env](.env) file:
```
FIWARE_IOTA= ["http://localhost:4041"]      # URL of the IoT Agent
FIWARE_CB= ["http://localhost:1026"]        # URL of the Context Broker
FIWARE_SERVICE= ["example_service"]         # Name of the FIWARE Service
FIWARE_SERVICE_PATH= ["/"]                  # FIWARE Service Path, usually "/"
```

You must have access to the required Fiware platform.

In [2]:
load_dotenv()

True

In [3]:



fiware_header = FiwareHeader(service=os.getenv("FIWARE_SERVICE"),
                             service_path=os.getenv("FIWARE_SERVICE_PATH"))

iota_client = IoTAClient(url=os.getenv("FIWARE_IOTA"),
                         fiware_header=fiware_header)
cb_client = ContextBrokerClient(url=os.getenv("FIWARE_CB"),
                                fiware_header=fiware_header)


def create_service():
    """
    Function to create a service group for the example service
    """
    groups = iota_client.get_group_list()
    for group in groups:
        if (group.service == os.getenv("FIWARE_SERVICE")
            and group.subservice == os.getenv("FIWARE_SERVICE_PATH")):
            logger.info("Service group already exists")
            return False

    service_group = ServiceGroup(service=os.getenv("FIWARE_SERVICE"),
                                 subservice=os.getenv("FIWARE_SERVICE_PATH"),
                                 apikey=os.getenv("FIWARE_SERVICE"),
                                 entity_type="example_service",
                                 resource="/iot/json",
                                 explicitAttrs=True,
                                 autoprovision=False)

    iota_client.post_group(service_group=service_group,
                           update=True)
    
    return True

def create_subscription():
    """
    Function to create a subscription for the example service -- if needed
    
    The subscription is used to notify at all changes of all components
    """
    subscription = Subscription(
        description="Notification of all changes",
        subject={
            "entities": [
                    {
                        "idPattern": ".*",
                        "type": "example_service"
                    }
                ],
                "condition": {
                    "attrs": []
                }
        },
        notification={
            "http": {
                "url": os.getenv("FIWARE_QL") + "/v2/notify"
            },
            "attrs": [],
            "metadata": ["TimeInstant"],
            "onlyChangedAttrs": False
        }
    )

    cb_client.post_subscription(subscription=subscription)
    
def create_entities():
    """
    Function to create the required entities for the example service
    """
    entities = [
        {
            "id": "urn:thermal_storage:01",
            "type": "thermal_storage",
            "attributes": [
                {
                    "id": "temperature__measured",
                    "type": "Number",
                    "value": 49,
                    "metadata": {
                        "unitCode":"CEL"
                        }
                },
                {
                    "id": "temperature__setpoint",
                    "type": "Number",
                    "value": 60,
                    "metadata": {
                        "unitCode":"CEL"
                        }
                }
            ],
            "cmds": []
        },
        {
            "id": "urn:heater:01",
            "type": "heater",
            "attributes": [{
                    "id": "power__rated",
                    "type": "Number",
                    "value": 15,
                    "metadata": {
                        "unitCode":"KWT"
                        }
                }],
            "cmds": [
                {
                    "id": "operation__setpoint",
                    "value": 0
                }
            ]
        }
    ]
    
    for entity_raw in entities:
        entity = ContextEntity(id=entity_raw["id"],
                               type=entity_raw["type"])
        
        attrs = []
        if "attributes" in entity_raw and len(entity_raw["attributes"])>0: 
            for attribute in entity_raw["attributes"]:
                
                metadata = []
                for key, value in attribute["metadata"].items():
                    metadata.append(NamedMetadata(name=key, value=value, type="Text"))
                
                attrs.append(NamedContextAttribute(name=attribute["id"],
                                                type=attribute["type"],
                                                value=attribute["value"],
                                                metadata=metadata))
        if "cmds" in entity_raw and len(entity_raw["cmds"])>0:    
            for cmd in entity_raw["cmds"]:
                attrs.append(NamedCommand(name=cmd["id"],
                                          type=DataType.COMMAND,
                                          value=cmd["value"]))
        
        entity.add_attributes(attrs)
        
        cb_client.post_entity(entity=entity,
                              update=True)


def prepare_service():
    """
    Function to prepare the service, entitys and subscriptions
    """
    logger.info("Start creating service and subscription")
    new_service = create_service()
    if new_service is False:
        logger.info("Service is already available")
        
    create_entities()


prepare_service()

2025-03-26 12:33:29.141 | INFO     | __main__:prepare_service:145 - Start creating service and subscription


In [4]:
cb_client.get_entity(entity_id="urn:heater:01")

ContextEntity(id='urn:heater:01', type='heater', operation__setpoint=ContextAttribute(type='command', value=0, metadata={}), power__rated=ContextAttribute(type='Number', value=15.0, metadata={'unitCode': Metadata(type='Text', value='KWT')}))