In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when
from pyspark.sql.types import IntegerType, DoubleType

# Kreiranje Spark sesije
spark = SparkSession.builder.appName("HeartDiseaseAnalysis").getOrCreate()

# Putanja do dataset-a
file_path = "heart.csv"
output_path = "output_parquet"

try:
    # Učitavanje podataka u PySpark DataFrame
    df = spark.read.csv(file_path, header=True, inferSchema=True)

    # Funkcija za provjeru i uklanjanje nedostajućih vrijednosti
    def check_and_remove_missing_values(df):
        try:
            # Brojanje nedostajućih vrijednosti po stupcima
            missing_counts = df.select(
                [sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]
            )
            print("Nedostajućih vrijednosti po stupcima:")
            missing_counts.show()

            # Ako postoje nedostajuće vrijednosti, ukloni ih
            if missing_counts.count() > 1:
                df_cleaned = df.na.drop()
                print("Redovi s nedostajućim vrijednostima uklonjeni.")
                return df_cleaned
            else:
                print("Nema nedostajućih vrijednosti.")
                return df
        except Exception as e:
            print(f"Došlo je do pogreške pri provjeri nedostajućih vrijednosti: {e}")
            return df

    # Provodi provjeru i uklanjanje nedostajućih vrijednosti
    df = check_and_remove_missing_values(df)

    # Implementacija konverzije tipova podataka
    def convert_column_types(df):
        try:
            # Definiraj tipove koje trebaš za svaku kolonu
            df = df.withColumn("age", col("age").cast(IntegerType())) \
                   .withColumn("sex", col("sex").cast(IntegerType())) \
                   .withColumn("cp", col("cp").cast(IntegerType())) \
                   .withColumn("trestbps", col("trestbps").cast(IntegerType())) \
                   .withColumn("chol", col("chol").cast(IntegerType())) \
                   .withColumn("fbs", col("fbs").cast(IntegerType())) \
                   .withColumn("restecg", col("restecg").cast(IntegerType())) \
                   .withColumn("thalach", col("thalach").cast(IntegerType())) \
                   .withColumn("exang", col("exang").cast(IntegerType())) \
                   .withColumn("oldpeak", col("oldpeak").cast(DoubleType())) \
                   .withColumn("slope", col("slope").cast(IntegerType())) \
                   .withColumn("ca", col("ca").cast(IntegerType())) \
                   .withColumn("thal", col("thal").cast(IntegerType())) \
                   .withColumn("target", col("target").cast(IntegerType()))
            return df
        except Exception as e:
            print(f"Došlo je do pogreške pri konverziji tipova: {e}")
            return df

    # Provjera svih jedinstvenih vrijednosti u svakoj koloni
    def check_column_values(df, column_name, min_value, max_value):
        """Provjerava vrijednosti u zadanoj koloni i filtrira one koje nisu unutar opsega"""
        try:
            print(f"Provjera vrijednosti u stupcu '{column_name}':")
            # Prikazuje jedinstvene vrijednosti u koloni
            df.select(column_name).distinct().show()

            # Filtrira redove gdje vrijednosti nisu unutar željenog opsega
            invalid_values = df.filter(~col(column_name).between(min_value, max_value))
            if invalid_values.count() > 0:
                print(f"Uklanjanje nekonzistentnih vrijednosti u stupcu '{column_name}'")
                df = df.subtract(invalid_values)  # Uklanja redove s nekonzistentnim vrijednostima
                invalid_values.show()
            else:
                print(f"Nema nekonzistentnih vrijednosti u stupcu '{column_name}'")
            return df

        except Exception as e:
            print(f"Došlo je do pogreške pri provjeri stupca '{column_name}': {e}")
            return df

    # Provjera za 'age' (opseg 0-120)
    df = check_column_values(df, 'age', 0, 120)

    # Provjera za 'sex' (validne vrijednosti su 0 i 1)
    df = check_column_values(df, 'sex', 0, 1)

    # Provjera za 'cp' (validne vrijednosti su 0, 1, 2, 3)
    df = check_column_values(df, 'cp', 0, 3)

    # Provjera za 'trestbps' (opseg 0-300)
    df = check_column_values(df, 'trestbps', 0, 300)

    # Provjera za 'chol' (opseg 50-1000)
    df = check_column_values(df, 'chol', 50, 1000)

    # Provjera za 'fbs' (validne vrijednosti su 0 i 1)
    df = check_column_values(df, 'fbs', 0, 1)

    # Provjera za 'restecg' (validne vrijednosti su 0, 1, 2)
    df = check_column_values(df, 'restecg', 0, 2)

    # Provjera za 'thalach' (opseg 20-230)
    df = check_column_values(df, 'thalach', 20, 230)

    # Provjera za 'exang' (validne vrijednosti su 0 i 1)
    df = check_column_values(df, 'exang', 0, 1)

    # Provjera za 'oldpeak' (opseg -3 do 8)
    df = check_column_values(df, 'oldpeak', -3, 8)

    # Provjera za 'slope' (validne vrijednosti su 0, 1, 2)
    df = check_column_values(df, 'slope', 0, 2)

    # Provjera za 'ca' (opseg 0-4)
    df = check_column_values(df, 'ca', 0, 4)

    # Provjera za 'thal' (opseg 0-3)
    df = check_column_values(df, 'thal', 0, 3)

    # Provjera za 'target' (validne vrijednosti su 0 i 1)
    df = check_column_values(df, 'target', 0, 1)

    # Provodi konverziju tipova podataka
    df = convert_column_types(df)

    # Kreiranje nove značajke 'category' na temelju vrijednosti risk_score
    df = df.withColumn(
        "category", 
        when(col("age") > 60, "High Risk")
        .when(col("chol") > 250, "High Risk")
        .when(col("trestbps") > 140, "High Risk")
        .when(col("thalach") < 100, "High Risk")
        .otherwise("Low Risk")
    )

    # Kreiranje nove značajke 'age_groups' na temelju dobi
    df = df.withColumn(
        "age_groups",
        when(col("age") < 30, "Under 30")
        .when((col("age") >= 30) & (col("age") < 40), "30-40")
        .when((col("age") >= 40) & (col("age") < 50), "40-50")
        .when((col("age") >= 50) & (col("age") < 60), "50-60")
        .when(col("age") >= 60, "60+")
        .otherwise("Unknown")
    )

    df.show()

    # Spremanje rezultata u Parquet format
    try:
        df.write.parquet(output_path)
        print(f"Podaci uspješno spremljeni u Parquet format na {output_path}")

    except Exception as e:
        print(f"Došlo je do pogreške prilikom spremanja u Parquet format: {e}")

except Exception as e:
    print(f"Došlo je do pogreške prilikom učitavanja podataka: {e}")


Nedostajućih vrijednosti po stupcima:
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|age|sex| cp|trestbps|chol|fbs|restecg|thalach|exang|oldpeak|slope| ca|thal|target|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+
|  0|  0|  0|       0|   0|  0|      0|      0|    0|      0|    0|  0|   0|     0|
+---+---+---+--------+----+---+-------+-------+-----+-------+-----+---+----+------+

Nema nedostajućih vrijednosti.
Provjera vrijednosti u stupcu 'age':
+---+
|age|
+---+
| 65|
| 53|
| 34|
| 76|
| 44|
| 47|
| 52|
| 40|
| 57|
| 54|
| 48|
| 64|
| 41|
| 43|
| 37|
| 61|
| 35|
| 55|
| 59|
| 39|
+---+
only showing top 20 rows

Nema nekonzistentnih vrijednosti u stupcu 'age'
Provjera vrijednosti u stupcu 'sex':
+---+
|sex|
+---+
|  1|
|  0|
+---+

Nema nekonzistentnih vrijednosti u stupcu 'sex'
Provjera vrijednosti u stupcu 'cp':
+---+
| cp|
+---+
|  1|
|  3|
|  2|
|  0|
+---+

Nema nekonzistentnih vrijednosti u stupcu 'cp'
P

In [4]:
from pyspark.sql import functions as F

# Funkcija za analizu distribucije bolesti srca po dobi i spolu
def analyze_heart_disease_distribution(df):
    try:
        # Grupiranje podataka po dobnoj skupini i spolu, te izračunavanje distribucije targeta
        result = df.groupBy("age_groups", "sex", "target") \
            .agg(F.count("*").alias("count")) \
            .orderBy("age_groups", "sex", "target")

        # Prikaz rezultata
        print("Distribucija bolesti srca po dobi i spolu:")
        result.show()

        # Spremanje rezultata u Parquet format
        result.write.parquet("heart_disease_distribution_by_age_and_sex.parquet")
        print("Distribucija bolesti srca uspješno spremljena u Parquet format.")
    
    except Exception as e:
        print(f"Došlo je do pogreške pri analizi distribucije bolesti srca: {e}")

# Poziv funkcije za analizu
analyze_heart_disease_distribution(df)


Distribucija bolesti srca po dobi i spolu:
+----------+---+------+-----+
|age_groups|sex|target|count|
+----------+---+------+-----+
|     30-40|  0|     1|   17|
|     30-40|  1|     0|   15|
|     30-40|  1|     1|   21|
|     40-50|  0|     0|    4|
|     40-50|  0|     1|   55|
|     40-50|  1|     0|   76|
|     40-50|  1|     1|  102|
|     50-60|  0|     0|   35|
|     50-60|  0|     1|   74|
|     50-60|  1|     0|  181|
|     50-60|  1|     1|  132|
|       60+|  0|     0|   47|
|       60+|  0|     1|   80|
|       60+|  1|     0|  141|
|       60+|  1|     1|   41|
|  Under 30|  1|     1|    4|
+----------+---+------+-----+

Došlo je do pogreške pri analizi distribucije bolesti srca: [PATH_ALREADY_EXISTS] Path file:/app/heart_disease_distribution_by_age_and_sex.parquet already exists. Set mode as "overwrite" to overwrite the existing path.


In [5]:
def calculate_all_correlations(df):
    try:
        # Dobivanje svih numeričkih kolona u DataFrame-u
        numerical_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, (IntegerType, DoubleType))]

        # Kreiranje praznog rječnika za korelacije
        correlations = {}

        # Izračunavanje korelacije za svaku numeričku varijablu u odnosu na target
        for column in numerical_columns:
            correlation_value = df.stat.corr(column, "target")
            correlations[column] = abs(correlation_value)  # Apsolutna korelacija

        # Sortiranje korelacija od najveće prema najmanjoj
        sorted_correlations = sorted(correlations.items(), key=lambda item: item[1], reverse=True)

        # Ispis svih korelacija
        print("Korelacija svih numeričkih varijabli s bolesti srca (target):")
        for factor, correlation in sorted_correlations:
            print(f"{factor} - Korelacija: {correlation:.4f}")

        # Spremanje rezultata u Parquet format
        result_df = spark.createDataFrame(sorted_correlations, ["Factor", "Correlation"])
        result_df.write.parquet("all_correlations.parquet")
        print("Korelacije svih varijabli uspješno spremljene u Parquet format.")

    except Exception as e:
        print(f"Došlo je do pogreške pri analizi korelacija: {e}")

# Poziv funkcije za analizu svih korelacija
calculate_all_correlations(df)


Korelacija svih numeričkih varijabli s bolesti srca (target):
target - Korelacija: 1.0000
oldpeak - Korelacija: 0.4384
exang - Korelacija: 0.4380
cp - Korelacija: 0.4349
thalach - Korelacija: 0.4229
ca - Korelacija: 0.3821
slope - Korelacija: 0.3455
thal - Korelacija: 0.3378
sex - Korelacija: 0.2795
age - Korelacija: 0.2293
trestbps - Korelacija: 0.1388
restecg - Korelacija: 0.1345
chol - Korelacija: 0.1000
fbs - Korelacija: 0.0412


25/01/18 14:36:08 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

Korelacije svih varijabli uspješno spremljene u Parquet format.


In [11]:
def calculate_all_correlations(df):
    try:
        # Dobivanje svih numeričkih kolona u DataFrame-u
        numerical_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, (IntegerType, DoubleType)) and field.name != 'target']

        # Kreiranje praznog rječnika za korelacije
        correlations = {}

        # Izračunavanje korelacije za svaku numeričku varijablu u odnosu na target
        for column in numerical_columns:
            correlation_value = df.stat.corr(column, "target")
            correlations[column] = abs(correlation_value)  # Apsolutna korelacija

        # Sortiranje korelacija od najveće prema najmanjoj
        sorted_correlations = sorted(correlations.items(), key=lambda item: item[1], reverse=True)

        # Ispis svih korelacija
        print("Korelacija svih numeričkih varijabli s bolesti srca (target):")
        for factor, correlation in sorted_correlations:
            print(f"{factor} - Korelacija: {correlation:.4f}")

        # Spremanje rezultata u Parquet format
        result_df = spark.createDataFrame(sorted_correlations, ["Factor", "Correlation"])
        print("Top 3 najvažnija faktora korelirana s bolesti srca:")
        for i in range(3):
            factor, correlation = sorted_correlations[i]
            print(f"{i+1}. {factor} - Korelacija: {correlation:.4f}")

        # Spremanje rezultata u Parquet format
        result_df = spark.createDataFrame(sorted_correlations[:3], ["Factor", "Correlation"])
        result_df.write.parquet("top_3_significant_factors.parquet")
        print("Top 3 najvažnija faktora uspješno spremljena u Parquet format.")

    except Exception as e:
        print(f"Došlo je do pogreške pri analizi korelacija: {e}")

# Poziv funkcije za analizu svih korelacija
calculate_all_correlations(df)


Korelacija svih numeričkih varijabli s bolesti srca (target):
oldpeak - Korelacija: 0.4384
exang - Korelacija: 0.4380
cp - Korelacija: 0.4349
thalach - Korelacija: 0.4229
ca - Korelacija: 0.3821
slope - Korelacija: 0.3455
thal - Korelacija: 0.3378
sex - Korelacija: 0.2795
age - Korelacija: 0.2293
trestbps - Korelacija: 0.1388
restecg - Korelacija: 0.1345
chol - Korelacija: 0.1000
fbs - Korelacija: 0.0412
Top 3 najvažnija faktora korelirana s bolesti srca:
1. oldpeak - Korelacija: 0.4384
2. exang - Korelacija: 0.4380
3. cp - Korelacija: 0.4349
Došlo je do pogreške pri analizi korelacija: [PATH_ALREADY_EXISTS] Path file:/app/top_3_significant_factors.parquet already exists. Set mode as "overwrite" to overwrite the existing path.
