# Pipeline

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import ConfusionMatrixDisplay
from azure.ai.ml import MLClient
from azure.ai.ml.entities import Environment, Model
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import CommandComponent, Data
from azure.ai.ml import command, Input, Output
from azure.identity import InteractiveBrowserCredential
from azure.ai.ml.constants import AssetTypes
import os
import time

# Initialize MLClient
credential = InteractiveBrowserCredential()
ml_client = MLClient(
    credential=credential,
    subscription_id="0a94de80-6d3b-49f2-b3e9-ec5818862801",
    resource_group_name="buas-y2",
    workspace_name="NLP6-2025"
)

def create_training_script():
    os.makedirs("pipeline_scripts", exist_ok=True)
    
    training_script = '''
import pandas as pd 
import joblib 
from sklearn.ensemble import RandomForestClassifier 
from sklearn.linear_model import LogisticRegression 
from sklearn.metrics import accuracy_score 
from sklearn.preprocessing import StandardScaler 
import argparse
import os
import glob
from sklearn.preprocessing import LabelEncoder
import json
import time
import mlflow
from azureml.core import Run
import numpy as np
from sklearn.metrics import ConfusionMatrixDisplay
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

def find_csv_file(path):
    if os.path.isdir(path):
        csv_files = glob.glob(os.path.join(path, "*.csv"))
        if not csv_files:
            raise FileNotFoundError(f"No CSV files found in directory: {path}")
        return csv_files[0]
    return path

def convert_time_to_seconds(time_str):
    """Convert time string (HH:MM:SS,fff) to total seconds"""
    try:
        hh_mm_ss, millis = time_str.split(',')
        h, m, s = hh_mm_ss.split(':')
        return float(h) * 3600 + float(m) * 60 + float(s) + (float(millis)/1000)
    except:
        return np.nan

def preprocess_data(train_path, test_path): 
    train_file = find_csv_file(train_path)
    test_file = find_csv_file(test_path)
    
    train_df = pd.read_csv(train_file)
    test_df = pd.read_csv(test_file)
    
    # Identify time columns and convert to seconds
    time_cols = [col for col in train_df.columns 
                if any(x in str(train_df[col].dtype) for x in ['time', 'object'])]
    
    for col in time_cols:
        if train_df[col].astype(str).str.match(r'\\d{2}:\\d{2}:\\d{2},\\d{3}').any():
            train_df[col] = train_df[col].astype(str).apply(convert_time_to_seconds)
            test_df[col] = test_df[col].astype(str).apply(convert_time_to_seconds)
    
    target_columns = ['target', 'Target', 'label', 'Label', 'emotion', 'Emotion']
    target_col = next((col for col in target_columns if col in train_df.columns), None)
    
    if target_col is None:
        raise ValueError(f"No target column found. Available columns: {train_df.columns.tolist()}")
    
    label_encoder = LabelEncoder()
    y_train = label_encoder.fit_transform(train_df[target_col])
    y_test = label_encoder.transform(test_df[target_col])
    
    X_train = train_df.drop(columns=[target_col])
    X_test = test_df.drop(columns=[target_col])
    
    # Convert remaining string columns to numeric or categorical
    for col in X_train.select_dtypes(include=['object']).columns:
        try:
            X_train[col] = pd.to_numeric(X_train[col], errors='raise')
            X_test[col] = pd.to_numeric(X_test[col], errors='raise')
        except:
            X_train[col] = LabelEncoder().fit_transform(X_train[col].astype(str))
            X_test[col] = LabelEncoder().fit_transform(X_test[col].astype(str))
    
    # Handle missing values
    X_train = X_train.fillna(0)
    X_test = X_test.fillna(0)
    
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    os.makedirs('outputs', exist_ok=True)
    joblib.dump(scaler, 'outputs/scaler.pkl')
    joblib.dump(label_encoder, 'outputs/label_encoder.pkl')
    
    return X_train_scaled, X_test_scaled, y_train, y_test

def train_and_evaluate(args):
    run = Run.get_context()
    mlflow.start_run()
    
    try:
        X_train, X_test, y_train, y_test = preprocess_data(args.train_data, args.test_data)
        
        if args.model_name == "random_forest":
            params = {
                "n_estimators": args.n_estimators, 
                "max_depth": args.max_depth,
                "min_samples_split": args.min_samples_split,
                "random_state": 42
            }
            model = RandomForestClassifier(**params)
        elif args.model_name == "logistic_regression":
            params = {
                "C": args.C,
                "max_iter": 1000,
                "random_state": 42
            }
            model = LogisticRegression(**params)
        else:
            raise ValueError(f"Unsupported model: {args.model_name}")
        
        model.fit(X_train, y_train)
        preds = model.predict(X_test)
        acc = accuracy_score(y_test, preds)

        # Calculate additional metrics
        precision = precision_score(y_test, preds, average="weighted")
        recall = recall_score(y_test, preds, average="weighted")
        f1 = f1_score(y_test, preds, average="weighted")

        os.makedirs(args.model_output, exist_ok=True)
        joblib.dump(model, os.path.join(args.model_output, 'model.pkl'))

        # Log all metrics to MLflow so they show up in AzureML "Metrics"
        mlflow.log_metric("accuracy", acc)
        mlflow.log_metric("precision", precision)
        mlflow.log_metric("recall", recall)
        mlflow.log_metric("f1_score", f1)
        mlflow.log_params(params)
        mlflow.log_param("model_name", args.model_name)

        if hasattr(model, "predict_proba"):
            proba = model.predict_proba(X_test)[:, 1]
            for i, p in enumerate(proba):
                mlflow.log_metric("test_positive_proba", p, step=i)

        cm_display = ConfusionMatrixDisplay.from_predictions(y_test, preds)
        plt.title("Confusion Matrix")
        conf_matrix_path = os.path.join(args.model_output, "confusion_matrix.png")
        plt.savefig(conf_matrix_path)
        plt.close()
        mlflow.log_artifact(conf_matrix_path)
        # Register the model with AzureML
        run = Run.get_context()
        model_path = os.path.join(args.model_output, 'model.pkl')
        run.upload_file(name='model/model.pkl', path_or_stream=model_path)
        run.register_model(
            model_name=args.model_name,
            model_path='model/model.pkl',
            tags={"framework": "sklearn"},
            properties={"accuracy": acc}
        )

        return acc
        
    finally:
        mlflow.end_run()

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--train_data', type=str, required=True)
    parser.add_argument('--test_data', type=str, required=True)
    parser.add_argument('--model_output', type=str, required=True)
    parser.add_argument('--model_name', type=str, required=True)
    parser.add_argument('--n_estimators', type=int, default=100)
    parser.add_argument('--max_depth', type=int, default=20)
    parser.add_argument('--min_samples_split', type=int, default=2)
    parser.add_argument('--C', type=float, default=1.0)
    args = parser.parse_args()
    
    accuracy = train_and_evaluate(args)
    print(f"Model trained with accuracy: {accuracy:.4f}")

if __name__ == "__main__":
    main()
'''
    
    with open("pipeline_scripts/train_model.py", "w", encoding='utf-8') as f:
        f.write(training_script)

def create_training_component():
    create_training_script()
    
    return command(
        name="emotion_model_training",
        display_name="Emotion Classification Training",
        description="Trains emotion classification models with time feature handling",
        code="./pipeline_scripts",
        command="python train_model.py "
                "--train_data ${{inputs.train_data}} "
                "--test_data ${{inputs.test_data}} "
                "--model_output ${{outputs.model_output}} "
                "--model_name ${{inputs.model_name}} "
                "--n_estimators ${{inputs.n_estimators}} "
                "--max_depth ${{inputs.max_depth}} "
                "--min_samples_split ${{inputs.min_samples_split}} "
                "--C ${{inputs.C}}",
        environment="AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest",
        inputs={
            "train_data": Input(type=AssetTypes.URI_FOLDER),
            "test_data": Input(type=AssetTypes.URI_FOLDER),
            "model_name": Input(type="string"),
            "n_estimators": Input(type="integer", default=100),
            "max_depth": Input(type="integer", default=20),
            "min_samples_split": Input(type="integer", default=2),
            "C": Input(type="number", default=1.0)
        },
        outputs={
            "model_output": Output(type=AssetTypes.URI_FOLDER)
        }
    )

@pipeline()
def emotion_training_pipeline(train_data, test_data):
    train_component = create_training_component()
    
    # Random Forest
    rf_train = train_component(
        train_data=train_data,
        test_data=test_data,
        model_name="random_forest",
        n_estimators=150,
        max_depth=15,
        min_samples_split=3
    )
    rf_train.compute = "adsai-lambda-0"
    
    # Logistic Regression
    lr_train = train_component(
        train_data=train_data,
        test_data=test_data,
        model_name="logistic_regression",
        C=1.0
    )
    lr_train.compute = "adsai-lambda-0"
    
    return {
        "random_forest_output": rf_train.outputs.model_output,
        "logistic_regression_output": lr_train.outputs.model_output
    }

def submit_pipeline():
    try:
        train_data = ml_client.data.get(name="emotion-raw-train", label="latest")
        test_data = ml_client.data.get(name="emotion-raw-test", label="latest")
        
        pipeline_job = emotion_training_pipeline(
            train_data=Input(type=AssetTypes.URI_FOLDER, path=train_data.path),
            test_data=Input(type=AssetTypes.URI_FOLDER, path=test_data.path)
        )
        pipeline_job.settings.default_compute = "adsai-lambda-0"
        
        submitted_job = ml_client.jobs.create_or_update(
            pipeline_job,
            experiment_name="emotion-classification-v2"
        )
        
        print(f"Pipeline submitted: {submitted_job.studio_url}")
        return submitted_job
        
    except Exception as e:
        print(f"Error submitting pipeline: {e}")
        return None

if __name__ == "__main__":
    submit_pipeline()

Overriding of current TracerProvider is not allowed
Overriding of current LoggerProvider is not allowed
Overriding of current MeterProvider is not allowed
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented


Available datasets in workspace:


Overriding of current TracerProvider is not allowed
Overriding of current LoggerProvider is not allowed
Overriding of current MeterProvider is not allowed
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented
Attempting to instrument while already instrumented


=== AZURE ML PIPELINE TRAINING SYSTEM ===

This script provides two approaches:
1. Pipeline-based training (recommended)
2. Parallel individual jobs (fallback)
=== PIPELINE-BASED TRAINING ===
Available datasets in workspace:
  - emotion-raw-train (version: None, type: uri_folder)
  - emotion-raw-test (version: None, type: uri_folder)
  - emotion-train-data-v2 (version: None, type: uri_file)
  - emotion-test-data-v2 (version: None, type: uri_file)
  - emotion-processed-train (version: None, type: uri_file)
  - emotion-processed-test (version: None, type: uri_file)
  - emotion-encoders (version: None, type: uri_folder)

Available datasets:
  - emotion-raw-train
  - emotion-raw-test
  - emotion-train-data-v2
  - emotion-test-data-v2
  - emotion-processed-train
  - emotion-processed-test
  - emotion-encoders
Train dataset: emotion-processed-train (version: 17)
Test dataset: emotion-processed-test (version: 17)
Creating new environment...
Error submitting pipeline: No such file or directory

In [1]:
from azure.ai.ml import MLClient
from azure.ai.ml.entities import Environment, Model
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import CommandComponent, Data
from azure.ai.ml import command, Input, Output
from azure.identity import InteractiveBrowserCredential
from azure.ai.ml.constants import AssetTypes
import os
import time
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.email import EmailOperator
from airflow.sensors.filesystem import FileSensor
from airflow.exceptions import AirflowException
from airflow.utils.task_group import TaskGroup
from airflow.models import Variable
import logging

# Initialize MLClient
credential = InteractiveBrowserCredential()
ml_client = MLClient(
    credential=credential,
    subscription_id="0a94de80-6d3b-49f2-b3e9-ec5818862801",
    resource_group_name="buas-y2",
    workspace_name="NLP6-2025"
)

def create_training_script():
    os.makedirs("pipeline_scripts", exist_ok=True)
    
    training_script = '''
import pandas as pd 
import joblib 
from sklearn.ensemble import RandomForestClassifier 
from sklearn.linear_model import LogisticRegression 
from sklearn.metrics import accuracy_score 
from sklearn.preprocessing import StandardScaler 
import argparse
import os
import glob
from sklearn.preprocessing import LabelEncoder
import json
import time
import mlflow
from azureml.core import Run
import numpy as np

def find_csv_file(path):
    if os.path.isdir(path):
        csv_files = glob.glob(os.path.join(path, "*.csv"))
        if not csv_files:
            raise FileNotFoundError(f"No CSV files found in directory: {path}")
        return csv_files[0]
    return path

def convert_time_to_seconds(time_str):
    """Convert time string (HH:MM:SS,fff) to total seconds"""
    try:
        hh_mm_ss, millis = time_str.split(',')
        h, m, s = hh_mm_ss.split(':')
        return float(h) * 3600 + float(m) * 60 + float(s) + (float(millis)/1000)
    except:
        return np.nan

def preprocess_data(train_path, test_path): 
    train_file = find_csv_file(train_path)
    test_file = find_csv_file(test_path)
    
    train_df = pd.read_csv(train_file)
    test_df = pd.read_csv(test_file)
    
    # Identify time columns and convert to seconds
    time_cols = [col for col in train_df.columns 
                if any(x in str(train_df[col].dtype) for x in ['time', 'object'])]
    
    for col in time_cols:
        if train_df[col].astype(str).str.match(r'\\d{2}:\\d{2}:\\d{2},\\d{3}').any():
            train_df[col] = train_df[col].astype(str).apply(convert_time_to_seconds)
            test_df[col] = test_df[col].astype(str).apply(convert_time_to_seconds)
    
    target_columns = ['target', 'Target', 'label', 'Label', 'emotion', 'Emotion']
    target_col = next((col for col in target_columns if col in train_df.columns), None)
    
    if target_col is None:
        raise ValueError(f"No target column found. Available columns: {train_df.columns.tolist()}")
    
    label_encoder = LabelEncoder()
    y_train = label_encoder.fit_transform(train_df[target_col])
    y_test = label_encoder.transform(test_df[target_col])
    
    X_train = train_df.drop(columns=[target_col])
    X_test = test_df.drop(columns=[target_col])
    
    # Convert remaining string columns to numeric or categorical
    for col in X_train.select_dtypes(include=['object']).columns:
        try:
            X_train[col] = pd.to_numeric(X_train[col], errors='raise')
            X_test[col] = pd.to_numeric(X_test[col], errors='raise')
        except:
            X_train[col] = LabelEncoder().fit_transform(X_train[col].astype(str))
            X_test[col] = LabelEncoder().fit_transform(X_test[col].astype(str))
    
    # Handle missing values
    X_train = X_train.fillna(0)
    X_test = X_test.fillna(0)
    
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    os.makedirs('outputs', exist_ok=True)
    joblib.dump(scaler, 'outputs/scaler.pkl')
    joblib.dump(label_encoder, 'outputs/label_encoder.pkl')
    
    return X_train_scaled, X_test_scaled, y_train, y_test

def train_and_evaluate(args):
    run = Run.get_context()
    mlflow.start_run()
    
    try:
        X_train, X_test, y_train, y_test = preprocess_data(args.train_data, args.test_data)
        
        if args.model_name == "random_forest":
            params = {
                "n_estimators": args.n_estimators, 
                "max_depth": args.max_depth,
                "min_samples_split": args.min_samples_split,
                "random_state": 42
            }
            model = RandomForestClassifier(**params)
        elif args.model_name == "logistic_regression":
            params = {
                "C": args.C,
                "max_iter": 1000,
                "random_state": 42
            }
            model = LogisticRegression(**params)
        else:
            raise ValueError(f"Unsupported model: {args.model_name}")
        
        model.fit(X_train, y_train)
        preds = model.predict(X_test)
        acc = accuracy_score(y_test, preds)
        
        os.makedirs(args.model_output, exist_ok=True)
        joblib.dump(model, os.path.join(args.model_output, 'model.pkl'))
        
        mlflow.log_metric("accuracy", acc)
        mlflow.log_params(params)
        mlflow.log_param("model_name", args.model_name)
        
        return acc
        
    finally:
        mlflow.end_run()

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--train_data', type=str, required=True)
    parser.add_argument('--test_data', type=str, required=True)
    parser.add_argument('--model_output', type=str, required=True)
    parser.add_argument('--model_name', type=str, required=True)
    parser.add_argument('--n_estimators', type=int, default=100)
    parser.add_argument('--max_depth', type=int, default=20)
    parser.add_argument('--min_samples_split', type=int, default=2)
    parser.add_argument('--C', type=float, default=1.0)
    args = parser.parse_args()
    
    accuracy = train_and_evaluate(args)
    print(f"Model trained with accuracy: {accuracy:.4f}")

if __name__ == "__main__":
    main()
'''
    
    with open("pipeline_scripts/train_model.py", "w", encoding='utf-8') as f:
        f.write(training_script)

def create_training_component():
    create_training_script()
    
    return command(
        name="emotion_model_training",
        display_name="Emotion Classification Training",
        description="Trains emotion classification models with time feature handling",
        code="./pipeline_scripts",
        command="python train_model.py "
                "--train_data ${{inputs.train_data}} "
                "--test_data ${{inputs.test_data}} "
                "--model_output ${{outputs.model_output}} "
                "--model_name ${{inputs.model_name}} "
                "--n_estimators ${{inputs.n_estimators}} "
                "--max_depth ${{inputs.max_depth}} "
                "--min_samples_split ${{inputs.min_samples_split}} "
                "--C ${{inputs.C}}",
        environment="AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest",
        inputs={
            "train_data": Input(type=AssetTypes.URI_FOLDER),
            "test_data": Input(type=AssetTypes.URI_FOLDER),
            "model_name": Input(type="string"),
            "n_estimators": Input(type="integer", default=100),
            "max_depth": Input(type="integer", default=20),
            "min_samples_split": Input(type="integer", default=2),
            "C": Input(type="number", default=1.0)
        },
        outputs={
            "model_output": Output(type=AssetTypes.URI_FOLDER)
        }
    )

@pipeline()
def emotion_training_pipeline(train_data, test_data):
    train_component = create_training_component()
    
    # Random Forest
    rf_train = train_component(
        train_data=train_data,
        test_data=test_data,
        model_name="random_forest",
        n_estimators=150,
        max_depth=15,
        min_samples_split=3
    )
    rf_train.compute = "adsai-lambda-0"
    
    # Logistic Regression
    lr_train = train_component(
        train_data=train_data,
        test_data=test_data,
        model_name="logistic_regression",
        C=1.0
    )
    lr_train.compute = "adsai-lambda-0"
    
    return {
        "random_forest_output": rf_train.outputs.model_output,
        "logistic_regression_output": lr_train.outputs.model_output
    }

# Airflow Task Functions
def validate_data_availability(**kwargs):
    """Check if data assets are available before starting pipeline"""
    try:
        train_data = ml_client.data.get(name="emotion-raw-train", label="latest")
        test_data = ml_client.data.get(name="emotion-raw-test", label="latest")
        
        logging.info(f"Train data found: {train_data.name} v{train_data.version}")
        logging.info(f"Test data found: {test_data.name} v{test_data.version}")
        
        ti = kwargs['ti']
        ti.xcom_push(key='train_data_path', value=train_data.path)
        ti.xcom_push(key='test_data_path', value=test_data.path)
        
        return True
        
    except Exception as e:
        logging.error(f"Data validation failed: {e}")
        raise AirflowException(f"Required data assets not found: {e}")

def submit_pipeline(**kwargs):
    try:
        ti = kwargs['ti']
        
        # Get data paths from previous task
        train_data_path = ti.xcom_pull(task_ids='validate_data', key='train_data_path')
        test_data_path = ti.xcom_pull(task_ids='validate_data', key='test_data_path')
        
        if not train_data_path or not test_data_path:
            # Fallback to direct data retrieval
            train_data = ml_client.data.get(name="emotion-raw-train", label="latest")
            test_data = ml_client.data.get(name="emotion-raw-test", label="latest")
            train_data_path = train_data.path
            test_data_path = test_data.path
        
        pipeline_job = emotion_training_pipeline(
            train_data=Input(type=AssetTypes.URI_FOLDER, path=train_data_path),
            test_data=Input(type=AssetTypes.URI_FOLDER, path=test_data_path)
        )
        pipeline_job.settings.default_compute = "adsai-lambda-0"
        
        submitted_job = ml_client.jobs.create_or_update(
            pipeline_job,
            experiment_name="emotion-classification-v2"
        )
        
        logging.info(f"Pipeline submitted: {submitted_job.studio_url}")
        
        # Push job details to XCom
        ti.xcom_push(key='job_id', value=submitted_job.id)
        ti.xcom_push(key='job_url', value=submitted_job.studio_url)
        ti.xcom_push(key='experiment_name', value="emotion-classification-v2")
        
        return submitted_job.id
        
    except Exception as e:
        logging.error(f"Error submitting pipeline: {e}")
        raise AirflowException(f"Pipeline submission failed: {e}")

def monitor_pipeline(**kwargs):
    try:
        ti = kwargs['ti']
        job_id = ti.xcom_pull(task_ids='ml_pipeline_group.submit_pipeline', key='job_id')
        
        if not job_id:
            raise AirflowException("No job ID found from previous task")
        
        job = ml_client.jobs.get(job_id)
        
        logging.info(f"Pipeline status: {job.status}")
        
        # Push status to XCom for downstream tasks
        ti.xcom_push(key='job_status', value=job.status)
        ti.xcom_push(key='job_details', value={
            'id': job.id,
            'status': job.status,
            'creation_time': str(job.creation_context.created_at) if job.creation_context else None
        })
        
        if job.status in ["Completed", "Finished"]:
            logging.info(f"Pipeline completed successfully: {job.studio_url}")
            return True
        elif job.status in ["Failed", "Canceled"]:
            raise AirflowException(f"Pipeline failed with status: {job.status}")
        else:
            logging.info(f"Pipeline still running. Current status: {job.status}")
            return False
        
    except Exception as e:
        logging.error(f"Error monitoring pipeline: {e}")
        raise AirflowException(f"Pipeline monitoring failed: {e}")

def collect_metrics(**kwargs):
    """Collect and log metrics from completed pipeline"""
    try:
        ti = kwargs['ti']
        job_id = ti.xcom_pull(task_ids='ml_pipeline_group.submit_pipeline', key='job_id')
        
        if not job_id:
            logging.warning("No job ID found, skipping metrics collection")
            return
        
        job = ml_client.jobs.get(job_id)
        
        # Log pipeline completion metrics
        logging.info(f"Pipeline {job_id} completed")
        logging.info(f"Experiment: {job.experiment_name}")
        logging.info(f"Total duration: {job.creation_context}")
        
        # Store metrics in XCom for potential downstream use
        ti.xcom_push(key='pipeline_metrics', value={
            'job_id': job_id,
            'status': job.status,
            'experiment_name': job.experiment_name,
            'completion_time': str(datetime.now())
        })
        
        return True
        
    except Exception as e:
        logging.error(f"Error collecting metrics: {e}")
        # Don't fail the pipeline for metrics collection issues
        return False

def send_completion_notification(**kwargs):
    """Send notification about pipeline completion"""
    try:
        ti = kwargs['ti']
        job_url = ti.xcom_pull(task_ids='ml_pipeline_group.submit_pipeline', key='job_url')
        job_status = ti.xcom_pull(task_ids='ml_pipeline_group.monitor_pipeline', key='job_status')
        
        message = f"""
        Azure ML Pipeline Completed Successfully!
        
        Status: {job_status}
        Studio URL: {job_url}
        Completion Time: {datetime.now()}
        
        The emotion classification models have been trained and are ready for use.
        """
        
        logging.info("Pipeline completion notification prepared")
        logging.info(message)
        
        # In a real environment, you might send this via email, Slack, etc.
        return message
        
    except Exception as e:
        logging.error(f"Error sending notification: {e}")
        return None

def cleanup_resources(**kwargs):
    """Clean up temporary resources if needed"""
    try:
        # Clean up temporary files
        if os.path.exists("pipeline_scripts"):
            logging.info("Cleaning up pipeline scripts directory")
        
        logging.info("Resource cleanup completed")
        return True
        
    except Exception as e:
        logging.error(f"Error during cleanup: {e}")
        return False

def create_airflow_dag():
    default_args = {
        'owner': 'data-science-team',
        'depends_on_past': False,
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 2,
        'retry_delay': timedelta(minutes=5),
        'execution_timeout': timedelta(hours=2),
    }

    dag = DAG(
        'azureml_emotion_classification_enhanced',
        default_args=default_args,
        description='Enhanced orchestration of Azure ML Emotion Classification Pipeline',
        schedule_interval='@weekly',  # Run weekly
        start_date=datetime(2024, 1, 1),
        catchup=False,
        tags=['azureml', 'emotion-classification', 'ml-pipeline'],
        max_active_runs=1,  # Prevent concurrent runs
    )

    # Pre-pipeline validation task
    validate_data_task = PythonOperator(
        task_id='validate_data',
        python_callable=validate_data_availability,
        provide_context=True,
        dag=dag,
    )

    # Create a task group for ML pipeline tasks
    with TaskGroup("ml_pipeline_group", dag=dag) as ml_pipeline_group:
        
        submit_task = PythonOperator(
            task_id='submit_pipeline',
            python_callable=submit_pipeline,
            provide_context=True,
        )

        monitor_task = PythonOperator(
            task_id='monitor_pipeline',
            python_callable=monitor_pipeline,
            provide_context=True,
            retries=12,  # Will retry for up to 1 hour (5 min * 12)
            retry_delay=timedelta(minutes=5),
            poke_interval=300,  # Check every 5 minutes
        )

        submit_task >> monitor_task

    # Post-pipeline tasks
    collect_metrics_task = PythonOperator(
        task_id='collect_metrics',
        python_callable=collect_metrics,
        provide_context=True,
        dag=dag,
    )

    send_notification_task = PythonOperator(
        task_id='send_notification',
        python_callable=send_completion_notification,
        provide_context=True,
        dag=dag,
    )

    cleanup_task = PythonOperator(
        task_id='cleanup_resources',
        python_callable=cleanup_resources,
        provide_context=True,
        trigger_rule='all_done',  # Run regardless of upstream success/failure
        dag=dag,
    )

    # Define task dependencies
    validate_data_task >> ml_pipeline_group >> [collect_metrics_task, send_notification_task] >> cleanup_task

    return dag

# Create the enhanced DAG
emotion_classification_dag = create_airflow_dag()

# Alternative DAG for manual/on-demand runs
def create_manual_dag():
    """Create a separate DAG for manual pipeline runs"""
    default_args = {
        'owner': 'data-science-team',
        'depends_on_past': False,
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=2),
    }

    manual_dag = DAG(
        'azureml_emotion_classification_manual',
        default_args=default_args,
        description='Manual trigger for Azure ML Emotion Classification Pipeline',
        schedule_interval=None,  # Manual trigger only
        start_date=datetime(2024, 1, 1),
        catchup=False,
        tags=['azureml', 'emotion-classification', 'manual'],
    )

    manual_submit = PythonOperator(
        task_id='manual_submit_pipeline',
        python_callable=submit_pipeline,
        provide_context=True,
        dag=manual_dag,
    )

    manual_monitor = PythonOperator(
        task_id='manual_monitor_pipeline',
        python_callable=monitor_pipeline,
        provide_context=True,
        retries=6,
        retry_delay=timedelta(minutes=5),
        dag=manual_dag,
    )

    manual_submit >> manual_monitor

    return manual_dag

# Create the manual DAG
emotion_classification_manual_dag = create_manual_dag()

if __name__ == "__main__":
    # For local testing outside of Airflow
    print("Testing pipeline submission...")
    try:
        result = submit_pipeline()
        print(f"Pipeline submitted with ID: {result}")
    except Exception as e:
        print(f"Error: {e}")

ModuleNotFoundError: No module named 'airflow'