# Data Preprocessing Pipeline with SageMaker Local Mode

This notebook implements a data preprocessing pipeline for our tourism recommendation system using SageMaker local mode. We'll create a container that handles data validation, cleaning, and feature engineering.

## Data Source


[Cultural Tourism Dataset in Kaggle](https://www.kaggle.com/datasets/ziya07/cultural-tourism-dataset)

Please down the data as csv and save into data dir.

## Pipeline Overview
1. Data Validation
2. Data Cleaning
3. Feature Engineering
4. Data Splitting
5. Quality Checks

## Setup and Imports

In [1]:
import sagemaker
import boto3
import pandas as pd
import numpy as np
import json
from sagemaker.processing import ScriptProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Initialize SageMaker session
sagemaker_session = sagemaker.LocalSession()
boto_session = boto3.Session(region_name='ap-southeast-2')
role = 'arn:aws:iam::111111111111:role/service-role/AmazonSageMaker-ExecutionRole-20200101T000001'



sagemaker.config INFO - Not applying SDK defaults from location: C:\ProgramData\sagemaker\sagemaker\config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: C:\Users\hohoy\AppData\Local\sagemaker\sagemaker\config.yaml


## Load and Examine Data

In [2]:
# Load tourism data
tourism_data = pd.read_csv('data/tourism_dataset_5000.csv')

# Display basic information about the dataset
print("Dataset Info:")
tourism_data.info()

print("\nSample Data:")
print(tourism_data.head())

Dataset Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 14 columns):
 #   Column                   Non-Null Count  Dtype  
---  ------                   --------------  -----  
 0   Tourist ID               5000 non-null   int64  
 1   Age                      5000 non-null   int64  
 2   Interests                5000 non-null   object 
 3   Preferred Tour Duration  5000 non-null   int64  
 4   Accessibility            5000 non-null   bool   
 5   Site Name                5000 non-null   object 
 6   Sites Visited            5000 non-null   object 
 7   Tour Duration            5000 non-null   int64  
 8   Route ID                 5000 non-null   int64  
 9   Tourist Rating           5000 non-null   float64
 10  System Response Time     5000 non-null   float64
 11  Recommendation Accuracy  5000 non-null   int64  
 12  VR Experience Quality    5000 non-null   float64
 13  Satisfaction             5000 non-null   int64  
dtypes: bool(1)

In [3]:
tourism_data.head()

Unnamed: 0,Tourist ID,Age,Interests,Preferred Tour Duration,Accessibility,Site Name,Sites Visited,Tour Duration,Route ID,Tourist Rating,System Response Time,Recommendation Accuracy,VR Experience Quality,Satisfaction
0,1,48,"['Architecture', 'Art', 'History']",5,False,Eiffel Tower,"['Eiffel Tower', 'Great Wall of China', 'Taj M...",7,1000,1.6,3.73,97,4.5,3
1,2,37,"['Cultural', 'Nature']",6,False,Colosseum,['Great Wall of China'],1,2000,2.6,2.89,90,4.5,3
2,3,43,"['History', 'Art', 'Architecture']",6,True,Machu Picchu,['Eiffel Tower'],2,3000,1.7,2.22,94,4.7,3
3,4,46,"['Cultural', 'Art', 'Architecture']",8,False,Colosseum,"['Machu Picchu', 'Taj Mahal']",5,4000,2.0,2.34,92,4.7,3
4,5,53,"['Architecture', 'Art']",5,True,Colosseum,"['Machu Picchu', 'Taj Mahal', 'Great Wall of C...",7,5000,3.7,2.0,96,4.8,4


## Create Preprocessing Scripts

Let's create the necessary preprocessing scripts that will run in our container.

For those who are new to sagemaker, for each of the processing, eg pre-processing, training and inference, each of them will run in a container *OUTSIDE* this notebook. The notebook will starts the containers to process those tasks. And results in each stage, eg data and model artifact will be save in s3 or locally.

In [4]:

!mkdir preprocessing

�l�ؿ����ɮ� preprocessing �w�g�s�b�C


In [29]:
%%writefile preprocessing/preprocess.py

import pandas as pd
import numpy as np
import json
import os
import ast
from sklearn.model_selection import train_test_split

def validate_data(df):
    """Validate input data structure and types."""
    required_columns = [
        'Tourist ID', 'Age', 'Interests', 'Preferred Tour Duration',
        'Accessibility', 'Site Name', 'Sites Visited', 'Tour Duration',
        'Tourist Rating', 'Satisfaction'
    ]
    
    # Check for required columns
    missing_cols = [col for col in required_columns if col not in df.columns]
    if missing_cols:
        raise ValueError(f"Missing required columns: {missing_cols}")
    
    # Validate data types
    assert df['Tourist ID'].dtype in ['int64', 'int32'], "Tourist ID must be integer"
    assert df['Age'].dtype in ['int64', 'int32', 'float64'], "Age must be numeric"
    
    return True

def clean_data(df):
    """Clean and preprocess the data."""
    # Create a copy to avoid modifying the original
    df_clean = df.copy()
    
    # Convert string lists to actual lists
    df_clean['Interests'] = df_clean['Interests'].apply(ast.literal_eval)
    df_clean['Sites Visited'] = df_clean['Sites Visited'].apply(ast.literal_eval)
    
    # Handle missing values
    df_clean['Age'].fillna(df_clean['Age'].median(), inplace=True)
    df_clean['Tourist Rating'].fillna(df_clean['Tourist Rating'].mean(), inplace=True)
    
    # Remove duplicates
    df_clean.drop_duplicates(subset=['Tourist ID', 'Site Name'], inplace=True)
    
    return df_clean

def engineer_features(df):
    """Create new features for the model."""
    df_featured = df.copy()
    
    # Create interest count feature
    df_featured['Interest_Count'] = df_featured['Interests'].apply(len)
    
    # Create previous sites count
    df_featured['Previous_Sites_Count'] = df_featured['Sites Visited'].apply(len)
    
    # Calculate tour duration difference
    df_featured['Duration_Difference'] = \
        df_featured['Tour Duration'] - df_featured['Preferred Tour Duration']
    
    # One-hot encode interests
    all_interests = set()
    for interests in df_featured['Interests']:
        all_interests.update(interests)
    
    for interest in all_interests:
        df_featured[f'Interest_{interest}'] = \
            df_featured['Interests'].apply(lambda x: 1 if interest in x else 0)
    
    return df_featured

def main():
    # Read input data
    base_dir = "/opt/ml/processing"
    input_data_path =  f"{base_dir}/input/tourism_dataset_5000.csv"
    df = pd.read_csv(input_data_path)
    
    # Validate data
    validate_data(df)
    
    # Clean data
    df_clean = clean_data(df)
    
    # Engineer features
    df_featured = engineer_features(df_clean)
    
    # Split data
    train_data, test_data = train_test_split(
        df_featured, test_size=0.2, random_state=42
    )
    if not os.path.exists(f'{base_dir}/train/'):
        os.makedirs(f'{base_dir}/train')
    if not os.path.exists(f'{base_dir}/test/'):
        os.makedirs(f'{base_dir}/test')
    # Save processed datasets
    train_data.to_csv(f'{base_dir}/train/train.csv', header=False, index=False)
    test_data.to_csv(f'{base_dir}/test/test.csv', header=False, index=False)
    print('Saving Done')
if __name__ == "__main__":
    import os 
    path = "/opt/ml/processing/input"
    dirs = os.listdir( path )
    for file in dirs:
        print(file)
    main()

Overwriting preprocessing/preprocess.py


## Create Dockerfile for Preprocessing Container

In [30]:
%%writefile preprocessing/Dockerfile

FROM python:3.10-slim

# Install required packages
COPY requirements.txt /opt/ml/requirements.txt
RUN pip install -r /opt/ml/requirements.txt

# Copy preprocessing script
COPY preprocess.py /opt/ml/processing/preprocess.py

ENTRYPOINT ["python", "/opt/ml/processing/preprocess.py"]

Overwriting preprocessing/Dockerfile


In [31]:
%%writefile preprocessing/requirements.txt
pandas>=2.0.0
numpy>=1.24.0
scikit-learn>=1.3.0

Overwriting preprocessing/requirements.txt


In [32]:
import os
import sys
from pathlib import Path

def create_sagemaker_config():
    try:
        # Use user's home directory instead of C:\
        home_dir = Path.home()
        config_path = home_dir / '.sagemaker-code-config'
        
        # Create an empty config file
        config_path.touch(exist_ok=True)
        
        print(f"Successfully created SageMaker config file at: {config_path}")
        return True
    except Exception as e:
        print(f"Error creating config file: {str(e)}")
        print("\nAlternative options:")
        print("1. Run this script with administrative privileges")
        print("2. Specify a different directory with write permissions")
        return False

create_sagemaker_config()

Successfully created SageMaker config file at: C:\Users\hohoy\.sagemaker-code-config


True

## Build and Test Preprocessing Container Locally

In [46]:
# Build preprocessing container
!docker build -t tourism-preprocessing ./preprocessing/

from sagemaker.local import LocalSession
import os

# Create local output directories
os.makedirs('output/train', exist_ok=True)
os.makedirs('output/test', exist_ok=True)

sagemaker_session = LocalSession()
sagemaker_session.config = {'local': {'local_code': False}}

# Create processor
processor = ScriptProcessor(
    command=['python3'],
    image_uri='tourism-preprocessing:latest',
    role=role,
    instance_count=1,
    instance_type='local'
)


# Run preprocessing job
processor.run(
    code='preprocessing/preprocess.py',
    inputs=[
        ProcessingInput(
            source='data',
            destination="/opt/ml/processing/input"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name='train',
            source='/opt/ml/processing/train',
            destination='output/train'
        ),
        ProcessingOutput(
            output_name='test',
            source='/opt/ml/processing/test',
            destination='output/test'
        )
    ]
)

#0 building with "desktop-linux" instance using docker driver

#1 [internal] load build definition from Dockerfile
#1 transferring dockerfile: 333B done
#1 DONE 0.0s

#2 [internal] load metadata for docker.io/library/python:3.10-slim
#2 DONE 1.1s

#3 [internal] load .dockerignore
#3 transferring context: 2B done
#3 DONE 0.0s

#4 [1/4] FROM docker.io/library/python:3.10-slim@sha256:66aad90b231f011cb80e1966e03526a7175f0586724981969b23903abac19081
#4 DONE 0.0s

#5 [internal] load build context
#5 transferring context: 70B done
#5 DONE 0.0s

#6 [2/4] COPY requirements.txt /opt/ml/requirements.txt
#6 CACHED

#7 [3/4] RUN pip install -r /opt/ml/requirements.txt
#7 CACHED

#8 [4/4] COPY preprocess.py /opt/ml/processing/preprocess.py
#8 CACHED

#9 exporting to image
#9 exporting layers done
#9 writing image sha256:04929f2ebbc6ad53f4a70cb1b9d11440deeb30bfda99509dfdc2b55d926914b0 done
#9 naming to docker.io/library/tourism-preprocessing done
#9 DONE 0.0s

View build details: docker-desktop://das

 Container bxv04ux6ve-algo-1-3a417  Creating
 Container bxv04ux6ve-algo-1-3a417  Created
Attaching to bxv04ux6ve-algo-1-3a417
bxv04ux6ve-algo-1-3a417  | code
bxv04ux6ve-algo-1-3a417  | placeholder.py
bxv04ux6ve-algo-1-3a417  | tourism_dataset_5000.csv
bxv04ux6ve-algo-1-3a417  | The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.
bxv04ux6ve-algo-1-3a417  | 
bxv04ux6ve-algo-1-3a417  | For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.
bxv04ux6ve-algo-1-3a417  | 
bxv04ux6ve-algo-1-3a417  | 
bxv04ux6ve-algo-1-3a417  |   df_clean['Age'].fillna(df_clean['Age'].median(), inplace=True)
bxv04ux6ve-algo-1-3a417  | The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on w

## Verify Processed Data

In [45]:
# Load and examine processed data
train_data = pd.read_csv('output/train/train.csv')
test_data = pd.read_csv('output/test/test.csv')

print("Training data shape:", train_data.shape)
print("Test data shape:", test_data.shape)

print("\nFeatures in processed data:")
print(train_data.columns.tolist())

print("\nSample of processed data:")
print(train_data.head())

## Data Quality Checks

In [None]:
def run_quality_checks(train_df, test_df):
    """Run data quality checks on processed datasets."""
    # Check for missing values
    print("Missing values in training data:")
    print(train_df.isnull().sum())
    
    # Check feature distributions
    print("\nFeature statistics in training data:")
    print(train_df.describe())
    
    # Verify train/test split ratio
    total_records = len(train_df) + len(test_df)
    train_ratio = len(train_df) / total_records
    print(f"\nTrain/Test split ratio: {train_ratio:.2f}/{1-train_ratio:.2f}")

run_quality_checks(train_data, test_data)

## Save Pipeline Configuration

Save the preprocessing configuration for future reference and reproducibility.

In [None]:
pipeline_config = {
    'data_version': '1.0',
    'preprocessing_steps': [
        'data_validation',
        'data_cleaning',
        'feature_engineering',
        'train_test_split'
    ],
    'train_test_ratio': 0.8,
    'random_seed': 42,
    'output_features': train_data.columns.tolist()
}

with open('preprocessing_config.json', 'w') as f:
    json.dump(pipeline_config, f, indent=2)

print("Pipeline configuration saved to 'preprocessing_config.json'")

# Display saved configuration
with open('preprocessing_config.json', 'r') as f:
    print(json.dumps(json.load(f), indent=2))

## Performance Analysis

Let's analyze the processing performance and memory usage.

In [None]:
import time
import psutil
import matplotlib.pyplot as plt

def measure_performance(func, *args):
    """Measure execution time and memory usage of a function."""
    # Get initial memory usage
    process = psutil.Process()
    initial_memory = process.memory_info().rss / 1024 / 1024  # MB
    
    # Time execution
    start_time = time.time()
    result = func(*args)
    execution_time = time.time() - start_time
    
    # Get final memory usage
    final_memory = process.memory_info().rss / 1024 / 1024  # MB
    memory_used = final_memory - initial_memory
    
    return {
        'execution_time': execution_time,
        'memory_used': memory_used,
        'result': result
    }

# Measure performance of each preprocessing step
df = pd.read_json('data/tourism_data.json')

performance_metrics = {
    'Data Validation': measure_performance(validate_data, df),
    'Data Cleaning': measure_performance(clean_data, df),
    'Feature Engineering': measure_performance(engineer_features, clean_data(df))
}

# Plot performance metrics
plt.figure(figsize=(12, 6))

# Execution time plot
plt.subplot(1, 2, 1)
times = [metrics['execution_time'] for metrics in performance_metrics.values()]
plt.bar(performance_metrics.keys(), times)
plt.title('Execution Time by Processing Step')
plt.ylabel('Time (seconds)')
plt.xticks(rotation=45)

# Memory usage plot
plt.subplot(1, 2, 2)
memory = [metrics['memory_used'] for metrics in performance_metrics.values()]
plt.bar(performance_metrics.keys(), memory)
plt.title('Memory Usage by Processing Step')
plt.ylabel('Memory (MB)')
plt.xticks(rotation=45)

plt.tight_layout()
plt.show()

## Error Handling Implementation

Let's implement comprehensive error handling for the preprocessing pipeline.

In [None]:
class PreprocessingError(Exception):
    """Custom exception for preprocessing errors."""
    pass

def process_with_error_handling(input_data):
    """Run preprocessing pipeline with error handling."""
    try:
        # Validate input type
        if not isinstance(input_data, pd.DataFrame):
            raise PreprocessingError("Input must be a pandas DataFrame")
            
        # Data validation
        logger.info("Starting data validation...")
        try:
            validate_data(input_data)
        except AssertionError as e:
            raise PreprocessingError(f"Data validation failed: {str(e)}")
            
        # Data cleaning
        logger.info("Starting data cleaning...")
        try:
            cleaned_data = clean_data(input_data)
        except Exception as e:
            raise PreprocessingError(f"Data cleaning failed: {str(e)}")
            
        # Feature engineering
        logger.info("Starting feature engineering...")
        try:
            featured_data = engineer_features(cleaned_data)
        except Exception as e:
            raise PreprocessingError(f"Feature engineering failed: {str(e)}")
            
        # Data splitting
        logger.info("Splitting data...")
        try:
            train_data, test_data = train_test_split(
                featured_data, test_size=0.2, random_state=42
            )
        except Exception as e:
            raise PreprocessingError(f"Data splitting failed: {str(e)}")
            
        return train_data, test_data
        
    except PreprocessingError as e:
        logger.error(f"Preprocessing error: {str(e)}")
        raise
    except Exception as e:
        logger.error(f"Unexpected error: {str(e)}")
        raise PreprocessingError(f"Unexpected error during preprocessing: {str(e)}")

# Test error handling
try:
    # Test with valid data
    train_data, test_data = process_with_error_handling(pd.read_json('data/tourism_data.json'))
    print("Processing completed successfully")
    
    # Test with invalid data
    invalid_data = pd.DataFrame({'Invalid': [1, 2, 3]})
    train_data, test_data = process_with_error_handling(invalid_data)
except PreprocessingError as e:
    print(f"Caught preprocessing error: {str(e)}")

## MLflow Integration

Let's track our preprocessing pipeline with MLflow.

In [None]:
import mlflow
import mlflow.sklearn

# Set the MLflow experiment
mlflow.set_experiment("tourism-preprocessing")

# Start MLflow run
with mlflow.start_run(run_name="preprocessing-pipeline") as run:
    # Log preprocessing parameters
    mlflow.log_params({
        "train_test_ratio": 0.8,
        "random_seed": 42,
        "data_version": "1.0"
    })
    
    # Process data and measure performance
    start_time = time.time()
    train_data, test_data = process_with_error_handling(
        pd.read_json('data/tourism_data.json')
    )
    processing_time = time.time() - start_time
    
    # Log metrics
    mlflow.log_metrics({
        "processing_time": processing_time,
        "train_samples": len(train_data),
        "test_samples": len(test_data),
        "num_features": len(train_data.columns)
    })
    
    # Log preprocessing configuration
    mlflow.log_artifact("preprocessing_config.json")
    
    # Log sample data
    train_sample = train_data.head(100).to_json(orient='records')
    with open("train_sample.json", "w") as f:
        f.write(train_sample)
    mlflow.log_artifact("train_sample.json")

print(f"MLflow run: {run.info.run_id}")

## Save Processed Data

Finally, let's save our processed datasets for the next step in our pipeline.

In [None]:
# Create output directory if it doesn't exist
import os
os.makedirs('processed_data', exist_ok=True)

# Save processed datasets
train_data.to_json('processed_data/train.json')
test_data.to_json('processed_data/test.json')

# Save feature names
feature_names = {
    'features': train_data.columns.tolist(),
    'categorical_features': [
        col for col in train_data.columns 
        if col.startswith('Interest_')
    ],
    'numerical_features': [
        'Age', 'Tour Duration', 'Tourist Rating',
        'Interest_Count', 'Previous_Sites_Count',
        'Duration_Difference'
    ]
}

with open('processed_data/feature_names.json', 'w') as f:
    json.dump(feature_names, f, indent=2)

print("Processed data saved to 'processed_data' directory")
print(f"Number of training samples: {len(train_data)}")
print(f"Number of test samples: {len(test_data)}")
print(f"Number of features: {len(feature_names['features'])}")

## Next Steps

1. Proceed to Lesson 3: NCF Model Development
2. Use the processed data in the `processed_data` directory
3. Reference the feature configuration in `feature_names.json`

The preprocessing pipeline is now complete and ready for model development.