# BIG DATA ANALYSIS - FINAL COURSEWORK

# 1. Project Proposal 

## 1.1 Introduction

An extensive examination is being conducted utilizing a vast dataset that records all childbirths throughout the United States for the year 2018. This dataset, spanning 532.6 MB, comprehensively captures a myriad of variables including demographic factors, health indicators, and medical procedures related to childbirth, aiming to predict newborns' birth weight—an essential marker of neonatal health with substantial implications for immediate medical interventions and long-term health outcomes.

Accurate prediction of birth weight is vital, as it is integral in anticipating complications such as shoulder dystocia and managing conditions like preterm labor or growth restrictions effectively. This analysis not only investigates how maternal behaviors and demographic characteristics impact birth weight but also strives to identify patterns that could lead to more personalized prenatal care and more strategic healthcare resource allocation.

Sophisticated machine learning models are employed to sift through millions of data points to uncover patterns and establish predictive factors that can accurately forecast birth weight. These insights are expected to refine medical practices subtly and aid in more effective health planning.

This project serves to enhance health management strategies by providing data-driven insights that could lead to better healthcare outcomes. By leveraging big data and advanced analytics, the study provides a nuanced understanding of the factors influencing birth weight, facilitating the development of targeted interventions to prevent potential health issues in newborns and ensure better health outcomes. The approach is methodical and data-centric, emphasizing the importance of precise data handling and sophisticated analytical techniques, and aims to influence healthcare practices subtly without overhauling major medical protocols or public health policies.

In addition to the challenges of managing and analyzing a large-scale dataset, the practical aspects of utilizing PySpark on a cluster have posed significant operational challenges. Uploading the dataset to the HDFS system initially took about 20 minutes, and calculating correlation coefficients extended to approximately 80 minutes. Running machine learning models, especially tree-based algorithms, required up to 4-5 hours per model, with frequent system interruptions prolonging the process. To circumvent these issues, PySpark was installed on a local machine equipped with 16GB of RAM, configured to utilize 8GB for the Spark session. This adjustment enabled more efficient execution of the project, reducing the runtime to about 4-5 hours to process all codes. 

Therefore, it was necessary to submit the project in an already run state. This is particularly crucial because each run, whether calculating correlation coefficients or conducting hyperparameter tuning, yields slightly different but closely related results. Transitioning to running the project locally not only streamlined the process but also allowed for a consistent environment where repeated analyses and model trainings could be predictably managed within a reasonable timeframe.

## 1.2 Dataset

Initially, I tried to use PySpark installation on DSM10 cluster and followed the steps below to upload my dataset to HDFS and run it in PySpark on the cluster. However, due to the problems I mentioned before, I decided to continue my project on my local PySpark installation. I wrote and ran all the codes of the project in my local PySpark environment.

Source: https://www.kaggle.com/datasets/des137/us-births-2018/data

User Guide to the 2018 Fetal Death Public Use File: https://ftp.cdc.gov/pub/Health_Statistics/NCHS/Dataset_Documentation/DVS/fetaldeath/2018FetalUserGuide.pdf

### 1.2.1 Variables

### 1.2.2 Libraries

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, min, max
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression, GeneralizedLinearRegression
from pyspark.ml.regression import DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

### 1.2.3 Importing the Dataset 

This cell initializes a Spark session for a data processing application with specific configurations, including memory allocation for executors and an increased limit for debug string fields. It then loads a CSV file into a DataFrame with headers and inferred schema, and displays the contents of this DataFrame.

In [6]:
# Starting Spark Session
spark = SparkSession \
    .builder \
    .appName("My Spark Application") \
    .config("spark.executor.memory", "8g") \
    .config("spark.sql.debug.maxToStringFields", "100") \
    .master("local[*]") \
    .getOrCreate()


# Loading CSV file as DataFrame
birth_df = spark.read.format("csv") \
           .option("header", "true") \
           .option("inferSchema", "true") \
           .load("/Users/alperenunal/Desktop/Birth.csv")

# Showing DataFrame
birth_df.show()


[Stage 1:>                                                          (0 + 8) / 8]

+------+------+----+-----+----+-------+-------+----+------+------+------+------+------+--------+-----+------+-------+-------+------+------+------+-----+-------+------+-------+-----+-----------+-------+-----------+-----+------+-------+-------+-------+--------+------+-----+-------+--------+--------+--------+---+-------+-------+------+---------+---------+---------+------+----------+--------+--------+---------+---+------+
|ATTEND|BFACIL| BMI|CIG_0|DBWT|DLMP_MM|DLMP_YY|DMAR|DOB_MM|DOB_TT|DOB_WK|DOB_YY|DWgt_R|FAGECOMB|FEDUC|FHISPX|FRACE15|FRACE31|FRACE6|ILLB_R|ILOP_R|ILP_R|IMP_SEX|IP_GON|LD_INDL|MAGER|MAGE_IMPFLG|MAR_IMP|MBSTATE_REC|MEDUC|MHISPX|MM_AICU|MRACE15|MRACE31|MRACEIMP|MRAVE6|MTRAN|M_Ht_In|NO_INFEC|NO_MMORB|NO_RISKS|PAY|PAY_REC|PRECARE|PREVIS|PRIORDEAD|PRIORLIVE|PRIORTERM|PWgt_R|RDMETH_REC|RESTATUS|RF_CESAR|RF_CESARN|SEX|WTGAIN|
+------+------+----+-----+----+-------+-------+----+------+------+------+------+------+--------+-----+------+-------+-------+------+------+------+-----+----

                                                                                

In [7]:
spark.sparkContext.setLogLevel("ERROR")

Setting the log level of Spark to "ERROR" to minimize console output, showing only error messages and suppressing less severe log messages like info and debug. This helps in focusing on critical issues and declutters the output during runtime.

In [8]:
type(birth_df)

pyspark.sql.dataframe.DataFrame

In [9]:
birth_df.show(10, truncate=False)

+------+------+----+-----+----+-------+-------+----+------+------+------+------+------+--------+-----+------+-------+-------+------+------+------+-----+-------+------+-------+-----+-----------+-------+-----------+-----+------+-------+-------+-------+--------+------+-----+-------+--------+--------+--------+---+-------+-------+------+---------+---------+---------+------+----------+--------+--------+---------+---+------+
|ATTEND|BFACIL|BMI |CIG_0|DBWT|DLMP_MM|DLMP_YY|DMAR|DOB_MM|DOB_TT|DOB_WK|DOB_YY|DWgt_R|FAGECOMB|FEDUC|FHISPX|FRACE15|FRACE31|FRACE6|ILLB_R|ILOP_R|ILP_R|IMP_SEX|IP_GON|LD_INDL|MAGER|MAGE_IMPFLG|MAR_IMP|MBSTATE_REC|MEDUC|MHISPX|MM_AICU|MRACE15|MRACE31|MRACEIMP|MRAVE6|MTRAN|M_Ht_In|NO_INFEC|NO_MMORB|NO_RISKS|PAY|PAY_REC|PRECARE|PREVIS|PRIORDEAD|PRIORLIVE|PRIORTERM|PWgt_R|RDMETH_REC|RESTATUS|RF_CESAR|RF_CESARN|SEX|WTGAIN|
+------+------+----+-----+----+-------+-------+----+------+------+------+------+------+--------+-----+------+-------+-------+------+------+------+-----+----

In [10]:
birth_df.printSchema()

root
 |-- ATTEND: integer (nullable = true)
 |-- BFACIL: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- CIG_0: integer (nullable = true)
 |-- DBWT: integer (nullable = true)
 |-- DLMP_MM: integer (nullable = true)
 |-- DLMP_YY: integer (nullable = true)
 |-- DMAR: string (nullable = true)
 |-- DOB_MM: integer (nullable = true)
 |-- DOB_TT: integer (nullable = true)
 |-- DOB_WK: integer (nullable = true)
 |-- DOB_YY: integer (nullable = true)
 |-- DWgt_R: integer (nullable = true)
 |-- FAGECOMB: integer (nullable = true)
 |-- FEDUC: integer (nullable = true)
 |-- FHISPX: integer (nullable = true)
 |-- FRACE15: integer (nullable = true)
 |-- FRACE31: integer (nullable = true)
 |-- FRACE6: integer (nullable = true)
 |-- ILLB_R: integer (nullable = true)
 |-- ILOP_R: integer (nullable = true)
 |-- ILP_R: integer (nullable = true)
 |-- IMP_SEX: string (nullable = true)
 |-- IP_GON: string (nullable = true)
 |-- LD_INDL: string (nullable = true)
 |-- MAGER: integer (nullab

# 2. Implementation of the proposed project

#### Data Preparation and Initial Setup
* Data Upload to PySpark: I will start by loading the dataset into PySpark. This initial step is crucial for ensuring all subsequent data manipulations are grounded on a stable and accessible data foundation.
* Data Cleaning: After uploading, I'll conduct thorough checks for missing values and duplicate records. Although initial assessments show no missing values, the removal of duplicates will clean the dataset for more reliable analysis.
* Outlier Identification and Removal: I plan to use statistical methods to identify and remove outliers in the target variable 'DBWT'. This will help in reducing noise and improving the accuracy of the predictive models.

#### Feature Engineering and Optimization
* Categorical Feature Encoding: All categorical variables will be converted to numerical format using String Indexer, ensuring they are appropriately formatted for machine learning models.
* Feature Integration: Post-encoding, all features will be merged into a comprehensive dataset ready for further analysis.
* Correlation Analysis and Feature Reduction: Correlation coefficients will be calculated to identify features with minimal impact on the target variable, which will then be removed to streamline the model.

#### Model Development and Refinement
* Subsampling for Efficiency: Recognizing the computational limits of PySpark in handling large datasets, I'll reduce the dataset size to 750,000 records. This size strikes a balance between maintaining the predictive power of the model and managing computational resources effectively.
* Standardization of Features: I will standardize features using the StandardScaler. This normalization is essential for removing bias in model training due to scale differences among features.
* Advanced Model Training Techniques: Due to the time-intensive nature of operations in PySpark, I will forgo cross-validation and exhaustive hyperparameter tuning techniques like grid search and random search. Instead, simpler methods will be employed to find a suitable model configuration.

#### Testing and Validation
* Splitting Data: The final dataset will be split into training, validation, and test sets. This will facilitate the unbiased evaluation of the models.
* Machine Learning Model Implementation: I will implement several machine learning models, beginning with simpler ones like linear regression and progressing to more complex models such as decision trees and random forests.
* Model Evaluation and Selection: Each model's performance will be evaluated based on Mean Absolute Error (MAE). The best-performing model under set computational constraints will be selected for final testing.

#### Final Adjustments and Documentation
* Final Model Testing: The selected model will be tested on the test dataset to assess its performance and the accuracy of its predictions against actual values.
* Comprehensive Documentation: All steps from data preparation to final model testing will be meticulously documented in a Jupyter Notebook. This will include markdown cells that detail every decision and its rationale, ensuring clarity and reproducibility for examiners.


## 2.1. Data Cleaning

### 2.1.1Missing Value Detection

In [11]:
# Counting missing values
missing_values = birth_df.select([count(when(col(c).isNull(), c)).alias(c) for c in birth_df.columns])

# Displaying the counts of missing values
missing_values.show(vertical=True, truncate=False)


[Stage 4:>                                                          (0 + 8) / 8]

-RECORD 0----------
 ATTEND      | 0   
 BFACIL      | 0   
 BMI         | 0   
 CIG_0       | 0   
 DBWT        | 0   
 DLMP_MM     | 0   
 DLMP_YY     | 0   
 DMAR        | 0   
 DOB_MM      | 0   
 DOB_TT      | 0   
 DOB_WK      | 0   
 DOB_YY      | 0   
 DWgt_R      | 0   
 FAGECOMB    | 0   
 FEDUC       | 0   
 FHISPX      | 0   
 FRACE15     | 0   
 FRACE31     | 0   
 FRACE6      | 0   
 ILLB_R      | 0   
 ILOP_R      | 0   
 ILP_R       | 0   
 IMP_SEX     | 0   
 IP_GON      | 0   
 LD_INDL     | 0   
 MAGER       | 0   
 MAGE_IMPFLG | 0   
 MAR_IMP     | 0   
 MBSTATE_REC | 0   
 MEDUC       | 0   
 MHISPX      | 0   
 MM_AICU     | 0   
 MRACE15     | 0   
 MRACE31     | 0   
 MRACEIMP    | 0   
 MRAVE6      | 0   
 MTRAN       | 0   
 M_Ht_In     | 0   
 NO_INFEC    | 0   
 NO_MMORB    | 0   
 NO_RISKS    | 0   
 PAY         | 0   
 PAY_REC     | 0   
 PRECARE     | 0   
 PREVIS      | 0   
 PRIORDEAD   | 0   
 PRIORLIVE   | 0   
 PRIORTERM   | 0   
 PWgt_R      | 0   


                                                                                

### 2.1.2 Duplicated Value Detection

In [12]:
# Grouping duplicate records using all columns
duplicates = birth_df.groupBy(birth_df.columns)\
                     .count()\
                     .filter("count > 1")

# Displaying duplicate records
duplicates.show(truncate=False)


[Stage 13:>                                                         (0 + 2) / 2]

+------+------+----+-----+----+-------+-------+----+------+------+------+------+------+--------+-----+------+-------+-------+------+------+------+-----+-------+------+-------+-----+-----------+-------+-----------+-----+------+-------+-------+-------+--------+------+-----+-------+--------+--------+--------+---+-------+-------+------+---------+---------+---------+------+----------+--------+--------+---------+---+------+-----+
|ATTEND|BFACIL|BMI |CIG_0|DBWT|DLMP_MM|DLMP_YY|DMAR|DOB_MM|DOB_TT|DOB_WK|DOB_YY|DWgt_R|FAGECOMB|FEDUC|FHISPX|FRACE15|FRACE31|FRACE6|ILLB_R|ILOP_R|ILP_R|IMP_SEX|IP_GON|LD_INDL|MAGER|MAGE_IMPFLG|MAR_IMP|MBSTATE_REC|MEDUC|MHISPX|MM_AICU|MRACE15|MRACE31|MRACEIMP|MRAVE6|MTRAN|M_Ht_In|NO_INFEC|NO_MMORB|NO_RISKS|PAY|PAY_REC|PRECARE|PREVIS|PRIORDEAD|PRIORLIVE|PRIORTERM|PWgt_R|RDMETH_REC|RESTATUS|RF_CESAR|RF_CESARN|SEX|WTGAIN|count|
+------+------+----+-----+----+-------+-------+----+------+------+------+------+------+--------+-----+------+-------+-------+------+------+-----

                                                                                

In [13]:
# Getting the count of duplicate records
duplicate_count = duplicates.drop("count").distinct().count()
print(f"Total number of duplicate records: {duplicate_count}")




Total number of duplicate records: 36


                                                                                

In [14]:
# Removing duplicate records
birth_df = birth_df.dropDuplicates()

# Checking the size of the cleaned DataFrame
print("Number of records after removing duplicates:", birth_df.count())




Number of records after removing duplicates: 3801498


                                                                                

In [16]:
# Relocating the 'DBWT' column
# Obtaining all current column names
columns = birth_df.columns

# Finding and removing the 'DBWT' column from the list
columns.remove('DBWT')

# Appending the 'DBWT' column to the end of the list
columns.append('DBWT')

# Creating a new DataFrame by reordering the columns
birth_df = birth_df.select(columns)

# Printing the columns of the new DataFrame to check the new order
print("New column order in birth_df_reordered:")
print(birth_df.columns)


New column order in birth_df_reordered:
['ATTEND', 'BFACIL', 'BMI', 'CIG_0', 'DLMP_MM', 'DLMP_YY', 'DMAR', 'DOB_MM', 'DOB_TT', 'DOB_WK', 'DOB_YY', 'DWgt_R', 'FAGECOMB', 'FEDUC', 'FHISPX', 'FRACE15', 'FRACE31', 'FRACE6', 'ILLB_R', 'ILOP_R', 'ILP_R', 'IMP_SEX', 'IP_GON', 'LD_INDL', 'MAGER', 'MAGE_IMPFLG', 'MAR_IMP', 'MBSTATE_REC', 'MEDUC', 'MHISPX', 'MM_AICU', 'MRACE15', 'MRACE31', 'MRACEIMP', 'MRAVE6', 'MTRAN', 'M_Ht_In', 'NO_INFEC', 'NO_MMORB', 'NO_RISKS', 'PAY', 'PAY_REC', 'PRECARE', 'PREVIS', 'PRIORDEAD', 'PRIORLIVE', 'PRIORTERM', 'PWgt_R', 'RDMETH_REC', 'RESTATUS', 'RF_CESAR', 'RF_CESARN', 'SEX', 'WTGAIN', 'DBWT']


### 2.1.3 Dealing with the Outlier Values 

In [14]:
# Performing describe operation for the 'DBWT' column
describe_dbwt = birth_df.describe("DBWT")

# Approximate calculation of 1st Quartile, 2nd Quartile (Median), and 3rd Quartile for the 'DBWT' column
quartiles = birth_df.approxQuantile("DBWT", [0.25, 0.5, 0.75], 0.01)

# Displaying describe results and quartile values
print("Describe results for DBWT column:")
describe_dbwt.show()

# Printing quartile values
print("Quartile values for DBWT column:")
print(f"1st Quartile (25%): {quartiles[0]}")
print(f"Median (50%): {quartiles[1]}")
print(f"3rd Quartile (75%): {quartiles[2]}")


                                                                                

Describe results for DBWT column:




+-------+------------------+
|summary|              DBWT|
+-------+------------------+
|  count|           3801498|
|   mean|3266.9015419710863|
| stddev| 619.4212455351602|
|    min|               227|
|    max|              9999|
+-------+------------------+

Quartile values for DBWT column:
1st Quartile (25%): 2960.0
Median (50%): 3300.0
3rd Quartile (75%): 3629.0


                                                                                

9999 is outlier value which is described in metadata as 'unknown'. We will also check the other features to detection of outlier values.


In [15]:
# Calculating and printing min and max values for all columns
for c in birth_df.columns:
    # Calculating min and max values for the column
    min_max = birth_df.agg(
        min(col(c)).alias("min_value"), 
        max(col(c)).alias("max_value")
    ).collect()[0]  # Collect the results
    
    # Printing the results
    print(f"Column: {c}")
    print(f"  Min Value: {min_max['min_value']}")
    print(f"  Max Value: {min_max['max_value']}")
    print("\n")  # Space between columns


                                                                                

Column: ATTEND
  Min Value: 1
  Max Value: 9




                                                                                

Column: BFACIL
  Min Value: 1
  Max Value: 9




                                                                                

Column: BMI
  Min Value: 13.0
  Max Value: 99.9




                                                                                

Column: CIG_0
  Min Value: 0
  Max Value: 99




                                                                                

Column: DLMP_MM
  Min Value: 1
  Max Value: 99




                                                                                

Column: DLMP_YY
  Min Value: 2016
  Max Value: 9999




                                                                                

Column: DMAR
  Min Value:  
  Max Value: 2




                                                                                

Column: DOB_MM
  Min Value: 1
  Max Value: 12




                                                                                

Column: DOB_TT
  Min Value: 0
  Max Value: 9999




                                                                                

Column: DOB_WK
  Min Value: 1
  Max Value: 7




                                                                                

Column: DOB_YY
  Min Value: 2018
  Max Value: 2018




                                                                                

Column: DWgt_R
  Min Value: 100
  Max Value: 999




                                                                                

Column: FAGECOMB
  Min Value: 11
  Max Value: 99




                                                                                

Column: FEDUC
  Min Value: 1
  Max Value: 9




                                                                                

Column: FHISPX
  Min Value: 0
  Max Value: 9




                                                                                

Column: FRACE15
  Min Value: 1
  Max Value: 99




                                                                                

Column: FRACE31
  Min Value: 1
  Max Value: 99




                                                                                

Column: FRACE6
  Min Value: 1
  Max Value: 9




                                                                                

Column: ILLB_R
  Min Value: 3
  Max Value: 999




                                                                                

Column: ILOP_R
  Min Value: 3
  Max Value: 999




                                                                                

Column: ILP_R
  Min Value: 3
  Max Value: 999




                                                                                

Column: IMP_SEX
  Min Value:  
  Max Value: 1




                                                                                

Column: IP_GON
  Min Value: N
  Max Value: Y




                                                                                

Column: LD_INDL
  Min Value: N
  Max Value: Y




                                                                                

Column: MAGER
  Min Value: 12
  Max Value: 50




                                                                                

Column: MAGE_IMPFLG
  Min Value:  
  Max Value: 1




                                                                                

Column: MAR_IMP
  Min Value:  
  Max Value: 1




                                                                                

Column: MBSTATE_REC
  Min Value: 1
  Max Value: 3




                                                                                

Column: MEDUC
  Min Value: 1
  Max Value: 9




                                                                                

Column: MHISPX
  Min Value: 0
  Max Value: 9




                                                                                

Column: MM_AICU
  Min Value: N
  Max Value: Y




                                                                                

Column: MRACE15
  Min Value: 1
  Max Value: 15




                                                                                

Column: MRACE31
  Min Value: 1
  Max Value: 31




                                                                                

Column: MRACEIMP
  Min Value:  
  Max Value: 1




                                                                                

Column: MRAVE6
  Min Value: 1
  Max Value: 6




                                                                                

Column: MTRAN
  Min Value: N
  Max Value: Y




                                                                                

Column: M_Ht_In
  Min Value: 30
  Max Value: 99




                                                                                

Column: NO_INFEC
  Min Value: 0
  Max Value: 9




                                                                                

Column: NO_MMORB
  Min Value: 0
  Max Value: 9




                                                                                

Column: NO_RISKS
  Min Value: 0
  Max Value: 9




                                                                                

Column: PAY
  Min Value: 1
  Max Value: 9




                                                                                

Column: PAY_REC
  Min Value: 1
  Max Value: 9




                                                                                

Column: PRECARE
  Min Value: 0
  Max Value: 99




                                                                                

Column: PREVIS
  Min Value: 0
  Max Value: 99




                                                                                

Column: PRIORDEAD
  Min Value: 0
  Max Value: 99




                                                                                

Column: PRIORLIVE
  Min Value: 0
  Max Value: 99




                                                                                

Column: PRIORTERM
  Min Value: 0
  Max Value: 99




                                                                                

Column: PWgt_R
  Min Value: 75
  Max Value: 999




                                                                                

Column: RDMETH_REC
  Min Value: 1
  Max Value: 9




                                                                                

Column: RESTATUS
  Min Value: 1
  Max Value: 4




                                                                                

Column: RF_CESAR
  Min Value: N
  Max Value: Y




                                                                                

Column: RF_CESARN
  Min Value: 0
  Max Value: 99




                                                                                

Column: SEX
  Min Value: F
  Max Value: M




                                                                                

Column: WTGAIN
  Min Value: 0
  Max Value: 99




[Stage 198:>                                                        (0 + 8) / 8]

Column: DBWT
  Min Value: 227
  Max Value: 9999




                                                                                

Most of the features have outliers as '9', '99', '999' and '9999'. We have checked this maximum values above. Now we will remove these values to better analyse. First we remove outliers from the target value is 'DBWT'

In [17]:
# Dropping rows where 'DBWT' column equals 9999
birth_df= birth_df.filter(col('DBWT') != 9999)

# Counting the occurrences of 9999 in the 'DBWT' column
count_9999 = birth_df.filter(col('DBWT') == 9999).count()

# Printing the count of 9999 in the 'DBWT' column
print("Number of occurrences of 9999 in 'DBWT' column:", count_9999)

Number of occurrences of 9999 in 'DBWT' column: 0


In [18]:
# Checking count after removing outlier from the target feature
birth_df.count()

                                                                                

3798539

## 2.2 Feature Engineering 

### 2.2.1 Detecting Categorized Variables

In [18]:
# DataFrame columns and their data types
column_types = birth_df.dtypes

# Find non-numeric columns
non_numeric_columns = [column_name for column_name, dtype in column_types if dtype not in ('int', 'double', 'float', 'decimal')]

# Print non-numeric columns
print("Non-numeric columns:")
for column in non_numeric_columns:
    print(column)


Non-numeric columns:
DMAR
IMP_SEX
IP_GON
LD_INDL
MAGE_IMPFLG
MAR_IMP
MM_AICU
MRACEIMP
MTRAN
RF_CESAR
SEX


In [19]:
# Non-numeric features
non_numeric_features = [
    'DMAR', 'IMP_SEX', 'IP_GON', 'LD_INDL', 'MAGE_IMPFLG',
    'MAR_IMP', 'MM_AICU', 'MRACEIMP', 'MTRAN', 'RF_CESAR', 'SEX'
]

# Count the occurrences of each non-numeric feature
feature_counts = {}
for feature in non_numeric_features:
    count_value = birth_df.filter(col(feature).isNotNull()).count()
    feature_counts[feature] = count_value

# Print the counts of non-numeric features
print("Counts of non-numeric features:")
for feature, count_value in feature_counts.items():
    print(f"{feature}: {count_value}")



Counts of non-numeric features:
DMAR: 3798539
IMP_SEX: 3798539
IP_GON: 3798539
LD_INDL: 3798539
MAGE_IMPFLG: 3798539
MAR_IMP: 3798539
MM_AICU: 3798539
MRACEIMP: 3798539
MTRAN: 3798539
RF_CESAR: 3798539
SEX: 3798539


                                                                                

### 2.2.2 Checking index counts within categorised features

In [20]:
# Non-numeric features
non_numeric_features = [
    'DMAR', 'IMP_SEX', 'IP_GON', 'LD_INDL', 'MAGE_IMPFLG',
    'MAR_IMP', 'MM_AICU', 'MRACEIMP', 'MTRAN', 'RF_CESAR', 'SEX'
]

# Calculating and printing the counts of categories within each non-numeric feature
print("Counts of categories within non-numeric features:")
for feature in non_numeric_features:
    # Grouping by each category and count the occurrences
    category_counts = birth_df.groupBy(feature).count().orderBy("count", ascending=False)
    
    # Printing the results
    print(f"\nCounts for feature '{feature}':")
    category_counts.show()


Counts of categories within non-numeric features:

Counts for feature 'DMAR':


                                                                                

+----+-------+
|DMAR|  count|
+----+-------+
|   1|2006308|
|   2|1336271|
|    | 455960|
+----+-------+


Counts for feature 'IMP_SEX':


                                                                                

+-------+-------+
|IMP_SEX|  count|
+-------+-------+
|       |3798473|
|      1|     66|
+-------+-------+


Counts for feature 'IP_GON':


                                                                                

+------+-------+
|IP_GON|  count|
+------+-------+
|     N|3778726|
|     Y|  11733|
|     U|   8080|
+------+-------+


Counts for feature 'LD_INDL':


                                                                                

+-------+-------+
|LD_INDL|  count|
+-------+-------+
|      N|2767455|
|      Y|1027082|
|      U|   4002|
+-------+-------+


Counts for feature 'MAGE_IMPFLG':


                                                                                

+-----------+-------+
|MAGE_IMPFLG|  count|
+-----------+-------+
|           |3798157|
|          1|    382|
+-----------+-------+


Counts for feature 'MAR_IMP':


                                                                                

+-------+-------+
|MAR_IMP|  count|
+-------+-------+
|       |3796670|
|      1|   1869|
+-------+-------+


Counts for feature 'MM_AICU':


                                                                                

+-------+-------+
|MM_AICU|  count|
+-------+-------+
|      N|3789205|
|      Y|   6287|
|      U|   3047|
+-------+-------+


Counts for feature 'MRACEIMP':


                                                                                

+--------+-------+
|MRACEIMP|  count|
+--------+-------+
|        |3558995|
|       1| 239544|
+--------+-------+


Counts for feature 'MTRAN':


                                                                                

+-----+-------+
|MTRAN|  count|
+-----+-------+
|    N|3777712|
|    Y|  19251|
|    U|   1576|
+-----+-------+


Counts for feature 'RF_CESAR':


                                                                                

+--------+-------+
|RF_CESAR|  count|
+--------+-------+
|       N|3201377|
|       Y| 594654|
|       U|   2508|
+--------+-------+


Counts for feature 'SEX':




+---+-------+
|SEX|  count|
+---+-------+
|  M|1941660|
|  F|1856879|
+---+-------+



                                                                                

### 2.2.3 Converting categorised features' indices to numeric

In [19]:
# Converting the 'DMAR' column to numerical indices using StringIndexer
string_indexer = StringIndexer(inputCol='DMAR', outputCol='DMAR_Indexed', handleInvalid='skip')

# Fitting and transforming the model on the DataFrame
birth_df_indexed = string_indexer.fit(birth_df).transform(birth_df)

# Showing the indexed column
birth_df_indexed.select('DMAR', 'DMAR_Indexed').show()


[Stage 42:>                                                         (0 + 1) / 1]

+----+------------+
|DMAR|DMAR_Indexed|
+----+------------+
|   1|         0.0|
|   2|         1.0|
|   1|         0.0|
|   1|         0.0|
|   1|         0.0|
|   1|         0.0|
|   1|         0.0|
|   2|         1.0|
|   1|         0.0|
|   1|         0.0|
|   1|         0.0|
|   2|         1.0|
|   1|         0.0|
|   1|         0.0|
|   1|         0.0|
|   1|         0.0|
|   1|         0.0|
|    |         2.0|
|    |         2.0|
|   2|         1.0|
+----+------------+
only showing top 20 rows



                                                                                

In [22]:
# Grouping the DataFrame by 'DMAR_Indexed' and counting the occurrences of each index
indexed_counts = birth_df_indexed.groupBy('DMAR_Indexed').count().orderBy('DMAR_Indexed')

# Showing the counts of each index (for checking)
indexed_counts.show()




+------------+-------+
|DMAR_Indexed|  count|
+------------+-------+
|         0.0|2006308|
|         1.0|1336271|
|         2.0| 455960|
+------------+-------+



                                                                                

In [20]:
string_indexer_imp_sex = StringIndexer(inputCol='IMP_SEX', outputCol='IMP_SEX_indexed', handleInvalid='skip')
birth_df_indexed_imp_sex = string_indexer_imp_sex.fit(birth_df).transform(birth_df)

                                                                                

In [24]:
index_counts = birth_df_indexed_imp_sex.groupBy('IMP_SEX_indexed').count()
index_counts.show() # For checking



+---------------+-------+
|IMP_SEX_indexed|  count|
+---------------+-------+
|            0.0|3798473|
|            1.0|     66|
+---------------+-------+



                                                                                

In [21]:
string_indexer_IP_GON = StringIndexer(inputCol='IP_GON', outputCol='IP_GON_indexed', handleInvalid='skip')
birth_df_indexed_IP_GON = string_indexer_IP_GON.fit(birth_df).transform(birth_df)

                                                                                

In [22]:
string_indexer_LD_INDL = StringIndexer(inputCol='LD_INDL', outputCol='LD_INDL_indexed', handleInvalid='skip')
birth_df_indexed_LD_INDL = string_indexer_LD_INDL.fit(birth_df).transform(birth_df)

                                                                                

In [23]:
string_indexer_MAGE_IMPFLG = StringIndexer(inputCol='MAGE_IMPFLG', outputCol='MAGE_IMPFLG_indexed', handleInvalid='skip')
birth_df_indexed_MAGE_IMPFLG = string_indexer_MAGE_IMPFLG.fit(birth_df).transform(birth_df)

                                                                                

In [24]:
string_indexer_MAR_IMP = StringIndexer(inputCol='MAR_IMP', outputCol='MAR_IMP_indexed', handleInvalid='skip')
birth_df_indexed_MAR_IMP = string_indexer_MAR_IMP.fit(birth_df).transform(birth_df)


                                                                                

In [25]:
string_indexer_MM_AICU = StringIndexer(inputCol='MM_AICU', outputCol='MM_AICU_indexed', handleInvalid='skip')
birth_df_indexed_MM_AICU = string_indexer_MM_AICU.fit(birth_df).transform(birth_df)


                                                                                

In [26]:
string_indexer_MRACEIMP = StringIndexer(inputCol='MRACEIMP', outputCol='MRACEIMP_indexed', handleInvalid='skip')
birth_df_indexed_MRACEIMP = string_indexer_MRACEIMP.fit(birth_df).transform(birth_df)

                                                                                

In [27]:
string_indexer_MTRAN = StringIndexer(inputCol='MTRAN', outputCol='MTRAN_indexed', handleInvalid='skip')
birth_df_indexed_MTRAN = string_indexer_MTRAN.fit(birth_df).transform(birth_df)

                                                                                

In [28]:
string_indexer_RF_CESAR = StringIndexer(inputCol='RF_CESAR', outputCol='RF_CESAR_indexed', handleInvalid='skip')
birth_df_indexed_RF_CESAR = string_indexer_RF_CESAR.fit(birth_df).transform(birth_df)

                                                                                

In [29]:
string_indexer_SEX = StringIndexer(inputCol='SEX', outputCol='SEX_indexed', handleInvalid='skip')
birth_df_indexed_SEX = string_indexer_SEX.fit(birth_df).transform(birth_df)

                                                                                

### 2.2.4 Combining all the indexed features in the same dataframe

In [30]:
# We will create new dataframe to prevent any confusion
# All columns and their output column names
columns_to_index = [
    ('DMAR', 'DMAR_Indexed'),
    ('IMP_SEX', 'IMP_SEX_indexed'),
    ('IP_GON', 'IP_GON_indexed'),
    ('LD_INDL', 'LD_INDL_indexed'),
    ('MAGE_IMPFLG', 'MAGE_IMPFLG_indexed'),
    ('MAR_IMP', 'MAR_IMP_indexed'),
    ('MM_AICU', 'MM_AICU_indexed'),
    ('MRACEIMP', 'MRACEIMP_indexed'),
    ('MTRAN', 'MTRAN_indexed'),
    ('RF_CESAR', 'RF_CESAR_indexed'),
    ('SEX', 'SEX_indexed')
]

# Creating StringIndexer objects
indexers = [StringIndexer(inputCol=column, outputCol=indexed_name, handleInvalid='skip') for column, indexed_name in columns_to_index]

# Creating a Pipeline and add all indexers
pipeline = Pipeline(stages=indexers)

# Fitting the Pipeline and performing the transformation
model = pipeline.fit(birth_df)
birth_df_indexed2 = model.transform(birth_df)

# Showing the results
birth_df_indexed2.show()




+------+------+----+-----+-------+-------+----+------+------+------+------+------+--------+-----+------+-------+-------+------+------+------+-----+-------+------+-------+-----+-----------+-------+-----------+-----+------+-------+-------+-------+--------+------+-----+-------+--------+--------+--------+---+-------+-------+------+---------+---------+---------+------+----------+--------+--------+---------+---+------+----+------------+---------------+--------------+---------------+-------------------+---------------+---------------+----------------+-------------+----------------+-----------+
|ATTEND|BFACIL| BMI|CIG_0|DLMP_MM|DLMP_YY|DMAR|DOB_MM|DOB_TT|DOB_WK|DOB_YY|DWgt_R|FAGECOMB|FEDUC|FHISPX|FRACE15|FRACE31|FRACE6|ILLB_R|ILOP_R|ILP_R|IMP_SEX|IP_GON|LD_INDL|MAGER|MAGE_IMPFLG|MAR_IMP|MBSTATE_REC|MEDUC|MHISPX|MM_AICU|MRACE15|MRACE31|MRACEIMP|MRAVE6|MTRAN|M_Ht_In|NO_INFEC|NO_MMORB|NO_RISKS|PAY|PAY_REC|PRECARE|PREVIS|PRIORDEAD|PRIORLIVE|PRIORTERM|PWgt_R|RDMETH_REC|RESTATUS|RF_CESAR|RF_CESARN|S

                                                                                

In [31]:
birth_df_indexed2.printSchema()

root
 |-- ATTEND: integer (nullable = true)
 |-- BFACIL: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- CIG_0: integer (nullable = true)
 |-- DLMP_MM: integer (nullable = true)
 |-- DLMP_YY: integer (nullable = true)
 |-- DMAR: string (nullable = true)
 |-- DOB_MM: integer (nullable = true)
 |-- DOB_TT: integer (nullable = true)
 |-- DOB_WK: integer (nullable = true)
 |-- DOB_YY: integer (nullable = true)
 |-- DWgt_R: integer (nullable = true)
 |-- FAGECOMB: integer (nullable = true)
 |-- FEDUC: integer (nullable = true)
 |-- FHISPX: integer (nullable = true)
 |-- FRACE15: integer (nullable = true)
 |-- FRACE31: integer (nullable = true)
 |-- FRACE6: integer (nullable = true)
 |-- ILLB_R: integer (nullable = true)
 |-- ILOP_R: integer (nullable = true)
 |-- ILP_R: integer (nullable = true)
 |-- IMP_SEX: string (nullable = true)
 |-- IP_GON: string (nullable = true)
 |-- LD_INDL: string (nullable = true)
 |-- MAGER: integer (nullable = true)
 |-- MAGE_IMPFLG: string (

After combining the indexed features into the one dataframe, we notice that all the features are in the same dataframe (indexed features and string features).
Now let's remove string features from the dataset for preventing any confusion.

In [32]:
columns_to_remove = [
    'DMAR', 'IMP_SEX', 'IP_GON', 'LD_INDL', 'MAGE_IMPFLG',
    'MAR_IMP', 'MM_AICU', 'MRACEIMP', 'MTRAN', 'RF_CESAR', 'SEX'
]

# Columns to remove
birth_df_indexed2 = birth_df_indexed2.drop(*columns_to_remove)

# Checking updated dataframe
birth_df_indexed2.printSchema()

root
 |-- ATTEND: integer (nullable = true)
 |-- BFACIL: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- CIG_0: integer (nullable = true)
 |-- DLMP_MM: integer (nullable = true)
 |-- DLMP_YY: integer (nullable = true)
 |-- DOB_MM: integer (nullable = true)
 |-- DOB_TT: integer (nullable = true)
 |-- DOB_WK: integer (nullable = true)
 |-- DOB_YY: integer (nullable = true)
 |-- DWgt_R: integer (nullable = true)
 |-- FAGECOMB: integer (nullable = true)
 |-- FEDUC: integer (nullable = true)
 |-- FHISPX: integer (nullable = true)
 |-- FRACE15: integer (nullable = true)
 |-- FRACE31: integer (nullable = true)
 |-- FRACE6: integer (nullable = true)
 |-- ILLB_R: integer (nullable = true)
 |-- ILOP_R: integer (nullable = true)
 |-- ILP_R: integer (nullable = true)
 |-- MAGER: integer (nullable = true)
 |-- MBSTATE_REC: integer (nullable = true)
 |-- MEDUC: integer (nullable = true)
 |-- MHISPX: integer (nullable = true)
 |-- MRACE15: integer (nullable = true)
 |-- MRACE31: int

In [33]:
# We assign the indexed dataframe as the actual dataframe.
birth_df = birth_df_indexed2

# Checking birth_df
birth_df.show()



+------+------+----+-----+-------+-------+------+------+------+------+------+--------+-----+------+-------+-------+------+------+------+-----+-----+-----------+-----+------+-------+-------+------+-------+--------+--------+--------+---+-------+-------+------+---------+---------+---------+------+----------+--------+---------+------+----+------------+---------------+--------------+---------------+-------------------+---------------+---------------+----------------+-------------+----------------+-----------+
|ATTEND|BFACIL| BMI|CIG_0|DLMP_MM|DLMP_YY|DOB_MM|DOB_TT|DOB_WK|DOB_YY|DWgt_R|FAGECOMB|FEDUC|FHISPX|FRACE15|FRACE31|FRACE6|ILLB_R|ILOP_R|ILP_R|MAGER|MBSTATE_REC|MEDUC|MHISPX|MRACE15|MRACE31|MRAVE6|M_Ht_In|NO_INFEC|NO_MMORB|NO_RISKS|PAY|PAY_REC|PRECARE|PREVIS|PRIORDEAD|PRIORLIVE|PRIORTERM|PWgt_R|RDMETH_REC|RESTATUS|RF_CESARN|WTGAIN|DBWT|DMAR_Indexed|IMP_SEX_indexed|IP_GON_indexed|LD_INDL_indexed|MAGE_IMPFLG_indexed|MAR_IMP_indexed|MM_AICU_indexed|MRACEIMP_indexed|MTRAN_indexed|RF_CESAR_i

                                                                                

## 2.3 Feature Removing

### 2.3.1 Using Correlation Coefficients

In [38]:
numeric_columns = [col_name for col_name in birth_df.columns if col_name != 'DBWT']

# Calculating correlations between 'DBWT' and other numeric columns
for column in numeric_columns:
    try:
        # Correlation calculation, we use try-except in case some columns are non-numeric
        correlation = birth_df.stat.corr('DBWT', column)
        print(f"Correlation (DBWT and {column}): {correlation}")
    except Exception as e:
        print(f"Correlation cannot be calculated (DBWT and {column}): {str(e)}")


                                                                                

Correlation (DBWT and ATTEND): 0.0660329770633437


                                                                                

Correlation (DBWT and BFACIL): 0.03120878300362332


                                                                                

Correlation (DBWT and BMI): 0.0071328572690093454


                                                                                

Correlation (DBWT and CIG_0): -0.048750976492003065


                                                                                

Correlation (DBWT and DLMP_MM): -0.0323062242770345


                                                                                

Correlation (DBWT and DLMP_YY): -0.033061624253114326


                                                                                

Correlation (DBWT and DOB_MM): -0.0042733679044357105


                                                                                

Correlation (DBWT and DOB_TT): -0.0018932687452942242


                                                                                

Correlation (DBWT and DOB_WK): -0.006829815057485014


                                                                                

Correlation (DBWT and DOB_YY): nan


                                                                                

Correlation (DBWT and DWgt_R): 0.0460267044238054


                                                                                

Correlation (DBWT and FAGECOMB): -0.08568928670926604


                                                                                

Correlation (DBWT and FEDUC): -0.029638674588220133


                                                                                

Correlation (DBWT and FHISPX): -0.09072464099949766


                                                                                

Correlation (DBWT and FRACE15): -0.08587819216034948


                                                                                

Correlation (DBWT and FRACE31): -0.08522003841307921


                                                                                

Correlation (DBWT and FRACE6): -0.10241051603247536


                                                                                

Correlation (DBWT and ILLB_R): -0.059775552103827526


                                                                                

Correlation (DBWT and ILOP_R): -0.0036739743943222858


                                                                                

Correlation (DBWT and ILP_R): -0.05413116901850391


                                                                                

Correlation (DBWT and MAGER): 0.04358959198978101


                                                                                

Correlation (DBWT and MBSTATE_REC): -0.010984354417201605


                                                                                

Correlation (DBWT and MEDUC): 0.0716610191588086


                                                                                

Correlation (DBWT and MHISPX): -0.01101865583384784


                                                                                

Correlation (DBWT and MRACE15): -0.043436842553733465


                                                                                

Correlation (DBWT and MRACE31): -0.046920797710985666


                                                                                

Correlation (DBWT and MRAVE6): -0.07706181143924574


                                                                                

Correlation (DBWT and M_Ht_In): 0.09980814487475127


                                                                                

Correlation (DBWT and NO_INFEC): -0.0034626024776994247


                                                                                

Correlation (DBWT and NO_MMORB): -0.003659172086732437


                                                                                

Correlation (DBWT and NO_RISKS): 0.06550881590644084


                                                                                

Correlation (DBWT and PAY): 0.036609483590699994


                                                                                

Correlation (DBWT and PAY_REC): 0.052176204232438565


                                                                                

Correlation (DBWT and PRECARE): -0.04131400552202143


                                                                                

Correlation (DBWT and PREVIS): 0.005310742100463932


                                                                                

Correlation (DBWT and PRIORDEAD): -0.005891309250678859


                                                                                

Correlation (DBWT and PRIORLIVE): -3.662841113028343e-05


                                                                                

Correlation (DBWT and PRIORTERM): -0.009220157645208392


                                                                                

Correlation (DBWT and PWgt_R): 0.006195557466352074


                                                                                

Correlation (DBWT and RDMETH_REC): -0.05105177913348051


                                                                                

Correlation (DBWT and RESTATUS): -0.014704503402084006


                                                                                

Correlation (DBWT and RF_CESARN): -0.0061281239357344405


                                                                                

Correlation (DBWT and WTGAIN): 0.10533617104254864


                                                                                

Correlation (DBWT and DMAR_Indexed): -0.06100427750777923


                                                                                

Correlation (DBWT and IMP_SEX_indexed): -0.00664996737774643


                                                                                

Correlation (DBWT and IP_GON_indexed): -0.028030904985367602


                                                                                

Correlation (DBWT and LD_INDL_indexed): 0.06050996951441731


                                                                                

Correlation (DBWT and MAGE_IMPFLG_indexed): -0.0035099104496965176


                                                                                

Correlation (DBWT and MAR_IMP_indexed): -0.011799609272491423


                                                                                

Correlation (DBWT and MM_AICU_indexed): -0.023301753292826924


                                                                                

Correlation (DBWT and MRACEIMP_indexed): -0.007238610970448987


                                                                                

Correlation (DBWT and MTRAN_indexed): -0.10036424106809785


                                                                                

Correlation (DBWT and RF_CESAR_indexed): 0.015320283669879706




Correlation (DBWT and SEX_indexed): -0.09645641914661501


                                                                                

Let's see the count of 'DOB_YY' indexes to see why its correlation coefficient couldn't be calculated.

In [34]:
index_counts = birth_df.groupBy('DOB_YY').count()
index_counts.show()



+------+-------+
|DOB_YY|  count|
+------+-------+
|  2018|3798539|
+------+-------+



                                                                                

As can be seen, the 'DOB_YY' column has just a '2018' value, so its correlation coefficient could not be calculated. Now, we can drop 'DOB_YY'. We will also drop the features that are not statistically significant with the target value (which has the most minor correlation coefficient).

In [35]:
columns_to_drop = [
    "PRIORLIVE", "DOB_TT", "NO_INFEC", "MAGE_IMPFLG_indexed",
    "NO_MMORB", "ILOP_R", "DOB_MM", "DOB_YY", "PREVIS", "PRIORDEAD",
    "RF_CESARN", "PWgt_R", "IMP_SEX_indexed", "DOB_WK",
    "BMI", "MRACEIMP_indexed", "PRIORTERM", "MBSTATE_REC",
    "MHISPX", "MAR_IMP_indexed", "RESTATUS", "RF_CESAR_indexed",
    "MM_AICU_indexed", "IP_GON_indexed", "FEDUC"
]

# Features removing
birth_df_cleaned = birth_df.drop(*columns_to_drop)

# Checking dataset
birth_df_cleaned.printSchema()


root
 |-- ATTEND: integer (nullable = true)
 |-- BFACIL: integer (nullable = true)
 |-- CIG_0: integer (nullable = true)
 |-- DLMP_MM: integer (nullable = true)
 |-- DLMP_YY: integer (nullable = true)
 |-- DWgt_R: integer (nullable = true)
 |-- FAGECOMB: integer (nullable = true)
 |-- FHISPX: integer (nullable = true)
 |-- FRACE15: integer (nullable = true)
 |-- FRACE31: integer (nullable = true)
 |-- FRACE6: integer (nullable = true)
 |-- ILLB_R: integer (nullable = true)
 |-- ILP_R: integer (nullable = true)
 |-- MAGER: integer (nullable = true)
 |-- MEDUC: integer (nullable = true)
 |-- MRACE15: integer (nullable = true)
 |-- MRACE31: integer (nullable = true)
 |-- MRAVE6: integer (nullable = true)
 |-- M_Ht_In: integer (nullable = true)
 |-- NO_RISKS: integer (nullable = true)
 |-- PAY: integer (nullable = true)
 |-- PAY_REC: integer (nullable = true)
 |-- PRECARE: integer (nullable = true)
 |-- RDMETH_REC: integer (nullable = true)
 |-- WTGAIN: integer (nullable = true)
 |-- DBWT:

After cleaning those features, we will recheck the outlier values using the most updated data frame. We will check it with the max-min value calculation as we did before.

### 2.3.2 Removing Outliers From Updated Features

In [42]:
# Calculating and printing min and max values for all columns
for c in birth_df_cleaned.columns:
    # Calculating min and max values for the column
    min_max = birth_df_cleaned.agg(
        min(col(c)).alias("min_value"), 
        max(col(c)).alias("max_value")
    ).collect()[0]  # Collect the results
    
    # Printing the results
    print(f"Column: {c}")
    print(f"  Min Value: {min_max['min_value']}")
    print(f"  Max Value: {min_max['max_value']}")
    print("\n")  # Space between columns


                                                                                

Column: ATTEND
  Min Value: 1
  Max Value: 9




                                                                                

Column: BFACIL
  Min Value: 1
  Max Value: 9




                                                                                

Column: CIG_0
  Min Value: 0
  Max Value: 99




                                                                                

Column: DLMP_MM
  Min Value: 1
  Max Value: 99




                                                                                

Column: DLMP_YY
  Min Value: 2016
  Max Value: 9999




                                                                                

Column: DWgt_R
  Min Value: 100
  Max Value: 999




                                                                                

Column: FAGECOMB
  Min Value: 11
  Max Value: 99




                                                                                

Column: FHISPX
  Min Value: 0
  Max Value: 9




                                                                                

Column: FRACE15
  Min Value: 1
  Max Value: 99




                                                                                

Column: FRACE31
  Min Value: 1
  Max Value: 99




                                                                                

Column: FRACE6
  Min Value: 1
  Max Value: 9




                                                                                

Column: ILLB_R
  Min Value: 3
  Max Value: 999




                                                                                

Column: ILP_R
  Min Value: 3
  Max Value: 999




                                                                                

Column: MAGER
  Min Value: 12
  Max Value: 50




                                                                                

Column: MEDUC
  Min Value: 1
  Max Value: 9




                                                                                

Column: MRACE15
  Min Value: 1
  Max Value: 15




                                                                                

Column: MRACE31
  Min Value: 1
  Max Value: 31




                                                                                

Column: MRAVE6
  Min Value: 1
  Max Value: 6




                                                                                

Column: M_Ht_In
  Min Value: 30
  Max Value: 99




                                                                                

Column: NO_RISKS
  Min Value: 0
  Max Value: 9




                                                                                

Column: PAY
  Min Value: 1
  Max Value: 9




                                                                                

Column: PAY_REC
  Min Value: 1
  Max Value: 9




                                                                                

Column: PRECARE
  Min Value: 0
  Max Value: 99




                                                                                

Column: RDMETH_REC
  Min Value: 1
  Max Value: 9




                                                                                

Column: WTGAIN
  Min Value: 0
  Max Value: 99




                                                                                

Column: DBWT
  Min Value: 227
  Max Value: 8165




                                                                                

Column: DMAR_Indexed
  Min Value: 0.0
  Max Value: 2.0




                                                                                

Column: LD_INDL_indexed
  Min Value: 0.0
  Max Value: 2.0




                                                                                

Column: MTRAN_indexed
  Min Value: 0.0
  Max Value: 2.0






Column: SEX_indexed
  Min Value: 0.0
  Max Value: 1.0




                                                                                

We will create differently named data frames (step by step) for cleaning outlier values so that if we encounter any problems, they will be easier to fix.

In [36]:
# Filtering out values of 99 and 9999 in the columns 'WTGAIN', 'DLMP_MM', and 'DLMP_YY'
birth_df_cleaned = birth_df_cleaned.filter((col('WTGAIN') != 99) & (col('DLMP_MM') != 99) & (col('DLMP_YY') != 9999))


birth_df_cleaned.count()


                                                                                

3528678

In [37]:
# Filtering out values of 9
birth_df_cleaned2 =birth_df_cleaned.filter((col('ATTEND') != 9) & (col('BFACIL') != 9) & (col('FHISPX') != 9) & (col('FRACE6') != 9)
                                          & (col('MEDUC') != 9) & (col('NO_RISKS') != 9) & (col('PAY') != 9) & (col('PAY_REC') != 9)
                                          & (col('RDMETH_REC') != 9))
birth_df_cleaned2.count()

                                                                                

2870165

In [38]:
# Filtering out values of 99
birth_df_cleaned99 =birth_df_cleaned2.filter((col('CIG_0') != 99) & (col('DLMP_MM') != 99) & (col('FAGECOMB') != 99) & (col('FRACE15') != 99)
                                          & (col('FRACE31') != 99) & (col('M_Ht_In') != 99) & (col('PRECARE') != 99) & (col('WTGAIN') != 99))
birth_df_cleaned99.count()

                                                                                

2817742

In [39]:
# Filtering out values of 999
birth_df_cleaned999 =birth_df_cleaned99.filter((col('DWgt_R') != 999) & (col('ILLB_R') != 999) & (col('ILP_R') != 999))
birth_df_cleaned999.count()

                                                                                

2522662

In [40]:
# Filtering out values of 9999
birth_df_cleaned9999 =birth_df_cleaned999.filter((col('DLMP_YY') != 9999))
birth_df_cleaned9999.count()

                                                                                

2522662

After removing all the outliers, we will check again the maximum values.

In [41]:
# Calculating and printing min and max values for all columns
for c in birth_df_cleaned9999.columns:
    # Calculating min and max values for the column
    min_max = birth_df_cleaned9999.agg(
        min(c).alias("min_value"), 
        max(c).alias("max_value")
    ).collect()[0]  # Collect the results
    
    # Printing the results
    print(f"Column: {c}")
    print(f"  Min Value: {min_max['min_value']}")
    print(f"  Max Value: {min_max['max_value']}")
    print("\n")  # Space between columns


                                                                                

Column: ATTEND
  Min Value: 1
  Max Value: 5




                                                                                

Column: BFACIL
  Min Value: 1
  Max Value: 7




                                                                                

Column: CIG_0
  Min Value: 0
  Max Value: 98




                                                                                

Column: DLMP_MM
  Min Value: 1
  Max Value: 12




                                                                                

Column: DLMP_YY
  Min Value: 2016
  Max Value: 2018




                                                                                

Column: DWgt_R
  Min Value: 100
  Max Value: 400




                                                                                

Column: FAGECOMB
  Min Value: 13
  Max Value: 95




                                                                                

Column: FHISPX
  Min Value: 0
  Max Value: 6




                                                                                

Column: FRACE15
  Min Value: 1
  Max Value: 15




                                                                                

Column: FRACE31
  Min Value: 1
  Max Value: 31




                                                                                

Column: FRACE6
  Min Value: 1
  Max Value: 6




                                                                                

Column: ILLB_R
  Min Value: 3
  Max Value: 888




                                                                                

Column: ILP_R
  Min Value: 3
  Max Value: 888




                                                                                

Column: MAGER
  Min Value: 13
  Max Value: 50




                                                                                

Column: MEDUC
  Min Value: 1
  Max Value: 8




                                                                                

Column: MRACE15
  Min Value: 1
  Max Value: 15




                                                                                

Column: MRACE31
  Min Value: 1
  Max Value: 31




                                                                                

Column: MRAVE6
  Min Value: 1
  Max Value: 6




                                                                                

Column: M_Ht_In
  Min Value: 30
  Max Value: 78




                                                                                

Column: NO_RISKS
  Min Value: 0
  Max Value: 1




                                                                                

Column: PAY
  Min Value: 1
  Max Value: 8




                                                                                

Column: PAY_REC
  Min Value: 1
  Max Value: 4




                                                                                

Column: PRECARE
  Min Value: 0
  Max Value: 10




                                                                                

Column: RDMETH_REC
  Min Value: 1
  Max Value: 4




                                                                                

Column: WTGAIN
  Min Value: 0
  Max Value: 98




                                                                                

Column: DBWT
  Min Value: 227
  Max Value: 8165




                                                                                

Column: DMAR_Indexed
  Min Value: 0.0
  Max Value: 2.0




                                                                                

Column: LD_INDL_indexed
  Min Value: 0.0
  Max Value: 2.0




                                                                                

Column: MTRAN_indexed
  Min Value: 0.0
  Max Value: 2.0






Column: SEX_indexed
  Min Value: 0.0
  Max Value: 1.0




                                                                                

As can be seen, the dataset currently does not have any outlier values. After this cleaning process, we should check the correlation coefficients again to keep working on the most statistically significant features.

### 2.3.3 Checking Updated Features' Correlations

In [50]:
final_features = [col_name for col_name in birth_df_cleaned9999.columns if col_name != 'DBWT']

# Calculating correlations between 'DBWT' and other numeric columns
for column in final_features:
    try:
        correlation = birth_df_cleaned9999.stat.corr('DBWT', column)
        print(f"Correlation (DBWT and {column}): {correlation}")
    except Exception as e:
        print(f"Correlation cannot be calculated (DBWT and {column}): {str(e)}")


                                                                                

Correlation (DBWT and ATTEND): 0.07066990487057992


                                                                                

Correlation (DBWT and BFACIL): 0.044346545871795796


                                                                                

Correlation (DBWT and CIG_0): -0.052011493990799976


                                                                                

Correlation (DBWT and DLMP_MM): 0.009392019740550036


                                                                                

Correlation (DBWT and DLMP_YY): -0.062315232907290304


                                                                                

Correlation (DBWT and DWgt_R): 0.17489805628027008


                                                                                

Correlation (DBWT and FAGECOMB): 0.023302882468250016


                                                                                

Correlation (DBWT and FHISPX): -0.01858438793333863


                                                                                

Correlation (DBWT and FRACE15): -0.05004145710715298


                                                                                

Correlation (DBWT and FRACE31): -0.05332330585403293


                                                                                

Correlation (DBWT and FRACE6): -0.08274550250097432


                                                                                

Correlation (DBWT and ILLB_R): -0.06967029673904312


                                                                                

Correlation (DBWT and ILP_R): -0.057462304771920825


                                                                                

Correlation (DBWT and MAGER): 0.03814412126263935


                                                                                

Correlation (DBWT and MEDUC): 0.061565998047461705


                                                                                

Correlation (DBWT and MRACE15): -0.051311084283976544


                                                                                

Correlation (DBWT and MRACE31): -0.05210714492406016


                                                                                

Correlation (DBWT and MRAVE6): -0.08321829799405411


                                                                                

Correlation (DBWT and M_Ht_In): 0.15523970779096785


                                                                                

Correlation (DBWT and NO_RISKS): 0.07385529184202033


                                                                                

Correlation (DBWT and PAY): 0.03570230737122791


                                                                                

Correlation (DBWT and PAY_REC): 0.05820072206630511


                                                                                

Correlation (DBWT and PRECARE): 0.0048387047356537585


                                                                                

Correlation (DBWT and RDMETH_REC): -0.045989733321320676


                                                                                

Correlation (DBWT and WTGAIN): 0.16375883439288283


                                                                                

Correlation (DBWT and DMAR_Indexed): -0.04775230288927009


                                                                                

Correlation (DBWT and LD_INDL_indexed): 0.05543428934300409


                                                                                

Correlation (DBWT and MTRAN_indexed): -0.08784871498769718




Correlation (DBWT and SEX_indexed): -0.10207051384557955


                                                                                

#### Removing Features with the Least Correlation Coefficients

In [42]:
columns_to_drop = ["DLMP_MM", "PRECARE", "FHISPX", "FAGECOMB"]

# Features removing
birth_df_cleaned9999 = birth_df_cleaned9999.drop(*columns_to_drop)

# Checking dataset
birth_df_cleaned9999.printSchema()


root
 |-- ATTEND: integer (nullable = true)
 |-- BFACIL: integer (nullable = true)
 |-- CIG_0: integer (nullable = true)
 |-- DLMP_YY: integer (nullable = true)
 |-- DWgt_R: integer (nullable = true)
 |-- FRACE15: integer (nullable = true)
 |-- FRACE31: integer (nullable = true)
 |-- FRACE6: integer (nullable = true)
 |-- ILLB_R: integer (nullable = true)
 |-- ILP_R: integer (nullable = true)
 |-- MAGER: integer (nullable = true)
 |-- MEDUC: integer (nullable = true)
 |-- MRACE15: integer (nullable = true)
 |-- MRACE31: integer (nullable = true)
 |-- MRAVE6: integer (nullable = true)
 |-- M_Ht_In: integer (nullable = true)
 |-- NO_RISKS: integer (nullable = true)
 |-- PAY: integer (nullable = true)
 |-- PAY_REC: integer (nullable = true)
 |-- RDMETH_REC: integer (nullable = true)
 |-- WTGAIN: integer (nullable = true)
 |-- DBWT: integer (nullable = true)
 |-- DMAR_Indexed: double (nullable = false)
 |-- LD_INDL_indexed: double (nullable = false)
 |-- MTRAN_indexed: double (nullable = f

In [43]:
birth_df_cleaned9999.count()

                                                                                

2522662

Now, we have the most significant values to work on. However, the dataset's size is so huge that we will use the 'subsampling' method to work on PySpark more rapidly. Otherwise, PySpark would be down or severely slow. 

### 2.3.4 Subsampling / Reducing Dataset Size

In [44]:
fraction = 750000 / 2522662  # Approximate ratio calculation
sampled_df = birth_df_cleaned9999.sample(withReplacement=False, fraction=fraction)

# Checking the size of the new DataFrame
print(f"Sampled DataFrame count: {sampled_df.count()}")


[Stage 321:>                                                        (0 + 8) / 9]

Sampled DataFrame count: 750073


                                                                                

In [45]:
sampled_df.printSchema()

root
 |-- ATTEND: integer (nullable = true)
 |-- BFACIL: integer (nullable = true)
 |-- CIG_0: integer (nullable = true)
 |-- DLMP_YY: integer (nullable = true)
 |-- DWgt_R: integer (nullable = true)
 |-- FRACE15: integer (nullable = true)
 |-- FRACE31: integer (nullable = true)
 |-- FRACE6: integer (nullable = true)
 |-- ILLB_R: integer (nullable = true)
 |-- ILP_R: integer (nullable = true)
 |-- MAGER: integer (nullable = true)
 |-- MEDUC: integer (nullable = true)
 |-- MRACE15: integer (nullable = true)
 |-- MRACE31: integer (nullable = true)
 |-- MRAVE6: integer (nullable = true)
 |-- M_Ht_In: integer (nullable = true)
 |-- NO_RISKS: integer (nullable = true)
 |-- PAY: integer (nullable = true)
 |-- PAY_REC: integer (nullable = true)
 |-- RDMETH_REC: integer (nullable = true)
 |-- WTGAIN: integer (nullable = true)
 |-- DBWT: integer (nullable = true)
 |-- DMAR_Indexed: double (nullable = false)
 |-- LD_INDL_indexed: double (nullable = false)
 |-- MTRAN_indexed: double (nullable = f

### 2.3.5 Scaling Features with 'StandartScaler'

In [46]:
# Select all numeric columns except 'DBWT'
features = [c for c in sampled_df.columns if c != 'DBWT']

# Create VectorAssembler and StandardScaler for each feature and add the results as a new column
stages = []
for feature in features:
    # Create VectorAssembler
    assembler = VectorAssembler(inputCols=[feature], outputCol=feature + "_vec")
    
    # Create StandardScaler
    scaler = StandardScaler(inputCol=feature + "_vec", outputCol=feature + "_scaled", withStd=True, withMean=True)
    
    # Add these operations to the Pipeline
    stages.append(assembler)
    stages.append(scaler)

# Create a Pipeline
pipeline = Pipeline(stages=stages)

# Apply the Pipeline
model = pipeline.fit(sampled_df)
sampled_df_scaled = model.transform(sampled_df)

# Show the newly created columns
sampled_df_scaled.select(['DBWT'] + [c + "_scaled" for c in features]).show()


[Stage 477:>                                                        (0 + 1) / 1]

+----+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+--------------------+--------------------+
|DBWT|       ATTEND_scaled|       BFACIL_scaled|        CIG_0_scaled|      DLMP_YY_scaled|       DWgt_R_scaled|      FRACE15_scaled|      FRACE31_scaled|       FRACE6_scaled|       ILLB_R_scaled|        ILP_R_scaled|        MAGER_scaled|        MEDUC_scaled|      MRACE15_scaled|      MRACE31_scaled|       MRAVE6_scaled|      M_Ht_In_scaled|     NO_RISKS_scaled|          PAY_scaled|      PAY_REC_scaled|   RDMETH_REC_scaled|       WTGAIN_scaled| DMAR_Indexed_scale

                                                                                

Probably, the dataset has all the vectorised and scaled values together. Let's check them:

In [47]:
sampled_df_scaled.columns

['ATTEND',
 'BFACIL',
 'CIG_0',
 'DLMP_YY',
 'DWgt_R',
 'FRACE15',
 'FRACE31',
 'FRACE6',
 'ILLB_R',
 'ILP_R',
 'MAGER',
 'MEDUC',
 'MRACE15',
 'MRACE31',
 'MRAVE6',
 'M_Ht_In',
 'NO_RISKS',
 'PAY',
 'PAY_REC',
 'RDMETH_REC',
 'WTGAIN',
 'DBWT',
 'DMAR_Indexed',
 'LD_INDL_indexed',
 'MTRAN_indexed',
 'SEX_indexed',
 'ATTEND_vec',
 'ATTEND_scaled',
 'BFACIL_vec',
 'BFACIL_scaled',
 'CIG_0_vec',
 'CIG_0_scaled',
 'DLMP_YY_vec',
 'DLMP_YY_scaled',
 'DWgt_R_vec',
 'DWgt_R_scaled',
 'FRACE15_vec',
 'FRACE15_scaled',
 'FRACE31_vec',
 'FRACE31_scaled',
 'FRACE6_vec',
 'FRACE6_scaled',
 'ILLB_R_vec',
 'ILLB_R_scaled',
 'ILP_R_vec',
 'ILP_R_scaled',
 'MAGER_vec',
 'MAGER_scaled',
 'MEDUC_vec',
 'MEDUC_scaled',
 'MRACE15_vec',
 'MRACE15_scaled',
 'MRACE31_vec',
 'MRACE31_scaled',
 'MRAVE6_vec',
 'MRAVE6_scaled',
 'M_Ht_In_vec',
 'M_Ht_In_scaled',
 'NO_RISKS_vec',
 'NO_RISKS_scaled',
 'PAY_vec',
 'PAY_scaled',
 'PAY_REC_vec',
 'PAY_REC_scaled',
 'RDMETH_REC_vec',
 'RDMETH_REC_scaled',
 'WTGAIN_ve

Now, we will create the most updated final data frame using the model creation process. We need to work with just scaled features.

In [48]:
scaled_features = [col_name for col_name in sampled_df_scaled.columns if '_scaled' in col_name] + ['DBWT']
final_df = sampled_df_scaled.select(scaled_features)

final_df.show()



+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+--------------------+--------------------+----+
|       ATTEND_scaled|       BFACIL_scaled|        CIG_0_scaled|      DLMP_YY_scaled|       DWgt_R_scaled|      FRACE15_scaled|      FRACE31_scaled|       FRACE6_scaled|       ILLB_R_scaled|        ILP_R_scaled|        MAGER_scaled|        MEDUC_scaled|      MRACE15_scaled|      MRACE31_scaled|       MRAVE6_scaled|      M_Ht_In_scaled|     NO_RISKS_scaled|          PAY_scaled|      PAY_REC_scaled|   RDMETH_REC_scaled|       WTGAIN_scaled| DMAR_Indexed_scaled|LD_

                                                                                

In [49]:
final_df.printSchema()

root
 |-- ATTEND_scaled: vector (nullable = true)
 |-- BFACIL_scaled: vector (nullable = true)
 |-- CIG_0_scaled: vector (nullable = true)
 |-- DLMP_YY_scaled: vector (nullable = true)
 |-- DWgt_R_scaled: vector (nullable = true)
 |-- FRACE15_scaled: vector (nullable = true)
 |-- FRACE31_scaled: vector (nullable = true)
 |-- FRACE6_scaled: vector (nullable = true)
 |-- ILLB_R_scaled: vector (nullable = true)
 |-- ILP_R_scaled: vector (nullable = true)
 |-- MAGER_scaled: vector (nullable = true)
 |-- MEDUC_scaled: vector (nullable = true)
 |-- MRACE15_scaled: vector (nullable = true)
 |-- MRACE31_scaled: vector (nullable = true)
 |-- MRAVE6_scaled: vector (nullable = true)
 |-- M_Ht_In_scaled: vector (nullable = true)
 |-- NO_RISKS_scaled: vector (nullable = true)
 |-- PAY_scaled: vector (nullable = true)
 |-- PAY_REC_scaled: vector (nullable = true)
 |-- RDMETH_REC_scaled: vector (nullable = true)
 |-- WTGAIN_scaled: vector (nullable = true)
 |-- DMAR_Indexed_scaled: vector (nullable =

## 2.4 Model Creation

In [50]:
# Separating the data set into training, validation and test sets
train_df, val_df, test_df = final_df.randomSplit([0.6, 0.2, 0.2], seed=42)

## 2.4.1 Model 1: Linear Regression

In [53]:
# Combining features into a vector
assembler = VectorAssembler(inputCols=[col for col in train_df.columns if col != 'DBWT'], outputCol='features')
train_vector = assembler.transform(train_df)
val_vector = assembler.transform(val_df)

# Creating a Linear Regression Model
lr = LinearRegression(featuresCol='features', labelCol='DBWT')

# Training the Model
model = lr.fit(train_vector)

# Making Predictions on the Validation Dataset
val_predictions = model.transform(val_vector)

# Evaluating Model Performance using MAE
evaluator = RegressionEvaluator(labelCol='DBWT', predictionCol='prediction', metricName='mae')
val_mae = evaluator.evaluate(val_predictions)
print(f'Validation MAE: {val_mae}')




Validation MAE: 396.2107096071563


                                                                                

#### L1 Regularisation with different 'regParams' values:

In [56]:
# L1
regParams = [0.001, 0.01, 0.1, 1.0]  
best_mae = float('inf')
best_mae = float('inf')
best_param = None

for param in regParams:
    lr = LinearRegression(featuresCol='features', labelCol='DBWT', elasticNetParam=1.0, regParam=param)
    model = lr.fit(train_vector)
    val_predictions = model.transform(val_vector)
    val_mae = evaluator.evaluate(val_predictions)
    print(f'regParam: {param} -> Validation MAE: {val_mae}')
    
    # Find the best MAE
    if val_mae < best_mae:
        best_mae = val_mae
        best_param = param

print(f'Best regParam: {best_param} with MAE: {best_mae}')


                                                                                

regParam: 0.001 -> Validation MAE: 396.2103679901393


                                                                                

regParam: 0.01 -> Validation MAE: 395.71766738693265


                                                                                

regParam: 0.1 -> Validation MAE: 396.2025458722118




regParam: 1.0 -> Validation MAE: 396.20127345974277
Best regParam: 0.01 with MAE: 395.71766738693265


                                                                                

#### L2 Regularisation with different 'regParams' values:

In [57]:
# L2
regParams = [0.001, 0.01, 0.1, 1.0]  
best_mae = float('inf')
best_mae = float('inf')
best_param = None

for param in regParams:
    lr = LinearRegression(featuresCol='features', labelCol='DBWT', elasticNetParam=0, regParam=param)
    model = lr.fit(train_vector)
    val_predictions = model.transform(val_vector)
    val_mae = evaluator.evaluate(val_predictions)
    print(f'regParam: {param} -> Validation MAE: {val_mae}')
    
    # Finding the best MAE
    if val_mae < best_mae:
        best_mae = val_mae
        best_param = param

print(f'Best regParam: {best_param} with MAE: {best_mae}')


                                                                                

regParam: 0.001 -> Validation MAE: 396.2107011505841


                                                                                

regParam: 0.01 -> Validation MAE: 396.21062515768983


                                                                                

regParam: 0.1 -> Validation MAE: 396.1939958977408




regParam: 1.0 -> Validation MAE: 396.2032009342583
Best regParam: 0.1 with MAE: 396.1939958977408


                                                                                

I tried using the cross-validation method to make better predictions, but PySpark stopped working immediately. For this reason, I couldn't use Cross-Validation, Random Search, or Grid Search.

### 2.4.2 Model 2: Generalized Linear Regression

In [69]:
# Create the Generalized Linear Regression model
glr = GeneralizedLinearRegression(featuresCol='features', labelCol='DBWT', family='gaussian', link='identity')

# Train the model
glr_model = glr.fit(train_vector)

# Make predictions on the validation dataset
val_predictions = glr_model.transform(val_vector)

# Evaluate the model's performance using MAE
evaluator = RegressionEvaluator(labelCol='DBWT', predictionCol='prediction', metricName='mae')
val_mae = evaluator.evaluate(val_predictions)
print(f'Validation MAE: {val_mae}')




Validation MAE: 396.2107096071563


                                                                                

In PySpark, the GeneralizedLinearRegression model does not accept the elasticNetParam parameter, therefore it is not possible to apply L1, L2, or elastic net regularization for this model.

### 2.4.3 Decision Tree

In [58]:
# Creating a Decision Tree Model
dt = DecisionTreeRegressor(featuresCol="features", labelCol="DBWT")
dt_model = dt.fit(train_vector)

# Making Predictions on the Validation Dataset
val_predictions = dt_model.transform(val_vector)

# Evaluating Model Performance using MAE
evaluator = RegressionEvaluator(labelCol="DBWT", predictionCol="prediction", metricName="mae")
val_mae = evaluator.evaluate(val_predictions)
print(f"Validation MAE: {val_mae}")





Validation MAE: 398.14859950510896


                                                                                

In [59]:
maxDepthValues = [3, 5, 10]  
maxBinsValues = [32, 64, 128]  

In [60]:
bestMAE = float('inf')
bestParams = {'maxDepth': None, 'maxBins': None}

for depth in maxDepthValues:
    for bins in maxBinsValues:
      
        dt = DecisionTreeRegressor(featuresCol="features", labelCol="DBWT", maxDepth=depth, maxBins=bins)

        dt_model = dt.fit(train_vector)
        
        val_predictions = dt_model.transform(val_vector)

        evaluator = RegressionEvaluator(labelCol="DBWT", predictionCol="prediction", metricName="mae")
        val_mae = evaluator.evaluate(val_predictions)
        
        # Updating the best parameters
        if val_mae < bestMAE:
            bestMAE = val_mae
            bestParams['maxDepth'] = depth
            bestParams['maxBins'] = bins
            
        print(f"Tested maxDepth: {depth}, maxBins: {bins}, Validation MAE: {val_mae}")

print(f"Best Params -> maxDepth: {bestParams['maxDepth']}, maxBins: {bestParams['maxBins']}, Lowest MAE: {bestMAE}")


                                                                                

Tested maxDepth: 3, maxBins: 32, Validation MAE: 404.18244213119783


                                                                                

Tested maxDepth: 3, maxBins: 64, Validation MAE: 403.09364132911907


                                                                                

Tested maxDepth: 3, maxBins: 128, Validation MAE: 402.95061523869424


                                                                                

Tested maxDepth: 5, maxBins: 32, Validation MAE: 398.14859950510896


                                                                                

Tested maxDepth: 5, maxBins: 64, Validation MAE: 396.8871618286929


                                                                                

Tested maxDepth: 5, maxBins: 128, Validation MAE: 397.60257146329553


                                                                                

Tested maxDepth: 10, maxBins: 32, Validation MAE: 389.6925653618021


                                                                                

Tested maxDepth: 10, maxBins: 64, Validation MAE: 388.9748301829355




Tested maxDepth: 10, maxBins: 128, Validation MAE: 388.850842680379
Best Params -> maxDepth: 10, maxBins: 128, Lowest MAE: 388.850842680379


                                                                                

### 2.4.4 Random Forest

In [62]:
# Creating an evaluator 
evaluator = RegressionEvaluator(labelCol="DBWT", predictionCol="prediction", metricName="mae")

maxDepthValues = [3, 5, 10]  
maxBinsValues = [32, 64, 128]  


In [63]:
# Initializing bestMAE to infinity and bestParams to store the optimal parameters
bestMAE = float('inf')
bestParams = {'maxDepth': None, 'maxBins': None}

# Iterating over combinations of maxDepth and maxBins values
for depth in maxDepthValues:
    for bins in maxBinsValues:
        # Creating a RandomForestRegressor model with specified depth and bins
        rf = RandomForestRegressor(featuresCol="features", labelCol="DBWT", maxDepth=depth, maxBins=bins)
        
        # Training the model on the training dataset
        rf_model = rf.fit(train_vector)
        
        # Making predictions on the validation dataset
        val_predictions = rf_model.transform(val_vector)
        
        # Evaluating the model's performance using MAE
        val_mae = evaluator.evaluate(val_predictions)
        
        # Updating the best parameters if the current MAE is lower than the best MAE found so far
        if val_mae < bestMAE:
            bestMAE = val_mae
            bestParams['maxDepth'] = depth
            bestParams['maxBins'] = bins
            
        # Printing the tested parameters and the corresponding MAE
        print(f"Tested maxDepth: {depth}, maxBins: {bins}, Validation MAE: {val_mae}")

# Printing the best parameters and the lowest MAE found
print(f"Best Params -> maxDepth: {bestParams['maxDepth']}, maxBins: {bestParams['maxBins']}, Lowest MAE: {bestMAE}")


                                                                                

Tested maxDepth: 3, maxBins: 32, Validation MAE: 402.409082466909


                                                                                

Tested maxDepth: 3, maxBins: 64, Validation MAE: 400.8780674216066


                                                                                

Tested maxDepth: 3, maxBins: 128, Validation MAE: 400.77948811213895


                                                                                

Tested maxDepth: 5, maxBins: 32, Validation MAE: 394.0736970557233


                                                                                

Tested maxDepth: 5, maxBins: 64, Validation MAE: 393.40857213558166


                                                                                

Tested maxDepth: 5, maxBins: 128, Validation MAE: 393.50847430220097


                                                                                

Tested maxDepth: 10, maxBins: 32, Validation MAE: 383.8915583158499


                                                                                

Tested maxDepth: 10, maxBins: 64, Validation MAE: 383.6025065505291




Tested maxDepth: 10, maxBins: 128, Validation MAE: 383.3875761563849
Best Params -> maxDepth: 10, maxBins: 128, Lowest MAE: 383.3875761563849


                                                                                

### 2.4.5 Gradient Boosting

In [64]:
maxBinsValues = [32, 64, 128]

# Evaluation setup
evaluator = RegressionEvaluator(labelCol="DBWT", predictionCol="prediction", metricName="mae")

# Finding the best model
bestMAE = float('inf')
bestBins = None

for bins in maxBinsValues:
    # Create a GBTRegressor model, set maxDepth to 10
    gbt = GBTRegressor(featuresCol="features", labelCol="DBWT", maxDepth=10, maxBins=bins)
    
    # Training the model
    gbt_model = gbt.fit(train_vector)
    
    # Making predictions on the validation dataset
    val_predictions = gbt_model.transform(val_vector)
    
    # Evaluating the model's performance using MAE
    val_mae = evaluator.evaluate(val_predictions)
    print(f"Tested maxBins: {bins}, Validation MAE: {val_mae}")
    
    # Updating the best parameters if the current MAE is lower than the best found so far
    if val_mae < bestMAE:
        bestMAE = val_mae
        bestBins = bins

# Printing the best result
print(f"Best maxBins: {bestBins} with MAE: {bestMAE}")


                                                                                

Tested maxBins: 32, Validation MAE: 385.06982308519895


                                                                                

Tested maxBins: 64, Validation MAE: 383.8943803893117




Tested maxBins: 128, Validation MAE: 384.2438798883027
Best maxBins: 64 with MAE: 383.8943803893117


                                                                                

## 2.5 Best Models

In [80]:
assembler = VectorAssembler(inputCols=[col for col in train_df.columns if col != 'DBWT'], outputCol='features')
train_vector = assembler.transform(train_df)
val_vector = assembler.transform(val_df)
test_vector = assembler.transform(test_df)

### 2.5.1 Random Forest 

In [81]:
# Creating the RandomForestRegressor model
rf = RandomForestRegressor(featuresCol="features", labelCol="DBWT", maxDepth=10, maxBins=128)

# Training the model on the training dataset
rf_model = rf.fit(train_vector)

# Making predictions on the test dataset
test_predictions = rf_model.transform(test_vector)

# Evaluating the model's performance using MAE
evaluator = RegressionEvaluator(labelCol="DBWT", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(test_predictions)
print(f"Test set MAE: {mae}")




Test set MAE: 382.62556724421137


                                                                                

### 2.5.2 Gradient Boosting

In [82]:
# Creating the RandomForestRegressor model
gbt = GBTRegressor(featuresCol="features", labelCol="DBWT", maxDepth=10, maxBins=64)

# Training the model on the training dataset
gbt_model = gbt.fit(train_vector)

# Making predictions on the test dataset
test_predictions = gbt_model.transform(test_vector)

# Evaluating the model's performance using MAE
evaluator = RegressionEvaluator(labelCol="DBWT", predictionCol="prediction", metricName="mae")
mae = evaluator.evaluate(test_predictions)
print(f"Test set MAE: {mae}")




Test set MAE: 383.6532524765132


                                                                                

## 2.6 Comparison between The Best Prediction and Real 'DBWT" values (First 50 input) 

In [83]:
# 'prediction' vs 'DBWT'
comparison = test_predictions.select("prediction", "DBWT")
comparison.show(50)


[Stage 3739:>                                                       (0 + 1) / 1]

+------------------+----+
|        prediction|DBWT|
+------------------+----+
| 2991.750173896204|3487|
|2511.1197047997257|3260|
| 3388.296428860248|3260|
|3233.6423960988377|3629|
|3573.9013112521857|3855|
| 3376.618598988486|1710|
| 3573.133218867692|3997|
|2773.0599720421633|2741|
| 2857.238212288263|1928|
| 2804.476632322405|2201|
|2981.1461526535304|2145|
|2513.8091270108057|3240|
|450.63179208438874|1160|
| 2743.253455571965|2863|
|2988.3932268727926|2481|
| 2614.585472973234|2055|
| 2849.670899811198|3495|
|2782.6104069091366|2480|
| 2870.761886531882|3045|
|2910.5157955078557|2312|
| 2992.655150799916|3035|
|2938.2353289252233|3544|
|3008.2344776871987|3760|
|3177.0455116985786|3750|
|2837.9706106170343|2810|
| 2502.726412275498| 630|
|3100.3753645620964|3030|
|3152.8361646406424|2920|
|3089.4136124321476|2552|
|2774.8814786133225|1304|
|2554.3464214246706|2610|
|2795.0795076005043|1890|
|  2828.01950428967|2660|
|3012.4037247817214|2626|
|2874.0973249733443|3335|
|2815.895294

                                                                                

In [None]:
spark.stop()

# 3. Summary and Conclusion

### Challenges and Solutions
Throughout the project, one major challenge was the extensive computational load, particularly the demanding nature of operations like calculating correlation coefficients and running machine learning models on PySpark. To address this, the machine learning pipeline was simplified by avoiding computationally intensive techniques such as cross-validation and exhaustive hyperparameter tuning like grid search and random search. These methods are not practical for large datasets in PySpark due to their heavy processing demands.

###  Results and Impact
The development and refinement of machine learning models included linear regression, decision trees, random forests, and gradient boosting. These were iteratively optimized through basic hyperparameter tuning. The random forest model, in particular, achieved an impressive Mean Absolute Error (MAE) on the test set, demonstrating strong predictive performance. This model's efficacy in predicting birth weight was further confirmed through a detailed examination of the first 50 prediction results, which showed a good alignment with the actual birth weights, underscoring the model's accuracy and reliability.

###  Discussion and Technical Merit
The project involved dealing with a huge amount of data and complex models which presented significant technical challenges. I had to adapt to resource constraints and optimize computational efficiency by using PySpark in a local environment instead of a cluster.

### Analytic Merit
This project addressed complex data-driven questions regarding the factors influencing newborn birth weight. It provided insights that could potentially enhance prenatal care and health management strategies, how big data analytics can be pivotal in healthcare.

### Further Model Enhancement and Observations
Despite achieving strong results, there remains room for improvement in the model's accuracy. The project's infrastructure limited deeper regulatory adjustments and the use of advanced validation techniques like cross-validation and grid search, which could potentially lower the MAE further. Unfortunately, PySpark lacks support for more complex regression analysis tools like MLP (Multi-Layer Perceptron), which could offer alternative modeling approaches, such as adapting the model to a neural network framework.

The best model performance was achieved with a random forest configuration set at maxDepth=10 and maxBins=128, reaching an MAE of 383.6532524765132. Although this setup provided strong predictive performance, the exploration of more robust machine learning infrastructure and techniques could potentially yield even better results.


### Future Enhancements in Engineering
To further advance this project, dedicating additional time to enhance the feature engineering process could significantly improve the outcomes. Delving deeper into more detailed and varied approaches in feature engineering might uncover new insights and lead to better predictive performance. By exploring more intricate interactions between variables and introducing innovative data transformation techniques, the project could achieve even finer results.

### Conclusion
This project has demonstrated the potential of using advanced analytics to address important health-related challenges. While the achievements are notable, there remains room for further refinement and improvement. 

# 4. References

[1] Aurélien Géron, Hands-on machine learning with Scikit-Learn, Keras, and TensorFlow, 3rd ed. O’Reilly Media, Inc., 2022.

[2] D. Lee and Tomasz Drabas, PySpark Cookbook. Packt Publishing Ltd, 2018.

[3]R. Bandi, J. Amudhavel, and R. Karthik, “Machine Learning with PySpark - Review,” Indonesian Journal of Electrical Engineering and Computer Science, vol. 12, no. 1, p. 102, Oct. 2018, doi: https://doi.org/10.11591/ijeecs.v12.i1.pp102-106.

[4]“Classification and regression - Spark 3.4.0 Documentation,” spark.apache.org. https://spark.apache.org/docs/latest/ml-classification-regression.html#regression

[5]K. Batko and A. Ślęzak, “The Use of Big Data Analytics in Healthcare,” Journal of Big Data, vol. 9, no. 1, Jan. 2022, doi: https://doi.org/10.1186/s40537-021-00553-4.