# ML FLOW
# Capabilities of MLflow

There are four components to MLflow:

- MLflow Tracking
- MLflow Projects
- MLflow Models
- MLflow Model Registry
### MLflow Tracking
MLflow Tracking allows data scientists to work with experiments in which they process and analyze data or train machine learning models. For each run in an experiment, a data scientist can log parameter values, versions of libraries used, model evaluation metrics, and generated output files; including images of data visualizations and model files. This ability to log important details about experiment runs makes it possible to audit and compare the results of prior model training executions.

### MLflow Projects
An MLflow Project is a way of packaging up code for consistent deployment and reproducibility of results. MLflow supports several environments for projects, including the use of Conda and Docker to define consistent Python code execution environments.

### MLflow Models
MLflow offers a standardized format for packaging models for distribution. This standardized model format allows MLflow to work with models generated from several popular libraries, including Scikit-Learn, PyTorch, MLlib, and others.

### MLflow Model Registry
The MLflow Model Registry allows data scientists to register trained models. MLflow Models and MLflow Projects use the MLflow Model Registry to enable machine learning engineers to deploy and serve models for client applications to consume.

In [0]:
 %sh
 rm -r /dbfs/mlflow_lab
 mkdir /dbfs/mlflow_lab
 wget -O /dbfs/mlflow_lab/MMM.csv https://raw.githubusercontent.com/sruthidasrg/test_databrick/467d848ec975a4e07a1505800b7413e74bf2a516/MMM.csv

# Training pipeline /Model Evalution pipeline

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import col

   
data = spark.read.format("csv").option("header", "true").load("dbfs:/mlflow_lab/MMM.csv")

# List of columns to convert
columns_to_convert = ["TV_Spend", "Digital_Spend", "Social_Media_Spend","Print_Spend","Discount_Applied","Units_Sold"]
# Apply casting to each column
for col_name in columns_to_convert:
    data = data.withColumn(col_name, col(col_name).cast("long"))
data = data.withColumn("Sales", col("Sales").cast("double"))
data = data.withColumn("Date", col("Date").cast("date"))

display(data.sample(0.2))
   
splits = data.randomSplit([0.7, 0.3])
train = splits[0]
test = splits[1]
print ("Training Rows:", train.count(), " Testing Rows:", test.count())

In [0]:
train.printSchema()

In [0]:
def train_market_media_mix_model(training_data, test_data, maxIterations, regularization):
    import mlflow
    import mlflow.spark
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler,StringIndexer
    from pyspark.ml.regression import LinearRegression
    from pyspark.ml.evaluation import RegressionEvaluator
    import time

    # Start an MLflow run
    mlflow.set_experiment("/Users/sruthidas.g@einfochipsindia.onmicrosoft.com/my-experimentdb_train")
    with mlflow.start_run() as run:
        run_id = run.info.run_id 
        # parameters
        maxIterations = 5
        regularization = 0.5

        catFeatures = ['Campaign_Type', 'Product_Type', 'Region']
        numFeatures=['TV_Spend', 'Digital_Spend', 'Social_Media_Spend', 'Print_Spend', 'Discount_Applied']

        catIndexers = [StringIndexer(inputCol=col, outputCol=col + "Idx", handleInvalid="keep") for col in catFeatures]

        numVector = VectorAssembler(inputCols=numFeatures, outputCol="numericFeatures")
        numScaler = MinMaxScaler(inputCol = numVector.getOutputCol(), outputCol="normalizedFeatures")
        allFeatures = [col + "Idx" for col in catFeatures] + ["normalizedFeatures"]
        featureVector = VectorAssembler(inputCols=allFeatures, outputCol="features")
        
        # Define the Linear Regression model with hyperparameters
        algo = LinearRegression(
            labelCol="Sales",  # Ensure this is the correct target variable
            featuresCol="features",
            maxIter=maxIterations,
            regParam=regularization
        )

        # lr = LinearRegression(featuresCol='features', labelCol='Sales', predictionCol='prediction')
        # Chain the steps as stages in a pipeline
        stages = catIndexers + [numVector, numScaler, featureVector, algo]
        pipeline = Pipeline(stages=stages)
        print([type(stage) for stage in pipeline.getStages()])

        # Log training parameter values
        print ("Training Linear Regression model...")
        mlflow.log_param('maxIter', algo.getMaxIter())
        mlflow.log_param('regParam', algo.getRegParam())
        model = pipeline.fit(train)

            # Evaluate the model and log metrics
        prediction = model.transform(test)
        
        metrics = ["r2", "mse", "rmse"]
        for metric in metrics:
            evaluator = RegressionEvaluator(labelCol="Sales", predictionCol="prediction", metricName=metric)
            metric_value = evaluator.evaluate(model.transform(test))
            print(f"{metric}: {metric_value}")
            mlflow.log_metric(metric, metric_value)


        # Log the model itself
        unique_model_name = "Regressor-" + str(time.time())
        mlflow.spark.log_model(model, unique_model_name, mlflow.spark.get_default_conda_env())
        experiment_id = run.info.experiment_id
        dbfs_model_path = f"dbfs:/databricks/mlflow-tracking/{experiment_id}/{run_id}/artifacts/{unique_model_name}"
     
        # modelpath = "/model/%s" % (unique_model_name)
        mlflow.spark.save_model(model, dbfs_model_path)
    


    print(dbfs_model_path)    
    print("Experiment run complete.")
    return unique_model_name,run_id

In [0]:
unique_model_name, run_id=train_market_media_mix_model(train, test, 10, 0.2)
print(unique_model_name)

In [0]:
print(unique_model_name.split(".")[0])

In [0]:
# from mlflow.tracking import MlflowClient
# import mlflow
# client = MlflowClient()

# # Register the model
# with mlflow.start_run() as run: model_uri = f"dbfs:/mlflow/{run.info.run_id}/{unique_model_name}"
# # registered_model = client.create_registered_model(unique_model_name.split(".")[0])

# # Create a model version
# model_version = client.create_model_version(
#     name=unique_model_name.split(".")[0],
#     source=model_uri,
#     run_id=run_id
# )

# print(f"Model registered as {unique_model_name} with version {model_version.version}")

# Model Registering

In [0]:
from mlflow.tracking import MlflowClient
import mlflow
def register_model(unique_model_name, run_id):
    client = MlflowClient()

    # Extract model name (remove timestamp suffix)
    model_name = unique_model_name.split(".")[0]
    experiment_id = mlflow.get_experiment_by_name("/Users/sruthidas.g@einfochipsindia.onmicrosoft.com/my-experimentdb_train").experiment_id

    # Define DBFS model path
    dbfs_model_path = f"dbfs:/databricks/mlflow-tracking/{experiment_id}/{run_id}/artifacts/{unique_model_name}"
    print(dbfs_model_path)

    # Ensure the model exists in MLflow Model Registry
    try:
        client.get_registered_model(model_name)
        print(f"Model '{model_name}' already exists in MLflow Registry.")
    except:
        print(f"Creating a new registered model '{model_name}'...")
        client.create_registered_model(model_name)

    # Create a new model version
    model_version = client.create_model_version(
        name=model_name,
        source=dbfs_model_path,
        run_id=run_id
    )

    print(f"Model registered as {model_name} with version {model_version.version}")
    return model_name, model_version.version

In [0]:
model_name, model_version = register_model(unique_model_name, run_id)

In [0]:
model_name

# Inference Pipeline

In [0]:
 %sh
 rm -r /dbfs/mlflow_lab
 mkdir /dbfs/mlflow_lab
 wget -O /dbfs/mlflow_lab/MMM_test.csv https://raw.githubusercontent.com/sruthidasrg/test_databrick/refs/heads/main/MMM_test.csv

In [0]:
infer_data = spark.read.format("csv").option("header", "true").load("dbfs:/mlflow_lab//MMM_test.csv")

# List of columns to convert
columns_to_convert = ["TV_Spend", "Digital_Spend", "Social_Media_Spend","Print_Spend","Discount_Applied","Units_Sold"]
# Apply casting to each column
for col_name in columns_to_convert:
    infer_data = infer_data.withColumn(col_name, col(col_name).cast("long"))
infer_data = infer_data.withColumn("Sales", col("Sales").cast("double"))
infer_data = infer_data.withColumn("Date", col("Date").cast("date"))


In [0]:
display(infer_data.sample(0.2))

In [0]:
import mlflow
import mlflow.spark
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler, MinMaxScaler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col
from mlflow.tracking import MlflowClient


def load_model(model_name, model_version):
    """Loads the specified model version from MLflow Model Registry."""
    client = MlflowClient()
    model_uri = f"models:/{model_name}/{model_version}"
    print(f"Loading model from: {model_uri}")
    model = mlflow.spark.load_model(model_uri)
    return model

# def preprocess_data(raw_data):
#     """Apply the same preprocessing steps as in training."""
#     catFeatures = ['Campaign_Type', 'Product_Type', 'Region']
#     numFeatures = ['TV_Spend', 'Digital_Spend', 'Social_Media_Spend', 'Print_Spend', 'Discount_Applied']
    
#     catIndexers = [StringIndexer(inputCol=col, outputCol=col + "Idx", handleInvalid="keep") for col in catFeatures]
#     numVector = VectorAssembler(inputCols=numFeatures, outputCol="numericFeatures")
#     numScaler = MinMaxScaler(inputCol="numericFeatures", outputCol="normalizedFeatures")
#     allFeatures = [col + "Idx" for col in catFeatures] + ["normalizedFeatures"]
#     featureVector = VectorAssembler(inputCols=allFeatures, outputCol="features")
    
#     stages = catIndexers + [numVector, numScaler, featureVector]
    
#     from pyspark.ml import Pipeline
#     pipeline = Pipeline(stages=stages)
#     processed_data = pipeline.fit(raw_data).transform(raw_data)
#     return processed_data

def make_predictions(model, inference_data):
    """Runs inference using the loaded model on the given data."""
    predictions = model.transform(inference_data)
    return predictions.select("features", "prediction","Sales")

def evaluate_rmse(predictions):
    """Calculates RMSE for model predictions."""
    evaluator = RegressionEvaluator(labelCol="Sales", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(predictions)
    print(f"RMSE: {rmse}")
    return rmse




if __name__ == "__main__":
    spark = SparkSession.builder.appName("MarketMediaMixInference").getOrCreate()
    
    # Define model details
    # model_name = "Regressor"
    # model_version = 1  # Update if needed
    
    # Load the model
    model = load_model(model_name, model_version)
    
  
    
    # Make predictions
    predictions = make_predictions(model, infer_data)
    
    # Show results
    predictions.show()

    # Evaluate RMSE
    evaluate_rmse(predictions)


    
    print("Inference complete.")


In [0]:
assert train.schema == infer_data.schema, "Schema mismatch between train and infer_data!"


In [0]:
train.printSchema()

In [0]:
client = mlflow.tracking.MlflowClient()
model_name = unique_model_name.split(".")[0]
experiment_id = mlflow.get_experiment_by_name("/Users/sruthidas.g@einfochipsindia.onmicrosoft.com/my-experimentdb_train").experiment_id

# Define DBFS model path
dbfs_model_path = f"dbfs:/databricks/mlflow-tracking/{experiment_id}/{run_id}/artifacts/{unique_model_name}"
print(dbfs_model_path)
client.transition_model_version_stage(
    # name="my_ml_model",
    name=model_name,       
    version=1,
    stage="Archived"
)


In [0]:
print(train.inputFiles())