Drift is an automation framework for Azure Machine Learning that streamlines the process of registering training datasets and retraining models in response to data updates. Built on top of PyDataIO, Drift provides a robust solution for managing ML pipelines in Databricks and Azure ML environments.
- Overview
- Design Architecture
- Features
- Installation
- Getting Started
- Usage
- Running Jobs in Databricks
- Configuration
- Contributing
- License
Drift automates two critical ML operations workflows:
-
Dataset Registration: Automatically registers new versions of training datasets from Azure Data Lake Storage Gen2 to Azure ML as both MLTable and URI folder assets.
-
Model Retraining: Intelligently triggers retraining of ML models when new data becomes available, ensuring your models stay up-to-date with the latest data.
The framework is built on PyDataIO, leveraging its pipeline orchestration capabilities to create robust, configurable ML workflows.
The architecture consists of two main components:
- Dataset Registrator: Monitors data sources and registers new dataset versions
- Model Retrainer: Detects new dataset versions and triggers model retraining jobs
- Automated Dataset Registration: Register Delta Lake tables as Azure ML data assets
- Intelligent Model Retraining: Automatically retrain models based on data asset updates
- Version Management: Track dataset and model versions with timestamp-based versioning
- Flexible Configuration: YAML-based configuration for easy customization
- Status Monitoring: Built-in job status tracking and timeout management
- Group-based Training: Support for training multiple model groups with selective retraining
- Azure Integration: Seamless integration with Azure ML, Databricks, and Azure Key Vault
- Python 3.11 or higher
- Access to Azure ML workspace
- Databricks workspace (for job execution)
- Azure Data Lake Storage Gen2
git clone https://github.com/yourusername/Drift.git
cd Drift
pip install -e .pip install -r requirements.txtOr if using uv:
uv syncDrift uses Azure Service Principal for authentication. Store your credentials in Azure Key Vault:
ApplicationID: Your Azure AD application ID (base64 encoded)ApplicationPassword: Your Azure AD application secret (base64 encoded)
Create a configuration file based on the examples in the docs/ directory.
spark_job --config path/to/config.conf \
--tenant <tenant-id> \
--vault_name <key-vault-name> \
--data_asset_version <version> (optional)The Dataset Registrator creates new versions of training data assets in Azure ML from Delta Lake tables stored in Azure Data Lake Storage Gen2.
Configuration Example (example-training-dataset-registrator.conf):
parameters:
azml:
subscriptionId: <azure-ml-subscription-id>
resourceGroup: <azure-ml-resource-group>
mlWorkspaceName: <azure-ml-mlWorkspace-name>
storageAccountName: <azure-storage-account-name>
containerName: <azure-container-name>
containerDataPath: <path-into-container>What it does:
- Connects to Azure Data Lake Storage Gen2
- Creates/updates a datastore in Azure ML
- Registers the Delta Lake table as an MLTable asset
- Registers the data as a URI folder asset
- Generates a timestamp-based version number
- Publishes the new version to Databricks job task values
The Model Retrainer automatically triggers retraining jobs for models that use updated data assets.
Configuration Example (example-model-retrainer.conf):
parameters:
azml:
subscriptionId: <azure-ml-subscription-id>
resourceGroup: <azure-ml-resource-group>
mlWorkspaceName: <azure-ml-mlWorkspace-name>
dataAssets:
- name: mltable_seat_spinning_features
value: <ml-table-name>
- name: raw_data
value: <raw-dataset-name>
refreshTimeout: "2700"
refreshDelay: "10"What it does:
- Retrieves all Azure ML pipeline jobs matching the naming pattern
- Identifies the most recent job for each model group
- Creates new retraining jobs with updated data asset versions
- Monitors job completion with configurable timeout and refresh intervals
- Reports success or failure status
Job Naming Convention:
Jobs should follow the pattern: {model_name_prefix}_{timestamp}_{random_string}
Drift is designed to run as Databricks jobs. The recommended way to deploy is using Databricks Asset Bundles (DABs).
Create a databricks.yml file to define your workflow. Here's a complete example:
resources:
jobs:
retraining_models:
name: retraining-models
email_notifications:
no_alert_for_skipped_runs: true
schedule:
quartz_cron_expression: 0 0 8 ? * MON *
timezone_id: UTC
pause_status: PAUSED
tasks:
- task_key: training-dataset-registrator
python_wheel_task:
package_name: drift
entry_point: spark_job
parameters:
- --tenant
- <tenant-id>
- --vault_name
- <vault-name>
- --config
- <path-to-configuration-file>
job_cluster_key: retraining-models-cluster
libraries:
- whl: <path-to-drift-python-wheel>
- task_key: model-retrainer
depends_on:
- task_key: training-dataset-registrator
python_wheel_task:
package_name: drift
entry_point: spark_job
parameters:
- --tenant
- <tenant-id>
- --vault_name
- <vault-name>
- --data_asset_version
- "{{tasks.`training-dataset-registrator`.values.data_asset_version}}"
- --config
- <path-to-configuration-file>
job_cluster_key: retraining-models-cluster
libraries:
- whl: <path-to-drift-python-wheel>
job_clusters:
- job_cluster_key: retraining-models-cluster
new_cluster:
cluster_name: ""
spark_version: 15.4.x-scala2.12
node_type_id: Standard_D4s_v5
enable_elastic_disk: true
data_security_mode: DATA_SECURITY_MODE_DEDICATED
kind: CLASSIC_PREVIEW
is_single_node: true
queue:
enabled: falseSee the complete example in docs/example-databricks-workflow.yaml.
Deployment Steps:
-
Build the Drift wheel package:
uv build
-
Upload the wheel to Databricks:
databricks fs cp dist/drift-*.whl dbfs:/path/to/drift-wheel/ -
Deploy the workflow:
databricks bundle deploy
-
Run the workflow:
databricks bundle run retraining_models
Task 1: Dataset Registration
- Registers new training dataset versions
- Outputs the new version via
data_asset_versiontask value
Task 2: Model Retraining
- Depends on Task 1 completion
- Retrieves the dataset version using:
{{tasks.training-dataset-registrator.values.data_asset_version}} - Triggers retraining jobs with the new data version
Scheduling:
- The example runs weekly on Mondays at 8 AM UTC
- Modify the
quartz_cron_expressionto adjust the schedule - Set
pause_status: UNPAUSEDto activate the schedule
-
Databricks CLI installed:
pip install databricks-cli
-
Databricks secrets configured:
- Set up a secret scope linked to Azure Key Vault
- Ensure it contains
ApplicationIDandApplicationPassword(base64 encoded)
-
Configuration files uploaded:
- Upload your configuration files to DBFS
- Update the
--configparameter paths in the workflow
-
Drift wheel package:
- Build and upload the Drift wheel to DBFS
- Update the
whlpath in the libraries section
If you prefer to create jobs manually through the Databricks UI:
-
Create a new job with two tasks
-
Task 1 Configuration:
- Task type: Python wheel
- Package name:
drift - Entry point:
spark_job - Parameters:
--config <config-path> --tenant <tenant-id> --vault_name <vault-name>
-
Task 2 Configuration:
- Depends on: Task 1
- Task type: Python wheel
- Package name:
drift - Entry point:
spark_job - Parameters:
--config <config-path> --tenant <tenant-id> --vault_name <vault-name> --data_asset_version {{tasks.training-dataset-registrator.values.data_asset_version}}
-
Attach libraries: Add the Drift wheel to both tasks
-
Configure cluster: Use Spark 15.4.x with appropriate node type
For Dataset Registration:
azml.subscriptionId: Azure subscription IDazml.resourceGroup: Azure resource group nameazml.mlWorkspaceName: Azure ML workspace namestorageAccountName: Azure Storage account namecontainerName: Storage container namecontainerDataPath: Path within the container
For Model Retraining:
azml.subscriptionId: Azure subscription IDazml.resourceGroup: Azure resource group nameazml.mlWorkspaceName: Azure ML workspace namedataAssets: List of data assets to monitor (name and value)refreshTimeout: Maximum time to wait for job completion (seconds)refreshDelay: Interval between status checks (seconds)
data_asset_version: Specific version to use for retraining (if not provided, uses latest)model_name_prefix: Comma-separated list of model prefix to retrain (if not provided, retrains all matching training jobs)
We welcome contributions! Please see our Contributing Guide for details on:
- How to report issues
- Development setup and workflow
- Code quality standards and testing
- Pull request process
Please also read our Code of Conduct before contributing.
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
- Built on top of PyDataIO
- Developed by Simone DE SANTIS and Guillaume LECLERC at Amadeus
For issues and questions:
- Open an issue on GitHub
- Check existing documentation in the
docs/directory
Note: This project is designed for Azure ML and Databricks environments. Ensure you have the necessary permissions and resources configured before running jobs.
