In [0]:
%pip install --quiet mlflow==2.22.0
dbutils.library.restartPython()

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
import mlflow
from mlflow import MlflowClient

mlflow.set_registry_uri("databricks-uc")
client = MlflowClient()

In [0]:
dbutils.widgets.text("catalog", "mlops")
dbutils.widgets.text("schema", "mlops_zoomcamp_prj")
dbutils.widgets.text("model_name", "software_defects")

model = dbutils.widgets.get("model_name")
# dbutils.widgets.text("experiment_name", "software-defects")

# experiment_name = dbutils.widgets.get("experiment_name")
catalog = dbutils.widgets.get("catalog")
db = dbutils.widgets.get("schema")

In [0]:
from mlflow.store.artifact.models_artifact_repo import ModelsArtifactRepository


requirements_path = ModelsArtifactRepository(f"models:/{catalog}.{db}.{model}@candidate").download_artifacts(artifact_path="requirements.txt") # download model from remote registry

In [0]:
# %pip install --quiet -r $requirements_path
# dbutils.library.restartPython()



In [0]:

# We are interested in validating the candidate model
model_alias = "candidate"
model_name = f"{catalog}.{db}.{model}"

model_details = client.get_model_version_by_alias(model_name, model_alias)
model_version = int(model_details.version)

print(f"Validating {model_alias} model for {model_name} on model version {model_version}")

Validating candidate model for mlops.mlops_zoomcamp_prj.software_defects on model version 1


##Check candidate model attributes

In [0]:
# If there's no description or an insufficient number of characters, tag accordingly
if not model_details.description:
  has_description = False
  print("Please add model description")
elif not len(model_details.description) > 20:
  has_description = False
  print("Please add detailed model description (40 char min).")
else:
  has_description = True

print(f'Model {model_name} version {model_details.version} has description: {has_description}')
client.set_model_version_tag(name=model_name, version=str(model_details.version), key="has_description", value=has_description)

Model mlops.mlops_zoomcamp_prj.software_defects version 1 has description: True


##Checks candidate model against Production model 

In [0]:
# Check accuracy score against Prod model
model_run_id = model_details.run_id
accuracy_score = mlflow.get_run(model_run_id).data.metrics['accuracy']

try:
    #Compare the candidate accuracy score to the existing champion if it exists
    production_model = client.get_model_version_by_alias(model_name, "production")
    production_accuracy = mlflow.get_run(production_model.run_id).data.metrics['test_accuracy_score']
    print(f'production accuracy score: {production_accuracy}. candidate accuracy score: {accuracy_score}.')
    metric_accuracy_passed = accuracy_score >= production_accuracy
except:
    print(f"No production found. Accept the model as it's the first one.")
    metric_accuracy_passed = True

print(f'Model {model_name} version {model_details.version} metric_accuracy_passed: {metric_accuracy_passed}')
# Tag that accuracy metric check has passed
client.set_model_version_tag(name=model_name, version=model_details.version, key="metric_accuracy_passed", value=metric_accuracy_passed)

No production found. Accept the model as it's the first one.
Model mlops.mlops_zoomcamp_prj.software_defects version 1 metric_accuracy_passed: True


## Promote candidate model to Production

In [0]:
results = client.get_model_version(model_name, model_version)
results.tags

{'accuracy': '0.8967',
 'has_description': 'True',
 'metric_accuracy_passed': 'True'}

In [0]:
if results.tags["has_description"] == "True" and results.tags["metric_accuracy_passed"] == "True":
  print('register model as production!')
  client.set_registered_model_alias(
    name=model_name,
    alias="production",
    version=model_version
  )

  client.delete_registered_model_alias(
    name=model_name,
    alias="candidate"    
  )
else:
  raise Exception("Model not ready for promotion")

register model as production!


In [0]:
# import pyspark.sql.functions as F
# #get our validation dataset:
# validation_df = spark.table('mlops.mlops_zoomcamp_prj.soft_quality_features').filter("split='validate'")

# #Call the model with the given alias and return the prediction
# def predict_defects(validation_df, model_alias):
#     model = mlflow.pyfunc.spark_udf(spark, model_uri=f"models:/{catalog}.{db}.{model}@{model_alias}") #Use env_manager="virtualenv" to recreate a venv with the same python version if needed
#     return validation_df.withColumn('predictions', model(*model.metadata.get_input_schema().input_names()))



In [0]:
# # Load customer features to be scored
# inference_df = spark.read.table(f"mlops_churn_inference")
# # Load champion model as a Spark UDF. You can use virtual env manager for the demo to avoid version conflict (you can remove the pip install above with virtual env)
# champion_model = mlflow.pyfunc.spark_udf(spark, model_uri=f"models:/{catalog}.{db}.mlops_churn@Champion") #Use env_manager="virtualenv" to recreate a venv with the same python version if needed

# # Batch score
# preds_df = inference_df.withColumn('predictions', champion_model(*champion_model.metadata.get_input_schema().input_names()))

# display(preds_df)

2025-07-12 10:55:04,821 14567 ERROR _handle_rpc_error GRPC Error received
Traceback (most recent call last):
  File "/databricks/python/lib/python3.11/site-packages/pyspark/sql/connect/client/core.py", line 1535, in _analyze
    resp = self._stub.AnalyzePlan(req, metadata=self.metadata())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/databricks/python/lib/python3.11/site-packages/grpc/_interceptor.py", line 277, in __call__
    response, ignored_call = self._with_call(
                             ^^^^^^^^^^^^^^^^
  File "/databricks/python/lib/python3.11/site-packages/grpc/_interceptor.py", line 332, in _with_call
    return call.result(), call
           ^^^^^^^^^^^^^
  File "/databricks/python/lib/python3.11/site-packages/grpc/_channel.py", line 440, in result
    raise self
  File "/databricks/python/lib/python3.11/site-packages/grpc/_interceptor.py", line 315, in continuation
    response, call = self._thunk(new_method).with_call(
                     ^

[0;31m---------------------------------------------------------------------------[0m
[0;31mRestException[0m                             Traceback (most recent call last)
File [0;32m<command-7112813776620545>, line 4[0m
[1;32m      2[0m inference_df [38;5;241m=[39m spark[38;5;241m.[39mread[38;5;241m.[39mtable([38;5;124mf[39m[38;5;124m"[39m[38;5;124mmlops_churn_inference[39m[38;5;124m"[39m)
[1;32m      3[0m [38;5;66;03m# Load champion model as a Spark UDF. You can use virtual env manager for the demo to avoid version conflict (you can remove the pip install above with virtual env)[39;00m
[0;32m----> 4[0m champion_model [38;5;241m=[39m mlflow[38;5;241m.[39mpyfunc[38;5;241m.[39mspark_udf(spark, model_uri[38;5;241m=[39m[38;5;124mf[39m[38;5;124m"[39m[38;5;124mmodels:/[39m[38;5;132;01m{[39;00mcatalog[38;5;132;01m}[39;00m[38;5;124m.[39m[38;5;132;01m{[39;00mdb[38;5;132;01m}[39;00m[38;5;124m.mlops_churn@Champion[39m[38;5;124m"[39m) [38;5;6