In [0]:
## Import libraries
from pyspark.ml.classification import LogisticRegression # Logistic Regerssion
from pyspark.ml.evaluation import MulticlassClassificationEvaluator # Evaluate our model
from pyspark.ml.regression import RandomForestRegressor
# Feature Engineering
from pyspark.ml.feature import VectorAssembler, StringIndexer, Tokenizer, StopWordsRemover, CountVectorizer

from pyspark.ml import Pipeline # Pipeline - similar to SKlearn
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder # CV and 

import mlflow
from pyspark.ml.feature import OneHotEncoder

In [0]:
pip install tensorflow-cpu==2.4.*

In [0]:
# File location and type
file_location = "/FileStore/tables/Heart-5.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)\
  .drop("Unnamed: 0") #drop unnamed column

display(df)

Age,Sex,ChestPain,RestBP,Chol,Fbs,RestECG,MaxHR,ExAng,Oldpeak,Slope,Ca,Thal,AHD
63,1,typical,145,233,1,2,150,0,2.3,3,0.0,fixed,No
67,1,asymptomatic,160,286,0,2,108,1,1.5,2,3.0,normal,Yes
67,1,asymptomatic,120,229,0,2,129,1,2.6,2,2.0,reversable,Yes
37,1,nonanginal,130,250,0,0,187,0,3.5,3,0.0,normal,No
41,0,nontypical,130,204,0,2,172,0,1.4,1,0.0,normal,No
56,1,nontypical,120,236,0,0,178,0,0.8,1,0.0,normal,No
62,0,asymptomatic,140,268,0,2,160,0,3.6,3,2.0,normal,Yes
57,0,asymptomatic,120,354,0,0,163,1,0.6,1,0.0,normal,No
63,1,asymptomatic,130,254,0,2,147,0,1.4,2,1.0,reversable,Yes
53,1,asymptomatic,140,203,1,2,155,1,3.1,3,0.0,reversable,Yes


In [0]:
# Create indexer for all the characteristic column
indexers = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(df) for column in list(set(df.columns)-set(['Age',
 'Sex',
 'RestBP',
 'Chol',
 'Fbs',
 'RestECG',
 'MaxHR',
 'ExAng',
 'Oldpeak',
 'Slope',
 'Ca'])) ]
pipeline = Pipeline(stages=indexers)
df_r = pipeline.fit(df).transform(df)

df_r.show()

In [0]:
# Creat OneHotEncoder except the targe column AHD_index
encoder = OneHotEncoder(inputCols=["ChestPain_index","Thal_index"],
                        outputCols=["ChestPain_Vec", "Thal_Vec"])
target = encoder.fit(df_r)
encoded = target.transform(df_r)
encoded.show()

In [0]:
# Asseble the OneHoteEcodended orderly
assembler = VectorAssembler(inputCols=['Age', 'Sex','RestBP', 'Chol', 'Fbs','RestECG','MaxHR','ExAng', 'Oldpeak', 'Slope','Ca','AHD_index', 'ChestPain_Vec', 'Thal_Vec'], outputCol='features')
df_enc = assembler.transform(encoded)
df_enc.show(5)

In [0]:
#keep in the Dataframe 
df_model=df_enc.select("features","AHD_index")

In [0]:
# Feature need to create modeling
df_model.show()

In [0]:
from pyspark.ml.regression import LinearRegression
train, test = df_model.randomSplit([0.8, 0.2], 42)

In [0]:
# Model building
lr = LinearRegression(featuresCol = 'features', labelCol='AHD_index')


In [0]:
#fit with train dataset
lr_model = lr.fit(train)

In [0]:
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

In [0]:
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

In [0]:
lr_predictions = lr_model.transform(test)

In [0]:
predictions = lr_model.transform(test)
predictions.select("prediction","AHD_index","features").show()