### **Pipeline de aprendizaje automático con pyspark, gestión del ciclo de vida del proyecto de machine learning con MLflow y despliegue del modelo como servicio REST usando MLflow en Databricks**
### 


En función al informe anual sobre el comportamiento de cliente en [E-commerce](https://dianagarcesportilla.github.io/Flexdashboard/#correlaciones) de ropa mujer 2023. se encuentra una correlación lineal positiva fuerte entre los años de membresía del cliente (corr: 0.809) y el gasto anual en compras online. Adicionalmente, una correlación lineal media con la duración de la sesión en la app y el tiempo de exposición en general al sitio (0.499 y 0.355 respectivamente) en minutos.

Por lo tanto, se plantea un modelo de regresión múltiple con las tres variables anteriores como regresoras para predecir el gasto anual del cliente. 

Se define un pipeline con el vector de características (corresponde a las variables regresoras o independientes) y la línea de regresión. se crea la grilla de hiperparámetros y la validación cruzada k-fold para el entrenamiento del modelo.

Luego con MLflow: 1. se registra el experimento en Databricks, 2. se realiza el empaquetamiento del mejor modelo, 3. se predice el gasto con los datos de prueba haciendo uso del mejor modelo y se evalua con las métricas rsme y r2, 4. se consulta las ejecuciones del modelo, 5. se carga el modelo guardado y 6. se despliega como servicio REST para realizar predicciones con nuevos datos.

**I. Leer archivo desde Azure Blob Storage en Databricks**

Conectar databricks con la cuenta de almacenamiento. Se hace uso de [Key value de Azure](https://learn.microsoft.com/es-es/azure/key-vault/general/basic-concepts) para no ingresar la clave almacenamiento en el código.

In [0]:
spark.conf.set(
  "fs.azure.account.key.storagedc1.blob.core.windows.net",
  dbutils.secrets.get(scope = "kv-scope", key = "storage-key")
)

Leer desde el contenedor de Blob Storage los datos.

In [0]:
df = spark.read.format("csv")\
     .option("header", "true")\
     .option("inferSchema", "true") \
     .option("delimiter", ";") \
     .load("wasbs://dataempresarial@storagedc1.blob.core.windows.net/Ecommerce01.csv")
df.display()

In [0]:
df.printSchema()

root
 |-- Id_cliente: integer (nullable = true)
 |-- min_promedio_sesion: double (nullable = true)
 |-- min_en_app: double (nullable = true)
 |-- min_en_web: double (nullable = true)
 |-- anos_miembro: double (nullable = true)
 |-- valor_compras_anuales_cop: integer (nullable = true)



**II. Entrenamiento del modelo de regresión múltiple, evaluación y seguimiento (tracking) con MLflow**

In [0]:
#Dividir la data en datos de entrenamiento y de prueba
trainDF, testDF = df.randomSplit([0.8, 0.2], seed=42)

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

#Crear el vector de caracterísicas que contiene las variables independientes
featureCols = ["anos_miembro", "min_promedio_sesion", "min_en_app"]
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")

#Definir el modelo de regresión lineal
lr = LinearRegression(featuresCol="features", labelCol="valor_compras_anuales_cop")

#Pipeline
pipeline = Pipeline(stages = [assembler, lr])

In [0]:
from pyspark.ml.tuning import ParamGridBuilder

#Grid de hiperparámetros
paramGrid = (ParamGridBuilder()
  .addGrid(lr.maxIter, [1, 10, 100]) #numero de iteraciones
  .addGrid(lr.fitIntercept, [True, False]) #con o sin intercepto  
  .addGrid(lr.standardization, [True, False]) #con o sin estandariación de variables
  .build()
)

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator

#Evaluador
evaluator = RegressionEvaluator(
  labelCol = "valor_compras_anuales_cop",
  predictionCol = "prediction"
)

#Validación cruzada
cv = CrossValidator(
  estimator = pipeline,             
  estimatorParamMaps = paramGrid,   
  evaluator=evaluator,              
  numFolds = 3,                     
  seed = 42                         
)

In [0]:
import mlflow         #MLflow para el tracking de experimentos.
import mlflow.spark   #registrar modelo Spark directamente (como PipelineModel, LinearRegressionModel, etc.).
from mlflow.models.signature import infer_signature

# Finalizar ejecución anterior si existe
if mlflow.active_run():
    mlflow.end_run()

mlflow.set_experiment(f"/Users/diseno360.com@outlook.com/mrecommerce-mlflow") #en que experimento guardar ejecución.

with mlflow.start_run(run_name="LR-ecommerce") as run: #inicia una nueva ejecución de mlflow con nombre LR-ecommerce
    
    # Entrenar con CrossValidator
    cvModel = cv.fit(trainDF)

    # Obtener mejor modelo
    best_Model = cvModel.bestModel #mejor modelo completo (pipeline) que fue entrenado durante la validación cruzada.
    lrModel = best_Model.stages[-1]  #accede directamente a la última etapa del pipeline, en este caso el modelo  
                                     #LinearRegression. se usa para acceder a los coeficientes, intercepto o #hiperparámetros
    # Registro de parámetros
    mlflow.log_param("label", "valor_compras_anuales_cop")
    mlflow.log_param("features", "anos_miembro, min_promedio_sesion, min_en_app")
    mlflow.log_param("model_type", "LinearRegression")

    # Lista de hiperparámetros que se desean registrar
    params_interes = {"fitIntercept", "maxIter", "standardization"}

    for param, value in lrModel.extractParamMap().items():
        name = param.name
        if name in params_interes:
            mlflow.log_param(name, value)

    # Registrar intercepto
    mlflow.log_param("intercept", lrModel.intercept)
    
    # Registrar coeficientes como string
    coef_str = ", ".join([f"{v:.6f}" for v in lrModel.coefficients])
    mlflow.log_param("coefficients", coef_str)


    # Firma del modelo
    sample_input_pdf = trainDF.limit(20).toPandas()
    sample_output_pdf = best_Model.transform(trainDF.limit(20)).select("prediction").toPandas()
    signature = infer_signature(sample_input_pdf, sample_output_pdf)
    
   
    #Registrar el modelo con firma y ejemplo, para desplegarlo como API REST
    mlflow.spark.log_model(
        spark_model=best_Model,
        artifact_path="lrmodel_ecommerce",
        registered_model_name="modelo_regresion_ecommerce",
        signature=signature,
        input_example=sample_input_pdf
    )

    # Predecir y evaluar
    predictions = best_Model.transform(testDF) #Aplica el pipeline completo al conjunto de prueba.
    regressionEvaluator = RegressionEvaluator(predictionCol="prediction", labelCol="valor_compras_anuales_cop")
    mse = regressionEvaluator.setMetricName("mse").evaluate(predictions)
    rmse = regressionEvaluator.setMetricName("rmse").evaluate(predictions)
    r2 = regressionEvaluator.setMetricName("r2").evaluate(predictions)

    # Metricas
    mlflow.log_metric("mse", mse) #registra la métrica error cuadrático medio
    mlflow.log_metric("rmse", rmse) #registra la métrica la raíz del error cuadrático medio
    mlflow.log_metric("r2", r2) #registra la métrica R² (coeficiente de determinación)

2025/07/10 20:21:08 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Downloading artifacts:   0%|          | 0/20 [00:00<?, ?it/s]



Uploading artifacts:   0%|          | 0/5 [00:00<?, ?it/s]

Registered model 'modelo_regresion_ecommerce' already exists. Creating a new version of this model...


Downloading artifacts:   0%|          | 0/25 [00:00<?, ?it/s]

Uploading artifacts:   0%|          | 0/25 [00:00<?, ?it/s]

Created version '1' of model 'databricks_dc_diana.default.modelo_regresion_ecommerce'.


Consulta de ejecuciones del experimento

In [0]:
from mlflow.tracking import MlflowClient

client = MlflowClient()

In [0]:
#Datos generales del experimento como su localización, id, datos del propietario.
client.search_experiments()

[<Experiment: artifact_location='dbfs:/databricks/mlflow-tracking/1032021877304985', creation_time=1752017421539, experiment_id='1032021877304985', last_update_time=1752178838366, lifecycle_stage='active', name='/Users/diseno360.com@outlook.com/mrecommerce-mlflow', tags={'mlflow.experiment.sourceName': '/Users/diseno360.com@outlook.com/mrecommerce-mlflow',
  'mlflow.experimentKind': 'custom_model_development',
  'mlflow.experimentType': 'MLFLOW_EXPERIMENT',
  'mlflow.ownerEmail': 'diseno360.com@outlook.com',
  'mlflow.ownerId': '993783110248991'}>]

In [0]:
#información sobre las ejecuciones (runs) del experimento
experiment_id = run.info.experiment_id
runs_df = mlflow.search_runs(experiment_id)

display(runs_df)

run_id,experiment_id,status,artifact_uri,start_time,end_time,metrics.r2,metrics.rmse,metrics.mse,params.coefficients,params.fitIntercept,params.features,params.maxIter,params.model_type,params.label,params.intercept,params.standardization,tags.mlflow.databricks.cluster.info,tags.mlflow.source.name,tags.mlflow.user,tags.mlflow.runName,tags.mlflow.runColor,tags.mlflow.databricks.notebook.commandID,tags.mlflow.databricks.workspaceURL,tags.mlflow.databricks.notebookRevisionID,tags.sparkDatasourceInfo,tags.mlflow.log-model.history,tags.mlflow.databricks.cluster.libraries,tags.mlflow.databricks.cluster.id,tags.mlflow.databricks.notebookID,tags.mlflow.databricks.notebookPath,tags.mlflow.databricks.workspaceID,tags.mlflow.databricks.webappURL,tags.mlflow.source.type
9b38ab83863f4a97b1d0ecafdd2934fc,1032021877304985,FINISHED,dbfs:/databricks/mlflow-tracking/1032021877304985/9b38ab83863f4a97b1d0ecafdd2934fc/artifacts,2025-07-10T20:20:38.366Z,2025-07-10T20:21:39.986Z,0.9849821343285992,39415.907132078806,1553613735.0446608,"244869.514915, 103504.062769, 154945.782813",True,"anos_miembro, min_promedio_sesion, min_en_app",1,LinearRegression,valor_compras_anuales_cop,-4155482.040576044,True,"{""cluster_name"":""Cluster_ml"",""spark_version"":""15.4.x-scala2.12"",""node_type_id"":""Standard_D4ds_v5"",""driver_node_type_id"":""Standard_D4ds_v5"",""autotermination_minutes"":120,""disk_spec"":{},""num_workers"":0}",/Users/diseno360.com@outlook.com/ml_Ecommerce,diseno360.com@outlook.com,LR-ecommerce,#7d54b2,1752178229566_6027425010260375499_3958ccaaffdc431b9d2d9d1ee12ca45e,adb-747824624093415.15.azuredatabricks.net,1752178900186,"path=wasbs:REDACTED_LOCAL_PART@storagedc1.blob.core.windows.net/Ecommerce01.csv,format=csv","[{""artifact_path"":""lrmodel_ecommerce"",""flavors"":{""spark"":{""pyspark_version"":""3.5.0"",""model_data"":""sparkml"",""code"":null,""model_class"":""pyspark.ml.pipeline.PipelineModel""},""python_function"":{""loader_module"":""mlflow.spark"",""python_version"":""3.11.11"",""data"":""sparkml"",""env"":{""conda"":""conda.yaml"",""virtualenv"":""python_env.yaml""}}},""utc_time_created"":""2025-07-10 20:21:08.706675""}]","{""installable"":[],""redacted"":[]}",0707-151051-ckdive9m,4225194139691762,/Users/diseno360.com@outlook.com/ml_Ecommerce,747824624093415,https://centralus-c2.azuredatabricks.net,NOTEBOOK
a69f797e0e0448f5962eb22167edc5d1,1032021877304985,FINISHED,dbfs:/databricks/mlflow-tracking/1032021877304985/a69f797e0e0448f5962eb22167edc5d1/artifacts,2025-07-10T16:42:36.202Z,2025-07-10T16:43:39.066Z,0.9849821343285992,39415.907132078806,1553613735.0446608,"244869.514915, 103504.062769, 154945.782813",True,"anos_miembro, min_promedio_sesion, min_en_app",1,LinearRegression,valor_compras_anuales_cop,-4155482.040576044,True,"{""cluster_name"":""Cluster_ml"",""spark_version"":""15.4.x-scala2.12"",""node_type_id"":""Standard_D4ds_v5"",""driver_node_type_id"":""Standard_D4ds_v5"",""autotermination_minutes"":120,""disk_spec"":{},""num_workers"":0}",/Users/diseno360.com@outlook.com/ml_Ecommerce,diseno360.com@outlook.com,LR-ecommerce,#da4c4c,1752163226178_8777194128657631767_a2c7eaa6a0544b6fa296a539dac5a8ab,adb-747824624093415.15.azuredatabricks.net,1752165819287,"path=wasbs:REDACTED_LOCAL_PART@storagedc1.blob.core.windows.net/Ecommerce01.csv,format=csv","[{""artifact_path"":""lrmodel_ecommerce"",""flavors"":{""spark"":{""pyspark_version"":""3.5.0"",""model_data"":""sparkml"",""code"":null,""model_class"":""pyspark.ml.pipeline.PipelineModel""},""python_function"":{""loader_module"":""mlflow.spark"",""python_version"":""3.11.11"",""data"":""sparkml"",""env"":{""conda"":""conda.yaml"",""virtualenv"":""python_env.yaml""}}},""utc_time_created"":""2025-07-10 16:43:13.996082""}]","{""installable"":[],""redacted"":[]}",0707-151051-ckdive9m,4225194139691762,/Users/diseno360.com@outlook.com/ml_Ecommerce,747824624093415,https://centralus-c2.azuredatabricks.net,NOTEBOOK
294c8bb8f1bb43688ab440ae1c2fd26b,1032021877304985,FINISHED,dbfs:/databricks/mlflow-tracking/1032021877304985/294c8bb8f1bb43688ab440ae1c2fd26b/artifacts,2025-07-09T23:27:58.243Z,2025-07-09T23:28:45.904Z,0.9849821343285992,39415.907132078806,1553613735.0446608,"244869.514915, 103504.062769, 154945.782813",True,"anos_miembro, min_promedio_sesion, min_en_app",1,LinearRegression,valor_compras_anuales_cop,-4155482.040576044,True,"{""cluster_name"":""Cluster_ml"",""spark_version"":""15.4.x-scala2.12"",""node_type_id"":""Standard_D4ds_v5"",""driver_node_type_id"":""Standard_D4ds_v5"",""autotermination_minutes"":120,""disk_spec"":{},""num_workers"":0}",/Users/diseno360.com@outlook.com/ml_Ecommerce,diseno360.com@outlook.com,LR-ecommerce,#5387dd,1752093593291_9115720827235219962_6ee0d7eeffb34b40b6cd42a733d83e02,adb-747824624093415.15.azuredatabricks.net,1752103726102,"path=wasbs:REDACTED_LOCAL_PART@storagedc1.blob.core.windows.net/Ecommerce01.csv,format=csv","[{""artifact_path"":""lrmodel_ecommerce"",""flavors"":{""spark"":{""pyspark_version"":""3.5.0"",""model_data"":""sparkml"",""code"":null,""model_class"":""pyspark.ml.pipeline.PipelineModel""},""python_function"":{""loader_module"":""mlflow.spark"",""python_version"":""3.11.11"",""data"":""sparkml"",""env"":{""conda"":""conda.yaml"",""virtualenv"":""python_env.yaml""}}},""utc_time_created"":""2025-07-09 23:28:22.327064""}]","{""installable"":[],""redacted"":[]}",0707-151051-ckdive9m,4225194139691762,/Users/diseno360.com@outlook.com/ml_Ecommerce,747824624093415,https://centralus-c2.azuredatabricks.net,NOTEBOOK


In [0]:
#Obtener la última ejecución y ver las métricas.
runs = client.search_runs(experiment_id, order_by=["attributes.start_time desc"], max_results=1)
runs[0].data.metrics

{'r2': 0.9849821343285992,
 'mse': 1553613735.0446608,
 'rmse': 39415.907132078806}

La raíz del error cuadrático edio es de $39.415 pesos, por lo tanto, el error entre las predicciones y los valores reales del gasto anual del cliente en el e-commerce no es significativo, teniendo en cuenta que el gasto anual se en encuentra en el rango de 1 a 3 millones de pesos.

Adicionalmente,  alrededor  del 98.49 por ciento de  la  variabilidad del gasto anual del cliente en el ecommerce, es explicada por los variables independientes o regresoras años de mebresía, tiempo de navegación en la app y el tiempo de exposición en general al sitio. 

In [0]:
#run id de la ultima ejecución del experimento
run_id = runs[0].info.run_id
run_id 

'9b38ab83863f4a97b1d0ecafdd2934fc'

Cargar el modelo guardado

In [0]:
loaded_model = mlflow.spark.load_model(f"runs:/{run.info.run_uuid}/lrmodel_ecommerce")
display(loaded_model.transform(testDF))

**III. Despliegue del modelo como servicio REST usando MLflow en Databricks.**

3.1 Activar el endpoint de Model Serving en Databricks

-En el menú lateral dar clic en 'Modelos', en los modelos registrados, dar clic en 'modelo_regresion_ecommerce'.

![Imagen](https://raw.githubusercontent.com/DianaGarcesPortilla//Pipeline_pyspark-MLflow-Despliegue_API/main/image01.png)

-Seleccionar la versión del modelo si hay más de una, en este caso solo existe la versión 1, por lo tanto, dar clic directamente en el botón superior derecho 'Servir este modelo'. Al realizar esta acción se generará un endpoint REST automático, como el que se enmarca en el recuadro verde.

![Imagen](https://raw.githubusercontent.com/DianaGarcesPortilla//Pipeline_pyspark-MLflow-Despliegue_API/main/image02.png)

![Imagen](https://raw.githubusercontent.com/DianaGarcesPortilla//Pipeline_pyspark-MLflow-Despliegue_API/main/image03.png)

-Antes de utilizar el modelo desplegado, haciendo uso del endpoint creado anteriormente, para realizar predicciones con nuevos datos. Se Configura la tabla de inferencia para registrar automáticamente las entradas y salidas de un modelo en producción en Databricks Model Serving con MLflow.

In [0]:
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, TimestampType

schema = StructType([
    StructField("input", StructType([
        StructField("anos_miembro", DoubleType()),
        StructField("min_promedio_sesion", DoubleType()),
        StructField("min_en_app", DoubleType())
    ])),
    StructField("prediction", DoubleType()),
    StructField("model_version", StringType()),
    StructField("timestamp", TimestampType())
])


# Definir el nombre completo con catálogo, base de datos y tabla
full_table_name = "databricks_dc_diana.default.inferenciasmodelo_LRecommerce"

# Crear la tabla Delta en el catálogo y base de datos deseados
spark.createDataFrame([], schema).write.format("delta").mode("ignore").saveAsTable(full_table_name)



-Llamar el modelo desplegado en Databricks vía API REST usando Python, desde un notebook de Databricks con es este caso o también desdePython externo.

In [0]:
import requests
import json

#URL del endpoint de predicción
endpoint_url = "https://adb-747824624093415.15.azuredatabricks.net/serving-endpoints/LRecommerce/invocations"

#Token de acceso personal (PAT)
token = "dapi500ea75c3d5e94ab4" 

#Datos de entrada (deben coincidir con las columnas del modelo)
input_data = {
    "dataframe_split": {
        "columns": ["anos_miembro", "min_promedio_sesion", "min_en_app"],
        "data": [
            [5, 38.2, 14.0],
            [3, 50.1, 20.5]
        ]
    }
}

#Cabeceras HTTP con token de autenticación
headers = {
    "Authorization": f"Bearer {token}",
    "Content-Type": "application/json"
}

# Enviar solicitud POST
response = requests.post(endpoint_url, headers=headers, json=input_data)

#Mostrar la predicción
if response.status_code == 200:
    predictions = response.json()
    print("Predicciones:", predictions)
else:
    print("Error:", response.status_code, response.text)
