In [26]:
import os 
from dotenv import load_dotenv

load_dotenv()

subscription_id = os.getenv("SUBSCRIPTION_ID")
resource_group = os.getenv("RESOURCE_GROUP")
workspace_name = os.getenv("WORKSPACE_NAME")

### PHASE 1.1 - CONNECT TO YOUR WORKSPACE VIA SDK

In [27]:
# establish connection to the workspace
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

ml_client = MLClient(
    DefaultAzureCredential(),
    subscription_id,
    resource_group,
    workspace_name
)

Overriding of current TracerProvider is not allowed
Overriding of current LoggerProvider is not allowed
Overriding of current MeterProvider is not allowed
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented


### CREATE AND REGISTER YOUR ENVRONMENT WITH YAML FILE

In [28]:
from azure.ai.ml.entities import Environment

# Create custom environment using Docker base image + Conda YAML
# Conda yaml can be found in the conda-yamls directory
custom_env = Environment(
    name="pipeline_env",
    description="Custom environment using Docker image and Conda YAML.",
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04",
    conda_file="conda-yamls/pipeline_env.yml"  
)

# Register the environment with Azure ML
ml_client.environments.create_or_update(custom_env)

Environment({'arm_type': 'environment_version', 'latest_version': None, 'image': 'mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04', 'intellectual_property': None, 'is_anonymous': False, 'auto_increment_version': False, 'auto_delete_setting': None, 'name': 'pipeline_env', 'description': 'Custom environment using Docker image and Conda YAML.', 'tags': {}, 'properties': {'azureml.labels': 'latest'}, 'print_as_yaml': False, 'id': '/subscriptions/9c16564d-8303-4672-9a52-2b4ba14a38d6/resourceGroups/azure_ml_pipeline/providers/Microsoft.MachineLearningServices/workspaces/azure_ml_workspace/environments/pipeline_env/versions/1', 'Resource__source_path': '', 'base_path': 'c:\\Users\\Administrator\\Desktop\\EDUCATION\\COURSES\\Projects\\ML Projects\\azure_ml_pipeline', 'creation_context': <azure.ai.ml.entities._system_data.SystemData object at 0x0000011AFCE65390>, 'serialize': <msrest.serialization.Serializer object at 0x0000011AF9E9BE90>, 'version': '1', 'conda_file': {'channels': ['defaults

### PHASE 1.2 - UPLOAD YOUR DATA AND REGISTER AS A DATA ASSET

In [29]:
# upload and register data as a data asset
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

my_data = Data(
    path="data/visa_application_gen.csv",  # path to your local file
    type=AssetTypes.URI_FILE,
    description="Visa risk assessment dataset",
    name="visa-data",
    version="1.0"
)

ml_client.data.create_or_update(my_data)

HttpResponseError: (UserError) A data version with this name and version already exists. If you are trying to create a new data version, use a different name or version. If you are trying to update an existing data version, the existing asset's data uri cannot be changed. Only tags, description, and isArchived can be updated.
Code: UserError
Message: A data version with this name and version already exists. If you are trying to create a new data version, use a different name or version. If you are trying to update an existing data version, the existing asset's data uri cannot be changed. Only tags, description, and isArchived can be updated.
Additional Information:Type: ComponentName
Info: {
    "value": "managementfrontend"
}Type: Correlation
Info: {
    "value": {
        "operation": "8a2eb1a0e06f740baa769129da6df3e6",
        "request": "99405f02655e952a"
    }
}Type: Environment
Info: {
    "value": "eastus"
}Type: Location
Info: {
    "value": "eastus"
}Type: Time
Info: {
    "value": "2025-06-16T16:03:43.5676075+00:00"
}Type: InnerError
Info: {
    "value": {
        "code": "Immutable",
        "innerError": {
            "code": "DataVersionPropertyImmutable",
            "innerError": null
        }
    }
}Type: MessageFormat
Info: {
    "value": "A data version with this name and version already exists. If you are trying to create a new data version, use a different name or version. If you are trying to update an existing data version, the existing asset's {property} cannot be changed. Only tags, description, and isArchived can be updated."
}Type: MessageParameters
Info: {
    "value": {
        "property": "data uri"
    }
}

### PHASE 1.3 - UPLOAD AND REGISTER YOUR COMPONENTS

In [79]:
%%writefile components/preprocess/preprocess.py
import argparse
import pandas as pd
import numpy as np
import joblib
import os
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder

# ---- Argument parsing ----
parser = argparse.ArgumentParser()
parser.add_argument("--input_data", type=str)
parser.add_argument("--output_dir", type=str)
args = parser.parse_args()

# country risk score dictionary ----
country_risk = {'Nigeria': 3, 'India': 2, 'Ghana': 2, 'Kenya': 2, 
                       'Germany': 0, 'USA': 0, 'UK': 0, 'Pakistan': 3, 
                       'South Africa': 1, 'Egypt': 2}

try:
    df = pd.read_csv(args.input_data)
    df.drop("risk_score", axis=1, inplace=True)

    df["country_risk_score"] = df["country"].map(country_risk)
    df.drop("country", axis=1, inplace=True)

    df["sponsorship"] = df["sponsorship"].map({"Yes": 1, "No": 0})
    
    label = df["risk_flag"].values  # (n_samples,) array

    # Drop label and encode the rest
    df = df.drop("risk_flag", axis=1)

    # Define features
    num_features = ['age', 'financial_support','travel_history', 'duration']
    cat_features = ['visa_type', 'employment_status', 'purpose']

    num_transformer = Pipeline([
        ("scaler", StandardScaler())
    ])

    cat_transformer = OneHotEncoder(handle_unknown="ignore", sparse_output=False)

    preprocessor = ColumnTransformer([
        ("num", num_transformer, num_features),
        ("cat", cat_transformer, cat_features),
    ])

    preprocessed_data = preprocessor.fit_transform(df)

    # ---- Save outputs ----
    os.makedirs(args.output_dir, exist_ok=True)
    np.save(os.path.join(args.output_dir, "data.npy"), preprocessed_data)
    np.save(os.path.join(args.output_dir, "y.npy"), label)
    joblib.dump(preprocessor, os.path.join(args.output_dir, "preprocessor.pkl"))

    print("Preprocessing complete.")

except Exception as e:
    print("Error during preprocessing:", e)
    raise

Overwriting components/preprocess/preprocess.py


In [None]:
%%writefile components/preprocess/preprocess.yml
name: preprocess
display_name: Preprocess Application Data
version: 3 # increment this if you have changed this file after submission
type: command
inputs:
  input_data:
    type: uri_file
outputs:
  output_dir:
    type: uri_folder
code: .
environment: azureml:pipeline_env:1
command: >-
  python preprocess.py 
  --input_data ${{inputs.input_data}} 
  --output_dir ${{outputs.output_dir}}

Overwriting components/preprocess/preprocess.yml


In [81]:
%%writefile components/split_and_train/split_and_train.py
import pandas as pd
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import joblib
import os
import argparse

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--input_data", type=str, help="Path to preprocessed folder")
    parser.add_argument("--model_output", type=str, help="Path to save model")
    args = parser.parse_args()

    output_dir = args.input_data
    # Load preprocessed data
    
    X = np.load(os.path.join(args.input_data, "data.npy"))
    y = np.load(os.path.join(args.input_data, "y.npy"))


    # Split the data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Train model
    model = LogisticRegression()
    model.fit(X_train, y_train)

    # Evaluate
    preds = model.predict(X_test)
    acc = accuracy_score(y_test, preds)
    print(f"Validation Accuracy: {acc:.4f}")

    # Save model
    os.makedirs(args.model_output, exist_ok=True)
    joblib.dump(model, os.path.join(args.model_output, "model.pkl"))

if __name__ == "__main__":
    main()


Overwriting components/split_and_train/split_and_train.py


In [None]:
%%writefile components/split_and_train/split_and_train.yml
name: split_and_train
display_name: Split and Train Model
version: 3 # increment this if you have changed this file after submission
type: command
inputs:
  input_data:
    type: uri_folder
outputs:
  model_output:
    type: uri_folder
code: .
environment: azureml:pipeline_env:1  # Replace with your environment
command: >-
  python split_and_train.py
  --input_data ${{inputs.input_data}}
  --model_output ${{outputs.model_output}}


Overwriting components/split_and_train/split_and_train.yml


In [83]:
from azure.ai.ml import load_component

In [84]:
preprocess_component = load_component(source="components/preprocess/preprocess.yml")
registered_preprocess_component = ml_client.components.create_or_update(preprocess_component)
print(f"Preprocessing component registered: {registered_preprocess_component.name} (v{registered_preprocess_component.version})")

Uploading preprocess (0.0 MBs): 100%|##########| 2714/2714 [00:01<00:00, 2215.58it/s]




Preprocessing component registered: preprocess (v3)


In [85]:
train_component = load_component(source="components/split_and_train/split_and_train.yml")
registered_train_component = ml_client.components.create_or_update(train_component)
print(f"Training component registered: {registered_train_component.name} (v{registered_train_component.version})")

Uploading split_and_train (0.0 MBs): 100%|##########| 1966/1966 [00:01<00:00, 1121.39it/s]




Training component registered: split_and_train (v3)


### PHASE 1.4 - CREATE AND RUN PIPELINE 

In [None]:
from azure.ai.ml import Input, dsl

# Load the components from registry
preprocess_component = ml_client.components.get(name="preprocess", version="3")
train_component = ml_client.components.get(name="split_and_train", version="3")

# Define the pipeline
@dsl.pipeline(
    compute="azure-ml-instance",
    description="ML pipeline for data preprocessing and model training"
)
def pipeline_func(input_data):
    preprocess_job = preprocess_component(
        input_data=input_data
    )

    train_job = train_component(
        input_data=preprocess_job.outputs.output_dir,
    )

    return {
        "model_output": train_job.outputs.model_output
    }

# Create a pipeline job from the function
pipeline_job = pipeline_func(
    input_data=Input(type="uri_file", path=my_data.path)
)

In [87]:
pipeline_job = ml_client.jobs.create_or_update(pipeline_job)
ml_client.jobs.stream(pipeline_job.name)

pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored


RunId: tender_square_3j782llsr7
Web View: https://ml.azure.com/runs/tender_square_3j782llsr7?wsid=/subscriptions/9c16564d-8303-4672-9a52-2b4ba14a38d6/resourcegroups/azure_ml_pipeline/workspaces/azure_ml_workspace

Streaming logs/azureml/executionlogs.txt

[2025-06-16 16:36:33Z] Submitting 1 runs, first five are: 12ed1735:15e0e4a0-8152-43be-85f3-80122ff1c546
[2025-06-16 16:36:57Z] Completing processing run id 15e0e4a0-8152-43be-85f3-80122ff1c546.
[2025-06-16 16:36:58Z] Submitting 1 runs, first five are: 12c61163:1635b683-4ab5-4c61-8179-4ab9c255da14
[2025-06-16 16:37:22Z] Completing processing run id 1635b683-4ab5-4c61-8179-4ab9c255da14.

Execution Summary
RunId: tender_square_3j782llsr7
Web View: https://ml.azure.com/runs/tender_square_3j782llsr7?wsid=/subscriptions/9c16564d-8303-4672-9a52-2b4ba14a38d6/resourcegroups/azure_ml_pipeline/workspaces/azure_ml_workspace

