In [1]:
import findspark
findspark.init('/home/ubuntu/spark-3.2.1-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('basics').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/23 05:27:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Read in dataset
data = spark.read.csv('Dataset/merged_data.csv', header=True, inferSchema=True)

##  Data reduction

In [3]:
from pyspark.sql.functions import when

# Encode 'Sex' column
data = data.withColumn("Sex", when(data["Sex"] == "Male", 0).when(data["Sex"] == "Female", 1))

In [4]:
from pyspark.sql.types import StringType

# Identify columns with string data type and store them in categorical_columns list
categorical_columns = [field.name for field in data.schema.fields if isinstance(field.dataType, StringType)]

# Show the list of categorical columns
print("Categorical columns:", categorical_columns)

Categorical columns: ['Diet', 'Country', 'Continent', 'Hemisphere', 'Age group']


In [5]:
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col


# Dictionary to store mappings
mappings = {}

# Apply StringIndexer to each categorical column and collect the mappings
for column in categorical_columns:
    indexer = StringIndexer(inputCol=column, outputCol=f"{column}_indexed")
    indexer_model = indexer.fit(data)
    data = indexer_model.transform(data)
    
    # Extract and print the mapping of original values to numerical values
    labels = indexer_model.labels
    mapping = {label: index for index, label in enumerate(labels)}
    mappings[column] = mapping
    print(f"Mapping for column '{column}': {mapping}")
    
    # Cast the indexed column to integer type
    data = data.withColumn(f"{column}_indexed", col(f"{column}_indexed").cast(IntegerType()))


                                                                                

Mapping for column 'Diet': {'Healthy': 0, 'Unhealthy': 1, 'Average': 2}
Mapping for column 'Country': {'Nigeria': 0, 'Germany': 1, 'United States': 2, 'Australia': 3, 'Italy': 4, 'United Kingdom': 5, 'South Africa': 6, 'Argentina': 7, 'Brazil': 8, 'France': 9, 'Colombia': 10, 'Spain': 11, 'Thailand': 12, 'Vietnam': 13, 'China': 14, 'India': 15, 'Japan': 16, 'Canada': 17, 'New Zealand': 18, 'South Korea': 19}
Mapping for column 'Continent': {'Asia': 0, 'Europe': 1, 'South America': 2, 'Africa': 3, 'North America': 4, 'Australia': 5}
Mapping for column 'Hemisphere': {'Northern Hemisphere': 0, 'Southern Hemisphere': 1}
Mapping for column 'Age group': {'Old': 0, 'Middle Age': 1, 'Young Adults': 2}


In [6]:
data.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Sex: integer (nullable = true)
 |-- Cholesterol: integer (nullable = true)
 |-- Systolic: integer (nullable = true)
 |-- Diastolic : integer (nullable = true)
 |-- Heart Rate: integer (nullable = true)
 |-- Diabetes: integer (nullable = true)
 |-- Family History (1: Yes): integer (nullable = true)
 |-- Smoking: integer (nullable = true)
 |-- Obesity: integer (nullable = true)
 |-- Alcohol Consumption: integer (nullable = true)
 |-- Exercise Hours Per Week: double (nullable = true)
 |-- Diet: string (nullable = true)
 |-- Previous Heart Problems (1 : Yes): integer (nullable = true)
 |-- Medication Use: integer (nullable = true)
 |-- Stress Level: integer (nullable = true)
 |-- Sedentary Hours Per Day: double (nullable = true)
 |-- Income: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- Triglycerides: integer (nullable = true)
 |-- Physical Activity Days Per Week: integer (nullable = true)
 |-- Sleep Hours Per Day: integer (

In [7]:
# Perform value counts for the variable "Heart Attack Risk (1: Yes)"
value_counts = data.groupBy("Age group_indexed").count()

# Show the value counts
value_counts.show()

+-----------------+-----+
|Age group_indexed|count|
+-----------------+-----+
|                1|  387|
|                2|  180|
|                0|  433|
+-----------------+-----+



In [8]:
# Drop the original categorical columns
data = data.drop(*categorical_columns)

In [9]:
data.printSchema()

root
 |-- Age: integer (nullable = true)
 |-- Sex: integer (nullable = true)
 |-- Cholesterol: integer (nullable = true)
 |-- Systolic: integer (nullable = true)
 |-- Diastolic : integer (nullable = true)
 |-- Heart Rate: integer (nullable = true)
 |-- Diabetes: integer (nullable = true)
 |-- Family History (1: Yes): integer (nullable = true)
 |-- Smoking: integer (nullable = true)
 |-- Obesity: integer (nullable = true)
 |-- Alcohol Consumption: integer (nullable = true)
 |-- Exercise Hours Per Week: double (nullable = true)
 |-- Previous Heart Problems (1 : Yes): integer (nullable = true)
 |-- Medication Use: integer (nullable = true)
 |-- Stress Level: integer (nullable = true)
 |-- Sedentary Hours Per Day: double (nullable = true)
 |-- Income: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- Triglycerides: integer (nullable = true)
 |-- Physical Activity Days Per Week: integer (nullable = true)
 |-- Sleep Hours Per Day: integer (nullable = true)
 |-- Heart Attack R

In [10]:
print(f"Number of columns: {len(data.columns)}")

Number of columns: 27


In [20]:
# Define the target column
target_column  = "Heart Attack Risk (1: Yes)"

# Define the feature columns (excluding the target column)
feature_columns  = [col for col in data.columns if col != target_column ]

# Split the DataFrame into X (features) and Y (target)
X = data.select(*feature_columns)
Y = data.select(target_column)

In [22]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.linalg import Vectors

# Combine feature columns into a single vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data_assembled= assembler.transform(data)

In [32]:
# Train a RandomForest model
rf = RandomForestClassifier(labelCol=target_column, featuresCol="features", numTrees=10)
model = rf.fit(data_assembled)

# Extract feature importance
feature_importance = model.featureImportances

# Combine feature names and importance scores
feature_names = feature_columns
feature_importance_dict = dict(zip(feature_names, feature_importance))

# Sort features by importance
sorted_features = sorted(feature_importance_dict.items(), key=lambda x: x[1], reverse=True)

# Print selected feature names with importance scores
#top_n = 12  # Specify the number of top features you want to print
#for feature, importance in sorted_features[:top_n]:
#    print(f"Feature: {feature}, Importance: {importance}")

# Print selected feature names with importance scores
top_n = 12  # Specify the number of top features you want to print
top_features = sorted_features[:top_n]

# Print the table header
print("Feature                 | Importance")
print("-----------------------------------")

# Print selected feature names with importance scores
for feature, importance in top_features:
    print(f"{feature:<24} | {importance:.2f}")

Feature                 | Importance
-----------------------------------
Income                   | 0.10
Cholesterol              | 0.10
Country_indexed          | 0.09
Diastolic                | 0.09
Exercise Hours Per Week  | 0.08
Heart Rate               | 0.07
Triglycerides            | 0.07
BMI                      | 0.06
Sedentary Hours Per Day  | 0.06
Age                      | 0.05
Stress Level             | 0.05
Systolic                 | 0.03


In [45]:
# Specify the column names you want to keep
columns_to_keep = ["Heart Attack Risk (1: Yes)", "Income", "Cholesterol","Country_indexed", "Diastolic ",
                   "Exercise Hours Per Week", "Heart Rate", "Triglycerides", "BMI", "Sedentary Hours Per Day","Age",
                   "Stress Level"]

# Select only the columns to keep
new_data = data_assembled.select(*columns_to_keep)

In [46]:
new_data.printSchema()

root
 |-- Heart Attack Risk (1: Yes): integer (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Cholesterol: integer (nullable = true)
 |-- Country_indexed: integer (nullable = true)
 |-- Diastolic : integer (nullable = true)
 |-- Exercise Hours Per Week: double (nullable = true)
 |-- Heart Rate: integer (nullable = true)
 |-- Triglycerides: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- Sedentary Hours Per Day: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Stress Level: integer (nullable = true)



In [47]:
# Coalesce the DataFrame to a single partition
new_data_single_partition = new_data.coalesce(1)

# Save the DataFrame to a single CSV file
new_data_single_partition.write.csv("new_data.csv", header=True)

In [48]:
# Read in dataset
new_data = spark.read.csv('Dataset/new_data.csv', header=True, inferSchema=True)

In [49]:
new_data.printSchema()

root
 |-- Heart Attack Risk (1: Yes): integer (nullable = true)
 |-- Income: integer (nullable = true)
 |-- Cholesterol: integer (nullable = true)
 |-- Country_indexed: integer (nullable = true)
 |-- Diastolic: integer (nullable = true)
 |-- Exercise Hours Per Week: double (nullable = true)
 |-- Heart Rate: integer (nullable = true)
 |-- Triglycerides: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- Sedentary Hours Per Day: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Stress Level: integer (nullable = true)



In [50]:
print(f"Number of rows: {new_data.count()}")
print(f"Number of columns: {len(new_data.columns)}")

Number of rows: 1000
Number of columns: 12


In [52]:
# Perform value counts for the variable "Heart Attack Risk (1: Yes)"
value_counts = new_data.groupBy("Heart Attack Risk (1: Yes)").count()

# Show the value counts
value_counts.show()

+--------------------------+-----+
|Heart Attack Risk (1: Yes)|count|
+--------------------------+-----+
|                         1|  509|
|                         0|  491|
+--------------------------+-----+



## 4.2 Data Projection

In [58]:
from pyspark.sql.functions import stddev

# Print header
print("{:<30} {:<20}".format("Column", "Standard Deviation"))
print("-------------------------------------------------------")

# Calculate and print the standard deviation for each column
for column in new_data.columns:
    std_dev = new_data.agg(stddev(column)).collect()[0][0]
    print("{:<30} | {:<20}".format(column, std_dev))

Column                         Standard Deviation  
-------------------------------------------------------
Heart Attack Risk (1: Yes)     | 0.5001691405606395  
Income                         | 80973.01257467821   
Cholesterol                    | 79.82551652252607   
Country_indexed                | 5.749640803508454   
Diastolic                      | 14.649868588135842  
Exercise Hours Per Week        | 5.735343348205532   
Heart Rate                     | 20.165317156345836  
Triglycerides                  | 221.82688765586357  
BMI                            | 6.298476971781967   
Sedentary Hours Per Day        | 3.5290078215488867  
Age                            | 21.32262719344007   
Stress Level                   | 2.8425418280201344  


In [59]:
from pyspark.sql.functions import log

# Log transformation of 'income' variable
log_income_values = new_data.select(log(new_data["Income"] + 1).alias("log_income"))

# Calculate the standard deviation after log transformation
std_dev_log_income = log_income_values.agg(stddev("log_income")).collect()[0][0]

# Print the standard deviation after log transformation
print("Standard deviation of log transformed income:", std_dev_log_income)

Standard deviation of log transformed income: 0.6780097873473351
