# Ejercicio Práctico_Predicción en Streaming con Spark ML y Spark Streaming

En este notebook vamos a cargar un pipeline que tiene un conjunto de fases de pre-procesamiento y un modelo de clasificacion predecir la probabilidad de un paciente de sufrir un ataque al corazón. La predicción se realizará sobre datos en streaming optenidos a partir del csv de heart.csv

In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.types import *

In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Aplicacion") \
    .getOrCreate()
## Inicia una sesion de Spark

In [9]:
## Carga y visualiza el csv de Ejercicios\data\heart.csv con el nombre de heart
heart = spark.read.csv("data/heart.csv", 
                    header=True, 
                    inferSchema=True)

In [10]:
schema = StructType( \
                     [StructField("age", LongType(),True), \
                      StructField("sex", LongType(), True), \
                      StructField("cp", LongType(), True), \
                      StructField('trestbps', LongType(), True), \
                      StructField("chol", LongType(), True), \
                      StructField("fbs", LongType(), True), \
                      StructField("restecg", LongType(), True), \
                      StructField("thalach", LongType(), True),\
                      StructField("exang", LongType(), True), \
                      StructField("oldpeak", DoubleType(), True), \
                      StructField("slope", LongType(),True), \
                      StructField("ca", LongType(), True), \
                      StructField("thal", LongType(), True), \
                      StructField("target", LongType(), True), \
                        ])

In [11]:
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType,StructField,LongType, StringType,DoubleType,TimestampType


df = heart.withColumnRenamed("target","label")
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- cp: integer (nullable = true)
 |-- trestbps: integer (nullable = true)
 |-- chol: integer (nullable = true)
 |-- fbs: integer (nullable = true)
 |-- restecg: integer (nullable = true)
 |-- thalach: integer (nullable = true)
 |-- exang: integer (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slope: integer (nullable = true)
 |-- ca: integer (nullable = true)
 |-- thal: integer (nullable = true)
 |-- label: integer (nullable = true)



In [12]:
testDF, trainDF = heart.randomSplit([0.3, 0.7])

### Carga del Pipeline

In [14]:
from pyspark.ml import PipelineModel


pModel = PipelineModel.load("/pipelines")

Py4JJavaError: An error occurred while calling o57.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/pipelines/metadata
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:304)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:244)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:332)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:208)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:294)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:290)
	at org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.io.IOException: Input path does not exist: file:/pipelines/metadata
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:278)
	... 25 more


In [2]:
## Comprueba que el pipeline anterior funciona correctamente. Para ello realiza una prediccion sobre el conjunto de 
## datos de trainDF y muestra la prediccion


In [81]:
testData = testDF.repartition(10)

testData.write.format("CSV").option("header",False).save("/heart_streaming/")

## Creando predicciones en Streaming

In [82]:
## Utiliza los csv guardados en data/heart_streaming para simular un proceso de datos en streaming.
## Para ello, utiliza la funcion de spark.readStream 
## En la configuración pon: que se importe un archivo por ejecucion
## que se renombre la variable de "output"a "label"
## Llama a este proceso con el nombre sourceStream


In [83]:
## Utiliza el pipeline "pModel" para realizar predicciones utilizando los datos en streaming de "sourceStream"
## De la predicción selecciona las variables label, probability, prediction. 
## Llama a este proceso con el nombre de "prediction1"


In [84]:
display(prediction1)

DataFrame[label: bigint, probability: vector, prediction: double]

#### Mostrando las predicciones en consola

In [85]:
## Obten las predicciones sobre los datos en streaming, para ello utiliza prediction1.writeStream. En las opciones de
## configuracion pon: "format" igual a "console" 
## en .trigger igual (once=True),
## y permite que el proceso espere hasta que se complete con .awaitTermination()


#### Guardando las perdicciones en Memoria

In [3]:
## Obten las predicciones sobre los datos en streaming, para ello utiliza prediction1.writeStream. 
## En las opciones de configuracion pon: que los resultados se guarden en memoria, 
## que el .outputMode sea "append"
## que el nombre de la query "queryName" sea "prediction4"


In [88]:
for x in range(2):
    df = spark.sql(
        "SELECT * FROM prediction4")
    df.show(10)
df

+-----+--------------------+----------+
|label|         probability|prediction|
+-----+--------------------+----------+
|    1|[0.04086978924170...|       1.0|
|    0|[0.98184892212735...|       0.0|
|    1|[0.00474279761632...|       1.0|
|    1|[0.35775366097494...|       1.0|
|    1|[0.05755909903937...|       1.0|
|    0|[0.95305536703752...|       0.0|
|    0|[0.94079962605713...|       0.0|
|    0|[0.13017480179914...|       1.0|
|    0|[0.99807916786174...|       0.0|
|    1|[0.15541832735450...|       1.0|
+-----+--------------------+----------+
only showing top 10 rows

+-----+--------------------+----------+
|label|         probability|prediction|
+-----+--------------------+----------+
|    1|[0.04086978924170...|       1.0|
|    0|[0.98184892212735...|       0.0|
|    1|[0.00474279761632...|       1.0|
|    1|[0.35775366097494...|       1.0|
|    1|[0.05755909903937...|       1.0|
|    0|[0.95305536703752...|       0.0|
|    0|[0.94079962605713...|       0.0|
|    0|[0.1301

DataFrame[label: bigint, probability: vector, prediction: double]

In [None]:
## Valida que el proceso de streaming está activo y después muestra el estado