In [23]:
import os
%pwd

'/home/amit/python/Industrial_AI_project/IITM_MLops_titanic_dataset_github_clone/IITM-MLProject-kaggle-Titanic-dataset'

In [2]:
os.chdir('../')
%pwd

'/home/amit/python/Industrial_AI_project/IITM_MLops_titanic_dataset_github_clone/IITM-MLProject-kaggle-Titanic-dataset'

In [24]:
from dataclasses import dataclass
from pathlib import Path


@dataclass(frozen=True)
class MLFlowModelManagementConfig:
    root_dir: Path
    input_model_folder: Path
    test_data_file: Path
    params_experiment_name: str
    params_mlflow_uri: str
    params_mlflow_run_name: str
    params_sparkSessionTitle: str

In [25]:
# from Titanic_dataset_analysis import constants as c
# from Titanic_dataset_analysis.constants import CONFIG_FILE_PATH, PARAMS_FILE_PATH
CONFIG_FILE_PATH = Path("config/config.yaml")
PARAMS_FILE_PATH = Path("params.yaml")
from Titanic_dataset_analysis.utils.common import read_yaml, create_directories

In [26]:
class ConfigurationManager:
    def __init__(
        self,
        config_filepath = CONFIG_FILE_PATH,
        params_filepath = PARAMS_FILE_PATH):
        print(os.getcwd())
        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)

        create_directories([self.config.artifacts_root])


    
    def mlflow_model_management_config(self) -> MLFlowModelManagementConfig:
        config = self.config.mlflow_model_management
        self.params = self.params

        create_directories([config.root_dir])
        # print(f"Params received under: {self.params} and {self.params.splitratio}")
        mlflow_model_management_config = MLFlowModelManagementConfig(
            root_dir=config.root_dir,
            input_model_folder= config.input_model_folder,
            test_data_file= config.test_data_file,
            params_experiment_name= self.params.experiment_name,
            params_mlflow_uri= self.params.mlflow_uri,
            params_mlflow_run_name= self.params.mlflow_run_name,
            params_sparkSessionTitle= self.params.sparkSessionTitle
            
        )

        return mlflow_model_management_config

In [None]:
import os
import mlflow
from pyspark.sql import SparkSession
from pathlib import Path
from pyspark.ml import PipelineModel
from Titanic_dataset_analysis import logger
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from Titanic_dataset_analysis import constants as const

In [None]:
class MLFlowModelManagement:
    def __init__(self, config: MLFlowModelManagementConfig):
        self.config = config


    
    def load_model(self):
        if not os.path.exists(self.config.input_model_folder):
            logger.info(f"Model download failed in previous step! Please check the location mentioned : {self.config.input_data_file}")
        else:
            logger.info(f"Model already exists at: {Path(self.config.input_model_folder)}")  

        self.model = PipelineModel.load(self.config.input_model_folder)
        logger.info(f"Loaded model from {self.config.input_model_folder}")
    
    def mlflow_model_tracking(self):
        """
        zip_file_path: str
        Extracts the zip file into the data directory
        Function returns None
        """
        # -------------------------------
        # MLflow Tracking
        # -------------------------------
        spark = SparkSession.builder.appName(self.config.params_sparkSessionTitle).getOrCreate()
        spark.sparkContext.setLogLevel("WARN")
        
        df = spark.read.csv(str(self.config.test_data_file), header=True, inferSchema=True)
        # # Family size + IsAlone
        # df = df.withColumn("FamilySize", col("SibSp") + col("Parch") + 1)
        # df = df.withColumn("IsAlone", when(col("FamilySize") == 1, 1).otherwise(0))
        logger.info(f"Experiment Name: {self.config.params_experiment_name}")
        experiment_name = self.config.params_experiment_name
        logger.info(f"Setting Tracking URI to : {self.config.params_mlflow_uri}")
        mlflow.set_tracking_uri(self.config.params_mlflow_uri)
        os.environ["MLFLOW_ARTIFACT_URI"] = f"file:{os.getcwd()}/mlruns"
        try:
            experiment_id = mlflow.create_experiment(experiment_name)
        except mlflow.exceptions.MlflowException:
            experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id
        mlflow.set_experiment(experiment_name)
        logger.info(f"Experiment ID: {experiment_id}")
        with mlflow.start_run(run_name=self.config.params_mlflow_run_name) as run:
            # cv_model = crossval.fit(train_data)
    
            # Best params from final stage (LogReg is last in pipeline)
            best_lr = self.model.stages[-1]
            # print(best_lr)
            logger.info(f"regparam: {best_lr.getOrDefault('regParam')}")
            logger.info(f"ElasticNetParam: {best_lr.getOrDefault('elasticNetParam')}")
            mlflow.log_param("regParam", best_lr.getOrDefault("regParam"))
            mlflow.log_param("elasticNetParam", best_lr.getOrDefault("elasticNetParam"))
    
            predictions = self.model.transform(df)
            
            # Evaluator
            evaluator = BinaryClassificationEvaluator(labelCol="Survived", rawPredictionCol="rawPrediction", metricName="areaUnderROC")
            # Metrics
            auc_test = evaluator.evaluate(predictions)
            mlflow.log_metric("AUC", auc_test)
            logger.info(f"AUC: {auc_test}")

            accuracy = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy").evaluate(predictions)
            precision = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="weightedPrecision").evaluate(predictions)
            recall = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="weightedRecall").evaluate(predictions)
            f1 = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="f1").evaluate(predictions)

            mlflow.log_metric("Accuracy", accuracy)
            mlflow.log_metric("Precision", precision)
            mlflow.log_metric("Recall", recall)
            mlflow.log_metric("F1", f1)
            logger.info(f"Accuracy: {accuracy}")
            logger.info(f"Precision: {precision}")
            logger.info(f"Recall: {recall}")
            logger.info(f"F1: {f1}")
            
            
            # model_name = "spark-titanic-LR-pipeline"
            mlflow.spark.log_model(self.model,
                           artifact_path=self.config.params_mlflow_run_name,
                           registered_model_name=self.config.params_experiment_name)

            # print(f"Run ID: {run.info.run_id}")
            # print(f"Experiment ID: {run.info.experiment_id}")
            const.RUN_ID = run.info.run_id
            const.EXPERIMENT_ID = run.info.experiment_id

            logger.info(f"Run ID from constants: {run.info.run_id}")
            logger.info(f"Experiment ID from constants: {run.info.experiment_id}")
                
        os.makedirs(self.config.root_dir, exist_ok=True)
        logger.info(f"MLFlow Model Tracking done successfully.")
        client = mlflow.tracking.MlflowClient()
        latest_versions = client.get_latest_versions(self.config.params_experiment_name, stages=["None"])
        if latest_versions:
            latest_version = latest_versions[0].version
            client.transition_model_version_stage(
                name=self.config.params_experiment_name,
                version=latest_version,
                stage="Staging"
            )
            print(f"Model '{self.config.params_experiment_name}' version {latest_version} moved to Staging!")
        spark.stop()

In [48]:
try:
    config = ConfigurationManager()
    mlflow_model_tracking_config = config.mlflow_model_management_config()
    mlflow_model_tracking_config = MLFlowModelManagement(config=mlflow_model_tracking_config)
    mlflow_model_tracking_config.load_model()
    mlflow_model_tracking_config.mlflow_model_tracking()
except Exception as e:
    raise e

/home/amit/python/Industrial_AI_project/IITM_MLops_titanic_dataset_github_clone/IITM-MLProject-kaggle-Titanic-dataset
[2025-08-28 13:13:06,225: INFO: common: yaml file: config/config.yaml loaded successfully]
[2025-08-28 13:13:06,230: INFO: common: yaml file: params.yaml loaded successfully]
[2025-08-28 13:13:06,234: INFO: common: created directory at: artifacts]
[2025-08-28 13:13:06,236: INFO: common: created directory at: artifacts/mlflow_model_management]
[2025-08-28 13:13:06,239: INFO: 104836984: Model already exists at: artifacts/model_training/best_model]


                                                                                

[2025-08-28 13:13:10,353: INFO: 104836984: Loaded model from artifacts/model_training/best_model]
[2025-08-28 13:13:10,675: INFO: 104836984: Experiment Name: Titanic_Pipeline_Exp1]
[2025-08-28 13:13:10,678: INFO: 104836984: Setting Tracking URI to : http://localhost:5000]
[2025-08-28 13:13:10,797: INFO: 104836984: Experiment ID: 285969590943178826]
[2025-08-28 13:13:10,862: INFO: 104836984: regparam: 0.01]
[2025-08-28 13:13:10,863: INFO: 104836984: ElasticNetParam: 0.0]
[2025-08-28 13:13:13,023: INFO: 104836984: AUC: 0.8857634902411028]
[2025-08-28 13:13:14,158: INFO: 104836984: Accuracy: 0.7862068965517242]
[2025-08-28 13:13:14,159: INFO: 104836984: Precision: 0.7887039239001188]
[2025-08-28 13:13:14,162: INFO: 104836984: Recall: 0.7862068965517242]
[2025-08-28 13:13:14,167: INFO: 104836984: F1: 0.784341065830721]


25/08/28 13:13:14 ERROR Instrumentation: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "mlflow-artifacts"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:673)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:167)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.super$save(Pipeline.scala:344)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.$anonfun$save$4(Pipeline.scala:344)
	at org.apache.spark.ml.MLEvents.withSaveInst

[2025-08-28 13:13:51,340: INFO: 104836984: Run ID from constants: 7ce974ca0d754096a1fe7b209fa0656f]
[2025-08-28 13:13:51,342: INFO: 104836984: Experiment ID from constants: 285969590943178826]


Created version '5' of model 'Titanic_Pipeline_Exp1'.
2025/08/28 13:13:51 INFO mlflow.tracking._tracking_service.client: 🏃 View run LR_Pipeline at: http://localhost:5000/#/experiments/285969590943178826/runs/7ce974ca0d754096a1fe7b209fa0656f.
2025/08/28 13:13:51 INFO mlflow.tracking._tracking_service.client: 🧪 View experiment at: http://localhost:5000/#/experiments/285969590943178826.


[2025-08-28 13:13:51,424: INFO: 104836984: MLFlow Model Tracking done successfully.]


In [37]:
%pwd

'/home/amit/python/Industrial_AI_project/IITM_MLops_titanic_dataset_github_clone/IITM-MLProject-kaggle-Titanic-dataset'