# Data Transformation Pipeline for MLOps Using SageMaker Data Processing Jobs
**Author: Anvesh Muppeda**

`This notebook demonstrates a comprehensive data transformation pipeline using Amazon SageMaker Processing Jobs. It showcases MLOps best practices for data preprocessing, feature engineering, and quality monitoring in a production-ready environment. It is designed to be run in a SageMaker Jupyter Notebook environment, leveraging SageMaker's data processing capabilities for efficient data handling and transformation.`

## 1. Prepare Environment
### 📦 Step 1: Setup Environment

In [14]:
# 📦 Step 1: Setup Environment
import sagemaker
import boto3
import pandas as pd
import os
from sagemaker.processing import ProcessingInput, ProcessingOutput, ScriptProcessor
import json
import numpy as np
from datetime import datetime

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sagemaker_session.default_bucket()
region = boto3.Session().region_name

print(f"SageMaker Role: {role}")
print(f"Default Bucket: {bucket}")

SageMaker Role: arn:aws:iam::910316760829:role/service-role/AmazonSageMaker-ExecutionRole-20250720T171468
Default Bucket: sagemaker-us-east-1-910316760829


### ⚙️ Step 2: Data Generation
Creating a realistic dataset that simulates common data quality challenges found in production environments.

In [15]:
import pandas as pd
import numpy as np
import json
import random
from datetime import datetime, timedelta
import os

# Set random seed for reproducibility
np.random.seed(42)

# Number of records
num_records = 20000

# Generate random data
data = {
    "id": np.arange(1, num_records + 1),
    "name": [f"Name_{i}" for i in np.random.randint(1, 1000, num_records)],
    "age": np.random.randint(18, 80, num_records),
    "salary": np.random.choice([50000, 60000, 70000, None], num_records),
    "hire_date": [
        (datetime.now() - timedelta(days=random.randint(0, 3650))).strftime("%Y-%m-%d")
        if random.random() > 0.1 else None
        for _ in range(num_records)
    ],
    "profile": [
        json.dumps({
            "address": f"Street {random.randint(1, 100)}, City {random.randint(1, 50)}",
            "phone": f"{random.randint(1000000000, 9999999999)}",
            "email": f"email_{random.randint(1, 1000)}@example.com"
        })
        if random.random() > 0.1 else None
        for _ in range(num_records)
    ],
    "department": np.random.choice(["HR", "IT", "Finance", "Marketing", None], num_records),
    "bonus": [None if random.random() > 0.9 else random.randint(1000, 10000) for _ in range(num_records)]
}

# Create DataFrame
df = pd.DataFrame(data)

# Introduce some NaN values randomly
df.loc[np.random.choice(df.index, size=int(num_records * 0.05), replace=False), "age"] = np.nan
df.loc[np.random.choice(df.index, size=int(num_records * 0.1), replace=False), "salary"] = np.nan

# Ensure 'data' folder exists
os.makedirs("data", exist_ok=True)

# Save to CSV
df.to_csv("data/mock_data.csv", index=False)
print("Dataset created and uploaded to data/mock_data.csv")

Dataset created and uploaded to data/mock_data.csv


### ⚙️ Step 3: Upload Source Data to S3
Upload the source CSV dataset to input location in S3 (default bucket)  

In [16]:
s3 = boto3.resource('s3')
s3.meta.client.upload_file('data/mock_data.csv', bucket, 'input/mock_data.csv')
print(f"Dataset 'mock_data.csv' uploaded to: s3://{bucket}/input/mock_data.csv")

Dataset 'mock_data.csv' uploaded to: s3://sagemaker-us-east-1-910316760829/input/mock_data.csv


## 2. Data Processing Job. 
Here we will create a SageMaker Processing Job to execute the data transformation script. This job will handle the data preprocessing, feature engineering, and quality checks.
### Step 1: 🛠️ Create Processing Script
This script contains all the data transformation logic that will be executed by SageMaker Processing.  

In [17]:
%%writefile preprocessing_script.py
import pandas as pd
import sys
import os
import json
import numpy as np
from datetime import datetime

# Load dataset
try:
    df = pd.read_csv('/opt/ml/processing/input/mock_data.csv')
    print(f"✅ Dataset loaded successfully!")
    print(f"📏 Dataset shape: {df.shape}")
except FileNotFoundError:
    print("❌ Error: mock_data.csv not found. Please run create_dataset.py first.")
    exit()

# Analyze missing patterns
print("\n📊 Missing Value Patterns:")
print("Missing Age values:")
print(df[df['age'].isnull()][['age', 'salary', 'department']])

print("Missing Salary values")
print(df[df['salary'].isnull()][['age', 'salary', 'department']])

# Get the median values for age, and salary
age_median = df['age'].median()
salary_median = df['salary'].median()
print("Age Median", age_median)
print("Salary Median", salary_median)

# Fill missing values of age with age_median
df['age'] = df['age'].fillna(age_median)
# Fill missing values of salary with salary_median
df['salary'] = df['salary'].fillna(salary_median)

# Verify the Age & Salary data
df.head()
# Check for missing values
print("Missing values in each column")
df.isnull().sum()

print("Print the missing values for Department\n")
print("Missing Department Missing values")
print(df[df['department'].isnull()][['age', 'salary', 'department']])

# Fill the missing values in department with 'Unknown'
df['department'] = df['department'].fillna('Unknown')

# Verify the Age & Salary data
df.head()
# Check for missing values
print("Missing values in each column")
print(df.isnull().sum())
# Check unique values in the department column
df['department'].unique()

print("Top rows from profile column \n")
print(df['profile'].head())

# Find the first non-null value in the column
profile_first_value = df['profile'].dropna().iloc[0]
# Print its type
print("\nProfile column values current data type")
print(type(profile_first_value))

# If your 'profile' column already contains Python dictionaries, not JSON strings.
# You do not need to parse it with json.loads(). The data is ready to be used directly.

# Convert profile JSON strings into dictionaries
df['profile'] = df['profile'].apply(lambda x: json.loads(x) if pd.notnull(x) else {})

# Extract Address Field
print("Extract Address Field....\n")
# Create new 'address' column by extracting from 'profile' dictionaries
df['address'] = df['profile'].apply(lambda x: x.get('address', None))  # Returns None if no address key

print("Top rows from profile column \n")
print(df['profile'].head())
print("\nTop rows from newly created address column \n")
print(df['address'].head())

# Extract Phone Field
print("Extract Phone Field....\n")
# Create new 'phone' column by extracting from 'profile' dictionaries
df['phone'] = df['profile'].apply(lambda x: x.get('phone', None))  # Returns None if no address key

print("Top rows from profile column \n")
print(df['profile'].head())
print("\nTop rows from newly created phone column \n")
print(df['phone'].head())

# Extract Email Field
print("Extract Email Field....\n")
# Create new 'email' column by extracting from 'profile' dictionaries
df['email'] = df['profile'].apply(lambda x: x.get('email', None))  # Returns None if no address key

print("Top rows from profile column \n")
print(df['profile'].head())
print("\nTop rows from newly created email column \n")
print(df['email'].head())

print(f"\n✅ Profile fields extracted:")


# Now drop the profile column
print("\nColumns before dropping profile:")
print(df.columns.tolist())

# Without inplace=True (df remains unchanged)
cleaned_df = df.drop(columns=['profile'])

# With inplace=True (df is modified directly)
#df.drop(columns=['profile'], inplace=True)

print("\nColumns in new DataFrame after dropping profile:")
# print(df.columns.tolist())
print(cleaned_df.columns.tolist())

print("\n💾 Saving cleaned data to: 'data/cleaned_data.csv' ...")
cleaned_df.to_csv("/opt/ml/processing/output/cleaned_data.csv", index=False)
print("✅ Cleaned data saved to: '/opt/ml/processing/output/cleaned_data.csv'")

transform_df = pd.read_csv('/opt/ml/processing/output/cleaned_data.csv')
transform_df.head()

# Create a new column 'address_length' 
print("\n🔧 Creating Address Length Feature...")
transform_df['address_length'] = transform_df['address'].apply(lambda x: len(str(x)))
print("Address followed by Address Length columns")
transform_df[['address', 'address_length']].head()

print("\n🔧 Creating Salary Categories...")
# Define the bins and labels
bins = [0, 50000, 70000, 100000]
labels = ['low', 'medium', 'high']

# Create a new column 'salary_category'
transform_df['salary_category'] = pd.cut(df['salary'], bins=bins, labels=labels, include_lowest=True)

# Print sample data after adding the 'salary_category' column
print("Sample data after adding the 'salary_category' column: \n")
transform_df[['salary', 'salary_category']].head()

print("\n🔧 Creating Age Groups...")
# Define age bins and labels
age_bins = [0, 25, 35, 45, 55, float('inf')]
age_labels = ['Young', 'Early Career', 'Mid Career', 'Senior', 'Experienced']

# Create a new column 'salary_category'
transform_df['age_group'] = pd.cut(df['age'], bins=age_bins, labels=age_labels, include_lowest=True)

# Age group distribution
print(f"Age group distribution:")
print(transform_df['age_group'].value_counts())

# Print sample data after adding the 'salary_category' column
print("\nSample data after adding the 'age_group' column: \n")
transform_df[['age', 'age_group']].head()

print("\n🔧 Creating Department Statistics...")
# Group by 'department' and calculate average salary and age
department_summary_report = df.groupby('department').agg({
    'salary': 'mean',
    'age': 'mean'
}).reset_index()

# rename columns of department_summary_report for clarity
department_summary_report.columns = ['Department', 'Average Salary', 'Average Age']

# Print the Summary Report
print("Summary report of average salary and age based on the department:\n")
print(department_summary_report)


print("\n📊 Data Quality Metrics...")

quality_metrics = {
    'total_rows': len(transform_df),
    'total_columns': len(transform_df.columns),
    'missing_values_count': transform_df.isnull().sum().sum(),
    'duplicate_rows': transform_df.duplicated().sum(),
    'numeric_columns': len(transform_df.select_dtypes(include=[np.number]).columns),
    'categorical_columns': len(transform_df.select_dtypes(include=['object']).columns),
    'unique_departments': transform_df['department'].nunique(),
    'unique_age_groups': transform_df['age_group'].nunique(),
    'unique_salary_categories': transform_df['salary_category'].nunique(),
    'processing_timestamp': datetime.now().isoformat()
}

print("Data Quality Metrics:")
for metric, value in quality_metrics.items():
    print(f"  {metric}: {value}")

print("Saving Transformed data csv to: '/opt/ml/processing/output/transformed_data.csv' ...")
transform_df.to_csv("/opt/ml/processing/output/transformed_data.csv", index=False)
print("\nTransformed data csv saved to: '/opt/ml/processing/output/transformed_data.csv'")

### Step 2: Save Department Statistics
print("Saving department statistics...")
department_summary_report.to_csv("/opt/ml/processing/output/department_statistics.csv", index=False)
print("✅ Department statistics saved to: '/opt/ml/processing/output/department_statistics.csv'")

Overwriting preprocessing_script.py


### Step 2: 🏃‍♂️ Execute SageMaker Processing Job
Now we will execute the SageMaker Processing Job using the script created in the previous step. This job will process the data, apply transformations, and generate output files.

In [None]:
input_raw_data_prefix = "input/"
output_preprocessed_data_prefix = "output"

processor = ScriptProcessor(
    image_uri=sagemaker.image_uris.retrieve('sklearn', 'us-east-1', '1.2-1'),
    role=role,
    command=['python3'],
    instance_type='ml.t3.medium',
    instance_count=1
)

processor.run(
    code='preprocessing_script.py',
    inputs=[ProcessingInput(source="s3://" + os.path.join(bucket, input_raw_data_prefix, "mock_data.csv"),
                            destination='/opt/ml/processing/input')], 
    outputs=[ProcessingOutput(source='/opt/ml/processing/output',
                            destination="s3://" + os.path.join(bucket, output_preprocessed_data_prefix, "data-processed"))]
)

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Defaulting to only supported image scope: cpu.
INFO:sagemaker:Creating processing-job with name sagemaker-scikit-learn-2025-07-20-23-44-22-210


....

## 3. Results Validation
📊 Review Processed Data.  
Validating the results of our data processing pipeline to ensure quality and completeness.

In [None]:
df_cleaned = pd.read_csv(f's3://{bucket}/output/data-processed/cleaned_data.csv')
df_department_statistics = pd.read_csv(f's3://{bucket}/output/data-processed/department_statistics.csv')
df_transformed_data = pd.read_csv(f's3://{bucket}/output/data-processed/transformed_data.csv')

print("🧹 CLEANED DATA SAMPLE:")
print("-" * 40)
print("Cleaned Data..\n")
print(df_cleaned.head())

print("\n🔧 TRANSFORMED DATA SAMPLE:")
print("-" * 40)
print("Trnasformed Data..\n")
print(df_transformed_data.head())

print("\n📈 DEPARTMENT STATISTICS:")
print("-" * 40)
print("Department Statistics Data..\n")
print(df_department_statistics)

## 4. Next Steps for MLOps

In [None]:
print("🎯 RECOMMENDED NEXT STEPS:")
print("=" * 50)
print("1. 🤖 Model Training:")
print("   - Use transformed features for ML model training")
print("   - Implement cross-validation with engineered features")
print("   - Track model performance metrics")

print("\n2. 🧪 Model Validation:")
print("   - A/B test model performance with/without new features")
print("   - Validate model predictions on hold-out dataset")
print("   - Monitor feature importance and model interpretability")

print("\n3. 🚀 Model Deployment:")
print("   - Deploy model using SageMaker endpoints")
print("   - Implement batch or real-time inference")
print("   - Set up model versioning and rollback capabilities")

print("\n4. 📊 Monitoring & Observability:")
print("   - Monitor data drift using quality metrics")
print("   - Set up alerts for data quality degradation")
print("   - Track model performance in production")

print("\n5. 🔄 Pipeline Automation:")
print("   - Integrate with SageMaker Pipelines for orchestration")
print("   - Schedule regular data processing jobs")
print("   - Implement CI/CD for model updates")

print("\n6. 📈 Advanced Analytics:")
print("   - Use department statistics for business insights")
print("   - Create dashboards for data quality monitoring")
print("   - Implement anomaly detection on incoming data")

print("\n" + "=" * 50)
print("🎉 DATA TRANSFORMATION PIPELINE COMPLETE!")
print("📁 Processed data ready for ML model training")
print("=" * 50)