In [57]:
import boto3
import sagemaker
import pandas as pd
import numpy as np
from sagemaker import get_execution_role
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.inputs import TrainingInput
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.tensorflow import TensorFlow
from sagemaker.workflow.parameters import ParameterInteger, ParameterString

# Initialize SageMaker
session = sagemaker.Session()
role = get_execution_role()
region = boto3.Session().region_name

# Configuration
bucket = 'transactly-data-lake'
prefix = 'Fraud_data/raw'
s3_data_path = f's3://{bucket}/{prefix}/synthetic_fraud_dataset.csv'
s3_output_path = f's3://{bucket}/{prefix}/output'

# Define pipeline parameters
processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1)
processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.xlarge")
training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.2xlarge")
gpu_instance_type = ParameterString(name="GPUInstanceType", default_value="ml.c5.2xlarge")

# Step 1: Data Processing
processor = SKLearnProcessor(
    framework_version="1.2-1",
    role=role,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count,
    base_job_name="fraud-preprocessing",
    sagemaker_session=session,
)

process_step = ProcessingStep(
    name="PreprocessData",
    processor=processor,
    inputs=[
        ProcessingInput(
            source=s3_data_path,
            destination="/opt/ml/processing/input",
            s3_data_distribution_type="ShardedByS3Key"
        )
    ],
    outputs=[
        ProcessingOutput(
            output_name="xgb_data",
            source="/opt/ml/processing/output/xgb",
            destination=f"{s3_output_path}/xgb"
        ),
        ProcessingOutput(
            output_name="wd_data",
            source="/opt/ml/processing/output/wd",
            destination=f"{s3_output_path}/wd"
        ),
        ProcessingOutput(
            output_name="preprocessor",
            source="/opt/ml/processing/preprocessor",
            destination=f"{s3_output_path}/preprocessor"
        )
    ],
    code="preprocess.py",
    job_arguments=[
        "--input-data", "/opt/ml/processing/input",
        "--output-data", "/opt/ml/processing/output",
        "--preprocessor-output", "/opt/ml/processing/preprocessor"
    ]
)

# Step 2: XGBoost Model Training
xgb_estimator = XGBoost(
    entry_point="xgb_train.py",
    framework_version="1.7-1",
    instance_type=training_instance_type,
    instance_count=1,
    output_path=f"{s3_output_path}/models/xgb",
    role=role,
    base_job_name="xgb-training",
    sagemaker_session=session,
    hyperparameters={
        "max_depth": 5,
        "subsample": 0.8,
        "scale_pos_weight": 30,
        "objective": "binary:logistic",
        "eval_metric": "aucpr",
        "num_round": 300
    }
)

xgb_step = TrainingStep(
    name="TrainXGBoost",
    estimator=xgb_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=process_step.properties.ProcessingOutputConfig.Outputs["xgb_data"].S3Output.S3Uri,
            content_type="text/csv"
        )
    }
)


# Step 3: Wide & Deep Model Training
# from sagemaker.tensorflow import TensorFlow
# from sagemaker.inputs import TrainingInput
# from sagemaker.workflow.steps import TrainingStep

# tf_estimator = TensorFlow(
#     entry_point="wd_train.py",
#     role=role,
#     instance_count=1,
#     instance_type=gpu_instance_type,  # e.g., "ml.g4dn.xlarge"
#     framework_version="2.13",         # ✅ TF 2.13 supports Python 3.9
#     py_version="py310",                # ✅ Upgrade to Python 3.9
#     output_path=f"{s3_output_path}/models/wd",
#     sagemaker_session=session,
#     dependencies=["requirements.txt"]
# )

# wd_step = TrainingStep(
#     name="TrainWideDeep",
#     estimator=tf_estimator,
#     inputs={
#         "train": TrainingInput(
#             s3_data=process_step.properties.ProcessingOutputConfig.Outputs["wd_data"].S3Output.S3Uri,
#             content_type="text/csv"
#         ),
#         "xgb_model": TrainingInput(
#             s3_data=xgb_step.properties.ModelArtifacts.S3ModelArtifacts,
#             content_type="application/x-tar"
#         ),
#         "preprocessor": TrainingInput(
#             s3_data=process_step.properties.ProcessingOutputConfig.Outputs["preprocessor"].S3Output.S3Uri,
#             content_type="application/x-joblib"
#         )
#     }
# )

from sagemaker.pytorch import PyTorch
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

pytorch_estimator = PyTorch(
    entry_point="wd_train.py",
    role=role,
    instance_count=1,
    instance_type=gpu_instance_type,     # e.g., "ml.g4dn.xlarge"
    framework_version="1.13",            # Or the latest available
    py_version="py39",                   # Compatible with your local dev env
    output_path=f"{s3_output_path}/models/wd",
    sagemaker_session=session,
    dependencies=["requirements.txt"],
    hyperparameters={
        "epochs": 10,
        "batch_size": 32,
        "lr": 1e-3
    }
)

wd_step = TrainingStep(
    name="TrainWideDeep",
    estimator=pytorch_estimator,
    inputs={
        "train": TrainingInput(
            s3_data=process_step.properties.ProcessingOutputConfig.Outputs["wd_data"].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "xgb_model": TrainingInput(
            s3_data=xgb_step.properties.ModelArtifacts.S3ModelArtifacts,
            content_type="application/x-tar"
        ),
        "preprocessor": TrainingInput(
            s3_data=process_step.properties.ProcessingOutputConfig.Outputs["preprocessor"].S3Output.S3Uri,
            content_type="application/x-joblib"
        )
    }
)
# Define pipeline
pipeline = Pipeline(
    name="AmazonFraudDetectionPipeline",
    parameters=[
        processing_instance_count,
        processing_instance_type,
        training_instance_type,
        gpu_instance_type
    ],
    steps=[process_step, xgb_step, wd_step],
    sagemaker_session=session
    
)

# Upload and execute pipeline
pipeline.upsert(role_arn=role)
execution = pipeline.start()

# Wait for the pipeline to finish
execution.wait()

# Print pipeline execution status
print(execution.describe())

INFO:sagemaker.image_uris:Defaulting to only available Python version: py3
INFO:sagemaker.image_uris:Ignoring unnecessary Python version: py3.
INFO:sagemaker.image_uris:Ignoring unnecessary instance type: ml.m5.2xlarge.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.
INFO:sagemaker.image_uris:image_uri is not presented, retrieving image_uri based on instance_type, framework etc.


{'PipelineArn': 'arn:aws:sagemaker:eu-north-1:703761849224:pipeline/AmazonFraudDetectionPipeline', 'PipelineExecutionArn': 'arn:aws:sagemaker:eu-north-1:703761849224:pipeline/AmazonFraudDetectionPipeline/execution/tszv4knlzn8h', 'PipelineExecutionDisplayName': 'execution-1750282218907', 'PipelineExecutionStatus': 'Succeeded', 'PipelineExperimentConfig': {'ExperimentName': 'amazonfrauddetectionpipeline', 'TrialName': 'tszv4knlzn8h'}, 'CreationTime': datetime.datetime(2025, 6, 18, 21, 30, 18, 815000, tzinfo=tzlocal()), 'LastModifiedTime': datetime.datetime(2025, 6, 18, 21, 42, 26, 778000, tzinfo=tzlocal()), 'CreatedBy': {'UserProfileArn': 'arn:aws:sagemaker:eu-north-1:703761849224:user-profile/d-g807byqfsrje/default-1750009708586', 'UserProfileName': 'default-1750009708586', 'DomainId': 'd-g807byqfsrje', 'IamIdentity': {'Arn': 'arn:aws:sts::703761849224:assumed-role/AmazonSageMaker-ExecutionRole-20250615T225694/SageMaker', 'PrincipalId': 'AROA2HW3ZTOEOBDXBHKWK:SageMaker'}}, 'LastModified

In [33]:
# Get all steps and their execution status
for step in execution.list_steps():
    print(f"Step: {step['StepName']}, Status: {step['StepStatus']}")
    if 'FailureReason' in step:
        print(f"FailureReason: {step['FailureReason']}\n")

Step: TrainWideDeep, Status: Failed
FailureReason: ClientError: AlgorithmError: InstallRequirementsError:
ExitCode 1
ErrorMessage ""
Command "/usr/local/bin/python3.10 -m pip install -r requirements.txt", exit code: 1

Step: TrainXGBoost, Status: Succeeded
Step: PreprocessData, Status: Succeeded


In [51]:
execution.list_steps()


[{'StepName': 'TrainWideDeep',
  'StartTime': datetime.datetime(2025, 6, 18, 17, 35, 0, 228000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 6, 18, 17, 39, 38, 559000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:eu-north-1:703761849224:training-job/pipelines-59g49bgqtquu-TrainWideDeep-xga9kdsya7'}},
  'AttemptCount': 1},
 {'StepName': 'TrainXGBoost',
  'StartTime': datetime.datetime(2025, 6, 18, 17, 32, 36, 956000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 6, 18, 17, 34, 59, 833000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:eu-north-1:703761849224:training-job/pipelines-59g49bgqtquu-TrainXGBoost-IBVJnoJB52'}},
  'AttemptCount': 1},
 {'StepName': 'PreprocessData',
  'StartTime': datetime.datetime(2025, 6, 18, 17, 30, 3, 905000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 6, 18, 17, 32, 36, 199000, tzinfo=tzlocal()),
  'StepStatu

In [50]:
execution.list_steps()


[{'StepName': 'TrainWideDeep',
  'StartTime': datetime.datetime(2025, 6, 18, 17, 35, 0, 228000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 6, 18, 17, 39, 38, 559000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:eu-north-1:703761849224:training-job/pipelines-59g49bgqtquu-TrainWideDeep-xga9kdsya7'}},
  'AttemptCount': 1},
 {'StepName': 'TrainXGBoost',
  'StartTime': datetime.datetime(2025, 6, 18, 17, 32, 36, 956000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 6, 18, 17, 34, 59, 833000, tzinfo=tzlocal()),
  'StepStatus': 'Succeeded',
  'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:eu-north-1:703761849224:training-job/pipelines-59g49bgqtquu-TrainXGBoost-IBVJnoJB52'}},
  'AttemptCount': 1},
 {'StepName': 'PreprocessData',
  'StartTime': datetime.datetime(2025, 6, 18, 17, 30, 3, 905000, tzinfo=tzlocal()),
  'EndTime': datetime.datetime(2025, 6, 18, 17, 32, 36, 199000, tzinfo=tzlocal()),
  'StepStatu

In [29]:
!python --version


Python 3.12.9
