In [15]:
import os
from typing import Dict, List

import verta
from verta.registry.stage_change import Archived, Staging, Production

import mlflow
from mlflow import MlflowClient
from mlflow.entities.model_registry import RegisteredModel as MLflow_RegisteredModel
from mlflow.store.entities.paged_list import PagedList

In [35]:
os.environ['VERTA_EMAIL'] = ""
os.environ['VERTA_DEV_KEY'] = ""
os.environ['VERTA_HOST'] = ""

verta_client = verta.Client()

got VERTA_HOST from environment
got VERTA_EMAIL from environment
got VERTA_DEV_KEY from environment
connection successfully established


In [None]:
os.environ["MLFLOW_TRACKING_URI"] = ""
os.environ["DATABRICKS_HOST"] = ""
os.environ["DATABRICKS_TOKEN"] = ""
mlflow.set_registry_uri("databricks")

mlflow_client = MlflowClient()
model_version_infos = mlflow_client.search_registered_models()
model_version_infos

In [24]:
import mlflow.pyfunc

model_name = "Iris"
model_version = 1

model = mlflow.pyfunc.load_model(model_uri=f"models:/{model_name}/{model_version}")



In [None]:
from pprint import pprint

for rm in mlflow_client.search_registered_models():
    pprint(dict(rm), indent=4)

    with open("yourlogfile.txt", "w") as log_file:
        pprint(dict(rm), log_file)

In [32]:
import json
from mlflow.tracking import MlflowClient

mlflow_client = MlflowClient()

# Create a list to store the registered model details
registered_models = []

# Retrieve and store the details of all registered models
for rm in mlflow_client.search_registered_models():
    # Convert ModelVersion object to a dictionary before appending to the list
    model_info = {
        "name": rm.name,
        "latest_version": rm.latest_versions[0].version if rm.latest_versions else None,
        "description": rm.description,
        "tags": dict(rm.tags) if rm.tags else None,
    }
    registered_models.append(rm)

# Print the registered models as JSON
print(json.dumps(registered_models, indent=4))

# Save the JSON output to a log file
with open("yourlogfile.txt", "w") as log_file:
    json.dump(registered_models, log_file, indent=4)


TypeError: Object of type RegisteredModel is not JSON serializable

In [23]:
import json
import mlflow.store.entities as entities
model_version_infos

for model in model_version_infos:
    print(model.to_json())


# with open("model_version_infos.json", "w") as outfile:
#     outfile.write(json_string)

AttributeError: 'RegisteredModel' object has no attribute 'to_json'

In [37]:
mlflow_models: List[MLflow_RegisteredModel] = list()

result: PagedList = mlflow_client.search_registered_models()
mlflow_models += result.to_list()
while result.token:  # handle paginated results
    result: PagedList = mlflow_client.search_registered_models(page_token=result.token)
    mlflow_models += result.to_list()

mlflow_models.sort(key=lambda x: x.name)
for mv in mlflow_models:
    print(f"name={mv.name};")

name=California Housing;
name=Diabetes;
name=Iris;
name=rf_cal_housing;


In [40]:
models_for_import = [model for model in mlflow_models if "test" not in model.name]


In [42]:
models_for_import

[<RegisteredModel: aliases={}, creation_timestamp=1691087947650, description='', last_updated_timestamp=1691092205650, latest_versions=[<ModelVersion: aliases=[], creation_timestamp=1691092205650, current_stage='None', description='', last_updated_timestamp=1691092209967, name='California Housing', run_id='342b95cae90a4e6a8f6173ebde8489a5', run_link='', source='dbfs:/databricks/mlflow-tracking/1757007025729527/342b95cae90a4e6a8f6173ebde8489a5/artifacts/model', status='READY', status_message='', tags={}, user_id='gteicom@hotmail.com', version='29'>], name='California Housing', tags={}>,
 <RegisteredModel: aliases={}, creation_timestamp=1691084464692, description='', last_updated_timestamp=1691084574336, latest_versions=[<ModelVersion: aliases=[], creation_timestamp=1691084574336, current_stage='None', description='', last_updated_timestamp=1691084576805, name='Diabetes', run_id='8f42eae83d0047d4bb3276de0339c0ae', run_link='', source='dbfs:/databricks/mlflow/tmp-external-source/8f42eae83

In [46]:
failed_model_imports: List[MLflow_RegisteredModel] = list()
successful_model_imports: List[MLflow_RegisteredModel] = list()
model_messages: Dict[MLflow_RegisteredModel, List[str]] = dict()

for rm in models_for_import:
    model_messages[rm.name] = list()

    try:
        verta_rm = verta_client.create_registered_model(
            name=rm.name,
            desc=rm.description,
            labels=[
                "mlflow_import",
                "mlflow_creation_time:" + str(rm.creation_timestamp),
                "mlflow_last_updated_time:" + str(rm.last_updated_timestamp),
                "mlflow_tags:" + ",".join(rm.tags),
            ],
        )
        model_messages[rm.name].append(
            f"created new registered model in Verta for {rm.name}"
        )
    except ValueError:
        model_messages[rm.name].append(
            f'a registered model named "{rm.name}" already exists in Verta. Skipping import.'
        )
        failed_model_imports.append(rm)
        continue

    try:
        rm_versions = mlflow_client.search_model_versions(f"name='{rm.name}'")
        if not rm_versions:
            failed_model_imports.append(rm)
            model_messages[rm.name].append(
                f"unable to find any model versions for {rm.name}.  Skipping import."
            )
            continue
    except Exception as err:
        model_messages[rm.name].append(
            f'failed to fetch versions for registered model "{rm.name}". Skipping import. Error: {err}'
        )
        failed_model_imports.append(rm)
        continue

    successful_versions = list()
    for version in rm_versions:
        try:
            verta_version = verta_rm.create_version(
                name=str(version.version),
                attrs={
                    "er_id": version.run_id,
                    "mlflow_source": version.source,
                    "mlflow_user_id": version.user_id,
                    "mlflow_run_link": version.run_link,
                    "mlflow_creation_time": version.creation_timestamp,
                    "mlflow_last_updated_time": version.last_updated_timestamp,
                    "mlflow_status": version.status,
                    "mlflow_current_stage": version.current_stage,
                    "mlflow_tags": version.tags,
                },
                labels=["mlflow_import"],
            )
            model_messages[rm.name].append(f"successfully created version {version.version}")
        except Exception as err:
            model_messages[rm.name].append(
                f"failed to create model version in Verta for {rm.name} - version: {version.version} due to {err}"
            )
            continue

        # Import artifacts for the model version
        try:
            outpath = mlflow.artifacts.download_artifacts(run_id=version.run_id)
        except Exception as err:
            model_messages[rm.name].append(
                f"unable to download artifacts from {rm.name} - version run id; {version.run_id} due to {err}"
            )
        else:
            for file_name in os.listdir(outpath):
                try:
                    verta_version.log_artifact(
                        file_name, os.path.join(outpath, file_name)
                    )
                    model_messages[rm.name].append(f"artifact logged in Verta: {file_name}")
                except ValueError as err:
                    model_messages[rm.name].append(
                        f"cannot upload artifact {file_name} for {rm.name} due to {err}"
                    )

        # Set model version's current stage
        stage_error_message = f"unable to set stage in Verta for {rm.name} - version: {version}, current_stage: {version.current_stage}"
        try:
            if version.current_stage != "None":
                if version.current_stage == "Staging":
                    verta_version.change_stage(Staging())
                if version.current_stage == "Production":
                    verta_version.change_stage(Production())
                if version.current_stage == "Archived":
                    verta_version.change_stage(Archived())
                else:
                    model_messages[rm.name].append(stage_error_message)
            else:
                model_messages[rm.name].append(stage_error_message)
        except Exception as err:
            model_messages[rm.name].append(f"{stage_error_message}, due to: {str(err)}")

        successful_model_imports.append(rm)


created new RegisteredModel: California Housing in workspace: Default
created new ModelVersion: 29
created new ModelVersion: 28
created new ModelVersion: 27
created new ModelVersion: 26
created new ModelVersion: 25
created new ModelVersion: 24
created new ModelVersion: 23
created new ModelVersion: 22
created new ModelVersion: 21
created new ModelVersion: 20
created new ModelVersion: 19
created new ModelVersion: 18
created new ModelVersion: 17
created new ModelVersion: 16
created new ModelVersion: 15
created new ModelVersion: 14
created new ModelVersion: 13
created new ModelVersion: 12
created new ModelVersion: 11
created new ModelVersion: 10
created new ModelVersion: 9
created new ModelVersion: 8
created new ModelVersion: 7
created new ModelVersion: 6
created new ModelVersion: 5
created new ModelVersion: 4
created new ModelVersion: 3
created new ModelVersion: 2
created new ModelVersion: 1
created new RegisteredModel: Diabetes in workspace: Default
created new ModelVersion: 2
created ne