# Create New Scenario and Upload
To create new scenario and upload to the Payload Server throught the API, follow this example.


#### Change Log
- 2024-09-13 v2.2
    - Update tag creation function and rename to handle_tags_creation_and_get_ids,
    - add openDrives request to show available openDrives.
    - move description assignment to front,
    - modify scenario deletion with scenario uuid.
- 2024-09-12 v2.1
    - Modify scenario create request body `id` -> `scenarioId`,
    - call create_tags_if_not_exists in code,
    - add KPIs request to show available KPIs.
- 2024-09-12 v2.0
    - Modify Condition Definitions,
    - add scenario id section,
    - add KPI instructions,
    - add route request to show available routes,
    - modify record agend to use many text instead of array of text,
    - add change log, prerequisition, configuration sections into this file,
    - add delete section.
- 2024-07-05 v1.0
    - Init Version

#### Prerequisition
- python 3.x

#### Configuration
1. Add an `.env` in the same folder.
2. Find user in http://172.30.1.139:3000/admin/collections/users?limit=10
3. Enable API Key if not already enabled and copy it.
4. In the `.env` file, add a line `PAYLOAD_API_KEY={YOUR_API_KEY}`.



In [None]:
%pip install python-dotenv

In [None]:
import os
import requests
import json
from typing import List
import dotenv
dotenv.load_dotenv()
base_url = "http://172.30.1.139:3000/api"

## Auth

In [None]:
user_api_key = os.getenv("PAYLOAD_API_KEY")
print(user_api_key)
headers = {
    "Authorization": f"users API-Key {user_api_key}",
}
response = requests.get(f"{base_url}/users/me", headers=headers)
if response.status_code == 200:
    print(f"User: {response.json()['user']['name']}, role: {response.json()['user']['role']}")


## Scenario ID

In [None]:
scenario_id = f"create_new_scenario_example_code"
description = f"A new scenario created by example code"

## Create Tags If Not Exists

In [None]:


tags_to_be_used_in_created_scenario = [
    "behavior:cut-in",
    "party:showay",
    "deliver:2024Aug",
    "src:cetran",
    "field:hct"
]

def handle_tags_creation_and_get_ids(tags: List[str]):
    """return a dictionary of tag name to tag id for the given list of tags. If a tag does not exist, it will be created.
    Args:
        tags (List[str]): list of tags to be created
    Returns:
        dict: dictionary of tag name to tag id
    Raises:
        Exception: if failed to create a tag
        KeyError: if a tag is not succesfully created
    """
    tags_doc = requests.get(
        f"{base_url}/tags?depth=1&limit=1000000",
        headers=headers).json()["docs"]
    all_tags = dict( [ (tag["name"], tag["id"]) for tag in tags_doc ] )

    for tag in tags:
        if tag not in all_tags:
            try:
                response = requests.post(
                    f"{base_url}/tags",
                    headers=headers,
                    json={
                        "name": tag,
                    }
                )
                if response.status_code == 201 and response.json().get("message") == "Tag successfully created.":
                    print(f"Tag {tag} created successfully. ID: {response.json()['doc']['id']}")
                    all_tags[tag] = response.json()["doc"]["id"]
            except Exception as e:
                raise f"Failed to create tag {tag}\n{e}"

    tags = dict( [ (tag, all_tags[tag]) for tag in tags ] )
    tag_ids = [ all_tags[tag] for tag in tags ]
    print(f"tags: {tags}")
    print(f"tag_ids: {tag_ids}")
    return tag_ids

tag_ids_to_be_used_in_created_scenario = handle_tags_creation_and_get_ids(tags_to_be_used_in_created_scenario)

**NOTE**

`tag_ids_to_be_used_in_created_scenario` is now a list of UUIDs.

## Parameters
The parameters and ranges to be searched.

In [None]:
parameters = [  # Should be the same name using in the OpenSCENARIO file
    {
        "name": "TargetSpeed",
        "unit": "m/s",
        "min": 20,
        "max": 40,
    },
    {
        "name": "TargetLateralOffset",
        "unit": "m",
        "min": 1,
        "max": -1,
    },
    {
        "name": "TargetS",
        "unit": "m",
        "min": -5,
        "max": 5,
    }
]

## OpenDRIVE File
Assign used OpenDRIVE here.

In [None]:
# Here we get all available OpenDRIVE files, and assigne one.

response = requests.get(f"{base_url}/openDrives/", headers=headers)
if response.status_code != 200:
    print("API is not working")
available_opendrives = dict( [ (opendrive["filename"], opendrive["id"]) for opendrive in response.json()["docs"] ] )
print(json.dumps(available_opendrives, indent=4))

opendrive_id = available_opendrives["hct_6.xodr"]


## OpenSCENARIO File
2 methods to upload the openSCENARIO file

#### 1. Upload with xml strings
The field should be
```json
openScenarioField = {
    "type": "String",
    "content": "<openSCENARIO> <!-- Paste open scenario xml string here. --!> <openSCENARIO/>"
}
```

#### 2. Upload with a file
The field should be
```json
openScenarioField = {
    "type": "File",
    "openScenario": "OpenScenarioID Here"
}
```
To get the openScenarioID, you'll have to first upload the file to the endpoint `base_url/api/openScenarios`, get the id from the response, then use it in the openScenarioField

In [None]:
def use_scenario_string(content):
    return {
        "type": "String",
        "content": content,
    }

In [None]:
## Upload openScenario
def upload_openscenario(file_path):
    with open(file_path, "rb") as f:
        files = {
            "file": ("hct_04.xosc", f, "application/octet-stream")
        }
        r = requests.post(
            f"{base_url}/openScenarios",
            headers=headers,
            files=files,
        )
    open_scenario_id = r.json()["doc"]["id"]
    print(json.dumps(r.json(), indent=4))
    return {
        "type": "File",
        "openScenario": open_scenario_id
    }

## Routes
A route is the path on the semantic map that the ego vehicle should be following.

A route should be assigned to the scenario. This will be used to inform the autonomous system to switch route.

To add a route, please use the GUI [here](http://172.30.1.139:3000/admin/collections/routes?limit=10) or use the default route.

Then assign the ID of the route.


In [None]:
# Here we list available routes.
response = requests.get(f"{base_url}/routes/")
if response.status_code != 200:
    print("API is not working")
available_routes = dict([(doc["name"], doc["id"]) for doc in response.json()["docs"]])
print(f"Available routes: {list(available_routes)}")

In [None]:
chosen_route = "hsinchu_eight3"  # choose from available routes above.
route = available_routes[chosen_route]

## Tag Tree
Tag Tree follows the CETRAN format, with camel-style strings. Please refer to the example below.

In [None]:
tag_tree = {
    "ego": {
        "vehicleLongitudinalActivity": {
            "mode": "drivingForward",
            "drivingForwardMode": "cruising"
        },
        "vehicleLateralActivity": {
            "mode": "goingStraight"
        }
    },
    "actors": [
    {

        "vehicleLongitudinalActivity": {

            "mode": "drivingForward",
            "drivingForwardMode": "cruising"

        },
        "vehicleLateralActivity": {

            "mode": "goingStraight"

        },
        "initialState": {

            "direction": "oncoming",
            "dynamics": "standingStill",
            "lateralPosition": "leftOfEgo",
            "longitudinalPosition": "inFrontOfEgo"

        },
        "leadVehicle": {

            "mode": "appearing",
            "appearingMode": "gapClosing"

        },
    }
    ],
    "roadLayout": {
        "mode": "junction",
        "junctionMode": "noTrafficLight"
    }
}

## Test Objective
Is the key to evaluate whether a scenario is good enough.
Find all evaluation target at [keyPerformanceIndicators](http://172.30.1.139:3000/admin/collections/keyPerformanceIndicators?limit=1000).
Or please add new evaluation targets and tell how should I implement it.

#### Note

If you need other KPIs, please inform somebody from ITRI's team.

New KPI means extra implementations in main runner.

##### Collision Less Than
##### Max Deceleration Less Than
##### Minimum Space Between Ego and Agents
##### Pass Time Less Than
- Execution Time:
    - Execution time must be less than the Pass Time for the trial to be considered success.
    - The start of execution time depends on the scenario:
        - If "Start Observation Sampling Conditions" is defined in the scenario, execution time starts at this moment.
        - Otherwise, execution time starts when the ego begins moving.

- Scenario Timeout:
    - If the scenario doesn't end within (PassTimeLessThan + 2) seconds from the execution time start, the trial is considered failed and will be stopped.
    - This feature prevents scenarios where the ego might become stuck at a certain point.


In [None]:
# Here we list available KPIs.
response = requests.get(f"{base_url}/keyPerformanceIndicators/", headers=headers)
if response.status_code != 200:
    print("API is not working")
available_kpis = dict([(doc["name"], doc["id"]) for doc in response.json()["docs"]])
print(f"Available KPIs: {list(available_kpis)}")

In [None]:
# Remove KPIs that are not needed

test_objective = {
    "criticalityMetrics": [
        {
            "keyPerformanceIndicator": available_kpis["Pass Time Less Than"],  # threshold should always be 0
            "threshold": 0,
        },
        {
            "keyPerformanceIndicator": available_kpis["Minimum Space Between Ego and Agents"],
            "threshold": 2,
        },
        {
            "keyPerformanceIndicator": available_kpis["Max Deceleration Less Than"],
            "threshold": 10,
        },
        {
            "keyPerformanceIndicator": available_kpis["Collision Less Than"],  # threshold should always be 0
            "threshold": 0,
        }
    ]
}

## Conditions
Conditions section was modified for more flexibility when using multiple conditions.

### Valid Conditions
Only if these conditions are triggered will the trial be considered valid.
It should be the condition name used in the provided OpenSCENARIO.

In [None]:
valid_conditions = {
    "conditionLogic": "And",  # Available options: "And", "Or"
    "conditions": [
        "Condition A"
    ]
}

### Fail Conditions
If these conditions are triggered, the trial will stop right away.

Note that this will not mark the trial fail if the Key Performance Indicator doesn't contains a specific item.


In [None]:
fail_conditions = {
    "conditionLogic": "Or",  # Available options: "And", "Or"
    "conditions": [
        "Condition B", "Condition C"
    ]
}


### End Conditions
If these conditions are triggered, the trial will stop right away.

Note that this will not mark the trial success. It simply stop the trial.

The success/fail will only depends on `KPI`s.


In [None]:
end_conditions = {
    "conditionLogic": "Or",  # Available options: "And", "Or"
    "conditions": [
        "Condition D", "Condition E"
    ]
}


### Start ObservationSampling Conditions
After theses events start, the scenario will be recorded.

If KPI `Pass Time Less Than` is specified, this condition will be used to start the timer.

Otherwise, the timer will start after the ego has speed other than 0.

In [None]:
start_conditions = {
    "conditionLogic": "And",  # Available options: "And", "Or"
    "conditions": [
        "Condition F", "Condition G", "Condition H"
    ]
}


In [None]:
conditions = {
    "validConditions": valid_conditions,
    "failConditions": fail_conditions,
    "endConditions": end_conditions,
    "startObservationSamplingConditions": start_conditions
}

## Observation Recording Agents
The agents that should be recorded.

Left empty to record all agents.

In [None]:
observation_recording_agents = [
    "Agent a", "Agent b", "Agent C"
]

## Construct Request Body

In [None]:
def create_request_body(
            scenarioId: str,
            tags: List[str],
            description: str,
            parameters: List[dict],
            openDrive: str,
            openScenario: str,
            usedRoute: str,
            tagTree: dict,
            testObjectives: dict,
            conditions: dict,
            observationRecordingAgents: List[dict],
            egoTargetSpeed: float
        ):
    return {
        "scenarioId": scenarioId,
        "tags": tags,
        "description": description,
        "parameters": parameters,
        "openDrive": openDrive,
        "openScenarioField": openScenario,
        "usedRoute": usedRoute,
        "tagTree": tagTree,
        "testObjectives": testObjectives,
        "conditions": conditions,
        "observationRecordingAgents": observationRecordingAgents,
        "egoTargetSpeed": egoTargetSpeed
    }

In [None]:
# openScenarioField = upload_openscenario("/resources/xosc/itri/hct_04.xosc")
openScenarioField = use_scenario_string("<OpenSCENARIO> <!-- some scenario --> </OpenSCENARIO>")
import uuid
data = create_request_body(
    scenarioId = scenario_id,
    tags = tag_ids_to_be_used_in_created_scenario,
    description = description,
    parameters = parameters,
    openDrive = opendrive_id,
    openScenario = openScenarioField,
    usedRoute = route,
    tagTree = tag_tree,
    testObjectives = test_objective,
    conditions = conditions,
    observationRecordingAgents = observation_recording_agents,
    egoTargetSpeed = 40
)

In [None]:
r = requests.post(f"{base_url}/scenarios", headers=headers, json=data)
try:
    print(json.dumps(r.json(), indent=4))
    scenario_uuid = r.json()["doc"]["id"]
except Exception as e:
    print(e)
except:
    print(r.text)

## DANGER!!!

## DANGER!!!

## DANGER!!!

## DANGER!!!

## DANGER!!!

## DANGER!!!

## DANGER!!!

## DANGER!!!

## DANGER!!!

## DANGER!!!

## TO DELETE A SCENARIO

In [None]:


import requests
# Construct the URL
print(f"Deleting scenario {scenario_uuid}")
url = f'{base_url}/scenarios/{scenario_uuid}'

try:
    # Send the DELETE request
    response = requests.delete(url, headers=headers)

    # Check if the request was successful
    if response.status_code == 200:
        data = response.json()
        print(f"Document deleted successfully:\ndata: {json.dumps(data, indent=4)}")
    else:
        print(f"Failed to delete document. Status code: {response.status_code}")
        print("Response:", response.text)

except requests.exceptions.RequestException as err:
    print("An error occurred:", err)