# Escenario

Eres un/a ingeniero/a de datos en una empresa de consultoría de análisis de datos. Tu compañía se enorgullece de poder manejar eficientemente conjuntos de datos enormes (grandes volúmenes de datos). Los científicos de datos en tu oficina necesitan trabajar con diferentes algoritmos y datos en diversos formatos. Aunque son expertos en Aprendizaje Automático (Machine Learning), cuentan contigo para realizar trabajos de ETL (Extracción, Transformación y Carga) y construir pipelines (canalizaciones) de Machine Learning.

# Objetivos

En esta tarea de 4 partes, deberás:

# Parte 1: ETL

Cargar un conjunto de datos en formato CSV.
Eliminar duplicados, si existen.
Eliminar filas con valores nulos, si existen.
Realizar transformaciones necesarias.
Almacenar los datos limpios en formato Parquet.

# Parte 2: Creación de Pipeline de Machine Learning

Crear un pipeline de Machine Learning para realizar predicciones.

# Parte 3: Evaluación del Modelo

Evaluar el modelo utilizando métricas adecuadas.
Imprimir el intercepto (término independiente) y los coeficientes del modelo.

# Parte 4: Persistencia del Modelo

Guardar (preservar) el modelo para su uso futuro en producción.
Cargar y verificar el modelo almacenado.
Conjuntos de Datos (Datasets)

En este laboratorio utilizarás el/los siguiente(s) conjunto(s) de datos:

Una versión modificada del conjunto de datos sobre el millaje de automóviles (car mileage dataset). El conjunto de datos original está disponible en: https://archive.ics.uci.edu/ml/datasets/auto+mpg

In [None]:
#Instalación de PySpark
!pip install pyspark==3.5 -q

In [None]:
#Importaciones y Configuración Inicial
import warnings
# Suprimir advertencias
def warn(*args, **kwargs):
    pass
warnings.warn = warn
warnings.filterwarnings('ignore')

# Importar librerías de PySpark
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.feature import VectorAssembler, StringIndexer, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import os

# Crear la Spark Session para el entorno local de Colab
# No se necesita findspark.init()
spark = SparkSession.builder.appName("MPG Prediction Colab").getOrCreate()

print("Spark Session creada exitosamente!")

In [None]:
#Descargar los Datos
# Descargar el archivo de datos
!wget -q https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMSkillsNetwork-BD0231EN-Coursera/datasets/mpg-raw.csv

# Verificar si el archivo se descargó
if os.path.exists("mpg-raw.csv"):
    print("Archivo mpg-raw.csv descargado correctamente.")
else:
    print("Error: El archivo mpg-raw.csv no se pudo descargar.")

In [None]:
# Parte 1- ETL
print("--- Iniciando Parte 1: ETL ---")

# Task 3: Cargar el CSV en un DataFrame de Spark
df = spark.read.csv("mpg-raw.csv", header=True, inferSchema=True)
print("DataFrame cargado desde CSV.")
df.show(5) # Mostrar algunas filas

# Task 4 & 5 (Implícito): Ver schema y conteo por Origen (opcional mostrarlo)
df.printSchema()
print("Conteo inicial por Origen:")
df.groupBy('Origin').count().orderBy('count').show()

# Task 6: Contar filas iniciales
rowcount1 = df.count()
print(f"Número total de filas iniciales (rowcount1): {rowcount1}")

# Task 7: Eliminar filas duplicadas
df = df.dropDuplicates()
print("Filas duplicadas eliminadas.")

# Task 8: Contar filas después de eliminar duplicados
rowcount2 = df.count()
print(f"Número total de filas tras dropDuplicates (rowcount2): {rowcount2}")

# Task 9: Eliminar filas con valores nulos
df = df.dropna()
print("Filas con valores nulos eliminadas (dropna).")

# Task 10: Contar filas después de eliminar nulos
rowcount3 = df.count()
print(f"Número total de filas tras dropna (rowcount3): {rowcount3}")

# Task 11: Renombrar columna "Engine Disp" a "Engine_Disp"
# Asegúrate de que el nombre original tenga el espacio
df = df.withColumnRenamed("Engine Disp", "Engine_Disp")
print("Columna 'Engine Disp' renombrada a 'Engine_Disp'.")
print("Schema después de renombrar:")
df.printSchema() # Verificar cambio de nombre

# Task 12: Guardar el DataFrame limpio en formato Parquet
parquet_path = "mpg-cleaned.parquet"
df.write.mode("overwrite").parquet(parquet_path)
print(f"DataFrame limpio guardado en: {parquet_path}")

# Evaluación Parte 1 (Adaptada)
print("\n--- Evaluación Parte 1 ---")
print(f"Total rows = {rowcount1}")
print(f"Total rows after dropping duplicate rows = {rowcount2}")
print(f"Total rows after dropping duplicate rows and rows with null values = {rowcount3}")
# Verificar el nombre de columna directamente de las columnas actuales
renamed_col_name = df.columns[2] # Asumiendo que sigue siendo la tercera columna
print(f"Renamed column name (columna 3) = {renamed_col_name}")
print(f"{parquet_path} exists : {os.path.isdir(parquet_path)}")
print("--- Fin Parte 1 ---")

In [None]:
#Parte 2
print("\n--- Iniciando Parte 2: Creación del Pipeline ---")

# Task 1: Cargar datos desde Parquet
df = spark.read.parquet(parquet_path)
rowcount4 = df.count()
print(f"DataFrame cargado desde {parquet_path}, filas: {rowcount4}")
print("Mostrando 5 filas y schema del DataFrame limpio:")
df.show(5)
df.printSchema()

# --- Definir etapas del Pipeline ---

# Task 2: StringIndexer para la columna 'Origin'
indexer = StringIndexer(inputCol="Origin", outputCol="OriginIndex")
print("Etapa 1: StringIndexer definida.")

# Task 3: VectorAssembler para las características numéricas
feature_cols = ['Cylinders', 'Engine_Disp', 'Horsepower', 'Weight', 'Accelerate', 'Year']
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
print("Etapa 2: VectorAssembler definida.")

# Task 4: StandardScaler para escalar características
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
print("Etapa 3: StandardScaler definida.")

# Task 5: LinearRegression para predecir 'MPG'
lr = LinearRegression(featuresCol="scaledFeatures", labelCol="MPG")
print("Etapa 4: LinearRegression definida.")

# Task 6: Construir el Pipeline
pipeline = Pipeline(stages=[indexer, assembler, scaler, lr])
print("Pipeline construido con 4 etapas.")

# Task 7: Dividir datos en entrenamiento y prueba (70/30)
(trainingData, testingData) = df.randomSplit([0.7, 0.3], seed=42)
print(f"Datos divididos: {trainingData.count()} para entrenamiento, {testingData.count()} para prueba.")

# Task 8: Entrenar (ajustar) el pipeline con los datos de entrenamiento
print("Entrenando el pipeline...")
pipelineModel = pipeline.fit(trainingData)
print("Pipeline entrenado exitosamente!")

# Evaluación Parte 2 (Adaptada)
print("\n--- Evaluación Parte 2 ---")
print(f"Total rows loaded = {rowcount4}")
ps = [str(x).split("_")[0] for x in pipeline.getStages()]
print(f"Pipeline Stage 1 = {ps[0]}")
print(f"Pipeline Stage 2 = {ps[1]}")
print(f"Pipeline Stage 3 = {ps[2]}")
print(f"Pipeline Stage 4 = {lr.__class__.__name__}") # Obtener nombre de la clase
print(f"Label column = {lr.getLabelCol()}")
print("--- Fin Parte 2 ---")


In [None]:
#Parte 3- Evaluación del Modelo

print("\n--- Iniciando Parte 3: Evaluación del Modelo ---")

# Task 1: Hacer predicciones en los datos de prueba
predictions = pipelineModel.transform(testingData)
print("Predicciones realizadas en el conjunto de prueba.")
print("Mostrando algunas predicciones (MPG real vs. Predicción):")
predictions.select("MPG", "prediction").show(5)

# --- Calcular Métricas ---

# Task 2: Calcular MSE (Mean Squared Error)
evaluator_mse = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="mse")
mse = evaluator_mse.evaluate(predictions)
print(f"Mean Squared Error (MSE) = {mse}")

# Task 3: Calcular MAE (Mean Absolute Error)
evaluator_mae = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="mae")
mae = evaluator_mae.evaluate(predictions)
print(f"Mean Absolute Error (MAE) = {mae}")

# Task 4: Calcular R-squared (R2)
evaluator_r2 = RegressionEvaluator(predictionCol="prediction", labelCol="MPG", metricName="r2")
r2 = evaluator_r2.evaluate(predictions)
print(f"R-squared (R2) = {r2}")

# Evaluación Parte 3 (Adaptada)
print("\n--- Evaluación Parte 3 ---")
print(f"Mean Squared Error = {round(mse, 2)}")
print(f"Mean Absolute Error = {round(mae, 2)}")
print(f"R Squared = {round(r2, 2)}")

# Obtener el modelo de regresión lineal entrenado del pipeline
lrModel = pipelineModel.stages[-1] # La última etapa es el modelo LR
print(f"Intercept = {round(lrModel.intercept, 2)}")
print("--- Fin Parte 3 ---")

In [None]:
#parte 4

print("\n--- Iniciando Parte 4: Persistencia del Modelo ---")

# Task 1: Guardar el modelo del pipeline entrenado
model_path = "Practice_Project_MPG_Model"
pipelineModel.write().overwrite().save(model_path) # Usamos overwrite por si se ejecuta varias veces
print(f"Modelo del Pipeline guardado en: {model_path}")

# Task 2: Cargar el modelo del pipeline desde el disco
loadedPipelineModel = PipelineModel.load(model_path)
print(f"Modelo del Pipeline cargado desde: {model_path}")

# Task 3: Usar el modelo cargado para hacer predicciones (verificación)
print("Verificando predicciones con el modelo cargado...")
loaded_predictions = loadedPipelineModel.transform(testingData)

# Task 4: Mostrar algunas predicciones del modelo cargado
print("Mostrando predicciones (MPG real vs. Predicción) con modelo cargado:")
loaded_predictions.select("MPG", "prediction").show(5)


# Evaluación Parte 4 (Adaptada)
print("\n--- Evaluación Parte 4 ---")
loaded_lr_model = loadedPipelineModel.stages[-1] # Modelo LR del pipeline cargado
totalstages = len(loadedPipelineModel.stages)
# Obtenemos los nombres de las columnas de entrada del ensamblador (etapa 1 o índice 1)
inputcolumns = loadedPipelineModel.stages[1].getInputCols() # VectorAssembler es stages[1]

print(f"Number of stages in the loaded pipeline = {totalstages}")
print("Coefficients from loaded model:")
for feature, coef in zip(inputcolumns, loaded_lr_model.coefficients):
    print(f"  Coefficient for {feature} is {round(coef, 4)}")

print("--- Fin Parte 4 ---")

In [None]:

# Detener la sesión de Spark para liberar recursos
spark.stop()
print("Spark Session detenida.")