In [2]:
import mlflow
import requests
import json
import time
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential, AzureCliCredential
from datetime import datetime

from mlflow.tracking import MlflowClient
from mlflow.utils.rest_utils import http_request
import uuid

subscription_id = '96aede12-2f73-41cb-b983-6d11a904839b'
resource_group = 'promptflow'
workspace_name = 'promptflow-eastus'

# subscription_id = '96aede12-2f73-41cb-b983-6d11a904839b'
# resource_group = 'hod-rg'
# workspace_name = 'hod-pflow'

ml_client = MLClient(credential=AzureCliCredential(),
                        subscription_id=subscription_id,
                        resource_group_name=resource_group,
                        workspace_name=workspace_name)

base_endpoint = ml_client.workspaces.get(ml_client.workspace_name).discovery_url.replace("discovery", "")
url = (
    f"history/v1.0"
    f"/subscriptions/{ml_client.subscription_id}"
    f"/resourceGroups/{ml_client.resource_group_name}"
    f"/providers/Microsoft.MachineLearningServices"
    f"/workspaces/{ml_client.workspace_name}"
)
endpoint_url = base_endpoint + url
print(endpoint_url)

https://eastus.api.azureml.ms/history/v1.0/subscriptions/96aede12-2f73-41cb-b983-6d11a904839b/resourceGroups/promptflow/providers/Microsoft.MachineLearningServices/workspaces/promptflow-eastus


In [11]:
credential=AzureCliCredential()
token = credential.get_token("https://management.azure.com/.default").token

headers = {  
    'Authorization': f'Bearer {token}',
    'Content-Type': 'application/json',
} 

### Get run from run history

In [None]:

run_id = "web_classification_default_20230815_103252_547529"    # eastus
# run_id = "web_classification_default_20230809_163800_975156"    # hod-pflow
url = f"{endpoint_url}/runs/{run_id}"

response = requests.get(url, headers=headers)

if response.status_code == 200:
    print(f"Successfully get run from run history")
    with open("./update/rh_batch_run.json", "w") as f:
        json.dump(response.json(), f, indent=4)
else:
    print(f"Code: {response.status_code}, Reason: {response.text}")

### Modify run in run history

In [23]:
class MyClass:
    a = 5

# /experimentids/42caf8b8-562b-47f6-b838-0ae2aabb1dc1
run_id = "web_classification_default_20230815_103013_448837"    # eastus
url = f"{endpoint_url}/runs/{run_id}/modify"

payload = {
    # "runId": run_id,
    "hidden": False,
    "tags": {
        "test_tag": {"a": 123}
    },
    "description": "",
    "displayName": "hod-run",
}
response = requests.patch(url, headers=headers, json=payload)

if response.status_code == 200:
    print(f"Successfully updated run.")
    with open("./update/rh_batch_run_resp.json", "w") as f:
        json.dump(response.json(), f, indent=4)
else:
    print(f"Code: {response.status_code}, Reason: {response.text}")

Code: 400, Reason: {
  "error": {
    "code": "UserError",
    "severity": null,
    "message": "Error when parsing request; unable to deserialize request body",
    "messageFormat": null,
    "messageParameters": null,
    "referenceCode": null,
    "detailsUri": null,
    "target": null,
    "details": [],
    "innerError": null,
    "debugInfo": null,
    "additionalInfo": null
  },
  "correlation": {
    "operation": "a88437fcb71e324b151da7326f117435",
    "request": "9c0e3e97793206bd"
  },
  "environment": "eastus",
  "location": "eastus",
  "time": "2023-09-26T05:38:15.6232097+00:00",
  "componentName": "run-history",
  "statusCode": 400
}


### Get run data from run history

In [16]:

run_id = "d360affb-c01f-460f-beca-db9a8b88b625"    # eastus
# run_id = "web_classification_default_20230809_163800_975156"    # hod-pflow
url = f"{endpoint_url}/rundata"

payload = {
    "runId": run_id,
    "selectRunMetadata": True,
    "selectRunDefinition": True,
    "selectJobSpecification": True,
}

response = requests.post(url, headers=headers, json=payload)

if response.status_code == 200:
    print(f"Successfully get run from run history")
    with open("./rh_simple_llm.json", "w") as f:
        json.dump(response.json(), f, indent=4)
else:
    print(f"Code: {response.status_code}, Reason: {response.text}")

Successfully get run from run history


### Get run from PFS

In [4]:
run_id = "Web-Classification-Test-Bulk-Run-With-Data-Store-No-Credential_Submitted_638308852434071359"
pfs_endpoint_url = endpoint_url.replace("history/v1.0", "flow/api")
url = f"{pfs_endpoint_url}/BulkRuns/{run_id}/childRuns"

response = requests.get(url, headers=headers)

if response.status_code == 200:
    print(f"Successfully get child runs from PFS")
    with open("./pfs_run_info_child_runs.json", "w") as f:
        json.dump(response.json(), f, indent=4)
else:
    print(f"Code: {response.status_code}, Reason: {response.text}")

Successfully get child runs from PFS


### Get run from index service

In [None]:
run_id = "web_classification_default_20230815_103013_448837"
index_endpoint_url = endpoint_url.replace("/history", "/index")
url = f"{index_endpoint_url}/entities"

payload = {
    "filters": [{
        "field": "type",
        "operator": "eq",
        "values": ["runs"]
    }, {
        "field": "annotations/archived",
        "operator": "eq",
        "values": ["false"]
    },
        {
            "field": "properties/runId",
            "operator": "eq",
            "values": [run_id]
        }
    ],
    "order": [{
        "direction": "Desc",
        "field": "properties/startTime"
    }
    ],
    "pageSize": 50,
}

response = requests.post(url, json=payload, headers=headers)

if response.status_code == 200:
    print(f"Successfully get run info from index service")
    with open("./index_run_archived.json", "w") as f:
        json.dump(response.json(), f, indent=4)
else:
    print(f"Code: {response.status_code}, Reason: {response.text}")

### List runs from index service

In [7]:
index_endpoint_url = endpoint_url.replace("/history", "/index")
url = f"{index_endpoint_url}/entities"

payload = {
    "filters": [
        {
            "field": "type",
            "operator": "eq",
            "values": ["runs"]
        },
        {
            "field": "annotations/archived",
            "operator": "eq",
            "values": ["false"]
        },
        {
            "field": "properties/runType",
            "operator": "contains",
            "values": [
                "azureml.promptflow.FlowRun",
                "azureml.promptflow.EvaluationRun",
            ]
        }
    ],
    "freeTextSearch": "",
    "order": [
        {
            "direction": "Desc",
            "field": "properties/creationContext/createdTime"
        }
    ],
    # index service can return 100 results at most
    "pageSize": 50,
    "skip": 0,
    "includeTotalResultCount": True,
    "searchBuilder": "AppendPrefix"
}

response = requests.post(url, json=payload, headers=headers)

if response.status_code == 200:
    print(f"Successfully listed runs from index service")
    with open("./list_entities_50.json", "w") as f:
        json.dump(response.json(), f, indent=4)
else:
    print(f"Code: {response.status_code}, Reason: {response.text}")

Successfully listed runs from index service


### Get metrics from metric service

In [None]:
run_id = "groundedness_eval_default_20230821_191944_818788"
index_endpoint_url = endpoint_url.replace("/history/v1.0", "/metric/v2.0")
url = f"{index_endpoint_url}/runs/{run_id}/lastvalues"

response = requests.post(url, json={}, headers=headers)

if response.status_code == 200:
    print(f"Successfully got metrics from metric service")
    with open("./metrics_cle.json", "w") as f:
        json.dump(response.json(), f, indent=4)
else:
    print(f"Code: {response.status_code}, Reason: {response.text}")

### List flows from index service

In [13]:
index_endpoint_url = endpoint_url.replace("/history", "/index")
url = f"{index_endpoint_url}/entities"

payload = {
    "filters": [
        {
            "field": "type",
            "operator": "eq",
            "values": ["flows"]
        },
        # {
        #     "field": "annotations/archived",
        #     "operator": "eq",
        #     "values": ["true"]
        # },
        {
            "field": "properties/creationContext/createdBy/userTenantId",
            "operator": "eq",
            "values": ["72f988bf-86f1-41af-91ab-2d7cd011db47"]
        },
        {
            "field": "properties/creationContext/createdBy/userObjectId",
            "operator": "eq",
            "values": ["c05e0746-e125-4cb3-9213-a8b535eacd79"]
        }
    ],
    "freeTextSearch": "",
    "order": [
        {
            "direction": "Desc",
            "field": "properties/creationContext/createdTime"
        }
    ],
    # index service can return 100 results at most
    "pageSize": 10,
    "skip": 0,
    "includeTotalResultCount": True,
    "searchBuilder": "AppendPrefix"
}

response = requests.post(url, json=payload, headers=headers)

if response.status_code == 200:
    print(f"Successfully listed runs from index service")
    with open("./flow_list_archived.json", "w") as f:
        json.dump(response.json(), f, indent=4)
else:
    print(f"Code: {response.status_code}, Reason: {response.text}")

Successfully listed runs from index service


In [None]:
import re

pattern = re.compile(r"azureml://(?P<part1>[\w/]+)/paths/(?P<part2>.*)$")
target = "azureml://datastores/workspaceblobstore/paths/LocalUpload/312cca2af474e5f895013392b6b38f45/data.jsonl"
# target = "fake"
match = pattern.match(target)
if match:
    print(match.group('part1'))
    print(match.group('part2'))
print(match.groups())

In [None]:
import re

OUTPUT_PORTAL_PATTERN = re.compile(r"azureml://.*?/data/(?P<name>.*?)/versions/(?P<version>.*?)$")
target = 'azureml://locations/eastus/workspaces/3e123da1-f9a5-4c91-9234-8d9ffbb39ff5/data/azureml_web_classification_default_20230809_120434_491077_output_data_flow_outputs/versions/1'

match = OUTPUT_PORTAL_PATTERN.match(target)
if match:
    print(match.group('name'))
    print(match.group('version'))
print(match.groups())

In [2]:
from pathlib import Path
a = Path(".").resolve().absolute()
print(str(a))
print(a.as_posix())

D:\programs\azureml-examples\sdk\python\jobs\pipelines\hod_test_promptflow
D:/programs/azureml-examples/sdk/python/jobs/pipelines/hod_test_promptflow


: 

## Test promptflow schema

In [None]:
import json
from marshmallow_jsonschema import JSONSchema  
from promptflow._sdk.schemas._flow_dag import FlowDagSchema

json_schema = JSONSchema().dump(FlowDagSchema(context={"base_path": "."}))
with open("./flow_dag_schema.json", "w") as f:
    json.dump(json_schema, f, indent=4)

In [18]:
a = {"a": 123, "b": 456}
print(f"{a}")


{'a': 123, 'b': 456}


In [9]:
print(str(None).lower())

none
