# Get started with SageMaker Processing





## Runtime

This notebook takes approximately 5 minutes to run.

## Contents

1. [Prepare resources](#Prepare-resources)
1. [Download data](#Download-data)
1. [Prepare Processing script](#Prepare-Processing-script)
1. [Run Processing job](#Run-Processing-job)
1. [Conclusion](#Conclusion)

## Prepare resources

First, let’s create an SKLearnProcessor object, passing the scikit-learn version we want to use, as well as our managed infrastructure requirements.

In [4]:
import boto3
import sagemaker
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor

region = sagemaker.Session().boto_region_name
role = get_execution_role()

sklearn_processor = SKLearnProcessor(
    framework_version="1.2-1",
    role=role,
    instance_type="ml.t3.medium",
    instance_count=2
)

## Download data

Read in the raw data from a public S3 bucket. This example uses the [Census-Income (KDD) Dataset](https://archive.ics.uci.edu/ml/datasets/Census-Income+%28KDD%29) from the UCI Machine Learning Repository.

> Dua, D. and Graff, C. (2019). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science.

In [5]:
import os

In [10]:
import pandas as pd

s3 = boto3.client("s3")
s3.download_file(
    "fady-my-bucket",
    "sagemaker/input/data.csv",
    "input_data.csv",
)
df = pd.read_csv("input_data.csv")
df.to_csv("dataset.csv")
df.head()

Unnamed: 0,id,name,lastname,phonenumber,age,delete_at,update_at,messages_count,domain,country_Afghanistan,...,country_Togo,country_Turks and Caicos Islands,country_Ukraine,country_Vanuatu,country_Venezuela,is_male,age_group,create_at_day,create_at_month,create_at_year
0,1,Megan,Merritt,+1-594-583-4845,40,,,13,yahoo.com,0,...,0,0,0,0,0,1,30-50,21,7,2025
1,2,John,Mason,001-445-250-1527,95,,,17,yahoo.com,0,...,0,0,0,0,0,1,50+,21,7,2025
2,3,Andrea,Mcmillan,001-968-598-1591x372,31,,,63,yahoo.com,0,...,0,0,0,0,0,0,30-50,21,7,2025
3,4,Morgan,Dougherty,(253)248-0847x34345,44,,,91,gmail.com,0,...,0,0,0,0,0,1,30-50,21,7,2025
4,5,Carl,Green,+1-914-358-0492x6972,47,,,77,outlook.com,0,...,0,0,0,0,0,0,30-50,21,7,2025


## Prepare Processing script

Write the Python script that will be run by SageMaker Processing. This script reads the single data file from S3; splits the rows into train, test, and validation sets; and then writes the three output files to S3.

In [None]:
%%writefile preprocessing_fady.py

import pandas as pd
import numpy as np
import os

def preprocess_data(df: pd.DataFrame) -> pd.DataFrame:
    """
    Performs data preprocessing steps on the input DataFrame.

    Args:
        df (pd.DataFrame): The input DataFrame containing raw data.

    Returns:
        pd.DataFrame: The preprocessed DataFrame.
    """
    print("Starting data preprocessing...")
    df = pd.read_csv("/opt/ml/processing/input/data.csv")

    # --- Step 1: Extract domain name from email and create 'domain' column ---
    if 'email' in df.columns:
        df['domain'] = df['email'].apply(lambda x: x.split('@')[1] if pd.notna(x) and '@' in x else np.nan)
        df = df.drop(columns=['email'])
        print("Extracted domain from email and removed original email column.")
    else:
        print("Email column not found. Skipping domain extraction.")

    # --- Step 2: Apply one-hot encoding to 'country' column ---
    if 'country' in df.columns:
        # Handle potential non-string values or NaNs before one-hot encoding
        df['country'] = df['country'].astype(str).replace('nan', 'Unknown')
        df = pd.get_dummies(df, columns=['country'], prefix='country', drop_first=False)
        print("Applied one-hot encoding to 'country' column and removed original country column.")
    else:
        print("Country column not found. Skipping one-hot encoding.")

    # --- Step 3: Create 'is_male' based on 'gender' and remove original 'gender' ---
    if 'gender' in df.columns:
        # Assuming 'Male' indicates male, and other values (e.g., 'Female', NaN) are not male
        df['is_male'] = df['gender'].apply(lambda x: 1 if str(x).lower() == 'male' else 0)
        df = df.drop(columns=['gender'])
        print("Created 'is_male' column and removed original gender column.")
    else:
        print("Gender column not found. Skipping 'is_male' creation.")

    # --- Step 4: Fill age nulls with the mean average ---
    if 'age' in df.columns:
        if df['age'].isnull().any():
            mean_age = df['age'].mean()
            df['age'].fillna(mean_age, inplace=True)
            print(f"Filled age nulls with mean average: {mean_age:.2f}")
        else:
            print("No nulls found in 'age' column.")
    else:
        print("Age column not found. Skipping age null filling.")

    # --- Step 5: Convert 'age' into 4 different groups ---
    if 'age' in df.columns:
        # Define age bins and labels
        bins = [0, 18, 30, 50, np.inf]
        labels = ['0-18', '18-30', '30-50', '50+']
        # Ensure age column is numeric before cutting
        df['age'] = pd.to_numeric(df['age'], errors='coerce')
        df['age_group'] = pd.cut(df['age'], bins=bins, labels=labels, right=False, include_lowest=True)
        # Convert age_group to string or integer if needed for downstream tasks,
        # or keep as categorical if preferred. For simplicity, we'll keep it as categorical
        # or convert to string if saving to CSV/Parquet.
        # df['age_group'] = df['age_group'].astype(str) # Uncomment if you want string representation
        df = df.drop(columns=['age']) # Remove original age column
        print("Converted 'age' into 4 different groups and removed original age column.")
    else:
        print("Age column not found. Skipping age grouping.")

    # --- Step 6: Convert 'created_at' into separate columns ---
    if 'created_at' in df.columns:
        df['created_at'] = pd.to_datetime(df['created_at'], errors='coerce')
        df['created_at_day'] = df['created_at'].dt.day
        df['created_at_month'] = df['created_at'].dt.month
        df['created_at_year'] = df['created_at'].dt.year
        df = df.drop(columns=['created_at'])
        print("Converted 'created_at' into day, month, year columns and removed original 'created_at'.")
    else:
        print("Created_at column not found. Skipping date conversion.")

    print("Data preprocessing complete.")
    return df

# --- Data Loading, Preprocessing, and Saving (for Jupyter Notebook execution) ---

# Define input and output paths for local execution within Jupyter
# In a SageMaker Processing Job, these paths would be automatically set by SageMaker
input_data_path = "/opt/ml/processing/input/data.csv" # Assuming input CSV is in the same directory as the notebook
output_data_path = "/opt/ml/processing/output/preprocessed_data.csv" # Output CSV will be saved here

# Create dummy data for local testing if the input file doesn't exist
if not os.path.exists(input_data_path):
    print("Input file not found, creating dummy DataFrame for local testing.")
    data = {
        'email': ['user1@example.com', 'user2@domain.net', 'user3@sub.org', 'invalid-email', np.nan],
        'country': ['USA', 'Canada', 'USA', 'Mexico', np.nan],
        'gender': ['Male', 'Female', 'Male', 'Other', np.nan],
        'age': [25, 35, np.nan, 15, 55],
        'created_at': ['2023-01-15', '2022-11-01', '2023-05-20', '2024-02-29', np.nan],
        'value': [100, 200, 150, 300, 250]
    }
    df_raw = pd.DataFrame(data)
    df_raw.to_csv(input_data_path, index=False)
    print(f"Dummy data saved to {input_data_path}")

# Load the data
try:
    df_raw = pd.read_csv(input_data_path)
    print(f"Successfully loaded data from {input_data_path}")
except Exception as e:
    print(f"Error loading data from {input_data_path}: {e}")
    # In a Jupyter notebook, you might want to raise the error or handle it differently
    # For a script that might be run in a non-interactive way, exiting is fine.
    # raise e # Uncomment to raise the exception in Jupyter
    exit(1)

# Preprocess the data
df_processed = preprocess_data(df_raw.copy())

# Save the preprocessed data
try:
    df_processed.to_csv(output_data_path, index=False)
    print(f"Preprocessed data saved to {output_data_path}")
except Exception as e:
    print(f"Error saving preprocessed data to {output_data_path}: {e}")
    # raise e # Uncomment to raise the exception in Jupyter
    exit(1)

print("Script finished.")


Overwriting preprocessing_fady.py


## Run Processing job

Run the Processing job, specifying the script name, input file, and output files.

In [16]:
%%capture output

from sagemaker.processing import ProcessingInput, ProcessingOutput

# Run the processing job with the specified job name
# Assuming sklearn_processor is already defined as in the previous example
sklearn_processor.run(
    code="preprocessing_fady.py",  # Path to your preprocessing script
    job_name="sagemaker-fady-1",   # Your desired job name
    # arguments=[],                 # No arguments needed for the current script
    inputs=[
        # This assumes your 'dataset.csv' is uploaded to an S3 location
        # and SageMaker will mount it to /opt/ml/processing/input/dataset.csv
        # The script expects 'input.csv', so you might need to rename or adjust.
        # If dataset.csv is the only file, it will be mounted as /opt/ml/processing/input/dataset.csv
        # If your script expects 'input.csv', you might need to adjust the source or script.
        # For consistency with the script, consider naming your S3 input file 'input.csv'
        # and setting the source to its S3 URI.
        ProcessingInput(
            source="s3://fady-my-bucket/sagemaker/input/data.csv", # Example S3 path
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        # The preprocessing_script currently outputs 'preprocessed_data.csv'
        # to /opt/ml/processing/output/.
        # If you want separate train/validation/test outputs, your script
        # would need to be modified to create those files.
        # For now, it will save preprocessed_data.csv to the base output path.
        ProcessingOutput(
            source="/opt/ml/processing/output", # This is where your script writes 'preprocessed_data.csv'
            destination="s3://fady-my-bucket/sagemaker/output/" # S3 path for all output
        )
    ]
)


Get the Processing job logs and retrieve the job name.

You Get an ERROR, what is this error? where I can see the logs?

> Please check in cloudwatch for logs and fix the issue

In [17]:
output()

................[34mInput file not found, creating dummy DataFrame for local testing.[0m
[34mDummy data saved to input.csv[0m
[34mSuccessfully loaded data from input.csv[0m
[34mStarting data preprocessing...[0m
[34mExtracted domain from email and removed original email column.[0m
[34mApplied one-hot encoding to 'country' column and removed original country column.[0m
[34mCreated 'is_male' column and removed original gender column.[0m
[34mFilled age nulls with mean average: 32.50[0m
[34mConverted 'age' into 4 different groups and removed original age column.[0m
[34mConverted 'created_at' into day, month, year columns and removed original 'created_at'.[0m
[34mData preprocessing complete.[0m
[34mPreprocessed data saved to preprocessed_data.csv[0m
[34mScript finished.[0m
[35mInput file not found, creating dummy DataFrame for local testing.[0m
[35mDummy data saved to input.csv[0m
[35mSuccessfully loaded data from input.csv[0m
[35mStarting data preprocessing.

INFO:sagemaker:Creating processing-job with name sagemaker-fady-1


Confirm that the output dataset files were written to S3.

## Conclusion

In this notebook, we read a dataset from S3 and processed it into train, test, and validation sets using a SageMaker Processing job. You can extend this example for preprocessing your own datasets in preparation for machine learning or other applications.