In [2]:
# %pip install -q awswrangler==3.* sagemaker==2.* boto3==1.* pyarrow==15.* scikit-learn==1.4.*

import os, json, math, boto3, awswrangler as wr, numpy as np, pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import (
    mean_squared_error, mean_absolute_error, r2_score,
    accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
)
import sagemaker
from sagemaker import image_uris

region = "us-east-1"
sess = sagemaker.Session()
sm_client = boto3.client("sagemaker", region_name=region)
s3 = boto3.client("s3", region_name=region)
athena = boto3.client("athena", region_name=region)

role = sagemaker.get_execution_role()

ATHENA_WORKGROUP = "taxi-wg"
ATHENA_DB = "nyc_taxi"
ATHENA_OUTPUT = "s3://genai-taxi-athena-results-poc/athena/"
S3_BUCKET = "genai-taxi-curated-poc"
ARTIFACTS_PREFIX = "sagemaker/xgb-taxi"

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/sagemaker-user/.config/sagemaker/config.yaml


In [None]:
sql = """
SELECT
  CAST(date_format(tpep_pickup_datetime, '%H') AS integer) AS pickup_hour,
  date_format(tpep_pickup_datetime, '%a') AS dow,
  COALESCE(passenger_count, 0) AS passenger_count,
  trip_distance,
  COALESCE(payment_type, 0) AS payment_type,
  fare_amount,
  tip_amount
FROM nyc_taxi.yellow_curated y
WHERE y.year = 2024
  AND y.month BETWEEN 7 AND 12
  AND fare_amount IS NOT NULL
  AND tip_amount IS NOT NULL
  AND trip_distance IS NOT NULL
"""

df = wr.athena.read_sql_query(
    sql=sql + " LIMIT 500000",
    database=ATHENA_DB,
    workgroup=ATHENA_WORKGROUP,
    s3_output=ATHENA_OUTPUT,
    ctas_approach=True
)


In [12]:
df.shape
df.head

<bound method NDFrame.head of         pickup_hour  dow  passenger_count  trip_distance  payment_type  \
0                 8  Sat                1           4.89             1   
1                 8  Sat                1           1.72             1   
2                 8  Sat                2           2.48             2   
3                 8  Sat                1           2.47             1   
4                 8  Sat                1           0.90             1   
...             ...  ...              ...            ...           ...   
499995           20  Thu                3           2.30             1   
499996           20  Thu                1           0.00             1   
499997           20  Thu                2          12.09             1   
499998           20  Thu                1           3.63             1   
499999           20  Thu                1           2.80             1   

        fare_amount  tip_amount  
0              24.7        5.74  
1            

In [13]:
import numpy as np, pandas as pd
from sklearn.model_selection import train_test_split

# Map day-of-week to 0..6
dow_map = {"Mon":0,"Tue":1,"Wed":2,"Thu":3,"Fri":4,"Sat":5,"Sun":6}
df["dow_idx"] = df["dow"].map(dow_map).fillna(0).astype(int)

# Light outlier caps (keeps training stable)
df = df[(df["fare_amount"]>=0)&(df["fare_amount"]<=200)
        &(df["trip_distance"]>=0)&(df["trip_distance"]<=50)
        &(df["tip_amount"]>=0)&(df["tip_amount"]<=100)]

FEATURES = ["pickup_hour","dow_idx","passenger_count","trip_distance","payment_type","fare_amount"]
X = df[FEATURES].astype(float)
y_reg = df["tip_amount"].astype(float)           # regression target
y_cls = (df["tip_amount"] > 0).astype(int)       # classification label

# Split once, reuse for both tasks (70/15/15)
X_train, X_temp, y_reg_train, y_reg_temp, y_cls_train, y_cls_temp = train_test_split(
    X, y_reg, y_cls, test_size=0.30, random_state=42
)
X_val, X_test, y_reg_val, y_reg_test, y_cls_val, y_cls_test = train_test_split(
    X_temp, y_reg_temp, y_cls_temp, test_size=0.50, random_state=42
)

len(X_train), len(X_val), len(X_test)

(343194, 73542, 73542)

In [14]:
import boto3
s3 = boto3.client("s3")
S3_BUCKET = "genai-taxi-curated-poc"
ARTIFACTS_PREFIX = "sagemaker/xgb-taxi"

def to_xgb_csv(X, y):
    arr = np.concatenate([y.values.reshape(-1,1), X.values], axis=1)
    return "\n".join([",".join(map(str,row)) for row in arr])

paths = {}
# Regression
for split, Xs, ys in [("train",X_train,y_reg_train), ("val",X_val,y_reg_val), ("test",X_test,y_reg_test)]:
    key = f"{ARTIFACTS_PREFIX}/regression/{split}.csv"
    s3.put_object(Bucket=S3_BUCKET, Key=key, Body=to_xgb_csv(Xs, ys).encode())
    paths.setdefault("reg", {})[split] = f"s3://{S3_BUCKET}/{key}"

# Classification
for split, Xs, ys in [("train",X_train,y_cls_train), ("val",X_val,y_cls_val), ("test",X_test,y_cls_test)]:
    key = f"{ARTIFACTS_PREFIX}/classification/{split}.csv"
    s3.put_object(Bucket=S3_BUCKET, Key=key, Body=to_xgb_csv(Xs, ys).encode())
    paths.setdefault("cls", {})[split] = f"s3://{S3_BUCKET}/{key}"

paths

{'reg': {'train': 's3://genai-taxi-curated-poc/sagemaker/xgb-taxi/regression/train.csv',
  'val': 's3://genai-taxi-curated-poc/sagemaker/xgb-taxi/regression/val.csv',
  'test': 's3://genai-taxi-curated-poc/sagemaker/xgb-taxi/regression/test.csv'},
 'cls': {'train': 's3://genai-taxi-curated-poc/sagemaker/xgb-taxi/classification/train.csv',
  'val': 's3://genai-taxi-curated-poc/sagemaker/xgb-taxi/classification/val.csv',
  'test': 's3://genai-taxi-curated-poc/sagemaker/xgb-taxi/classification/test.csv'}}

In [16]:
import sagemaker
from sagemaker import image_uris
from sagemaker.inputs import TrainingInput

sess = sagemaker.Session()
role = sagemaker.get_execution_role()
region = sess.boto_region_name
container = image_uris.retrieve(framework="xgboost", region=region, version="1.7-1")

# Regression
reg_est = sagemaker.estimator.Estimator(
    image_uri=container, role=role, instance_count=1, instance_type="ml.m5.large",
    output_path=f"s3://{S3_BUCKET}/{ARTIFACTS_PREFIX}/regression/output", sagemaker_session=sess
)
reg_est.set_hyperparameters(
    objective="reg:squarederror", num_round=1000, max_depth=6, eta=0.1,
    subsample=0.8, colsample_bytree=0.8, eval_metric="rmse", early_stopping_rounds=25
)
reg_est.fit(
    {
        "train": TrainingInput(
            s3_data=paths["reg"]["train"],
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=paths["reg"]["val"],
            content_type="text/csv"
        ),
    },
    logs=True,
)
# Classification
cls_est = sagemaker.estimator.Estimator(
    image_uri=container, role=role, instance_count=1, instance_type="ml.m5.large",
    output_path=f"s3://{S3_BUCKET}/{ARTIFACTS_PREFIX}/classification/output", sagemaker_session=sess
)
cls_est.set_hyperparameters(
    objective="binary:logistic", num_round=1000, max_depth=6, eta=0.1,
    subsample=0.8, colsample_bytree=0.8, eval_metric="auc", early_stopping_rounds=25
)
cls_est.fit(
    {
        "train": TrainingInput(
            s3_data=paths["cls"]["train"],
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=paths["cls"]["val"],
            content_type="text/csv"
        ),
    },
    logs=True,
)

INFO:sagemaker.image_uris:Ignoring unnecessary instance type: None.
INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.
INFO:sagemaker:Creating training-job with name: sagemaker-xgboost-2025-08-08-21-26-56-997


2025-08-08 21:26:58 Starting - Starting the training job...
2025-08-08 21:27:14 Starting - Preparing the instances for training...
2025-08-08 21:27:34 Downloading - Downloading input data...
2025-08-08 21:28:20 Downloading - Downloading the training image......
  import pkg_resources[0m
[34m[2025-08-08 21:29:29.594 ip-10-0-104-94.ec2.internal:7 INFO utils.py:28] RULE_JOB_STOP_SIGNAL_FILENAME: None[0m
[34m[2025-08-08 21:29:29.634 ip-10-0-104-94.ec2.internal:7 INFO profiler_config_parser.py:111] User has disabled profiler.[0m
[34m[2025-08-08:21:29:30:INFO] Imported framework sagemaker_xgboost_container.training[0m
[34m[2025-08-08:21:29:30:INFO] Failed to parse hyperparameter eval_metric value rmse to Json.[0m
[34mReturning the value itself[0m
[34m[2025-08-08:21:29:30:INFO] Failed to parse hyperparameter objective value reg:squarederror to Json.[0m
[34mReturning the value itself[0m
[34m[2025-08-08:21:29:30:INFO] No GPUs detected (normal if no gpus installed)[0m
[34m[2025

INFO:sagemaker.telemetry.telemetry_logging:SageMaker Python SDK will collect telemetry to help us better understand our user's needs, diagnose issues, and deliver additional features.
To opt out of telemetry, please disable via TelemetryOptOut parameter in SDK defaults config. For more information, refer to https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk.
INFO:sagemaker:Creating training-job with name: sagemaker-xgboost-2025-08-08-21-30-44-474


Training seconds: 160
Billable seconds: 160
2025-08-08 21:30:45 Starting - Starting the training job...
2025-08-08 21:30:59 Starting - Preparing the instances for training...
2025-08-08 21:31:22 Downloading - Downloading input data...
2025-08-08 21:32:08 Downloading - Downloading the training image......
  import pkg_resources[0m
[34m[2025-08-08 21:33:12.308 ip-10-0-106-105.ec2.internal:7 INFO utils.py:28] RULE_JOB_STOP_SIGNAL_FILENAME: None[0m
[34m[2025-08-08 21:33:12.338 ip-10-0-106-105.ec2.internal:7 INFO profiler_config_parser.py:111] User has disabled profiler.[0m
[34m[2025-08-08:21:33:12:INFO] Imported framework sagemaker_xgboost_container.training[0m
[34m[2025-08-08:21:33:12:INFO] Failed to parse hyperparameter eval_metric value auc to Json.[0m
[34mReturning the value itself[0m
[34m[2025-08-08:21:33:12:INFO] Failed to parse hyperparameter objective value binary:logistic to Json.[0m
[34mReturning the value itself[0m
[34m[2025-08-08:21:33:12:INFO] No GPUs detected 

In [17]:
from sagemaker.deserializers import JSONDeserializer
from sagemaker.serializers import CSVSerializer

reg_ep = "xgb-taxi-reg-poc"
cls_ep = "xgb-taxi-cls-poc"

reg_pred = reg_est.deploy(1, "ml.m5.large", endpoint_name=reg_ep)
cls_pred = cls_est.deploy(1, "ml.m5.large", endpoint_name=cls_ep)
for p in (reg_pred, cls_pred):
    p.serializer = CSVSerializer()
    p.deserializer = JSONDeserializer()

INFO:sagemaker:Creating model with name: sagemaker-xgboost-2025-08-08-21-36-32-111
INFO:sagemaker:Creating endpoint-config with name xgb-taxi-reg-poc
INFO:sagemaker:Creating endpoint with name xgb-taxi-reg-poc


------!

INFO:sagemaker:Creating model with name: sagemaker-xgboost-2025-08-08-21-40-04-320
INFO:sagemaker:Creating endpoint-config with name xgb-taxi-cls-poc
INFO:sagemaker:Creating endpoint with name xgb-taxi-cls-poc


------!

In [18]:
import math
from sklearn.metrics import (mean_squared_error, mean_absolute_error, r2_score,
                             accuracy_score, precision_score, recall_score, f1_score, roc_auc_score)

def predict_batches(predictor, X, bs=500):
    preds=[]
    for i in range(0, len(X), bs):
        payload = "\n".join([",".join(map(str,row)) for row in X.iloc[i:i+bs].values])
        out = predictor.predict(payload)
        preds.extend([r["score"] for r in out["predictions"]])
    return np.array(preds, float)

# Regression
y_reg_pred = predict_batches(reg_pred, X_test)
rmse = math.sqrt(mean_squared_error(y_reg_test, y_reg_pred))
mae  = mean_absolute_error(y_reg_test, y_reg_pred)
mape = float(np.mean(np.where(y_reg_test!=0, np.abs((y_reg_test-y_reg_pred)/y_reg_test), 0))) * 100.0
r2   = r2_score(y_reg_test, y_reg_pred)
baseline_rmse = math.sqrt(mean_squared_error(y_reg_test, np.full_like(y_reg_test.values, y_reg_train.mean())))
improve_pct = (1 - rmse / baseline_rmse) * 100

reg_metrics = {
    "n_test": int(len(y_reg_test)),
    "rmse": rmse, "mae": mae, "mape_pct": mape, "r2": r2,
    "baseline_rmse": baseline_rmse, "rmse_improvement_vs_baseline_pct": improve_pct
}

# Classification
y_cls_proba = predict_batches(cls_pred, X_test)
y_cls_pred  = (y_cls_proba >= 0.5).astype(int)
acc  = accuracy_score(y_cls_test, y_cls_pred)
prec = precision_score(y_cls_test, y_cls_pred, zero_division=0)
rec  = recall_score(y_cls_test, y_cls_pred, zero_division=0)
f1   = f1_score(y_cls_test, y_cls_pred, zero_division=0)
auc  = roc_auc_score(y_cls_test, y_cls_proba)

cls_metrics = {
    "n_test": int(len(y_cls_test)),
    "accuracy": acc, "precision": prec, "recall": rec, "f1": f1, "auc": auc
}

reg_metrics, cls_metrics

({'n_test': 73542,
  'rmse': 2.560447565535145,
  'mae': 1.297054596593154,
  'mape_pct': 79.31955552765936,
  'r2': 0.6541359333056419,
  'baseline_rmse': 4.3537700057090305,
  'rmse_improvement_vs_baseline_pct': 41.19010507726246},
 {'n_test': 73542,
  'accuracy': 0.9541758450953197,
  'precision': 0.9440852993031651,
  'recall': 0.9992890784679641,
  'f1': 0.9709031255396304,
  'auc': 0.9299152968185095})

In [19]:
import json
metrics = {"features_order": FEATURES, "regression": reg_metrics, "classification": cls_metrics}

metrics_key = f"{ARTIFACTS_PREFIX}/metrics/metrics.json"
s3.put_object(Bucket=S3_BUCKET, Key=metrics_key, Body=json.dumps(metrics, indent=2).encode())
print("Saved metrics:", f"s3://{S3_BUCKET}/{metrics_key}")

# Small peek files
sample_reg = X_test.head(10).copy()
sample_reg["actual_tip"] = y_reg_test.head(10).values
sample_reg["pred_tip"] = y_reg_pred[:10]
s3.put_object(Bucket=S3_BUCKET, Key=f"{ARTIFACTS_PREFIX}/predictions/reg_sample.csv",
              Body=sample_reg.to_csv(index=False).encode())

sample_cls = X_test.head(10).copy()
sample_cls["actual_is_tipped"] = y_cls_test.head(10).values
sample_cls["prob_is_tipped"]   = y_cls_proba[:10]
sample_cls["pred_is_tipped"]   = (y_cls_proba[:10] >= 0.5).astype(int)
s3.put_object(Bucket=S3_BUCKET, Key=f"{ARTIFACTS_PREFIX}/predictions/cls_sample.csv",
              Body=sample_cls.to_csv(index=False).encode())

# Clean up endpoints to stop costs
reg_pred.delete_endpoint()
cls_pred.delete_endpoint()

INFO:sagemaker:Deleting endpoint configuration with name: xgb-taxi-reg-poc


Saved metrics: s3://genai-taxi-curated-poc/sagemaker/xgb-taxi/metrics/metrics.json


INFO:sagemaker:Deleting endpoint with name: xgb-taxi-reg-poc
INFO:sagemaker:Deleting endpoint configuration with name: xgb-taxi-cls-poc
INFO:sagemaker:Deleting endpoint with name: xgb-taxi-cls-poc
