# Ejercicio Práctico_Predicción en Streaming con Spark ML y Spark Streaming

En este notebook vamos a cargar un pipeline que tiene un conjunto de fases de pre-procesamiento y un modelo de clasificacion predecir la probabilidad de un paciente de sufrir un ataque al corazón. La predicción se realizará sobre datos en streaming optenidos a partir del csv de heart.csv

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.types import *

In [3]:
from pyspark.sql import SparkSession
## Inicia una sesion de Spark
spark = SparkSession.builder\
        .master("local[16]")\
        .appName('Heart Disease')\
        .getOrCreate()

spark

In [4]:
schema = StructType( \
                     [StructField("age", LongType(),True), \
                      StructField("sex", LongType(), True), \
                      StructField("cp", LongType(), True), \
                      StructField('trestbps', LongType(), True), \
                      StructField("chol", LongType(), True), \
                      StructField("fbs", LongType(), True), \
                      StructField("restecg", LongType(), True), \
                      StructField("thalach", LongType(), True),\
                      StructField("exang", LongType(), True), \
                      StructField("oldpeak", DoubleType(), True), \
                      StructField("slope", LongType(),True), \
                      StructField("ca", LongType(), True), \
                      StructField("thal", LongType(), True), \
                      StructField("target", LongType(), True), \
                        ])

In [5]:
## Carga y visualiza el csv de Ejercicios\data\heart.csv con el nombre de heart
heart = spark.read.csv(
    "./data/heart.csv",
    sep = ',',
    header = True,
    schema = schema
    )
heart.show()

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
| 63|  1|  3|     145| 233|  1|      0|    150|    0|    2.3|    0|  0|   1|     1|
| 37|  1|  2|     130| 250|  0|      1|    187|    0|    3.5|    0|  0|   2|     1|
| 41|  0|  1|     130| 204|  0|      0|    172|    0|    1.4|    2|  0|   2|     1|
| 56|  1|  1|     120| 236|  0|      1|    178|    0|    0.8|    2|  0|   2|     1|
| 57|  0|  0|     120| 354|  0|      1|    163|    1|    0.6|    2|  0|   2|     1|
| 57|  1|  0|     140| 192|  0|      1|    148|    0|    0.4|    1|  0|   1|     1|
| 56|  0|  1|     140| 294|  0|      0|    153|    0|    1.3|    1|  0|   2|     1|
| 44|  1|  1|     120| 263|  0|      1|    173|    0|    0.0|    2|  0|   3|     1|
| 52|  1|  2|     172| 199|  1|      1|    162|    0|    0.5|    2|  0|   3|

In [6]:
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType,StructField,LongType, StringType,DoubleType,TimestampType


df = heart.withColumnRenamed("target","label")
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- sex: long (nullable = true)
 |-- cp: long (nullable = true)
 |-- trestbps: long (nullable = true)
 |-- chol: long (nullable = true)
 |-- fbs: long (nullable = true)
 |-- restecg: long (nullable = true)
 |-- thalach: long (nullable = true)
 |-- exang: long (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slope: long (nullable = true)
 |-- ca: long (nullable = true)
 |-- thal: long (nullable = true)
 |-- label: long (nullable = true)



In [7]:
testDF, trainDF = df.randomSplit([0.3, 0.7])

### Crear el Pipeline

In [8]:
from pyspark.ml import PipelineModel
# Get the vectors, and make the stage 1, tranform
stage_1 = VectorAssembler(inputCols=[
    'age',
    'sex',
    'cp',
    'trestbps',
    'chol',
    'fbs',
    'restecg',
    'thalach',
    'exang',
    'oldpeak',
    'slope',
    'ca',
    'thal',
    ], 
    outputCol='features'
    )
# Stage 2 perform a logistic regression, estimator
stage_2 = LogisticRegression(featuresCol="features", labelCol="label")
# Regression pipeline, set up the pipeline
regression_pipeline = Pipeline(stages=[stage_1, stage_2])

In [9]:
regression_pipeline1 = PipelineModel(stages=[stage_1, stage_2])


In [16]:
regression_pipeline1.write().overwrite().save("./pipelines/sample2")
# regression_pipeline.write().overwrite().save("./pipelines/sample")

Py4JError: An error occurred while calling None.None. Trace:
py4j.Py4JException: Cannot convert org.apache.spark.ml.classification.LogisticRegression to org.apache.spark.ml.Transformer
	at py4j.commands.ArrayCommand.convertArgument(ArrayCommand.java:166)
	at py4j.commands.ArrayCommand.setArray(ArrayCommand.java:144)
	at py4j.commands.ArrayCommand.execute(ArrayCommand.java:97)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:1583)



### Correr el pipeline

In [11]:
# fit the pipeline for the trainning data
model = regression_pipeline.fit(trainDF)

In [12]:
# transform the data
sample_data_train = model.transform(trainDF)

In [13]:
# output
sample_data_train.select("*").show()

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+-----+--------------------+--------------------+--------------------+----------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|label|            features|       rawPrediction|         probability|prediction|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+-----+--------------------+--------------------+--------------------+----------+
| 29|  1|  1|     130| 204|  0|      0|    202|    0|    0.0|    2|  0|   2|    1|[29.0,1.0,1.0,130...|[-2.0716685397098...|[0.11188113879827...|       1.0|
| 34|  1|  3|     118| 182|  0|      0|    174|    0|    0.0|    2|  0|   2|    1|[34.0,1.0,3.0,118...|[-3.4284364850358...|[0.03141847728676...|       1.0|
| 35|  0|  0|     138| 183|  0|      1|    182|    0|    1.4|    2|  0|   2|    1|[35.0,0.0,0.0,138...|[-2.5265459825127...|[0.07401803472415...|       1.0|
| 35|  1|  0|     120| 198|  0|      1|    130|    1|    1

In [14]:
model.write.save("./pipelines/samplemodel")
# model_in = PipelineModel.load(outpath)


AttributeError: 'function' object has no attribute 'save'

### Carga del Pipeline

In [None]:
pModel = PipelineModel.load("\pipelines")
## Comprueba que el pipeline anterior funciona correctamente. Para ello realiza una prediccion sobre el conjunto de 
## datos de trainDF y muestra la prediccion


In [None]:
testData = testDF.repartition(10)

testData.write.format("CSV").option("header",False).save("/heart_streaming/")

## Creando predicciones en Streaming

In [None]:
## Utiliza los csv guardados en data/heart_streaming para simular un proceso de datos en streaming.
## Para ello, utiliza la funcion de spark.readStream 
## En la configuración pon: que se importe un archivo por ejecucion
## que se renombre la variable de "output"a "label"
## Llama a este proceso con el nombre sourceStream


In [None]:
## Utiliza el pipeline "pModel" para realizar predicciones utilizando los datos en streaming de "sourceStream"
## De la predicción selecciona las variables label, probability, prediction. 
## Llama a este proceso con el nombre de "prediction1"


In [None]:
display(prediction1)

#### Mostrando las predicciones en consola

In [None]:
## Obten las predicciones sobre los datos en streaming, para ello utiliza prediction1.writeStream. En las opciones de
## configuracion pon: "format" igual a "console" 
## en .trigger igual (once=True),
## y permite que el proceso espere hasta que se complete con .awaitTermination()


#### Guardando las perdicciones en Memoria

In [None]:
## Obten las predicciones sobre los datos en streaming, para ello utiliza prediction1.writeStream. 
## En las opciones de configuracion pon: que los resultados se guarden en memoria, 
## que el .outputMode sea "append"
## que el nombre de la query "queryName" sea "prediction4"


In [None]:
for x in range(2):
    df = spark.sql(
        "SELECT * FROM prediction4")
    df.show(10)
df

In [None]:
## Valida que el proceso de streaming está activo y después muestra el estado