# Experiment with Azure Digital Tween Service
[Location](https://explorer.digitaltwins.azure.net/?tid=76a2ae5a-9f00-4f6b-95ed-5d33d77c4d61&eid=digitaltwins.api.weu.digitaltwins.azure.net)

## Set up

### Create Service Instance
Create instance as described [here](https://learn.microsoft.com/en-gb/azure/digital-twins/quickstart-azure-digital-twins-explorer)

### Define models
To initial define set of models (ontology) was used Office 365 copilot and [prompt](prompts/Define%20models.txt). Some extra tuning was required.
Models are stored in `ontology` folder.
Then models were uploaded to service.

## Gen AI
Utilise GenAI to work with toplogy
Add, update, remove digital twin to the graph.

### Init

In [1]:
import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# Access the environment variables
digital_twin_endpoint = os.getenv("AZURE_DIGITAL_TWINS_ENDPOINT")

### Define Tools

In [2]:
from azure.digitaltwins.core import DigitalTwinsClient
from azure.identity import DefaultAzureCredential
from langchain.pydantic_v1 import BaseModel, Field
from langchain_core.tools import tool
import json

credential = DefaultAzureCredential()

class ModelDefinitionInput(BaseModel):
    model_type: str = Field(description="Model type that definition is required. For example: ControlHub or TemperatureSensor")

@tool("get-module-definition", args_schema=ModelDefinitionInput)
def get_module_definition(model_type:str) -> str:
    """
        Get the model definition in OTDL format and some usefull instructuion after.
        If model name is not valid, the tool returns NA it means that execution is failed.
    """
    if model_type == "ControlHub":
        with open('ontology/ControlHub.json', 'r') as file:
            definition = file.read()

        instructions = """
        IMPORTANT NOTES:
        1. The model id in $metadata section is dtmi:com:example:ControlHub;1
        2. For creation new digital twin it must be connected to CentralUnit twin and this information must be reflected into request to update topology.
            For this operation ID of CentalUnit twin must be queried through Azure Digital Twins Query API. 
            $relationshipName should be "connects"
        """
        return definition + instructions
    else:
        return "NA"


class UpdateTopologyInput(BaseModel):
    twin_id: str = Field(description=
    """ This is mandatoty parameter!
    The unique identifier of the digital twin that is going to be changed or created.
    The identifier passed here must respect name convention for Azure Digital Twins Id
    (alphabetical, numbers, underscore, up to 64 symbols)
    """)
    twin_data: str = Field(description=
    """ This is mandatoty parameter!
    The format has to be Dictionary[string, object] represented as type string
    The dictionary represent digital tween to be sent to the topology.
    The format should be in Azure Digital Twins Definition Language (DTDL).
    
    IMPORTANT! The metadata section should contain id of the model, for example if
    model id is dtmi:com:example:ControlHub;1 metadata section should looks like:
    "$metadata": {
        "$model": "dtmi:com:example:ControlHub;1"
    },

    The format must be valid to be sent as twin_data in following below example.
        
    from azure.digitaltwins.core import DigitalTwinsClient
    
    client = DigitalTwinsClient(digital_twin_endpoint, credential)
    client.upsert_digital_twin(twin_id, twin_data)
    """)

@tool("update-topology-tool", args_schema=UpdateTopologyInput)
def update_topology(twin_id, twin_data)->str:
    """
        Update the topology of Azure Digital Twins Service.
        This function should be used when you want to:
            - create new digital twin
            - remove a digital twin
            - update parameters for existed digital twin.
    """
    
    # Create an instance of the DigitalTwinsClient
    client = DigitalTwinsClient(digital_twin_endpoint, credential)
    
    # Send data to the topology
    try:
        twin_data_dict = json.loads(twin_data)
        client.upsert_digital_twin(twin_id, twin_data_dict)
        return "SUCCESS"
    except Exception as e:
        print("Error sending data to Azure Digital Twins service topology:", str(e))
        return "FAIL"

class QueryTwinInput(BaseModel):
    query: str = Field(description=
    """ This is required parameter!
        Query in Azure Digital Twins Query API format that is going to be executed on Azure Digital Twin Service.
         
        For example: You need to know the details of ControlHub twin.
        From topology you may get model definition for ControlHub iand from definition understand that id is dtmi:com:example:ControlHub;1
        Thaen you can format request as:
        SELECT * FROM digitaltwins T WHERE IS_OF_MODEL(T, 'dtmi:com:example:ControlHub;1')

        The request returns all description of all twins with type ControlHub.
    """)

@tool("query-twin", args_schema=QueryTwinInput)
def query_twin(query:str) -> str:
    """
        Request information about twins in Azure Digital Twins Service.
        This function should be used whan you need to get information about particular twin or several items.

        return: response from Azure Digital Twins Query API in JSON format.
    """
    if (query == None):
        return "FAIL: Query is not defined!"
    
    client = DigitalTwinsClient(digital_twin_endpoint, credential)

    # Execute the query
    query_result = client.query_twins(query)

    # Convert the results to JSON
    json_result = json.dumps(list(query_result))

    # Return the JSON result
    return json_result


tools = [get_module_definition, update_topology, query_twin]

In [None]:
import json
def query_twin() -> str:
    client = DigitalTwinsClient(digital_twin_endpoint, credential)

    # Define your query
    query = "SELECT * FROM digitaltwins T WHERE IS_OF_MODEL(T, 'dtmi:com:example:CentralUnit;1')"

    # Execute the query
    query_result = client.query_twins(query)

    # Print the results
    #for twin in query_result:
    #    print(twin)

    return json.dumps(list(query_result), indent=4)

r = query_twin()

print(r)

### Configure LLM
Configure Azure Open AI

In [28]:
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_openai import AzureChatOpenAI
from langchain_core.messages import AIMessage
from langchain_core.runnables import Runnable

model = AzureChatOpenAI(
    openai_api_version=os.environ["AZURE_OPENAI_API_VERSION"], # 2024-02-15-preview
    azure_deployment=os.environ["AZURE_OPENAI_DEPLOYED_NAME"], # gpt-4o
    temperature=0.1
)

### Prompt engineering

In [29]:
from langchain_core.prompts import ChatPromptTemplate

system_message = SystemMessage(
    content="""
    You are helpfull bot that is going to help with creation topology in Azure Digital Twins Service.
    You should not answer with general information, you must help with topology by user request.
    The service define several models as follow:

    - Model name: CentralUnit - which is main unit of the system exists in one instance. 
    If user want to create new CentralUnit, you must refuse it with message: "Central Unit has to be created manually!"

    - Modle name: ControlHub - which is connection unit between CentralUnit and TermoSensors. You need OTDL definition to generate valid instructions.
    If user want to create new ControlHub you must use tool and provide valid parameters. 
    
    
    If user want update or delete exsted twin then user must specify twin id. If id is not provided, you must refuse it with message: "DONE with error Twin Id is required!"
    If you was able to perform requested action, you must provide message: "DONE with success"
    """
)

prompt = ChatPromptTemplate.from_messages(
    [
        system_message,
        ("human", "{input}"),
        ("placeholder", "{agent_scratchpad}"),
    ]
)

### Execution

Is used ReAct approach that let LLM to think what steps should be done to achieve task and what tool is better fit this needs.
Several agent were cinsidered from [available](https://python.langchain.com/v0.1/docs/modules/agents/agent_types/)

#### Add Control Hub

In [34]:
human_message = HumanMessage(
    content="Please create Control Hub with following parameters: Name: Hub_2, Location: Kyiv, Status: on."
)

#### Add Temperature Sensor

In [None]:
print("hello world!")

#### Start ReAct

In [35]:
# from langchain import hub
from langchain.agents import AgentExecutor, create_tool_calling_agent

agent = create_tool_calling_agent(model, tools, prompt)

agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)

result = agent_executor.invoke({"input": human_message.content})




[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m
Invoking: `get-module-definition` with `{'model_type': 'ControlHub'}`


[0m[36;1m[1;3m{
    "@id": "dtmi:com:example:ControlHub;1",
    "@type": "Interface",
    "displayName": "Control Hub",
    "contents": [
        {
            "@type": "Property",
            "name": "name",
            "schema": "string",
            "writable": true,
            "displayName": "Name",
            "description": "Name configured by user"
        },
        {
            "@type": "Property",
            "name": "location",
            "schema": "string",
            "writable": true,
            "displayName": "Location",
            "description": "Location of the sensor specified by user"
        },
        {
            "@type": "Property",
            "name": "status",
            "schema": {
                "@type": "Enum",
                "valueSchema": "string",
                "enumValues": [
                    {"name": "on"

In [8]:
# Create an instance of the DigitalTwinsClient
client = DigitalTwinsClient(digital_twin_endpoint, credential)

def upd_twin(twin_id, twin_data):
    # Send data to the topology
    try:
        twin_data_dict = json.loads(twin_data)
        client.upsert_digital_twin(twin_id, twin_data_dict)
        return "SUCCESS"
    except Exception as e:
        print("Error sending data to Azure Digital Twins service topology:", str(e))
        return "FAIL"
    
def upd_rel(twin_id, relation_id, relation_data):
    try:
        relation_data_dict = json.loads(relation_data)
        client.upsert_relationship(twin_id, relation_id, relation_data_dict)
        return "SUCCESS"
    except Exception as e:
        print("Error sending data to Azure Digital Twins service topology:", str(e))
        return "FAIL"

twin_id = "HUB_4"
twin_data = """
    {
        "$metadata":{
            "$model":"dtmi:com:example:ControlHub;1"
        },
        "name":"Hub_4_name",
        "location":"Kyiv",
        "status":"on",
        "$relationships":{
            "connects":{
                "$targetId":"CentralUnit",
                "$relationshipName":"connects"
            }
        }
    }
"""

twin_id_2 = "CentralUnit"
relation_id="relationship1id"
relation_data = """
{
    "$relationshipId": "relationship1id",
    "$sourceId": "CentralUnit",
    "$relationshipName": "connects",
    "$targetId": "HUB_4",
    "$metadata": {
        "$model": "dtmi:com:example:CentralUnit;1"
    }
}
"""


In [52]:
upd_twin(twin_id, twin_data)

'SUCCESS'

In [9]:
upd_rel(twin_id_2, relation_id, relation_data)