In [261]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StandardScaler, OneHotEncoder, StringIndexer
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [262]:
import os

file_path = "/Users/vivek/Desktop/school/BIGDATA/heart.csv"
print("File exists:", os.path.exists(file_path))  # Should return True
print("File size:", os.path.getsize(file_path))  # Should return file size in bytes

File exists: True
File size: 56674


In [263]:
spark = SparkSession.builder \
    .appName("Heart Disease Analysis") \
    .master("spark://10.100.165.4:7077") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "1g") \
    .config("spark.executor.cores", "2") \
    .config("spark.cores.max", "6") \
    .config("spark.submit.deployMode", "cluster") \
    .getOrCreate()

In [264]:
print("Spark session initialized:", spark.conf.get("spark.master"))

Spark session initialized: spark://10.100.165.4:7077


In [265]:
df = spark.read.csv("/Users/vivek/Desktop/school/BIGDATA/heart.csv", header=True, inferSchema=True)
df.show()

+---+---+---+------+----+---+-------+--------+----+-------+---+---+-----+------+
|age|sex| cp|trtbps|chol|fbs|restecg|thalachh|exng|oldpeak|slp|caa|thall|output|
+---+---+---+------+----+---+-------+--------+----+-------+---+---+-----+------+
| 63|  1|  3|   145| 233|  1|      0|     150|   0|    2.3|  0|  0|    1|     1|
| 37|  1|  2|   130| 250|  0|      1|     187|   0|    3.5|  0|  0|    2|     1|
| 41|  0|  1|   130| 204|  0|      0|     172|   0|    1.4|  2|  0|    2|     1|
| 56|  1|  1|   120| 236|  0|      1|     178|   0|    0.8|  2|  0|    2|     1|
| 57|  0|  0|   120| 354|  0|      1|     163|   1|    0.6|  2|  0|    2|     1|
| 57|  1|  0|   140| 192|  0|      1|     148|   0|    0.4|  1|  0|    1|     1|
| 56|  0|  1|   140| 294|  0|      0|     153|   0|    1.3|  1|  0|    2|     1|
| 44|  1|  1|   120| 263|  0|      1|     173|   0|    0.0|  2|  0|    3|     1|
| 52|  1|  2|   172| 199|  1|      1|     162|   0|    0.5|  2|  0|    3|     1|
| 57|  1|  2|   150| 168|  0

In [266]:
rdd = spark.sparkContext.textFile("/Users/vivek/Desktop/school/BIGDATA/heart.csv")
print(rdd.take(5))  # This will print the first 5 lines if the file is accessible

['age,sex,cp,trtbps,chol,fbs,restecg,thalachh,exng,oldpeak,slp,caa,thall,output', '63,1,3,145,233,1,0,150,0,2.3,0,0,1,1', '37,1,2,130,250,0,1,187,0,3.5,0,0,2,1', '41,0,1,130,204,0,0,172,0,1.4,2,0,2,1', '56,1,1,120,236,0,1,178,0,0.8,2,0,2,1']


In [267]:
df = spark.read.csv("/Users/vivek/Desktop/school/BIGDATA/heart.csv", header=True, inferSchema=True)

In [268]:
# Rename columns (equivalent to your original column renaming)
new_cols = ["age", "sex", "cp", "trtbps", "chol", "fbs", "rest_ecg", "thalach", "exang", "oldpeak", "slope", "ca", "thal", "target"]
for old, new in zip(df.columns, new_cols):
    df = df.withColumnRenamed(old, new)

In [269]:
# Handle missing values
df = df.na.drop(subset=["age"])

In [270]:
# Feature preprocessing pipeline
# Identify numeric and categorical columns
numeric_cols = ["age", "trtbps", "thalach", "oldpeak"]
categorical_cols = ["sex", "cp", "exang", "slope", "ca", "thal"]


In [271]:
# Create preprocessing stages
stages = []

In [272]:
# One-hot encode categorical variables
for cat_col in categorical_cols:
    string_indexer = StringIndexer(inputCol=cat_col, outputCol=f"{cat_col}_index")
    onehot_encoder = OneHotEncoder(inputCols=[f"{cat_col}_index"], 
                                   outputCols=[f"{cat_col}_encoded"])
    stages.extend([string_indexer, onehot_encoder])


In [273]:
# Assemble numeric features
assembler_inputs = numeric_cols + [f"{col}_encoded" for col in categorical_cols]
vector_assembler = VectorAssembler(inputCols=assembler_inputs, outputCol="features")
stages.append(vector_assembler)

In [274]:
# Scale features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
stages.append(scaler)


In [275]:
# Create a logistic regression model
lr = LogisticRegression(featuresCol="scaled_features", labelCol="target")
stages.append(lr)


In [276]:
# Create and fit the pipeline
pipeline = Pipeline(stages=stages)
model = pipeline.fit(df)

In [277]:
# Split the data
train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)

In [278]:
# Make predictions
predictions = model.transform(test_df)

In [279]:
# Evaluate the model
evaluator = BinaryClassificationEvaluator(labelCol="target")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")


AUC: 0.931465517241378


24/11/28 13:10:38 WARN SparkConf: The configuration key 'spark.executor.port' has been deprecated as of Spark 2.0.0 and may be removed in the future. Not used anymore
24/11/28 13:10:38 WARN SparkConf: The configuration key 'spark.executor.port' has been deprecated as of Spark 2.0.0 and may be removed in the future. Not used anymore
24/11/28 13:10:38 WARN SparkConf: The configuration key 'spark.executor.port' has been deprecated as of Spark 2.0.0 and may be removed in the future. Not used anymore
24/11/28 13:10:38 WARN SparkConf: The configuration key 'spark.executor.port' has been deprecated as of Spark 2.0.0 and may be removed in the future. Not used anymore
24/11/28 13:10:38 WARN SparkConf: The configuration key 'spark.executor.port' has been deprecated as of Spark 2.0.0 and may be removed in the future. Not used anymore


In [280]:
multi_evaluator = MulticlassClassificationEvaluator(labelCol="target", metricName="accuracy")
accuracy = multi_evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

Accuracy: 0.8832684824902723


24/11/28 13:10:38 WARN SparkConf: The configuration key 'spark.executor.port' has been deprecated as of Spark 2.0.0 and may be removed in the future. Not used anymore
24/11/28 13:10:39 WARN SparkConf: The configuration key 'spark.executor.port' has been deprecated as of Spark 2.0.0 and may be removed in the future. Not used anymore
