# Linear Regression

In [1]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.sql import SparkSession
import pyspark.pandas as ps
import numpy as np
import os
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pandas.plotting import scatter_matrix



conexión al servidor MLFLOW

In [2]:
import mlflow

# Conectandose a MLFlow colocar el nombre asignado por mlflow
mlflow.set_tracking_uri("http://ibnodo3:25319")

# Generando el experimento o cargandolo si existe
experiment_name = "regresion-pyspark"
mlflow.set_experiment(experiment_name)

# Cargando la información
client = mlflow.tracking.MlflowClient()
experiment_id = client.get_experiment_by_name(experiment_name).experiment_id

# Vamos a ver si es cierto
print(f"MLflow Version: {mlflow.__version__}")
print(f"Tracking URI: {mlflow.tracking.get_tracking_uri()}")
print(f"Nombre del experimento: {experiment_name}")
print(f"ID del experimento: {experiment_id}")

MLflow Version: 1.25.1
Tracking URI: http://ibnodo3:25319
Nombre del experimento: regresion-pyspark
ID del experimento: 1


Preparación de los datos

In [3]:
spark = SparkSession \
    .builder \
    .config('spark.sql.debug.maxToStringFields', 2000) \
    .getOrCreate()

dataset = spark.read.parquet('/LUSTRE/home/mcd-01/dataset.parquet')
dataset = dataset.drop('TOPOSLPX', 'TOPOSLPY', 'SEAICE', 'UOCE', 'VOCE', 'FRC_URB2D', 'SR', 'PCB', 'PC', 'CANWAT')

label_column = "TEMP"
stages = []

# Preparing the independent variables (Features)
# Defining the variables to be used
x_df = dataset.drop('TEMP')
variables = x_df.columns
vectorAssembler = VectorAssembler(inputCols = variables, outputCol = 'features')
va_df = vectorAssembler.transform(dataset)
feature_vector = va_df.select('features', 'TEMP')

# Initialize the `standardScaler`
scaler = StandardScaler(inputCol = 'features',
                        outputCol = 'scaledFeatures',
                        withMean = True, withStd = True
                        ).fit(feature_vector)

preppedDataDF = scaler.transform(feature_vector)

# Split the featurized training data for training and validating the model
(train_data, test_data) = preppedDataDF.randomSplit([0.7, 0.3], seed=123)

print('Data preparation work completed.')

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/17 23:36:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/17 23:36:14 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/05/17 23:36:14 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
                                                                                

Data preparation work completed.


Gráfica de desempeño

In [4]:
def plot_regression_quality(predictions):
  p_df = predictions.select(["TEMP",  "PRED_TEMP"]).toPandas()
  true_value = p_df.TEMP
  predicted_value = p_df.PRED_TEMP

  fig = plt.figure(figsize=(15,15))
  plt.scatter(true_value, predicted_value, c='crimson')
 
  p1 = max(max(predicted_value), max(true_value))
  p2 = min(min(predicted_value), min(true_value))
  plt.plot([p1, p2], [p1, p2], 'b-')
  plt.xlabel('True Values', fontsize=15)
  plt.ylabel('Predictions', fontsize=15)
  plt.axis('equal')
  
  global image

  image = fig
  fig.savefig("LinearRegressionPrediction.png")
  plt.close(fig)
  return image

print('Created regression quality plot function')

Created regression quality plot function


In [5]:
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt

def train_linear_regression(train_data, test_data, label_column, features_column, elastic_net_param,
                            reg_param, max_iter, model_name = None):
  # Evaluate metrics
  def eval_metrics(predictions):
      evaluator = RegressionEvaluator(
          labelCol=label_column, predictionCol="PRED_TEMP", metricName="rmse")
      rmse = evaluator.evaluate(predictions)
      evaluator = RegressionEvaluator(
          labelCol=label_column, predictionCol="PRED_TEMP", metricName="mae")
      mae = evaluator.evaluate(predictions)
      evaluator = RegressionEvaluator(
          labelCol=label_column, predictionCol="PRED_TEMP", metricName="r2")
      r2 = evaluator.evaluate(predictions)
      return rmse, mae, r2

  # Start an MLflow run; the "with" keyword ensures we'll close the run even if this cell crashes
  with mlflow.start_run():
    lr = LinearRegression(featuresCol="scaledFeatures", labelCol="TEMP", predictionCol='PRED_TEMP')
    lrModel = lr.fit(train_data)
    predictions = lrModel.transform(test_data)
    (rmse, mae, r2) = eval_metrics(predictions)

    # Print out model metrics
    print("Linear regression model (elasticNetParam=%f, regParam=%f, maxIter=%f):" % (elastic_net_param,
                                                                                      reg_param, max_iter))
    print("  RMSE: %s" % rmse)
    print("  MAE: %s" % mae)
    print("  R2: %s" % r2)

    # Log hyperparameters for mlflow UI
    mlflow.log_param("elastic_net_param", elastic_net_param)
    mlflow.log_param("reg_param", reg_param)
    mlflow.log_param("max_iter", max_iter)
    # Log evaluation metrics
    mlflow.log_metric("rmse", rmse)
    mlflow.log_metric("r2", r2)
    mlflow.log_metric("mae", mae)
    # Log the model itself
    if model_name is None:
      mlflow.spark.log_model(lrModel, "model")
    else:
      mlflow.spark.log_model(lrModel, artifact_path="model", registered_model_name=model_name)
    modelpath = "/LUSTRE/home/mcd-01/artifucks_regresion/model-%f-%f-%f" % (elastic_net_param, reg_param, max_iter)
    mlflow.spark.save_model(lrModel, modelpath)
    
    # Generate a plot
    image = plot_regression_quality(predictions)
    
    # Log artifacts (in this case, the regression quality image)
    mlflow.log_artifact("LinearRegressionPrediction.png")

print('Created training and evaluation method')

Created training and evaluation method


se ejecuta el modelo

In [6]:
train_linear_regression(train_data, test_data, label_column='TEMP', features_column= 'scaledFeatures', 
                        elastic_net_param=0.7, reg_param=0.1, max_iter=100, model_name = None)

22/05/17 23:39:26 WARN Instrumentation: [32baf784] regParam is zero, which might cause numerical instability and overfitting.
22/05/17 23:39:27 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/05/17 23:39:27 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/05/17 23:39:28 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
22/05/17 23:39:28 WARN Instrumentation: [32baf784] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
                                                                                

Linear regression model (elasticNetParam=0.700000, regParam=0.100000, maxIter=100.000000):
  RMSE: 0.29032002404570356
  MAE: 0.25066571508317476
  R2: 0.9989015750582972


[Stage 24:====>                                                   (1 + 12) / 13]                                                                                