# Streaming Prediction with Spark ML and Spark Streaming

In this notebook we are going to train a classification model to predict a patient's probability of suffering a heart attack.

**Creating pipeline(a numbers of preprocess involved-assembler1, scaler, ohe, assembler2,lr ) and save it  for streaming data**

In [None]:
!pip install findspark
!pip install pyspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=02c71cf9d47bb122abae3a1f6da76f3f3637be9382f6ca0560c7251fd81a9750
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.types import *

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('UCI Heart disease').getOrCreate()

In [None]:
heart = spark.read.csv("/content/heart.csv",
                       inferSchema = True,
                       header = True)
heart.show(3)

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
| 63|  1|  3|     145| 233|  1|      0|    150|    0|    2.3|    0|  0|   1|     1|
| 37|  1|  2|     130| 250|  0|      1|    187|    0|    3.5|    0|  0|   2|     1|
| 41|  0|  1|     130| 204|  0|      0|    172|    0|    1.4|    2|  0|   2|     1|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
only showing top 3 rows



In [None]:
schema = StructType( \
                     [StructField("age", LongType(),True), \
                      StructField("sex", LongType(), True), \
                      StructField("cp", LongType(), True), \
                      StructField('trestbps', LongType(), True), \
                      StructField("chol", LongType(), True), \
                      StructField("fbs", LongType(), True), \
                      StructField("restecg", LongType(), True), \
                      StructField("thalach", LongType(), True),\
                      StructField("exang", LongType(), True), \
                      StructField("oldpeak", DoubleType(), True), \
                      StructField("slope", LongType(),True), \
                      StructField("ca", LongType(), True), \
                      StructField("thal", LongType(), True), \
                      StructField("target", LongType(), True), \
                        ])

In [None]:
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType,StructField,LongType, StringType,DoubleType,TimestampType

df = heart.withColumnRenamed("target","label")
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- cp: integer (nullable = true)
 |-- trestbps: integer (nullable = true)
 |-- chol: integer (nullable = true)
 |-- fbs: integer (nullable = true)
 |-- restecg: integer (nullable = true)
 |-- thalach: integer (nullable = true)
 |-- exang: integer (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slope: integer (nullable = true)
 |-- ca: integer (nullable = true)
 |-- thal: integer (nullable = true)
 |-- label: integer (nullable = true)



In [None]:
testDF, trainDF = df.randomSplit([0.3, 0.7])

In [None]:
# Create the logistic regression model
lr = LogisticRegression(maxIter=10, regParam= 0.01)

In [None]:
# We create a one hot encoder.
ohe = OneHotEncoder(inputCols = ['sex', 'cp', 'fbs', 'restecg', 'slope',
                                 'exang', 'ca', 'thal'],
                    outputCols=['sex_ohe', 'cp_ohe', 'fbs_ohe',
                                'restecg_ohe', 'slp_ohe', 'exng_ohe',
                                'caa_ohe', 'thall_ohe'])

# Input list for scaling
inputs = ['age','trestbps','chol','thalach','oldpeak']

# We scale our inputs
assembler1 = VectorAssembler(inputCols=inputs, outputCol="features_scaled1")
scaler = MinMaxScaler(inputCol="features_scaled1", outputCol="features_scaled")

# We create a second assembler for the encoded columns.
assembler2 = VectorAssembler(inputCols=['sex_ohe', 'cp_ohe',
                                        'fbs_ohe', 'restecg_ohe',
                                        'slp_ohe', 'exng_ohe', 'caa_ohe',
                                        'thall_ohe','features_scaled'],
                             outputCol="features")


In [None]:
# Create stages list
myStages = [assembler1, scaler, ohe, assembler2,lr]

# Set up the pipeline
pipeline = Pipeline(stages= myStages)

In [None]:
# We fit the model using the training data.
pModel = pipeline.fit(trainDF)

# We transform the data.
trainingPred = pModel.transform(trainDF)

# # We select the actual label, probability and predictions
trainingPred.select('label','probability','prediction').show()

+-----+--------------------+----------+
|label|         probability|prediction|
+-----+--------------------+----------+
|    1|[0.02082049276438...|       1.0|
|    1|[0.00349694524882...|       1.0|
|    1|[0.02497413140479...|       1.0|
|    0|[0.68654443009022...|       0.0|
|    1|[0.01359450880502...|       1.0|
|    1|[0.00329226868052...|       1.0|
|    1|[0.04984709511694...|       1.0|
|    0|[0.67075368688499...|       0.0|
|    1|[0.01929114560670...|       1.0|
|    0|[0.69930120117003...|       0.0|
|    1|[0.02657089565818...|       1.0|
|    0|[0.94457287311715...|       0.0|
|    1|[0.01897009099017...|       1.0|
|    1|[0.00392892014022...|       1.0|
|    1|[0.01147819212974...|       1.0|
|    0|[0.33364414059559...|       1.0|
|    1|[0.01173781003523...|       1.0|
|    1|[0.01131678038432...|       1.0|
|    1|[0.09699872384288...|       1.0|
|    1|[0.00868910471995...|       1.0|
+-----+--------------------+----------+
only showing top 20 rows



In [None]:
pModel.save("/pipelines")

In [16]:
import os
from google.colab import files

# Specify the directory path
directory_path = "/content/pipelines_alternative"

# Create the directory if it doesn't exist
os.makedirs(directory_path, exist_ok=True)

# List the contents of the directory
directory_contents = os.listdir(directory_path)
print("Directory Contents:", directory_contents)

# Download the entire directory as a zip file
zip_file_path = "/content/pipelines_alternative.zip"
!zip -r $zip_file_path $directory_path

# Download the zip file
files.download(zip_file_path)


Directory Contents: []
  adding: content/pipelines_alternative/ (stored 0%)


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [18]:
from pyspark.ml import PipelineModel


pModel = PipelineModel.load("/pipelines")

Transformation: pModel.transform(trainDF) applies the previously trained pipeline model pModel to the training DataFrame trainDF. The transformation involves applying each stage of the pipeline, including feature transformations and the final classification model, to the input data.

Selection of Columns: trainingPred.select('label','probability','prediction') selects specific columns from the resulting DataFrame (trainingPred). The selected columns are:

'label': The actual label or target variable from the training data.
'probability': The probability distribution of the predicted classes. This is often used in classification problems to see the likelihood of each class.
'prediction': The predicted class label based on the model.

In [20]:
# We transform the data.
trainingPred = pModel.transform(trainDF)

# # We select the actual label, probability and predictions
trainingPred.select('label','probability','prediction').show()

+-----+--------------------+----------+
|label|         probability|prediction|
+-----+--------------------+----------+
|    1|[0.02082049276438...|       1.0|
|    1|[0.00349694524882...|       1.0|
|    1|[0.02497413140479...|       1.0|
|    0|[0.68654443009022...|       0.0|
|    1|[0.01359450880502...|       1.0|
|    1|[0.00329226868052...|       1.0|
|    1|[0.04984709511694...|       1.0|
|    0|[0.67075368688499...|       0.0|
|    1|[0.01929114560670...|       1.0|
|    0|[0.69930120117003...|       0.0|
|    1|[0.02657089565818...|       1.0|
|    0|[0.94457287311715...|       0.0|
|    1|[0.01897009099017...|       1.0|
|    1|[0.00392892014022...|       1.0|
|    1|[0.01147819212974...|       1.0|
|    0|[0.33364414059559...|       1.0|
|    1|[0.01173781003523...|       1.0|
|    1|[0.01131678038432...|       1.0|
|    1|[0.09699872384288...|       1.0|
|    1|[0.00868910471995...|       1.0|
+-----+--------------------+----------+
only showing top 20 rows



Partioning test data into 10 csv and maxfiles trigger 1

In [21]:
testData = testDF.repartition(10)
testData

#Create a directory
testData.write.format("CSV").option("header",False).save("/heart_streaming/")

In [25]:
sourceStream = (
    spark.readStream.schema(schema)
    .option("maxFilesPerTrigger", 1)
    .csv("/heart_streaming")
)

In [26]:

prediction1 = pModel.transform(sourceStream).select('target',
                                                   'probability',
                                                   'prediction')


In [27]:
display(prediction1)

DataFrame[target: bigint, probability: vector, prediction: double]

Displaying the prediction of console

In [28]:
query1 = prediction1.writeStream.queryName("prediction1") \
            .format("console")\
            .trigger(once=True)\
            .start()\
            .awaitTermination()

In [29]:
query2 = (
            prediction1.writeStream.queryName("prediction4")
            .format("memory")
            .outputMode("append")
            .start())

In [30]:
for x in range(2):
    df = spark.sql(
        "SELECT * FROM prediction4")
    df.show(10)
df

+------+--------------------+----------+
|target|         probability|prediction|
+------+--------------------+----------+
|     0|[0.97397927826711...|       0.0|
|     0|[0.94116965839550...|       0.0|
|     1|[0.09057169258915...|       1.0|
|     1|[0.00895484866126...|       1.0|
|     0|[0.48344847711679...|       1.0|
|     0|[0.87733369531352...|       0.0|
|     0|[0.38316072658009...|       1.0|
|     0|[0.96788920228249...|       0.0|
|     0|[0.90723095682524...|       0.0|
|     0|[0.55792532929343...|       0.0|
+------+--------------------+----------+
only showing top 10 rows

+------+--------------------+----------+
|target|         probability|prediction|
+------+--------------------+----------+
|     0|[0.97397927826711...|       0.0|
|     0|[0.94116965839550...|       0.0|
|     1|[0.09057169258915...|       1.0|
|     1|[0.00895484866126...|       1.0|
|     0|[0.48344847711679...|       1.0|
|     0|[0.87733369531352...|       0.0|
|     0|[0.38316072658009...|  

DataFrame[target: bigint, probability: vector, prediction: double]

In [31]:
spark.streams.active[0].isActive

True

In [32]:
spark.streams

<pyspark.sql.streaming.query.StreamingQueryManager at 0x78bedbc0a020>

In [33]:
query2.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}