# Predicción en Streaming con Spark ML y Spark Streaming

En este notebook vamos a entrenar un modelo de clasificación para predecir la probabilidad de un paciente de sufrir un ataque al corazón

In [32]:
import findspark
findspark.init()

In [44]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.classification import LogisticRegression

In [33]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('UCI Heart disease').getOrCreate()

In [52]:
heart = spark.read.csv('heart.csv', 
                       inferSchema = True, 
                       header = True)
heart.show(3)

+---+---+---+------+----+---+-------+--------+----+-------+---+---+-----+------+
|age|sex| cp|trtbps|chol|fbs|restecg|thalachh|exng|oldpeak|slp|caa|thall|output|
+---+---+---+------+----+---+-------+--------+----+-------+---+---+-----+------+
| 63|  1|  3|   145| 233|  1|      0|     150|   0|    2.3|  0|  0|    1|     1|
| 37|  1|  2|   130| 250|  0|      1|     187|   0|    3.5|  0|  0|    2|     1|
| 41|  0|  1|   130| 204|  0|      0|     172|   0|    1.4|  2|  0|    2|     1|
+---+---+---+------+----+---+-------+--------+----+-------+---+---+-----+------+
only showing top 3 rows



In [39]:
schema = StructType( \
                     [StructField("age", LongType(),True), \
                      StructField("sex", LongType(), True), \
                      StructField("cp", LongType(), True), \
                      StructField('trtbps', LongType(), True), \
                      StructField("chol", LongType(), True), \
                      StructField("fbs", LongType(), True), \
                      StructField("restecg", LongType(), True), \
                      StructField("thalachh", LongType(), True),\
                      StructField("exng", LongType(), True), \
                      StructField("oldpeak", DoubleType(), True), \
                      StructField("slp", LongType(),True), \
                      StructField("caa", LongType(), True), \
                      StructField("thall", LongType(), True), \
                      StructField("output", LongType(), True), \
                        ])

In [35]:
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType,StructField,LongType, StringType,DoubleType,TimestampType

df = heart.withColumnRenamed("output","label")
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- cp: integer (nullable = true)
 |-- trtbps: integer (nullable = true)
 |-- chol: integer (nullable = true)
 |-- fbs: integer (nullable = true)
 |-- restecg: integer (nullable = true)
 |-- thalachh: integer (nullable = true)
 |-- exng: integer (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slp: integer (nullable = true)
 |-- caa: integer (nullable = true)
 |-- thall: integer (nullable = true)
 |-- label: integer (nullable = true)



In [36]:
testDF, trainDF = df.randomSplit([0.3, 0.7])

In [45]:
# Create the logistic regression model
lr = LogisticRegression(maxIter=10, regParam= 0.01)

In [46]:
# We create a one hot encoder.
ohe = OneHotEncoder(inputCols = ['sex', 'cp', 'fbs', 'restecg', 'slp', 
                                 'exng', 'caa', 'thall'], 
                    outputCols=['sex_ohe', 'cp_ohe', 'fbs_ohe', 
                                'restecg_ohe', 'slp_ohe', 'exng_ohe', 
                                'caa_ohe', 'thall_ohe'])

# Input list for scaling
inputs = ['age','trtbps','chol','thalachh','oldpeak']

# We scale our inputs
assembler1 = VectorAssembler(inputCols=inputs, outputCol="features_scaled1")
scaler = MinMaxScaler(inputCol="features_scaled1", outputCol="features_scaled")

# We create a second assembler for the encoded columns.
assembler2 = VectorAssembler(inputCols=['sex_ohe', 'cp_ohe', 
                                        'fbs_ohe', 'restecg_ohe', 
                                        'slp_ohe', 'exng_ohe', 'caa_ohe', 
                                        'thall_ohe','features_scaled'], 
                             outputCol="features")


In [47]:
# Create stages list
myStages = [assembler1, scaler, ohe, assembler2,lr]

# Set up the pipeline
pipeline = Pipeline(stages= myStages)

In [48]:
# We fit the model using the training data.
pModel = pipeline.fit(trainDF)

# We transform the data.
trainingPred = pModel.transform(trainDF)

# # We select the actual label, probability and predictions
trainingPred.select('label','probability','prediction').show()

+-----+--------------------+----------+
|label|         probability|prediction|
+-----+--------------------+----------+
|    1|[0.04275732251570...|       1.0|
|    1|[0.00770994745547...|       1.0|
|    1|[0.02864864899819...|       1.0|
|    0|[0.84423307425181...|       0.0|
|    0|[0.63608517373059...|       0.0|
|    1|[0.03306749779557...|       1.0|
|    1|[0.06549242762356...|       1.0|
|    1|[0.06549242762356...|       1.0|
|    0|[0.64041426598065...|       0.0|
|    1|[0.02325737111340...|       1.0|
|    0|[0.65754290725687...|       0.0|
|    1|[0.06216801152256...|       1.0|
|    0|[0.91057051865391...|       0.0|
|    1|[0.28174410550987...|       1.0|
|    1|[0.01761146407831...|       1.0|
|    1|[0.03817274182269...|       1.0|
|    1|[0.01179641883045...|       1.0|
|    0|[0.25472715497876...|       1.0|
|    1|[0.02113471846720...|       1.0|
|    1|[0.30261067768299...|       1.0|
+-----+--------------------+----------+
only showing top 20 rows



In [38]:
testData = testDF.repartition(10)
testData

#Create a directory
testData.write.format("CSV").option("header",False).save("heart_streaming/")

## Creando predicciones en Streaming

In [40]:
sourceStream = (
    spark.readStream.schema(schema)
    .option("maxFilesPerTrigger", 1)
    .csv("heart_streaming/")
    .withColumnRenamed("output","label")
)

In [53]:
prediction1 = pModel.transform(sourceStream).select('label',
                                                   'probability',
                                                   'prediction')

In [55]:
display(prediction1)

DataFrame[label: bigint, probability: vector, prediction: double]

#### Mostrando las predicciones en consola

In [65]:
query1 = prediction1.writeStream.queryName("prediction1") \
            .format("console")\
            .trigger(once=True)\
            .start()\
            .awaitTermination()

#### Guardando las perdicciones en Memoria

In [66]:
query2 = (
            prediction1.writeStream.queryName("prediction3") 
            .format("memory")
            .outputMode("append")
            .start())

In [72]:
for x in range(2):
    df = spark.sql(
        "SELECT * FROM prediction3")
    df.show(10)
df

+-----+--------------------+----------+
|label|         probability|prediction|
+-----+--------------------+----------+
|    1|[0.04086978924170...|       1.0|
|    0|[0.98184892212735...|       0.0|
|    1|[0.00474279761632...|       1.0|
|    1|[0.35775366097494...|       1.0|
|    1|[0.05755909903937...|       1.0|
|    0|[0.95305536703752...|       0.0|
|    0|[0.94079962605713...|       0.0|
|    0|[0.13017480179914...|       1.0|
|    0|[0.99807916786174...|       0.0|
|    1|[0.15541832735450...|       1.0|
+-----+--------------------+----------+
only showing top 10 rows

+-----+--------------------+----------+
|label|         probability|prediction|
+-----+--------------------+----------+
|    1|[0.04086978924170...|       1.0|
|    0|[0.98184892212735...|       0.0|
|    1|[0.00474279761632...|       1.0|
|    1|[0.35775366097494...|       1.0|
|    1|[0.05755909903937...|       1.0|
|    0|[0.95305536703752...|       0.0|
|    0|[0.94079962605713...|       0.0|
|    0|[0.1301

DataFrame[label: bigint, probability: vector, prediction: double]

In [68]:
spark.streams.active[0].isActive

True

In [73]:
spark.streams

<pyspark.sql.streaming.StreamingQueryManager at 0x192daf1e1f0>

In [75]:
query2.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}