# Predicción en Streaming con Spark ML y Spark Streaming

En este notebook vamos a entrenar un modelo de clasificación para predecir la probabilidad de un paciente de sufrir un ataque al corazón

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.types import *

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('UCI Heart disease').getOrCreate()

In [5]:
heart = spark.read.csv('/content/heart.csv',
                       inferSchema = True,
                       header = True)
heart.show(3)

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
| 63|  1|  3|     145| 233|  1|      0|    150|    0|    2.3|    0|  0|   1|     1|
| 37|  1|  2|     130| 250|  0|      1|    187|    0|    3.5|    0|  0|   2|     1|
| 41|  0|  1|     130| 204|  0|      0|    172|    0|    1.4|    2|  0|   2|     1|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
only showing top 3 rows



In [41]:
schema = StructType( \
                     [StructField("age", LongType(),True), \
                      StructField("sex", LongType(), True), \
                      StructField("cp", LongType(), True), \
                      StructField('trestbps', LongType(), True), \
                      StructField("chol", LongType(), True), \
                      StructField("fbs", LongType(), True), \
                      StructField("restecg", LongType(), True), \
                      StructField("thalach", LongType(), True),\
                      StructField("exang", LongType(), True), \
                      StructField("oldpeak", DoubleType(), True), \
                      StructField("slope", LongType(),True), \
                      StructField("ca", LongType(), True), \
                      StructField("thal", LongType(), True), \
                      StructField("target", LongType(), True), \
                        ])

In [7]:
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType,StructField,LongType, StringType,DoubleType,TimestampType

df = heart.withColumnRenamed("target","label")
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- cp: integer (nullable = true)
 |-- trestbps: integer (nullable = true)
 |-- chol: integer (nullable = true)
 |-- fbs: integer (nullable = true)
 |-- restecg: integer (nullable = true)
 |-- thalach: integer (nullable = true)
 |-- exang: integer (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slope: integer (nullable = true)
 |-- ca: integer (nullable = true)
 |-- thal: integer (nullable = true)
 |-- label: integer (nullable = true)



In [8]:
testDF, trainDF = df.randomSplit([0.3, 0.7])

In [9]:
# Creamos el modelo de regresión lineal
lr = LogisticRegression(maxIter=10, regParam= 0.01)

In [10]:
# Creamos el hot enconder
ohe = OneHotEncoder(inputCols = ['sex', 'cp', 'fbs', 'restecg', 'slope',
                                 'exang', 'ca', 'thal'],
                    outputCols=['sex_ohe', 'cp_ohe', 'fbs_ohe',
                                'restecg_ohe', 'slp_ohe', 'exng_ohe',
                                'caa_ohe', 'thall_ohe'])

# Lista de Inputs list para escalación
inputs = ['age','trestbps','chol','thalach','oldpeak']

# Escalamos los Inputs
assembler1 = VectorAssembler(inputCols=inputs, outputCol="features_scaled1")
scaler = MinMaxScaler(inputCol="features_scaled1", outputCol="features_scaled")

# Creamos un segundo assambles para el encoder por columna.
assembler2 = VectorAssembler(inputCols=['sex_ohe', 'cp_ohe',
                                        'fbs_ohe', 'restecg_ohe',
                                        'slp_ohe', 'exng_ohe', 'caa_ohe',
                                        'thall_ohe','features_scaled'],
                             outputCol="features")


In [11]:
# Creamos la lista de stages
myStages = [assembler1, scaler, ohe, assembler2,lr]

# Set up de los pipelines
pipeline = Pipeline(stages= myStages)

In [12]:
# Hacemos el fit de los modelo usando los datos de entrenamiento.
pModel = pipeline.fit(trainDF)

# Transformamos los datos
trainingPred = pModel.transform(trainDF)

# # Seleccionamos label, probability and predictions
trainingPred.select('label','probability','prediction').show()

+-----+--------------------+----------+
|label|         probability|prediction|
+-----+--------------------+----------+
|    1|[0.02515687743517...|       1.0|
|    1|[0.05536930420429...|       1.0|
|    0|[0.88743407623801...|       0.0|
|    1|[0.04848075370175...|       1.0|
|    1|[0.00563778993093...|       1.0|
|    1|[0.05397991136446...|       1.0|
|    1|[0.05397991136446...|       1.0|
|    0|[0.66759380307695...|       0.0|
|    1|[0.03549035603854...|       1.0|
|    0|[0.72461695624972...|       0.0|
|    1|[0.02977025491517...|       1.0|
|    1|[0.07690074513393...|       1.0|
|    1|[0.02211384105073...|       1.0|
|    1|[0.03738110239609...|       1.0|
|    1|[0.01118905216260...|       1.0|
|    0|[0.24448631285092...|       1.0|
|    1|[0.05569670760886...|       1.0|
|    1|[0.03647372276783...|       1.0|
|    1|[0.28994791812000...|       1.0|
|    1|[0.17150808593248...|       1.0|
+-----+--------------------+----------+
only showing top 20 rows



In [13]:
#Guardamos los pipelines
pModel.save("/Pipelines")

In [14]:
from pyspark.ml import PipelineModel

#Cargamos los pipelines
pModel = PipelineModel.load("/Pipelines")

pModel

PipelineModel_b4ba52657328

In [15]:
## Comprueba que el pipeline anterior funciona correctamente. Para ello realiza una prediccion sobre el conjunto de
## datos de trainDF y muestra la prediccion

# Transformamos los datos.
trainingPred = pModel.transform(trainDF)

# # We select the actual label, probability and predictions
selected_predictions = trainingPred.select('label','probability','prediction')

selected_predictions.show()


+-----+--------------------+----------+
|label|         probability|prediction|
+-----+--------------------+----------+
|    1|[0.02515687743517...|       1.0|
|    1|[0.05536930420429...|       1.0|
|    0|[0.88743407623801...|       0.0|
|    1|[0.04848075370175...|       1.0|
|    1|[0.00563778993093...|       1.0|
|    1|[0.05397991136446...|       1.0|
|    1|[0.05397991136446...|       1.0|
|    0|[0.66759380307695...|       0.0|
|    1|[0.03549035603854...|       1.0|
|    0|[0.72461695624972...|       0.0|
|    1|[0.02977025491517...|       1.0|
|    1|[0.07690074513393...|       1.0|
|    1|[0.02211384105073...|       1.0|
|    1|[0.03738110239609...|       1.0|
|    1|[0.01118905216260...|       1.0|
|    0|[0.24448631285092...|       1.0|
|    1|[0.05569670760886...|       1.0|
|    1|[0.03647372276783...|       1.0|
|    1|[0.28994791812000...|       1.0|
|    1|[0.17150808593248...|       1.0|
+-----+--------------------+----------+
only showing top 20 rows



In [90]:
trainingPred.columns

['age',
 'sex',
 'cp',
 'trestbps',
 'chol',
 'fbs',
 'restecg',
 'thalach',
 'exang',
 'oldpeak',
 'slope',
 'ca',
 'thal',
 'label',
 'features_scaled1',
 'features_scaled',
 'sex_ohe',
 'cp_ohe',
 'fbs_ohe',
 'restecg_ohe',
 'slp_ohe',
 'exng_ohe',
 'caa_ohe',
 'thall_ohe',
 'features',
 'rawPrediction',
 'probability',
 'prediction']

In [43]:
testData = testDF.repartition(10)


testData.write.format("CSV").option("header",True).mode("overwrite").save("/heart_streaming/")



## Creando predicciones en Streaming

In [101]:
## Utiliza los csv guardados en data/heart_streaming para simular un proceso de datos en streaming.
## Para ello, utiliza la funcion de spark.readStream
## En la configuración pon: que se importe un archivo por ejecucion
## que se renombre la variable de "output"a "label"
## Llama a este proceso con el nombre sourceStream

from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

sourceStream = (
    spark.readStream.format("csv")
    .option("header","True")
    .schema(schema)
    .option("ignoreLeadingWhiteSpace",True)
    .option("mode","dropMalformed")
    .option("maxFilesPerTrigger",1)
    .load("/heart_streaming/")
    .withColumnRenamed("target","label")
)

sourceStream


DataFrame[age: bigint, sex: bigint, cp: bigint, trestbps: bigint, chol: bigint, fbs: bigint, restecg: bigint, thalach: bigint, exang: bigint, oldpeak: double, slope: bigint, ca: bigint, thal: bigint, label: bigint]

In [59]:
sourceStream.isStreaming

True

In [102]:
## Utiliza el pipeline "pModel" para realizar predicciones utilizando los datos en streaming de "sourceStream"
## De la predicción selecciona las variables label, probability, prediction.
## Llama a este proceso con el nombre de "prediction1"

prediction1 = pModel.transform(sourceStream)

prediction1.printSchema()

root
 |-- age: long (nullable = true)
 |-- sex: long (nullable = true)
 |-- cp: long (nullable = true)
 |-- trestbps: long (nullable = true)
 |-- chol: long (nullable = true)
 |-- fbs: long (nullable = true)
 |-- restecg: long (nullable = true)
 |-- thalach: long (nullable = true)
 |-- exang: long (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slope: long (nullable = true)
 |-- ca: long (nullable = true)
 |-- thal: long (nullable = true)
 |-- label: long (nullable = true)
 |-- features_scaled1: vector (nullable = true)
 |-- features_scaled: vector (nullable = true)
 |-- sex_ohe: vector (nullable = true)
 |-- cp_ohe: vector (nullable = true)
 |-- fbs_ohe: vector (nullable = true)
 |-- restecg_ohe: vector (nullable = true)
 |-- slp_ohe: vector (nullable = true)
 |-- exng_ohe: vector (nullable = true)
 |-- caa_ohe: vector (nullable = true)
 |-- thall_ohe: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- proba

In [103]:
prediction1 = prediction1.select("label", "probability", "prediction")

In [104]:
display(prediction1)

DataFrame[label: bigint, probability: vector, prediction: double]

#### Mostrando las predicciones en consola

In [105]:
## Obten las predicciones sobre los datos en streaming, para ello utiliza prediction1.writeStream. En las opciones de
## configuracion pon: "format" igual a "console"
## en .trigger igual (once=True),
## y permite que el proceso espere hasta que se complete con .awaitTermination()

# Define the streaming query
query = (
    prediction1
    .writeStream
    .outputMode("append")
    .format("console")  # You can change the output format as needed
    .trigger(once = True)
    .queryName("prediction1")
    .start()
)

# Await termination of the streaming query
query.awaitTermination()

#### Guardando las perdicciones en Memoria

In [106]:
## Obten las predicciones sobre los datos en streaming, para ello utiliza prediction1.writeStream.
## En las opciones de configuracion pon: que los resultados se guarden en memoria,
## que el .outputMode sea "append"
## que el nombre de la query "queryName" sea "prediction4"

query = (
    prediction1
    .writeStream
    .outputMode("append")
    .format("memory")  # You can change the output format as needed
    .trigger(once = True)
    .queryName("prediction4")
    .start()
)

In [107]:
for x in range(2):
    df = spark.sql(
        "SELECT * FROM prediction4")
    df.show(10)
df

+-----+--------------------+----------+
|label|         probability|prediction|
+-----+--------------------+----------+
|    1|[0.46307504459163...|       1.0|
|    1|[0.04349641430887...|       1.0|
|    1|[0.07716164567849...|       1.0|
|    0|[0.90005706585809...|       0.0|
|    0|[0.75633699209214...|       0.0|
|    1|[0.03854801767868...|       1.0|
|    1|[0.08990835082386...|       1.0|
|    0|[0.83507859969069...|       0.0|
|    0|[0.9803939380043,...|       0.0|
|    0|[0.97798364164624...|       0.0|
+-----+--------------------+----------+
only showing top 10 rows

+-----+--------------------+----------+
|label|         probability|prediction|
+-----+--------------------+----------+
|    1|[0.46307504459163...|       1.0|
|    1|[0.04349641430887...|       1.0|
|    1|[0.07716164567849...|       1.0|
|    0|[0.90005706585809...|       0.0|
|    0|[0.75633699209214...|       0.0|
|    1|[0.03854801767868...|       1.0|
|    1|[0.08990835082386...|       1.0|
|    0|[0.8350

DataFrame[label: bigint, probability: vector, prediction: double]

In [108]:
## Valida que el proceso de streaming está activo y después muestra el estado
sourceStream.isStreaming


True