In [7]:
import os
os.chdir("../")
%pwd

'd:\\AI\\NLP\\HandsOn\\sentiment-analysis'

In [2]:
from dataclasses import dataclass
from typing import Optional, Dict
from pathlib import Path

@dataclass(frozen=True)
class MLflowConfig:
    root_dir: Path
    experiment_name: str
    run_name: str
    tracking_uri: str
    artifact_location: Optional[str]
    default_tags: Dict[str, str]
    dynamic_tags: Dict[str, bool]
    logging: Dict[str, bool]
    basemodel: Dict[str, str]
    advancedmodel: Dict[str, str]

In [3]:
from SentiScope.constants import (CONFIG_FILE_PATH,
                                  PARAMS_FILE_PATH)
from SentiScope.utils.file_utils import (create_directories,
                                            get_size)
from SentiScope.utils.config_utils import (read_yaml,
                                           Settings,
                                           get_settings)

In [4]:
class ConfigurationManager:
    def __init__(
        self,
        config_filepath=CONFIG_FILE_PATH,
        params_filepath=PARAMS_FILE_PATH,
    ):
        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)

        create_directories([self.config.artifacts_root])

    def get_mlflow_config(self) -> MLflowConfig:
        config = self.config.mlflow
        create_directories([config.root_dir])

        # Prepare the MLflow configuration object
        mlflow_config = MLflowConfig(
            root_dir = config.root_dir,
            experiment_name=config.experiment.name,
            run_name = config.experiment.run,
            tracking_uri=config.experiment.tracking_uri,
            artifact_location=config.experiment.artifact_location,  
            default_tags=config.default_tags,
            dynamic_tags=config.dynamic_tags,
            logging=config.logging,
            basemodel=config.basemodel,
            advancedmodel=config.advancedmodel,
        )

        return mlflow_config

In [5]:
import mlflow
from SentiScope.logging import logger
from typing import Any, Dict, Optional
from functools import wraps
from SentiScope.entity import MLflowConfig

class MLflowTracker:
    """
    A modular MLflow tracking component that can be imported and used across different modules
    without interfering with their core logic.
    """
    def __init__(self, config: MLflowConfig):
        self.config = config
        self.experiment_name = self.config.experiment_name
        self.run_name = self.config.run_name
        self.tracking_uri = self.config.tracking_uri
        self.run = None
        
        try:
            if self.tracking_uri:
                mlflow.set_tracking_uri(self.tracking_uri)
                logger.info(f"Setting MLflow tracking URI to: {self.tracking_uri}")
                
                # Test connection before proceeding
                self._test_connection()
            
            # Get or create experiment
            experiment = mlflow.get_experiment_by_name(self.experiment_name)
            if experiment is not None:
                self.experiment_id = experiment.experiment_id
                logger.info(f"Found existing experiment: {self.experiment_name} with ID: {self.experiment_id}")
            else:
                self.experiment_id = mlflow.create_experiment(self.experiment_name)
                logger.info(f"Created new experiment: {self.experiment_name} with ID: {self.experiment_id}")

        except ConnectionError as e:
            logger.error(f"Failed to connect to MLflow tracking server at {self.tracking_uri}: {str(e)}")
            raise
        except Exception as e:
            logger.error(f"Error initializing MLflow tracker: {str(e)}")
            raise

    def _test_connection(self) -> bool:
        """Test connection to MLflow server"""
        try:
            mlflow.get_tracking_uri()
            # Try to list experiments as a connection test
            mlflow.search_experiments()
            logger.info("Successfully connected to MLflow server")
            return True
        except Exception as e:
            raise ConnectionError(f"Cannot connect to MLflow server: {str(e)}")

    def start_run(self, run_name: str, nested: bool = False):
        """Start a new MLflow run if none is active"""
        if run_name:
            self.run_name = run_name
        try:
            self.run = mlflow.start_run(
                experiment_id=self.experiment_id, 
                run_name=self.run_name,
                nested=nested  # Add nested parameter support
            )
            logger.info(f"Started new MLflow run with ID: {self.run.info.run_id}")
            return self.run.info.run_id
        except Exception as e:
            logger.error(f"Error starting MLflow run: {str(e)}")
            raise

    def log_params(self, params: dict):
        """Log parameters to MLflow"""
        try:
            if self.run is None:
                raise RuntimeError("No active MLflow run. Call start_run() first.")
            mlflow.log_params(params)
            logger.debug(f"Logged parameters: {params}")
        except Exception as e:
            logger.error(f"Error logging parameters: {str(e)}")
            raise

    def log_metrics(self, metrics: dict):
        """Log metrics to MLflow"""
        try:
            if self.run is None:
                raise RuntimeError("No active MLflow run. Call start_run() first.")
            mlflow.log_metrics(metrics)
            logger.debug(f"Logged metrics: {metrics}")
        except Exception as e:
            logger.error(f"Error logging metrics: {str(e)}")
            raise

    def log_model(self, model: Any, artifact_path: str):
        """Log ML model to MLflow"""
        try:
            if self.run is None:
                raise RuntimeError("No active MLflow run. Call start_run() first.")
            mlflow.sklearn.log_model(sk_model=model, artifact_path=artifact_path)
            logger.info(f"Logged model to artifact path: {artifact_path}")
        except Exception as e:
            logger.error(f"Error logging model: {str(e)}")
            raise

    def log_artifact(self, artifact_path: str, destination_path: str = None):
        """Log artifact to MLflow"""
        try:
            if self.run is None:
                raise RuntimeError("No active MLflow run. Call start_run() first.")
            mlflow.log_artifact(artifact_path, destination_path)
            logger.info(f"Logged artifact from {artifact_path} to {destination_path or 'default path'}")
        except Exception as e:
            logger.error(f"Error logging artifact: {str(e)}")
            raise

    def end_run(self):
        """End the current MLflow run"""
        try:
            if self.run:
                mlflow.end_run()
                logger.info("Ended MLflow run")
                self.run = None
        except Exception as e:
            logger.error(f"Error ending MLflow run: {str(e)}")
            raise


In [6]:
class DataIngestion:
    def __init__(self, mlflow_tracker: MLflowTracker):
        self.mlflow_tracker = mlflow_tracker
        self.mlflow_tracker.start_run(run_name="Data_Ingestion",nested=True)
    def ingest_data(self, source: str) -> dict:
        # Your existing ingestion logic here
        metrics = {
            'rows_ingested': 1000,
            'ingestion_time': 10.5
        }
        # Log metrics instead of params
        self.mlflow_tracker.log_metrics(metrics)
        return {'metrics': metrics, 'data': 'ingested_data'}


In [7]:
# Main execution
try:
    config = ConfigurationManager()
    mlflow_config = config.get_mlflow_config() 
    mlflow_tracker = MLflowTracker(config=mlflow_config)
    mlflow_tracker.start_run("MainPipeline")
    data_ingestion = DataIngestion(mlflow_tracker)  
    dataframe = data_ingestion.ingest_data(  # Fixed variable naming convention
        source=r"D:\AI\NLP\HandsOn\sentiment-analysis\artifacts\feature_transformation\20250117_082011\test_split.csv"
    )
except Exception as e:
    raise e

[2025-01-19 16:06:45,289: INFO: config_utils: yaml file: config\config.yaml loaded successfully]
[2025-01-19 16:06:45,291: INFO: config_utils: yaml file: params.yaml loaded successfully]
[2025-01-19 16:06:45,293: INFO: file_utils: created directory at: artifacts]
[2025-01-19 16:06:45,294: INFO: file_utils: created directory at: artifacts/mlflow_tracking]
[2025-01-19 16:06:45,295: INFO: 23541907: Setting MLflow tracking URI to: http://localhost:5000]
[2025-01-19 16:06:47,352: INFO: 23541907: Successfully connected to MLflow server]
[2025-01-19 16:06:47,361: INFO: 23541907: Found existing experiment: sentiment_analysis_pipeline with ID: 854231226268693672]
[2025-01-19 16:06:47,619: INFO: 23541907: Started new MLflow run with ID: efe8a705ae624f2893f6504a3e8b907f]
[2025-01-19 16:06:47,674: INFO: 23541907: Started new MLflow run with ID: 4631619513f64ff18ab7ea92e7f3a3c4]


In [11]:
data_ingestion = DataIngestion(mlflow_tracker)  
dataframe = data_ingestion.ingest_data(  # Fixed variable naming convention
    source=r"D:\AI\NLP\HandsOn\sentiment-analysis\artifacts\feature_transformation\20250117_082011\test_split.csv"
)

[2025-01-19 16:07:44,528: INFO: 23541907: Started new MLflow run with ID: 43a19df81f2d461eb68b81b10fb5d105]


In [12]:
mlflow_tracker.end_run()

🏃 View run Data_Ingestion at: http://localhost:5000/#/experiments/854231226268693672/runs/43a19df81f2d461eb68b81b10fb5d105
🧪 View experiment at: http://localhost:5000/#/experiments/854231226268693672
[2025-01-19 16:08:53,826: INFO: 23541907: Ended MLflow run]


--------

### Runnign Commands

`mlflow server --host 127.0.0.1 --port 5000`
`mlflow.set_tracking_uri("file:///path/to/your/mlruns")`

#### https://mlflow.org/docs/latest/getting-started/registering-first-model/step1-register-model.html