In [2]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.shell import spark
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType

ds = spark.read.csv("heart_disease_dataset.csv", header=True)

# Фильтрация строк с NULL значениями в столбце "Oldpeak"
ds = ds.filter(ds["Oldpeak"].isNull() == False)

# Просмотрим мета-данные датасета
ds.printSchema()

2024-06-18 02:03:16,567 WARN util.Utils: Your hostname, localhost.localdomain resolves to a loopback address: 127.0.0.1; using 10.0.2.15 instead (on interface enp0s3)
2024-06-18 02:03:16,568 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
2024-06-18 02:03:16,882 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/

Using Python version 3.8.11 (default, Aug 19 2021 15:49:35)
Spark context Web UI available at http://10.0.2.15:4040
Spark context available as 'sc' (master = local[*], app id = local-1718643798040).
SparkSession available as 'spark'.
root
 |-- Age: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- ChestPainType: string (nullable = true)
 |-- RestingBP: string (nullable = true)
 |-- Cholesterol: string (nullable = true)
 |-- FastingBS: string (nullable = true)
 |-- RestingECG: string (nullable = true)
 |-- MaxHR: string (nullable = true)
 |-- ExerciseAngina: string (nullable = true)
 |-- Oldpeak: string (nullable = true)
 |-- HeartDisease: string (nullable = true)



In [3]:
# Схема определилась нерпавильно, поэтому нужно задать ее вручную в соотвествии с тимами данных в таблице
schema = StructType([
    StructField("Age", IntegerType(), True),
    StructField("Sex", IntegerType(), True),
    StructField("ChestPainType", IntegerType(), True),
    StructField("RestingBP", IntegerType(), True),
    StructField("Cholesterol", IntegerType(), True),
    StructField("FastingBS", IntegerType(), True),
    StructField("RestingECG", IntegerType(), True),
    StructField("MaxHR", IntegerType(), True),
    StructField("ExerciseAngina", IntegerType(), True),
    StructField("Oldpeak", DoubleType(), True),
    StructField("HeartDisease", IntegerType(), True),

])

# Еще раз просматриваем csv файл с указанием схемы и убеждаемся, что все изменилось
df = spark.read.csv("heart_disease_dataset.csv", header=True, schema=schema)
df.printSchema() # Все вывелось корректно

root
 |-- Age: integer (nullable = true)
 |-- Sex: integer (nullable = true)
 |-- ChestPainType: integer (nullable = true)
 |-- RestingBP: integer (nullable = true)
 |-- Cholesterol: integer (nullable = true)
 |-- FastingBS: integer (nullable = true)
 |-- RestingECG: integer (nullable = true)
 |-- MaxHR: integer (nullable = true)
 |-- ExerciseAngina: integer (nullable = true)
 |-- Oldpeak: double (nullable = true)
 |-- HeartDisease: integer (nullable = true)



In [4]:
# Подготовка данных
# Создание трансформера и его применение
assembler = VectorAssembler(inputCols=["Age", "Sex", "ChestPainType", "RestingBP", "Cholesterol", "FastingBS", "RestingECG", "MaxHR", "ExerciseAngina", "Oldpeak"], outputCol="features")
df = assembler.transform(df)


# Разделяем на тестовую и обучающую выборки
train, test = df.randomSplit([0.7, 0.3])

# Проверка количества строк тренировки и теста
print("Кол-во записей для тренировки", train.count())
print("Кол-во записей для теста", test.count())

                                                                                

Кол-во записей для тренировки 210270




Кол-во записей для теста 89730


                                                                                

In [5]:
# Создаем линейную регрессию
lr = LinearRegression(featuresCol="features", labelCol="HeartDisease")

# Обучаем модель
lr_model = lr.fit(train)

# применяем модель к тестовому набору данных
predictions = lr_model.transform(test)

predictions.show()

# Вызываем оценщика
res = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='HeartDisease')


# представляет степень или показатель отделимости. ROC показывает, насколько модель способна различать классы
ROC_AUC = res.evaluate(predictions)
print("Точность линейной регрессии: ", ROC_AUC)

2024-06-18 02:03:44,267 WARN util.Instrumentation: [b250f38d] regParam is zero, which might cause numerical instability and overfitting.
2024-06-18 02:03:44,665 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
2024-06-18 02:03:44,666 WARN netlib.BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
2024-06-18 02:03:45,192 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
2024-06-18 02:03:45,192 WARN netlib.LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
                                                                                

+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------------------+------------+--------------------+-------------------+
|Age|Sex|ChestPainType|RestingBP|Cholesterol|FastingBS|RestingECG|MaxHR|ExerciseAngina|            Oldpeak|HeartDisease|            features|         prediction|
+---+---+-------------+---------+-----------+---------+----------+-----+--------------+-------------------+------------+--------------------+-------------------+
| 20|  0|            1|       81|        208|        0|         1|   63|             0|0.27034656931119816|           0|[20.0,0.0,1.0,81....|0.41327363095489655|
| 20|  0|            1|       81|        355|        0|         2|  120|             1| 2.4659171576119694|           0|[20.0,0.0,1.0,81....|  0.632776183039798|
| 20|  0|            1|       82|        581|        0|         2|   97|             0| 0.3523562588174398|           0|[20.0,0.0,1.0,82....| 0.5523433793443275|
| 20|  0|            1|     

                                                                                

Точность линейной регрессии:  0.9995771543197074


In [6]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, OneHotEncoder
from pyspark.ml.classification import RandomForestClassifier, LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

# Загрузка данных из файла (предполагается, что данные находятся в файле 'heart_data.csv')
data = spark.read.csv("heart_disease_dataset.csv", header=True, inferSchema=True)

# Преобразование категориальных признаков в числовые
categoricalCols = ['Sex', 'ChestPainType', 'RestingECG', 'ExerciseAngina']
stages = []
for categoricalCol in categoricalCols:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoder(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "Encoded"])
    stages += [stringIndexer, encoder]

# Создание вектора признаков
numericCols = ["Age", "Sex", "ChestPainType", "RestingBP", "Cholesterol", "FastingBS", "RestingECG", "MaxHR", "ExerciseAngina", "Oldpeak"]
assemblerInputs = [c + "Encoded" for c in categoricalCols] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

# Создание конвейера
pipeline = Pipeline(stages=stages)

# Разделение данных на обучающую и тестовую выборки
(trainingData, testData) = data.randomSplit([0.8, 0.2], seed=42)

# --- Метод SVM ---
best_accuracy_svm = 0
best_max_iter = 0

maxIter_list = [5, 10, 15, 20, 25]

print("--SVM--")
for maxIter in maxIter_list:
    # Создание модели SVM с текущим значением maxIter
    svm = LinearSVC(featuresCol="features", labelCol="HeartDisease", maxIter=maxIter)

    # Обучение конвейера с моделью SVM
    pipeline_svm = Pipeline(stages=stages + [svm])
    svm_model = pipeline_svm.fit(trainingData)

    # Предсказание на тестовой выборке с помощью SVM
    predictions_svm = svm_model.transform(testData)

    # Вычисление точности SVM
    evaluator = BinaryClassificationEvaluator(labelCol="HeartDisease", rawPredictionCol="prediction")
    accuracy_svm = evaluator.evaluate(predictions_svm)
    print(f"Точность SVM с maxIter={maxIter}: {accuracy_svm:.2f}")

    if accuracy_svm > best_accuracy_svm:
        best_accuracy_svm = accuracy_svm
        best_max_iter = maxIter
print("\n")

# --- Метод случайных лесов ---
best_accuracy_rf = 0
best_num_trees = 0

numTrees_list = [5, 10, 15, 20, 25]

print("--Random Trees--")
for numTrees in numTrees_list:
    # Создание модели случайных лесов
    rf = RandomForestClassifier(featuresCol="features", labelCol="HeartDisease", numTrees=numTrees)

    # Создание конвейера для случайных лесов
    pipeline_rf = Pipeline(stages=stages + [rf])

    # Обучение модели случайных лесов
    rf_model = pipeline_rf.fit(trainingData)

    # Предсказание на тестовой выборке с помощью случайных лесов
    predictions_rf = rf_model.transform(testData)

    # Вычисление точности случайных лесов
    accuracy_rf = evaluator.evaluate(predictions_rf)
    print(f"Точность случайных лесов с numTrees={numTrees}: {accuracy_rf:.2f}")

    if accuracy_rf > best_accuracy_rf:
        best_accuracy_rf = accuracy_rf
        best_num_trees = numTrees
print("\n")

print("Вывод лучших результатов")
# Вывод результатов
print(f"Лучшая точность SVM: {best_accuracy_svm:.2f} с maxIter={best_max_iter}")
print(f"Лучшая точность случайных лесов: {best_accuracy_rf:.2f} с numTrees={best_num_trees}")

--SVM--


                                                                                

Точность SVM с maxIter=5: 0.50


                                                                                

Точность SVM с maxIter=10: 0.50


                                                                                

Точность SVM с maxIter=15: 0.51


                                                                                

Точность SVM с maxIter=20: 0.52


                                                                                

Точность SVM с maxIter=25: 0.71


--Random Trees--


                                                                                

Точность случайных лесов с numTrees=5: 0.74


                                                                                

Точность случайных лесов с numTrees=10: 0.76


                                                                                

Точность случайных лесов с numTrees=15: 0.77


                                                                                

Точность случайных лесов с numTrees=20: 0.74


                                                                                

Точность случайных лесов с numTrees=25: 0.74


Вывод лучших результатов
Лучшая точность SVM: 0.71 с maxIter=25
Лучшая точность случайных лесов: 0.77 с numTrees=15


In [7]:
from pyspark.sql.functions import col, count, when

# Загружаем данные
df = spark.read.csv("disease_dataset.csv", header=True, inferSchema=True)

# Подсчитываем количество мужчин и женщин с сердечными заболеваниями
heart_disease_counts = df.filter(col("HeartDisease") == 1) \
    .groupBy("Sex") \
    .agg(count("HeartDisease").alias("Count")) \
    .withColumn("Sex", when(col("Sex") == 1, "Male").otherwise("Female"))

print("Количество мужчин и женщин с сердечными заболеваниями")
# Выводим результаты
heart_disease_counts.show()

# Дополнительные виды анализа

# 1. Средний возраст мужчин и женщин с сердечными заболеваниями
avg_age = df.filter(col("HeartDisease") == 1) \
    .groupBy("Sex") \
    .agg({"Age": "avg"}) \
    .withColumn("Sex", when(col("Sex") == 1, "Male").otherwise("Female"))

print("Средний возраст мужчин и женщин с сердечными заболеваниями")
avg_age.show()

# 2. Средний уровень холестерина у мужчин и женщин с сердечными заболеваниями
avg_cholesterol = df.filter(col("HeartDisease") == 1) \
    .groupBy("Sex") \
    .agg({"Cholesterol": "avg"}) \
    .withColumn("Sex", when(col("Sex") == 1, "Male").otherwise("Female"))

print("Средний уровень холестерина у мужчин и женщин с сердечными заболеваниями")
avg_cholesterol.show()

# 3. Количество людей с типичной ангиной среди мужчин и женщин с сердечными заболеваниями
chest_pain_counts = df.filter((col("HeartDisease") == 1) & (col("ChestPainType") == 1)) \
    .groupBy("Sex") \
    .agg(count("ChestPainType").alias("Count")) \
    .withColumn("Sex", when(col("Sex") == 1, "Male").otherwise("Female"))

print("Количество людей с ангиной среди мужчин и женщин с сердечными заболеваниями")
chest_pain_counts.show()

Количество мужчин и женщин с сердечными заболеваниями
+------+-----+
|   Sex|Count|
+------+-----+
|  Male|75971|
|Female|75578|
+------+-----+

Средний возраст мужчин и женщин с сердечными заболеваниями
+------+------------------+
|   Sex|          avg(Age)|
+------+------------------+
|  Male| 50.09166655697569|
|Female|49.925785281431104|
+------+------------------+

Средний уровень холестерина у мужчин и женщин с сердечными заболеваниями
+------+------------------+
|   Sex|  avg(Cholesterol)|
+------+------------------+
|  Male|347.80468863118824|
|Female| 349.0907539230993|
+------+------------------+

Количество людей с ангиной среди мужчин и женщин с сердечными заболеваниями
+------+-----+
|   Sex|Count|
+------+-----+
|  Male|19054|
|Female|18893|
+------+-----+

