Bienvenido al proyecto final de "Apache Spark para el aprendizaje automático escalable en BigData". En esta tarea, analizará un conjunto de datos del mundo real y aplicará el aprendizaje automático con Apache Spark.

Para aprobar, debe implementar un código (como se describe en la sección de instrucciones en Coursera) y finalmente responder un cuestionario en la plataforma de Coursera.

Comencemos descargando el conjunto de datos y creando un marco de datos. Este conjunto de datos se puede encontrar en DAX, IBM Data Asset Exchange y se puede descargar de forma gratuita.
https://developer.ibm.com/exchanges/data/all/jfk-weather-data/


In [1]:
# eliminar archivos de ejecuciones anteriores
!rm -f jfk_weather*

# descargar el archivo que contiene los datos en formato CSV
!wget http://max-training-data.s3-api.us-geo.objectstorage.softlayer.net/noaa-weather/jfk_weather.tar.gz

# extraer los datos
!tar xvfz jfk_weather.tar.gz
    
# crear un marco de datos a partir de él utilizando la primera fila como nombres de campo e intentando inferir un esquema basado en el contenido
df = spark.read.option("header", "true").option("inferSchema","true").csv('jfk_weather.csv')

# registrar una tabla de consulta correspondiente
df.createOrReplaceTempView('df')

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20200219192531-0000
KERNEL_ID = 1193ebb4-1dbf-4ffe-b734-6337dbd631cf
--2020-02-19 19:25:34--  http://max-training-data.s3-api.us-geo.objectstorage.softlayer.net/noaa-weather/jfk_weather.tar.gz
Resolving max-training-data.s3-api.us-geo.objectstorage.softlayer.net (max-training-data.s3-api.us-geo.objectstorage.softlayer.net)... 67.228.254.196
Connecting to max-training-data.s3-api.us-geo.objectstorage.softlayer.net (max-training-data.s3-api.us-geo.objectstorage.softlayer.net)|67.228.254.196|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2575759 (2.5M) [application/x-tar]
Saving to: 'jfk_weather.tar.gz'


2020-02-19 19:25:34 (71.3 MB/s) - 'jfk_weather.tar.gz' saved [2575759/2575759]

./._jfk_weather.csv
jfk_weather.csv


El conjunto de datos contiene algunos valores nulos, por lo tanto, la inferencia de esquema no funcionó correctamente para todas las columnas, además, una columna contenía caracteres finales, por lo que primero debemos limpiar el conjunto de datos. Esta es una tarea normal en cualquier proyecto de ciencia de datos ya que sus datos nunca están limpios, no se preocupe si no comprende todo el código, no se le preguntará al respecto.

In [2]:
import random
random.seed(42)

from pyspark.sql.functions import translate, col

df_cleaned = df \
    .withColumn("HOURLYWindSpeed", df.HOURLYWindSpeed.cast('double')) \
    .withColumn("HOURLYWindDirection", df.HOURLYWindDirection.cast('double')) \
    .withColumn("HOURLYStationPressure", translate(col("HOURLYStationPressure"), "s,", "")) \
    .withColumn("HOURLYPrecip", translate(col("HOURLYPrecip"), "s,", "")) \
    .withColumn("HOURLYRelativeHumidity", translate(col("HOURLYRelativeHumidity"), "*", "")) \
    .withColumn("HOURLYDRYBULBTEMPC", translate(col("HOURLYDRYBULBTEMPC"), "*", "")) \

df_cleaned =   df_cleaned \
                    .withColumn("HOURLYStationPressure", df_cleaned.HOURLYStationPressure.cast('double')) \
                    .withColumn("HOURLYPrecip", df_cleaned.HOURLYPrecip.cast('double')) \
                    .withColumn("HOURLYRelativeHumidity", df_cleaned.HOURLYRelativeHumidity.cast('double')) \
                    .withColumn("HOURLYDRYBULBTEMPC", df_cleaned.HOURLYDRYBULBTEMPC.cast('double')) \

df_filtered = df_cleaned.filter("""
    HOURLYWindSpeed <> 0
    and HOURLYWindDirection <> 0
    and HOURLYStationPressure <> 0
    and HOURLYPressureTendency <> 0
    and HOURLYPressureTendency <> 0
    and HOURLYPrecip <> 0
    and HOURLYRelativeHumidity <> 0
    and HOURLYDRYBULBTEMPC <> 0
""")

Queremos predecir el valor de una columna en función de otras. A veces es útil imprimir una matriz de correlación.

In [3]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYWindDirection","HOURLYStationPressure"],
                                  outputCol="features")
df_pipeline = vectorAssembler.transform(df_filtered)
from pyspark.ml.stat import Correlation
Correlation.corr(df_pipeline,"features").head()[0].toArray()

array([[ 1.        ,  0.06306013, -0.4204518 ],
       [ 0.06306013,  1.        , -0.19199348],
       [-0.4204518 , -0.19199348,  1.        ]])

As we can see, HOURLYWindSpeed and HOURLYWindDirection correlate with 0.06306013 whereas HOURLYWindSpeed  and HOURLYStationPressure correlate with -0.4204518, this is a good sign if we want to predict HOURLYWindSpeed from HOURLYWindDirection and HOURLYStationPressure.
Since this is supervised learning, let’s split our data into train (80%) and test (20%) set.

In [4]:
splits = df_filtered.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]

Again, we can re-use our feature engineering pipeline

In [5]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer
from pyspark.ml import Pipeline

vectorAssembler = VectorAssembler(inputCols=[
                                    "HOURLYWindDirection",
                                    "ELEVATION",
                                    "HOURLYStationPressure"],
                                  outputCol="features")

normalizer = Normalizer(inputCol="features", outputCol="features_norm", p=1.0)

Ahora definimos una función para evaluar nuestro rendimiento de predicción de regresión. Estamos usando RMSE (Root Mean Squared Error) aquí, cuanto más pequeño, mejor ...


In [6]:
def regression_metrics(prediction):
    from pyspark.ml.evaluation import RegressionEvaluator
    evaluator = RegressionEvaluator(
    labelCol="HOURLYWindSpeed", predictionCol="prediction", metricName="rmse")
    rmse = evaluator.evaluate(prediction)
    print("RMSE on test data = %g" % rmse)

Primero, ejecutemos un modelo de regresión lineal para construir una línea base.


In [7]:
#LR1

from pyspark.ml.regression import LinearRegression


lr = LinearRegression(labelCol="HOURLYWindSpeed", featuresCol='features', maxIter=100, regParam=0.0, elasticNetParam=0.0)
pipeline = Pipeline(stages=[vectorAssembler, normalizer,lr])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
regression_metrics(prediction)

RMSE on test data = 6.33073


Ahora intentaremos un Gradient Boosted Tree Regressor

In [8]:
#GBT1

from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(labelCol="HOURLYWindSpeed", maxIter=100)
pipeline = Pipeline(stages=[vectorAssembler, normalizer,gbt])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
regression_metrics(prediction)

RMSE on test data = 7.01428


Now let’s switch gears. Previously, we tried to predict HOURLYWindSpeed, but now we predict HOURLYWindDirection. In order to turn this into a classification problem we discretize the value using the Bucketizer. The new feature is called HOURLYWindDirectionBucketized.

In [9]:
from pyspark.ml.feature import Bucketizer, OneHotEncoder
bucketizer = Bucketizer(splits=[ 0, 180, float('Inf') ],inputCol="HOURLYWindDirection", outputCol="HOURLYWindDirectionBucketized")
encoder = OneHotEncoder(inputCol="HOURLYWindDirectionBucketized", outputCol="HOURLYWindDirectionOHE")


Nuevamente, definimos una función para evaluar cómo nos desempeñamos. Aquí solo usamos la medida de precisión que nos da la fracción de ejemplos correctamente clasificados. De nuevo, 0 es malo, 1 es bueno.

In [10]:
def classification_metrics(prediction):
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    mcEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("HOURLYWindDirectionBucketized")
    accuracy = mcEval.evaluate(prediction)
    print("Accuracy on test data = %g" % accuracy)

Nuevamente, para la línea de base usamos LogisticRegression.

In [11]:
#LGReg1

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="HOURLYWindDirectionBucketized", maxIter=10)
#,"ELEVATION","HOURLYStationPressure","HOURLYPressureTendency","HOURLYPrecip"

vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYDRYBULBTEMPC"],
                                  outputCol="features")

pipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,lr])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)

Accuracy on test data = 0.613208


Probemos otros algoritmos y veamos si aumenta el rendimiento del modelo. También es importante ajustar otros parámetros como parámetros de algoritmos individuales (por ejemplo, número de árboles para RandomForest) o parámetros Pipline de ingeniería de características, por ejemplo relación de división de tren / prueba, normalización, desbarbado, ...

In [12]:
#RF1

from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="HOURLYWindDirectionBucketized", numTrees=30)

vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYDRYBULBTEMPC","ELEVATION","HOURLYStationPressure","HOURLYPressureTendency","HOURLYPrecip"],
                                  outputCol="features")

pipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,rf])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)

Accuracy on test data = 0.698113


In [13]:
#GBT2

from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(labelCol="HOURLYWindDirectionBucketized", maxIter=100)

vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYDRYBULBTEMPC","ELEVATION","HOURLYStationPressure","HOURLYPressureTendency","HOURLYPrecip"],
                                  outputCol="features")

pipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,gbt])
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)

Accuracy on test data = 0.688679
