# AWS End-to-End ML Pipeline Demo

# Data Lake Formation & Data Imputation (Part I)
### Author: Louis Wong

## Environment Setup

We are creating an environment using the AWS Glue Docker image and installing the packages we need. We will be using Boto3 for AWS APIs, PySpark/Delta-Spark for ETL, SageMaker for machine learning, and Matplotlib to visualize the results.

In [1]:
from dotenv import load_dotenv
import os

load_dotenv()
minio_user = os.getenv("MINIO_ROOT_USER")
minio_pass = os.getenv("MINIO_ROOT_PASSWORD")

print(f"MinIO user: {minio_user} \nMinIO password: {'*' * len(minio_pass)}")

MinIO user: minio 
MinIO password: ********


Let’s store the credentials in environment variables to avoid hardcoding them into the code.

In [2]:
%%writefile .gitignore
__pycache__/
.ipynb_checkpoints/
.env
.DS_Store

Writing .gitignore


We are creating a .gitignore file to ignore Python cache files, Jupyter Notebook checkpoints, and environment variable files.

# ETL (Extract, Transform, Load)
Now that we have our environment set up, let’s move on to ETL to process the data. We are going to extract, transform, and load the data into the simulated S3 bucket.

In [3]:
import boto3
import botocore

# Establish connection to S3
s3 = boto3.client(
    "s3",
    endpoint_url="http://host.docker.internal:9000", # Internal Docker network to access
    aws_access_key_id=minio_user,
    aws_secret_access_key=minio_pass,
    region_name="us-east-1"
)

# Let's create a reusable function to create bucket if not exists
def check_and_create_bucket(s3, bucket):
    try:
        s3.head_bucket(Bucket=bucket)
        print(f"'{bucket}' already exists.")
    except botocore.exceptions.ClientError:
        s3.create_bucket(Bucket=bucket)
        print(f"'{bucket}' created.")

# Create a medallion architecture organization with three buckets
buckets = ["raw-data", "silver-data", "gold-data"]

for bucket in buckets:
    check_and_create_bucket(s3, bucket)

'raw-data' created.
'silver-data' created.
'gold-data' created.


Loading our dataset, for more information about the dataset, please visit: https://gist.github.com/aishwarya8615/d2107f828d3f904839cbcb7eaa85bd04

This dataset is used to predict whether a patient is likely to get stroke based on the input parameters like gender, age, and various diseases and smoking status. A subset of the original train data is taken using the filtering method for Machine Learning and Data Visualization purposes.

About the Data: Each row in the data provides relavant information about a person , for instance; age, gender,smoking status, occurance of stroke in addition to other information Unknown in Smoking status means the information is unavailable. N/A in other input fields imply that it is not applicable.

In [4]:
import pandas as pd

# Loading dataset locally
df = pd.read_csv("./data/healthcare-dataset-stroke-data.csv")

df.info()
df.head()
df.describe()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5109 entries, 0 to 5108
Data columns (total 15 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   Admission Date     5109 non-null   object 
 1   Full Name          5109 non-null   object 
 2   Phone Number       5109 non-null   object 
 3   Address            5109 non-null   object 
 4   Gender             5109 non-null   object 
 5   Age                5109 non-null   float64
 6   Hypertension       5109 non-null   int64  
 7   Heart Disease      5109 non-null   int64  
 8   Ever Married       5109 non-null   object 
 9   Work Type          5109 non-null   object 
 10  Residence Type     5109 non-null   object 
 11  Avg Glucose Level  5109 non-null   float64
 12  BMI                4908 non-null   float64
 13  Smoking Status     5109 non-null   object 
 14  Stroke             5109 non-null   int64  
dtypes: float64(3), int64(3), object(9)
memory usage: 598.8+ KB


Unnamed: 0,Age,Hypertension,Heart Disease,Avg Glucose Level,BMI,Stroke
count,5109.0,5109.0,5109.0,5109.0,4908.0,5109.0
mean,43.229986,0.097475,0.054022,106.140399,28.89456,0.048738
std,22.613575,0.296633,0.226084,45.285004,7.85432,0.21534
min,0.08,0.0,0.0,55.12,10.3,0.0
25%,25.0,0.0,0.0,77.24,23.5,0.0
50%,45.0,0.0,0.0,91.88,28.1,0.0
75%,61.0,0.0,0.0,114.09,33.1,0.0
max,82.0,1.0,1.0,271.74,97.6,1.0


Before we load the dataset into S3, let's drop all the sensitive PII (Personally Identifiable Information), such as Name, Phone, Address, Billing, SSN, and Credit Card, etc.

For this case we will drop the patient name, phone number and address. If there are patient DOB (date of birth), we shall convert to age (round by year) and drop the actual date.

In [5]:
# Drop PII columns
df = df.drop(columns=["Full Name", "Phone Number", "Address"])

# Load to S3
csv_buffer = df.to_csv(index=False)
s3.put_object(Bucket="raw-data", Key="healthcare-dataset-stroke-data.csv", Body=csv_buffer)

{'ResponseMetadata': {'RequestId': '1868EDC64A20E16A',
  'HostId': 'dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'accept-ranges': 'bytes',
   'content-length': '0',
   'etag': '"a1fd5b75152c7874e1e58f267b3e13ca"',
   'server': 'MinIO',
   'strict-transport-security': 'max-age=31536000; includeSubDomains',
   'vary': 'Origin, Accept-Encoding',
   'x-amz-checksum-crc32': 'XpKlPw==',
   'x-amz-checksum-type': 'FULL_OBJECT',
   'x-amz-id-2': 'dd9025bab4ad464b049177c95eb6ebf374d3b3fd1af9251148b658df7ac2e3e8',
   'x-amz-request-id': '1868EDC64A20E16A',
   'x-content-type-options': 'nosniff',
   'x-ratelimit-limit': '2143',
   'x-ratelimit-remaining': '2143',
   'x-xss-protection': '1; mode=block',
   'date': 'Fri, 26 Sep 2025 20:04:50 GMT'},
  'RetryAttempts': 0},
 'ETag': '"a1fd5b75152c7874e1e58f267b3e13ca"',
 'ChecksumCRC32': 'XpKlPw==',
 'ChecksumType': 'FULL_OBJECT'}

In real-world big data with millions of records, it’s more efficient to use distributed frameworks like PySpark. Converting the dataset to Delta Lake improves query speed, storage reliability, ACID compliance, and tool integration, while also enabling scalable analytics and time travel for audits and reproducibility.

## PySpark Job

In [12]:
from pyspark.sql import SparkSession

s3_endpoint = "http://host.docker.internal:9000"

# Create SparkSession with MinIO S3 config
spark = (
    SparkSession.builder.appName("SparkJob")
    .config("spark.hadoop.fs.s3a.endpoint", s3_endpoint)
    .config("spark.hadoop.fs.s3a.access.key", minio_user)
    .config("spark.hadoop.fs.s3a.secret.key", minio_pass)
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") # MinIO does not use SSL by default
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .getOrCreate()
)

# MinIO requires setting credentials provider explicitly
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")

# Show Spark version
print("Spark Version:", spark.version)
print("Hadoop Version:", spark._jsc.version())
print("Alive:", not spark.sparkContext._jsc.sc().isStopped())

Spark Version: 3.5.4-amzn-0
Hadoop Version: 3.5.4-amzn-0
Alive: True


In [13]:
# Load data from S3 to Spark DataFrame
bucket = "raw-data"
key = "healthcare-dataset-stroke-data.csv"
s3_path = f"s3a://{bucket}/{key}"

df = spark.read.csv(s3_path, header=True, inferSchema=True)

df.show(5)

+--------------+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|Admission Date|Gender| Age|Hypertension|Heart Disease|Ever Married|    Work Type|Residence Type|Avg Glucose Level| BMI| Smoking Status|Stroke|
+--------------+------+----+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|    2025-03-03|  Male|67.0|           0|            1|         Yes|      Private|         Urban|           228.69|36.6|formerly smoked|     1|
|    2025-04-03|Female|61.0|           0|            0|         Yes|Self-employed|         Rural|           202.21|NULL|   never smoked|     1|
|    2025-06-27|  Male|80.0|           0|            1|         Yes|      Private|         Rural|           105.92|32.5|   never smoked|     1|
|    2025-06-19|Female|49.0|           0|            0|         Yes|      Private|         Urban|           171.23|34.4|         smokes|

In [14]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col

# Clean up column names (lowercase, replace spaces with underscores)
for c in df.columns:
    
    df = df.withColumnRenamed(c, c.strip().lower().replace(" ", "_"))

# Fix data types where needed
df = (df
    .withColumn("admission_date", F.to_date("admission_date", "yyyy-MM-dd"))
    .withColumn("age", F.col("age").cast("integer"))
    .withColumn("bmi", F.col("bmi").cast("float"))
    .withColumn("ever_married", F.when(F.col("ever_married") == "Yes", 1).otherwise(0))
)

df.show(5)

+--------------+------+---+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|admission_date|gender|age|hypertension|heart_disease|ever_married|    work_type|residence_type|avg_glucose_level| bmi| smoking_status|stroke|
+--------------+------+---+------------+-------------+------------+-------------+--------------+-----------------+----+---------------+------+
|    2025-03-03|  Male| 67|           0|            1|           1|      Private|         Urban|           228.69|36.6|formerly smoked|     1|
|    2025-04-03|Female| 61|           0|            0|           1|Self-employed|         Rural|           202.21|NULL|   never smoked|     1|
|    2025-06-27|  Male| 80|           0|            1|           1|      Private|         Rural|           105.92|32.5|   never smoked|     1|
|    2025-06-19|Female| 49|           0|            0|           1|      Private|         Urban|           171.23|34.4|         smokes|     1|

In [15]:
from pyspark.sql.functions import col

# Check for nulls in each column
for c in df.columns:
    null_count = df.filter(col(c).isNull()).count()
    print(f"{c}: {null_count} nulls")

admission_date: 0 nulls
gender: 0 nulls
age: 0 nulls
hypertension: 0 nulls
heart_disease: 0 nulls
ever_married: 0 nulls
work_type: 0 nulls
residence_type: 0 nulls
avg_glucose_level: 0 nulls
bmi: 201 nulls
smoking_status: 0 nulls
stroke: 0 nulls


In [16]:
# Check distinct values in categorical columns
for col_name in ['work_type','residence_type','smoking_status','gender']:
    df.groupBy(col_name).count().orderBy("count", ascending=False).show(truncate=False)

+-------------+-----+
|work_type    |count|
+-------------+-----+
|Private      |2924 |
|Self-employed|819  |
|children     |687  |
|Govt_job     |657  |
|Never_worked |22   |
+-------------+-----+

+--------------+-----+
|residence_type|count|
+--------------+-----+
|Urban         |2596 |
|Rural         |2513 |
+--------------+-----+

+---------------+-----+
|smoking_status |count|
+---------------+-----+
|never smoked   |1892 |
|Unknown        |1544 |
|formerly smoked|884  |
|smokes         |789  |
+---------------+-----+

+------+-----+
|gender|count|
+------+-----+
|Female|2994 |
|Male  |2115 |
+------+-----+



## Impute missing BMI Values
We will impute missing BMI Values with the using MICE (Multiple Imputation by Chained Equations) on gender, age, and average glucose level.

In [17]:
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.preprocessing import LabelEncoder
import numpy as np
from pyspark.sql.functions import lit

# Convert to pandas
pdf = df.select("gender", "age", "avg_glucose_level", "bmi").toPandas()

# Convert categorical to numeric
le = LabelEncoder()
pdf["gender"] = le.fit_transform(pdf["gender"])

# Impute using MICE (Multiple Imputation by Chained Equations) on missing bmi values
original_bmi = pdf["bmi"]
imputed = IterativeImputer().fit_transform(pdf)
imputed_bmi = imputed[:, pdf.columns.get_loc("bmi")]
pdf["bmi"] = np.where(original_bmi.isnull(), imputed_bmi, original_bmi).round(2) # Round to 2 decimal places to keep it consistent

# Add them back to the original Spark DataFrame using withColumn
bmi_values = pdf["bmi"].tolist()
df = df.drop("bmi").withColumn("bmi", F.array([lit(v) for v in bmi_values])[F.monotonically_increasing_id()])

df.show(5)

+--------------+------+---+------------+-------------+------------+-------------+--------------+-----------------+---------------+------+-----+
|admission_date|gender|age|hypertension|heart_disease|ever_married|    work_type|residence_type|avg_glucose_level| smoking_status|stroke|  bmi|
+--------------+------+---+------------+-------------+------------+-------------+--------------+-----------------+---------------+------+-----+
|    2025-03-03|  Male| 67|           0|            1|           1|      Private|         Urban|           228.69|formerly smoked|     1| 36.6|
|    2025-04-03|Female| 61|           0|            0|           1|Self-employed|         Rural|           202.21|   never smoked|     1|32.62|
|    2025-06-27|  Male| 80|           0|            1|           1|      Private|         Rural|           105.92|   never smoked|     1| 32.5|
|    2025-06-19|Female| 49|           0|            0|           1|      Private|         Urban|           171.23|         smokes|     1

In [18]:
# Define the silver layer location
silver_bucket = "silver-data"
silver_key    = "healthcare-dataset-stroke-data"
silver_path   = f"s3a://{silver_bucket}/{silver_key}"

# Create a year-month column for partitioning
df = df.withColumn("admission_year_month", F.date_format("admission_date", "yyyy-MM"))

# Write to Delta Lake partitioned by admission_year_month
(df.write
   .format("delta")
   .mode("overwrite")
   .partitionBy("admission_year_month")
   .save(silver_path)
)

# Check files in silver bucket
response = s3.list_objects_v2(Bucket=silver_bucket, Prefix=silver_key)
for obj in response.get('Contents', []):
    print(obj['Key'])

                                                                                

healthcare-dataset-stroke-data/_delta_log/00000000000000000000.crc
healthcare-dataset-stroke-data/_delta_log/00000000000000000000.json
healthcare-dataset-stroke-data/_delta_log/00000000000000000001.crc
healthcare-dataset-stroke-data/_delta_log/00000000000000000001.json
healthcare-dataset-stroke-data/_delta_log/_commits/
healthcare-dataset-stroke-data/admission_year_month=2025-01/part-00000-37af71e4-bf4d-4da4-a844-5f8fe92d56b3.c000.snappy.parquet
healthcare-dataset-stroke-data/admission_year_month=2025-01/part-00000-b976cb9e-1fa1-45ed-873c-a7e8e00d2cac.c000.snappy.parquet
healthcare-dataset-stroke-data/admission_year_month=2025-02/part-00000-4ebf5fe5-8fab-4e93-9d6e-652cd8dbb226.c000.snappy.parquet
healthcare-dataset-stroke-data/admission_year_month=2025-02/part-00000-f1784a6e-53be-436e-a6bc-08ac0b48cb2a.c000.snappy.parquet
healthcare-dataset-stroke-data/admission_year_month=2025-03/part-00000-79727071-80ba-46fb-9c19-7293ba8f5f1d.c000.snappy.parquet
healthcare-dataset-stroke-data/admissi

In a real-world scenario, if the dataset is too large to load all at once, we can perform imputation in chunks or partitions. Each processed batch can then be written to Delta Lake in append mode before moving on to the next segment.

Next, we are going to explore the dataset, impute the missing values, and transform the data into the silver and gold layers.

[**Part II: Exploration and Feature Engineering**](https://github.com/GiggleSamurai/aws-end-to-end-ml-pipeline/blob/main/exploratory_data_analysis.ipynb) 👈