## Run Workflow using Step Decorators

The code and notebook in this directory shows how we can create a complete pipeline with step decorators (see `pipeline.py`).
Each step of the pipeline is shown under the same run in MLflow.

Let's first install the dependencies required to run this code locally

In [1]:
%pip install -r requirements.txt

Collecting sagemaker==2.219.0 (from -r requirements.txt (line 1))
  Downloading sagemaker-2.219.0-py3-none-any.whl.metadata (14 kB)
Collecting scikit-learn==1.3.2 (from -r requirements.txt (line 2))
  Downloading scikit_learn-1.3.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (11 kB)
Collecting xgboost==1.7.6 (from -r requirements.txt (line 3))
  Downloading xgboost-1.7.6-py3-none-manylinux2014_x86_64.whl.metadata (1.9 kB)
Collecting joblib==1.5.1 (from -r requirements.txt (line 4))
  Downloading joblib-1.5.1-py3-none-any.whl.metadata (5.6 kB)
Collecting mlflow==2.17.0 (from -r requirements.txt (line 6))
  Downloading mlflow-2.17.0-py3-none-any.whl.metadata (29 kB)
Collecting cloudpickle==2.2.1 (from sagemaker==2.219.0->-r requirements.txt (line 1))
  Downloading cloudpickle-2.2.1-py3-none-any.whl.metadata (6.9 kB)
Collecting protobuf<5.0,>=3.12 (from sagemaker==2.219.0->-r requirements.txt (line 1))
  Downloading protobuf-4.25.8-cp37-abi3-manylinux2014_x86_64.wh

Lets restore the variables from the `00-start-here` notebook

In [2]:
%pip install joblib==1.5.3
%pip install xgboost==3.1.3

Collecting joblib==1.5.3
  Downloading joblib-1.5.3-py3-none-any.whl.metadata (5.5 kB)
Downloading joblib-1.5.3-py3-none-any.whl (309 kB)
Installing collected packages: joblib
  Attempting uninstall: joblib
    Found existing installation: joblib 1.5.1
    Uninstalling joblib-1.5.1:
      Successfully uninstalled joblib-1.5.1
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
autogluon-multimodal 1.4.0 requires nvidia-ml-py3<8.0,>=7.352.0, which is not installed.
autogluon-core 1.4.0 requires scikit-learn<1.8.0,>=1.4.0, but you have scikit-learn 1.3.2 which is incompatible.
autogluon-features 1.4.0 requires scikit-learn<1.8.0,>=1.4.0, but you have scikit-learn 1.3.2 which is incompatible.
autogluon-multimodal 1.4.0 requires scikit-learn<1.8.0,>=1.4.0, but you have scikit-learn 1.3.2 which is incompatible.
autogluon-multimodal 1.4.0 requires transformers[sente

In [3]:
import sys
import importlib

packages = [
    "sagemaker",
    "boto3",
    "mlflow",
    "xgboost",
    "numpy",
    "pandas",
    "sklearn",
    "scipy",
    "joblib",
    "sagemaker-mlflow",
    "s3fs",
]

print(f"Python: {sys.version}")

for pkg in packages:
    try:
        module_name = pkg.replace("-", "_")
        mod = importlib.import_module(module_name)
        version = getattr(mod, "__version__", "unknown")
        print(f"{pkg}: {version}")
    except Exception as e:
        print(f"{pkg}: not importable ({e})")


Python: 3.12.9 | packaged by conda-forge | (main, Feb 14 2025, 08:00:06) [GCC 13.3.0]
sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
sagemaker: 2.219.0
boto3: 1.37.3
mlflow: 2.17.0
xgboost: 3.1.3
numpy: 1.26.4
pandas: 2.3.3
sklearn: 1.3.2
scipy: 1.16.3
joblib: 1.5.3
sagemaker-mlflow: 0.2.0
s3fs: 2024.12.0


In [4]:
%store -r 

%store

try:
    initialized
except NameError:    
    print("[ERROR] YOU HAVE TO RUN 00-start-here notebook   ")

Stored variables and their in-db values:
bucket_prefix              -> 'sagemaker-us-east-1-632458605453/flights'
domain_id                  -> 'd-cyjixw7xaezf'
initialized                -> True
mlflow_arn                 -> 'arn:aws:sagemaker:us-east-1:632458605453:mlflow-t
mlflow_name                -> 'mlflow-d-cyjixw7xaezf'
project_prefix             -> 'flights'
region                     -> 'us-east-1'


Lets create a config which will be used by default for each step. 

Note that we define the `S3RootUri` to customize the S3 location that will be used for the artifacts

In [20]:
config_yaml = f"""
SchemaVersion: '1.0'
SageMaker:
  PythonSDK:
    Modules:
      RemoteFunction:
        S3RootUri: s3://{bucket_prefix}
        InstanceType: ml.m5.xlarge
        Dependencies: /home/sagemaker-user/flights_fare_timing_ml/workflow/requirements.txt
        IncludeLocalWorkDir: true
        PreExecutionCommands:
          - "conda install -y -c conda-forge libstdcxx-ng libgcc-ng"
          - "conda remove -y xgboost"
          - "pip install --force-reinstall xgboost==1.7.6"
          - "sudo bash -c 'echo /opt/conda/lib > /etc/ld.so.conf.d/conda.conf'"
          - "sudo ldconfig"
          - "sudo chmod -R 777 /opt/ml/model"
        CustomFileFilter:
          IgnoreNamePatterns:
            - "data/*"
            - "models/*"
            - "*.ipynb"
            - "__pycache__"

"""
print(config_yaml, file=open('config.yaml', 'w'))
print(config_yaml)



SchemaVersion: '1.0'
SageMaker:
  PythonSDK:
    Modules:
      RemoteFunction:
        S3RootUri: s3://sagemaker-us-east-1-632458605453/flights
        InstanceType: ml.m5.xlarge
        Dependencies: /home/sagemaker-user/flights_fare_timing_ml/workflow/requirements.txt
        IncludeLocalWorkDir: true
        PreExecutionCommands:
          - "conda install -y -c conda-forge libstdcxx-ng libgcc-ng"
          - "conda remove -y xgboost"
          - "pip install --force-reinstall xgboost==1.7.6"
          - "sudo bash -c 'echo /opt/conda/lib > /etc/ld.so.conf.d/conda.conf'"
          - "sudo ldconfig"
          - "sudo chmod -R 777 /opt/ml/model"
        CustomFileFilter:
          IgnoreNamePatterns:
            - "data/*"
            - "models/*"
            - "*.ipynb"
            - "__pycache__"




In [21]:
import os
os.environ["MLFLOW_TRACKING_ARN"] = mlflow_arn
os.environ["PROJECT_PREFIX"] = project_prefix
os.environ["BUCKET_PREFIX"] = bucket_prefix
os.environ["INPUT_DATA_S3_URI"] = f"s3://{bucket_prefix}/data/flight_fares.csv"
os.environ["OUTPUT_DATA_S3_URI"] = f"s3://{bucket_prefix}/processed"
!python pipeline.py

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml
sagemaker.config INFO - Fetched defaults config from location: /home/sagemaker-user/flights_fare_timing_ml/workflow
INFO:sagemaker.image_uris:Defaulting to only available Python version: py3


In [None]:
# 배포 후 추론 테스트 코드

import boto3
import numpy as np
import io

endpoint_name = "flights-endpoint-1768191960-25f5"
runtime = boto3.client("sagemaker-runtime")

payload = "0,afternoon,1,0,1,3091900015,1,long"

response = runtime.invoke_endpoint(
    EndpointName=endpoint_name,
    ContentType="application/octet-stream",
    Body=payload.encode("utf-8"),
)

raw = response["Body"].read()
preds = np.load(io.BytesIO(raw))

print(preds)
print(np.expm1(0.68778))

In [2]:
import joblib
import numpy as np
import pandas as pd
import xgboost as xgb

FEATURIZER_PATH = "../sklearn_model.joblib"
BOOSTER_PATH = "../xgboost_model.bin"

feature_order = [
    "purchase_day_of_week",
    "purchase_time_bucket",
    "days_until_departure",
    "is_weekend_departure",
    "is_holiday_season",
    "route_hash",
    "stops_count",
    "flight_duration_bucket",
]

row = {
    "purchase_day_of_week": 5,
    "purchase_time_bucket": "dawn",
    "days_until_departure": 26,
    "is_weekend_departure": 0,
    "is_holiday_season": 1,
    "route_hash": 3091900015,
    "stops_count": 1,
    "flight_duration_bucket": "long",
}

featurizer = joblib.load(FEATURIZER_PATH)
booster = xgb.Booster()
booster.load_model(BOOSTER_PATH)

df = pd.DataFrame([row], columns=feature_order)
transformed = featurizer.transform(df)

pred_log = booster.predict(xgb.DMatrix(transformed))[0]
print("pred_log:", pred_log)
print("pred_expm1:", np.expm1(pred_log))


pred_log: 9.993353
pred_expm1: 21879.54


  booster.load_model(BOOSTER_PATH)


In [None]:
import io
import joblib
import numpy as np
import pandas as pd
import xgboost as xgb

FEATURIZER_PATH = "/home/sagemaker-user/flights_fare_timing_ml/serve_extracted/sklearn_model.joblib"
BOOSTER_PATH = "/home/sagemaker-user/flights_fare_timing_ml/serve_extracted2/xgboost_model.bin"

# Lambda에 보낸 raw CSV (그대로)
RAW_PAYLOAD = "0,afternoon,1,0,1,3091900015,1,long"

# Lambda에서 받은 log-scale 예측값(예: 0.7937496)
LAMBDA_PRED_LOG = 0.7937496

FEATURE_ORDER = [
    "purchase_day_of_week",
    "purchase_time_bucket",
    "days_until_departure_bucket",
    "is_weekend_departure",
    "is_holiday_season",
    "route_hash",
    "stops_count",
    "flight_duration_bucket",
]

NUMERIC_FIELDS = {
    "purchase_day_of_week",
    "days_until_departure_bucket",
    "is_weekend_departure",
    "is_holiday_season",
    "route_hash",
    "stops_count",
}

def payload_to_df(payload: str) -> pd.DataFrame:
    parts = [p.strip() for p in payload.split(",")]
    if len(parts) != len(FEATURE_ORDER):
        raise ValueError(f"Expected {len(FEATURE_ORDER)} fields, got {len(parts)}: {parts}")
    row = {}
    for name, value in zip(FEATURE_ORDER, parts):
        if name in NUMERIC_FIELDS:
            try:
                row[name] = int(value)
            except ValueError:
                row[name] = float(value)
        else:
            row[name] = value
    return pd.DataFrame([row], columns=FEATURE_ORDER)

featurizer = joblib.load(FEATURIZER_PATH)
booster = xgb.Booster()
booster.load_model(BOOSTER_PATH)

df = payload_to_df(RAW_PAYLOAD)
transformed = featurizer.transform(df)

pred_log_local = booster.predict(xgb.DMatrix(transformed))[0]
pred_expm1_local = np.expm1(pred_log_local)

print("LOCAL preds_log:", pred_log_local)
print("LOCAL preds_expm1:", pred_expm1_local)
print("LAMBDA preds_log:", LAMBDA_PRED_LOG)
print("LAMBDA preds_expm1:", np.expm1(LAMBDA_PRED_LOG))
print("log diff:", pred_log_local - LAMBDA_PRED_LOG)


KeyError: "['days_until_departure'] not in index"

In [4]:
import io
import joblib
import numpy as np
import pandas as pd
import xgboost as xgb

FEATURIZER_PATH = "/home/sagemaker-user/flights_fare_timing_ml/serve_extracted/sklearn_model.joblib"
BOOSTER_PATH = "/home/sagemaker-user/flights_fare_timing_ml/serve_extracted2/xgboost_model.bin"

feature_order = [
    "purchase_day_of_week",
    "purchase_time_bucket",
    "days_until_departure_bucket",
    "is_weekend_departure",
    "is_holiday_season",
    "route_hash",
    "stops_count",
    "flight_duration_bucket",
]

# 샘플 입력 (필요시 값 변경)
row = {
    "purchase_day_of_week": 0,
    "purchase_time_bucket": "afternoon",
    "days_until_departure_bucket": 1,
    "is_weekend_departure": 0,
    "is_holiday_season": 1,
    "route_hash": 3091900015,
    "stops_count": 1,
    "flight_duration_bucket": "long",
}

featurizer = joblib.load(FEATURIZER_PATH)
booster = xgb.Booster()
booster.load_model(BOOSTER_PATH)

df = pd.DataFrame([row], columns=feature_order)

# 1) featurizer -> ndarray
transformed = featurizer.transform(df)
print("transformed shape:", transformed.shape)

# 2) NPY 직렬화/역직렬화
buf = io.BytesIO()
np.save(buf, transformed)
buf.seek(0)

roundtrip = np.load(buf)
print("roundtrip equal:", np.allclose(transformed, roundtrip))
print("roundtrip shape:", roundtrip.shape)

# 3) XGBoost 예측 비교
pred1 = booster.predict(xgb.DMatrix(transformed))
pred2 = booster.predict(xgb.DMatrix(roundtrip))
print("preds_log:", pred1, pred2)
print("preds_expm1:", np.expm1(pred1), np.expm1(pred2))


transformed shape: (1, 10)
roundtrip equal: True
roundtrip shape: (1, 10)
preds_log: [0.7937496] [0.7937496]
preds_expm1: [1.2116737] [1.2116737]


https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


In [14]:
import io
import joblib
import numpy as np
import pandas as pd
import xgboost as xgb

FEATURIZER_PATH = "/home/sagemaker-user/flights_fare_timing_ml/serve_extracted/sklearn_model.joblib"
BOOSTER_PATH = "/home/sagemaker-user/flights_fare_timing_ml/serve_extracted2/xgboost_model.bin"

RAW_PAYLOAD = "0,afternoon,1,0,1,3091900015,1,long"

FEATURE_ORDER = [
    "purchase_day_of_week",
    "purchase_time_bucket",
    "days_until_departure_bucket",
    "is_weekend_departure",
    "is_holiday_season",
    "route_hash",
    "stops_count",
    "flight_duration_bucket",
]

NUMERIC_FIELDS = {
    "purchase_day_of_week",
    "days_until_departure_bucket",
    "is_weekend_departure",
    "is_holiday_season",
    "route_hash",
    "stops_count",
}

def payload_to_df(payload: str) -> pd.DataFrame:
    parts = [p.strip() for p in payload.split(",")]
    if len(parts) != len(FEATURE_ORDER):
        raise ValueError(f"Expected {len(FEATURE_ORDER)} fields, got {len(parts)}: {parts}")
    row = {}
    for name, value in zip(FEATURE_ORDER, parts):
        if name in NUMERIC_FIELDS:
            try:
                row[name] = int(value)
            except ValueError:
                row[name] = float(value)
        else:
            row[name] = value
    return pd.DataFrame([row], columns=FEATURE_ORDER)

featurizer = joblib.load(FEATURIZER_PATH)
booster = xgb.Booster()
booster.load_model(BOOSTER_PATH)

df = payload_to_df(RAW_PAYLOAD)

# 1) featurizer 출력
transformed = featurizer.transform(df)
print("transformed shape:", transformed.shape)

# 2) NPY 직렬화 → 역직렬화 (서빙 흐름 모사)
buf = io.BytesIO()
np.save(buf, transformed)
buf.seek(0)
roundtrip = np.load(buf)

print("roundtrip equal:", np.allclose(transformed, roundtrip))
print("roundtrip shape:", roundtrip.shape)

# 3) XGBoost 입력 비교
pred_direct = booster.predict(xgb.DMatrix(transformed))[0]
pred_roundtrip = booster.predict(xgb.DMatrix(roundtrip))[0]

print("pred_log direct:", pred_direct)
print("pred_log roundtrip:", pred_roundtrip)
print("pred_expm1 direct:", np.expm1(pred_direct))


transformed shape: (1, 10)
roundtrip equal: True
roundtrip shape: (1, 10)
pred_log direct: 0.7937496
pred_log roundtrip: 0.7937496
pred_expm1 direct: 1.2116737


https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


In [2]:
import hashlib

source = "Dubai"
destination = "Kolkata"

route_str = f"{source}_{destination}"
print(int(hashlib.md5(route_str.encode()).hexdigest()[:8], 16))

4040905874
