In [19]:
import findspark
findspark.init()

In [20]:
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.sql.types import *
from pyspark.ml import Pipeline


In [21]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('UCI Heart prediction').getOrCreate()

In [22]:
PARAMETERS = {
    'INPUT_DATA_PATH': 'data/heart.csv',
    'TRAIN_DATA_PATH': 'data/train',
    'TEST_DATA_PATH': 'data/test',
    'INFER_SCHEMA': True,
    'HEADER': True,
    'TARGET_COLUMN': 'target',
    'TEST_SIZE': 0.2,
    'RANDOM_SEED': 42,
    'MAX_ITER': 10,
    'REG_PARAM': 0.3,
    'MODEL_PATH': 'models/logistic_regression',

    }




In [23]:
heart_predict = spark.read.csv(PARAMETERS['TEST_DATA_PATH'], 
                       inferSchema = PARAMETERS['INFER_SCHEMA'], 
                       header = PARAMETERS['HEADER'])

schema = heart_predict.schema
heart_predict.show(3)

ORIGINAL_COLUMNS = heart_predict.columns
COLUMNS_TO_SCALE = ['age','trestbps','chol','thalach','oldpeak']
NOT_TO_SCALE = [x for x in ORIGINAL_COLUMNS if x not in COLUMNS_TO_SCALE]

+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
| 34|  0|  1|     118| 210|  0|      1|    192|    0|    0.7|    2|  0|   2|     1|
| 35|  0|  0|     138| 183|  0|      1|    182|    0|    1.4|    2|  0|   2|     1|
| 37|  0|  2|     120| 215|  0|      1|    170|    0|    0.0|    2|  0|   2|     1|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
only showing top 3 rows



In [24]:
heart_predict.printSchema()

root
 |-- age: integer (nullable = true)
 |-- sex: integer (nullable = true)
 |-- cp: integer (nullable = true)
 |-- trestbps: integer (nullable = true)
 |-- chol: integer (nullable = true)
 |-- fbs: integer (nullable = true)
 |-- restecg: integer (nullable = true)
 |-- thalach: integer (nullable = true)
 |-- exang: integer (nullable = true)
 |-- oldpeak: double (nullable = true)
 |-- slope: integer (nullable = true)
 |-- ca: integer (nullable = true)
 |-- thal: integer (nullable = true)
 |-- target: integer (nullable = true)



In [25]:
from pyspark.ml import PipelineModel


pModel = PipelineModel.load(PARAMETERS['MODEL_PATH'])

In [28]:
repartitioned = heart_predict.repartition(10)
repartitioned
#Create a directory
repartitioned.write.format("CSV").option("header",False).save("data/streaming-prepared/")

In [30]:
sourceStream = (
    spark.readStream.schema(schema)
    .option("maxFilesPerTrigger", 1)
    .csv("data/streaming-prepared")
    
)

In [32]:
prediction1 = pModel.transform(sourceStream).select('target',
                                                   'probability',
                                                   'prediction')

In [33]:
display(prediction1)

DataFrame[target: int, probability: vector, prediction: double]

In [35]:
#SAVE PREDICTIONS TO MEMORY
query = (
            prediction1.writeStream.queryName("prediction") 
            .format("memory")
            .outputMode("append")
            .start())

In [37]:
for x in range(2):
    df = spark.sql(
        "SELECT * FROM prediction")
    df.show(10)
df

+------+--------------------+----------+
|target|         probability|prediction|
+------+--------------------+----------+
|     1|[0.25986077301635...|       1.0|
|     1|[0.21486082587260...|       1.0|
|     0|[0.69921885125176...|       0.0|
|     1|[0.12493890625893...|       1.0|
|     0|[0.78957905706201...|       0.0|
|     0|[0.84518413819083...|       0.0|
|     0|[0.78066419923193...|       0.0|
|     0|[0.81099168103656...|       0.0|
|     0|[0.81356022274645...|       0.0|
|     0|[0.58556439405414...|       0.0|
+------+--------------------+----------+
only showing top 10 rows

+------+--------------------+----------+
|target|         probability|prediction|
+------+--------------------+----------+
|     1|[0.25986077301635...|       1.0|
|     1|[0.21486082587260...|       1.0|
|     0|[0.69921885125176...|       0.0|
|     1|[0.12493890625893...|       1.0|
|     0|[0.78957905706201...|       0.0|
|     0|[0.84518413819083...|       0.0|
|     0|[0.78066419923193...|  

DataFrame[target: int, probability: vector, prediction: double]