Based on

In [None]:
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient


credential = DefaultAzureCredential()
# Check if given credential can get token successfully.
credential.get_token("https://management.azure.com/.default")


ml_client = MLClient.from_config(credential=credential)

In [None]:
%%writefile prep-data.py

import argparse
import csv
import glob
import os
import shutil
import tempfile
from pathlib import Path

import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler


def get_data(path):
    import os, glob, tempfile, shutil, csv
    from pathlib import Path

    p = Path(path)
    if p.is_dir():
        candidates = sorted(p.glob("*.csv"))
        if not candidates:
            raise FileNotFoundError(f"No CSV files in: {p}")
        p = candidates[0]

    # copy to local temp to avoid mount read(nbytes) issues
    fd, tmp = tempfile.mkstemp(suffix=p.suffix or ".csv"); os.close(fd)
    shutil.copyfile(str(p), tmp)

    # detect delimiter (fallback to comma)
    try:
        with open(tmp, "r", encoding="utf-8", errors="ignore") as f:
            sample = f.read(20000)
        sep = csv.Sniffer().sniff(sample, delimiters=[",",";","\t","|"]).delimiter
    except Exception:
        sep = ","

    # robust read with python engine
    try:
        df = pd.read_csv(tmp, engine="python", sep=sep)
    except UnicodeDecodeError:
        df = pd.read_csv(tmp, engine="python", sep=sep, encoding="latin-1")

    print(f"Preparing {len(df)} rows of data")
    return df


def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    # simple NA drop; extend with domain-specific cleaning if needed
    before = df.shape[0]
    df = df.dropna()
    print(f"[DEBUG] clean_data: dropped {before - df.shape[0]} rows with NA")
    return df


def normalize_data(df: pd.DataFrame) -> pd.DataFrame:
    # Handle common column name variants (Pima Indians dataset vs names)
    alias_groups = {
        "Pregnancies": ["Pregnancies"],
        "Glucose": ["Glucose", "PlasmaGlucose"],
        "BloodPressure": ["BloodPressure", "DiastolicBloodPressure"],
        "SkinThickness": ["SkinThickness", "TricepsThickness"],
        "Insulin": ["Insulin", "SerumInsulin"],
        "BMI": ["BMI"],
        "DiabetesPedigreeFunction": ["DiabetesPedigreeFunction", "DiabetesPedigree"],
        # Add "Age" if you want to scale it too
    }

    # Pick whichever alias exists in df
    cols_to_scale = []
    for canonical, aliases in alias_groups.items():
        for a in aliases:
            if a in df.columns:
                cols_to_scale.append(a)
                break

    if not cols_to_scale:
        raise ValueError("None of the expected numeric columns were found to scale.")

    # Coerce to numeric
    for c in cols_to_scale:
        df[c] = pd.to_numeric(df[c], errors="coerce")

    # Drop rows that became NaN in any scaled column
    before = df.shape[0]
    df = df.dropna(subset=cols_to_scale)
    print(f"[DEBUG] normalize_data: coerced to numeric; dropped {before - df.shape[0]} rows with non-numeric values in {cols_to_scale}")

    scaler = MinMaxScaler()
    df[cols_to_scale] = scaler.fit_transform(df[cols_to_scale])
    print(f"[DEBUG] normalized columns: {cols_to_scale}")
    return df


def main(args):
    df = get_data(args.input_data)
    print(f"Preparing {len(df)} rows of data")

    cleaned = clean_data(df)
    normalized = normalize_data(cleaned)

    out_dir = Path(args.output_data)
    out_dir.mkdir(parents=True, exist_ok=True)
    out_path = out_dir / "diabetes.csv"
    normalized.to_csv(out_path, index=False)
    print(f"[INFO] wrote: {out_path} rows={len(normalized)}")


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--input_data", required=True, type=str)
    parser.add_argument("--output_data", required=True, type=str)
    return parser.parse_args()


if __name__ == "__main__":
    print("start main")
    print("\n\n" + "*" * 60)
    args = parse_args()
    main(args)
    print("*" * 60 + "\n\n")


In [None]:
%%writefile prep-data.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: prep_data
display_name: Prepare training data
version: 1
type: command
inputs:
  input_data: 
    type: uri_file
outputs:
  output_data:
    type: uri_folder
code: ./
environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
command: >-
  python prep-data.py --input_data ${{inputs.input_data}} --output_data ${{outputs.output_data}}

In [None]:
%%writefile train-model.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: train_model
display_name: Train a logistic regression model
version: 1
type: command
inputs:
  training_data: 
    type: uri_folder
  reg_rate:
    type: number
    default: 0.01
outputs:
  model_output:
    type: mlflow_model
code: ./
environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
command: >-
  python train-model-mlflow.py --training_data ${{inputs.training_data}} --reg_rate ${{inputs.reg_rate}} --model_output ${{outputs.model_output}} 

In [None]:
from azure.ai.ml import load_component
parent_dir = ""

prep_data = load_component(source=parent_dir + "./prep-data.yml")
train_logistic_regression = load_component(source=parent_dir + "./train-model.yml")

print(train_logistic_regression)

In [57]:
from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.dsl import pipeline

@pipeline()
def diabetes_classification(pipeline_job_input):
    clean_data = prep_data(input_data=pipeline_job_input)
    
    train_model = train_logistic_regression(training_data=clean_data.outputs.output_data)

    return {
        "pipeline_job_transformed_data": clean_data.outputs.output_data,
        "pipeline_job_trained_model": train_model.outputs.model_output,
    }

pipeline_job = diabetes_classification(Input(type=AssetTypes.URI_FILE, path="azureml:diabetes:1"))

In [None]:
# change the output mode
pipeline_job.outputs.pipeline_job_transformed_data.mode = "upload"
pipeline_job.outputs.pipeline_job_trained_model.mode = "upload"
# set pipeline level compute
pipeline_job.settings.default_compute = "ml-compute-ntb"
# set pipeline level datastore
pipeline_job.settings.default_datastore = "workspaceblobstore"

print(pipeline_job)

In [None]:
ml_client.create_or_update(pipeline_job, experiment_name="diabetes_pipeline")