In [16]:
import pandas as pd, pyarrow as pa, pyarrow.dataset as ds
print("pandas:", pd.__version__, "| pyarrow:", pa.__version__)

pandas: 2.2.3 | pyarrow: 19.0.1


In [17]:
# ====================== EDIT THESE ======================
S3_PARQUET       = "s3://nyc-taxi-poc-101199/raw/yellow_tripdata_2025-01.parquet"
S3_OUTPUT_PREFIX = "s3://nyc-taxi-poc-101199/derived/eta_poc/"
ROWS_TARGET      = 200_000   # how many rows to load into RAM (keep modest)
# =======================================================

import os, uuid
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds

from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error

import boto3, sagemaker
from sagemaker import image_uris
from sagemaker.inputs import TrainingInput

print("pandas:", pd.__version__, "| pyarrow:", pa.__version__)

# -------------------------
# 1) LOAD A BOUNDED CHUNK
# -------------------------
dataset = ds.dataset(S3_PARQUET, format="parquet")

need_cols = [
    "tpep_pickup_datetime", "tpep_dropoff_datetime",
    "trip_distance", "passenger_count",
    "payment_type", "ratecodeid", "vendorid",
    "pulocationid", "dolocationid",
    "total_amount",               # for cleaning
    "airport_fee", "cbd_congestion_fee"  # may be absent in some months
]
cols = [c for c in need_cols if c in dataset.schema.names]

scanner = dataset.scanner(columns=cols, batch_size=50_000)
batches, rows_loaded = [], 0
for batch in scanner.to_batches():
    batches.append(batch)
    rows_loaded += len(batch)
    if rows_loaded >= ROWS_TARGET:
        break

table = pa.Table.from_batches(batches)
df = table.to_pandas()
print(f"Loaded rows: {len(df):,}")

# -------------------------
# 2) FEATURE ENGINEERING
# -------------------------
# Timestamps
df["tpep_pickup_datetime"]  = pd.to_datetime(df["tpep_pickup_datetime"],  errors="coerce")
df["tpep_dropoff_datetime"] = pd.to_datetime(df["tpep_dropoff_datetime"], errors="coerce")
df = df.dropna(subset=["tpep_pickup_datetime","tpep_dropoff_datetime"])

# Label (duration in minutes)
df["duration_min"] = (df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).dt.total_seconds() / 60

# Clean: keep reasonable trips
df = df[
    (df["duration_min"].between(1, 120)) &
    (df["trip_distance"] >= 0) &
    (df["total_amount"]  >= 0)
].copy()

# Time features
df["hour"]    = df["tpep_pickup_datetime"].dt.hour
df["weekday"] = df["tpep_pickup_datetime"].dt.weekday

# Optional flags (safe even if columns missing)
df["is_airport"] = 0
if "airport_fee" in df.columns:
    df["is_airport"] = (df["airport_fee"].fillna(0) > 0).astype(int)

df["has_congestion_fee"] = 0
if "cbd_congestion_fee" in df.columns:
    df["has_congestion_fee"] = (df["cbd_congestion_fee"].fillna(0) > 0).astype(int)

# -------------------------
# 3) BUILD TRAINING FRAME
# -------------------------
feature_cols = [
    "trip_distance","hour","weekday","passenger_count",
    "payment_type","ratecodeid","vendorid","pulocationid","dolocationid",
    "is_airport","has_congestion_fee"
]
target_col = "duration_min"

# Fill any missing feature columns with safe numeric defaults
defaults = {
    "trip_distance": 0.0, "hour": 12, "weekday": 3, "passenger_count": 1.0,
    "payment_type": 1, "ratecodeid": 1, "vendorid": 1,
    "pulocationid": 0, "dolocationid": 0,
    "is_airport": 0, "has_congestion_fee": 0
}
for col in feature_cols:
    if col not in df.columns:
        df[col] = defaults[col]

# Final guardrail
df = df[
    (df["duration_min"].between(1, 120)) &
    (df["trip_distance"] >= 0)
].copy()

data = df[feature_cols + [target_col]].dropna().reset_index(drop=True)
print("Training frame shape:", data.shape)
display(data.head())

# -------------------------
# 4) TRAIN / VALIDATION SPLIT
# -------------------------
# XGBoost expects: label first, no header
cols_order = [target_col] + feature_cols
trainable  = data[cols_order]

# Keep it snappy
sample_size = min(len(trainable), 120_000)
sample = trainable.sample(sample_size, random_state=42)

train_df, val_df = train_test_split(sample, test_size=0.2, random_state=42)
train_path = "train.csv"
val_path   = "val.csv"
train_df.to_csv(train_path, index=False, header=False)
val_df.to_csv(val_path,   index=False, header=False)
print("Train/Val sizes:", len(train_df), len(val_df))

# -------------------------
# 5) MANAGED TRAINING JOB (BUILT-IN XGBOOST)
# -------------------------
sm_sess = sagemaker.Session()
role    = sagemaker.get_execution_role()
region  = sm_sess.boto_region_name
xgb_img = image_uris.retrieve("xgboost", region=region, version="1.7-1")

# Upload train/val to S3 under your prefix
def parse_s3(s3url):
    assert s3url.startswith("s3://")
    bkt_key = s3url[5:]
    bucket, _, key = bkt_key.partition("/")
    return bucket, key if key.endswith("/") else (key + "/" if key else "")

bucket, prefix = parse_s3(S3_OUTPUT_PREFIX)
train_s3_uri = sm_sess.upload_data(path=train_path, bucket=bucket, key_prefix=prefix + "train")
val_s3_uri   = sm_sess.upload_data(path=val_path,   bucket=bucket, key_prefix=prefix + "val")
print("Train:", train_s3_uri)
print("Val:  ", val_s3_uri)

# Use a small instance class (eligible for free-trial hours if your account qualifies)
estimator = sagemaker.estimator.Estimator(
    image_uri=xgb_img,
    role=role,
    instance_count=1,
    instance_type="ml.m5.xlarge",
    sagemaker_session=sm_sess,
    hyperparameters={
        "objective": "reg:squarederror",
        "eval_metric": "mae",
        "max_depth": 6,
        "eta": 0.2,
        "subsample": 0.8,
        "min_child_weight": 1,
        "colsample_bytree": 0.8,
        "num_round": 120,
        "verbosity": 1
    },
)

estimator.fit({
    "train": TrainingInput(train_s3_uri, content_type="text/csv"),
    "validation": TrainingInput(val_s3_uri, content_type="text/csv"),
})
print("Training complete (managed job).")

# === Local predictions from the trained model artifact (no Batch Transform) ===
import os, tarfile, boto3, numpy as np, pandas as pd

s3 = boto3.client("s3")
model_s3 = estimator.model_data  # s3://.../output/model.tar.gz
print("Model artifact:", model_s3)

def _parse_s3(url):
    rest = url[5:]; b, _, k = rest.partition("/")
    return b, k
bucket_model, key_model = _parse_s3(model_s3)

local_tar = "/tmp/model.tar.gz"
s3.download_file(bucket_model, key_model, local_tar)

extract_dir = "/tmp/xgb_model"
os.makedirs(extract_dir, exist_ok=True)
with tarfile.open(local_tar) as tar:
    tar.extractall(extract_dir)

def _find_model_file(root):
    picks = []
    for dp, _, files in os.walk(root):
        for f in files:
            if f in ("xgboost-model", "model.xgb") or f.endswith((".json", ".bin")):
                picks.append(os.path.join(dp, f))
    for f in picks:
        if os.path.basename(f) in ("xgboost-model", "model.xgb") or f.endswith(".bin"):
            return f
    return picks[0] if picks else None

model_path = _find_model_file(extract_dir)
assert model_path, f"No model file found under {extract_dir}"
print("Using model file:", model_path)

# Load booster and predict on validation set
try:
    import xgboost as xgb
except ImportError:
    import sys, subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "xgboost==1.7.6"])
    import xgboost as xgb

# val_df is label-first (duration_min then features) from your code above
y_true = val_df.iloc[:, 0].to_numpy()
X_val  = val_df.iloc[:, 1:].to_numpy()

dm = xgb.DMatrix(X_val)
booster = xgb.Booster()
booster.load_model(model_path)
y_pred = booster.predict(dm)

from sklearn.metrics import mean_absolute_error
mae_model = mean_absolute_error(y_true, y_pred)
print(f"XGBoost model MAE (min): {mae_model:0.3f}")

# Build a compact Tableau-friendly CSV with predictions and errors
feature_cols = [
    "trip_distance","hour","weekday","passenger_count",
    "payment_type","ratecodeid","vendorid","pulocationid","dolocationid",
    "is_airport","has_congestion_fee"
]
val_named = val_df.copy()
val_named.columns = ["duration_min"] + feature_cols

out = pd.DataFrame({
    "duration_min": y_true,
    "pred_eta_min": y_pred,
    "abs_err": np.abs(y_true - y_pred),
    "hour": val_named["hour"].to_numpy(),
    "trip_distance": val_named["trip_distance"].to_numpy(),
    "weekday": val_named["weekday"].to_numpy(),
    "passenger_count": val_named["passenger_count"].to_numpy(),
    "pulocationid": val_named["pulocationid"].to_numpy(),
    "dolocationid": val_named["dolocationid"].to_numpy(),
})

# Keep it light for Tableau; sample if huge
out_sample = out.sample(min(50_000, len(out)), random_state=7)
local_pred_csv = "eta_predictions_for_tableau.csv"
out_sample.to_csv(local_pred_csv, index=False)

# Upload alongside your derived outputs so you can download easily
def _parse_prefix(s3url):
    rest = s3url[5:]; b, _, k = rest.partition("/")
    if k and not k.endswith("/"): k += "/"
    return b, k

bucket_out, prefix_out = _parse_prefix(S3_OUTPUT_PREFIX)
dst_key = f"{prefix_out}eta_predictions_for_tableau.csv"
s3.upload_file(local_pred_csv, bucket_out, dst_key)

print("Tableau CSV written to:", f"s3://{bucket_out}/{dst_key}")
print("HTTP download (if public or via signed URL):", f"https://{bucket_out}.s3.amazonaws.com/{dst_key}")


# -------------------------
# 6) QUICK BASELINE MAE
# -------------------------
bin_edges = [0, 0.5, 1, 2, 5, 10, 20, np.inf]

train_tmp = train_df.copy()
train_tmp.columns = [target_col] + feature_cols
train_tmp["dist_bin"] = pd.cut(train_tmp["trip_distance"], bins=bin_edges)
med = train_tmp.groupby(["hour","dist_bin"])[target_col].median()

def naive_predict(row):
    b = pd.cut([row["trip_distance"]], bins=bin_edges)[0]
    return med.get((row["hour"], b), med.median())

val_tmp = val_df.copy()
val_tmp.columns = [target_col] + feature_cols
y_true = val_tmp[target_col].to_numpy()
y_pred_naive = val_tmp.apply(naive_predict, axis=1).to_numpy()

mae_naive = mean_absolute_error(y_true, y_pred_naive)
print(f"Naive baseline MAE (min): {mae_naive:0.3f}")

# -------------------------
# 7) SAVE SMALL ERROR SAMPLE TO S3
# -------------------------
err = val_tmp[[target_col, "hour", "trip_distance"]].copy()
err["naive_eta"] = y_pred_naive
err["abs_err"]   = (err[target_col] - err["naive_eta"]).abs()

err_out = err.sample(min(5000, len(err)), random_state=7)
local_csv = "eta_errors_sample.csv"
os.makedirs("/tmp/eta_poc", exist_ok=True)
err_out.to_csv(local_csv, index=False)

s3 = boto3.client("s3")
dst_key = f"{prefix}eta_errors_sample.csv"
s3.upload_file(local_csv, bucket, dst_key)
print("Wrote errors sample to:", f"s3://{bucket}/{dst_key}")
print("✅ All done")


pandas: 2.2.3 | pyarrow: 19.0.1
Loaded rows: 200,000
Training frame shape: (191517, 12)


Unnamed: 0,trip_distance,hour,weekday,passenger_count,payment_type,ratecodeid,vendorid,pulocationid,dolocationid,is_airport,has_congestion_fee,duration_min
0,1.6,0,2,1,1,1,1,0,0,0,0,8.35
1,0.5,0,2,1,1,1,1,0,0,0,0,2.55
2,0.6,0,2,1,1,1,1,0,0,0,0,1.95
3,0.52,0,2,3,2,1,1,0,0,0,0,5.566667
4,0.66,0,2,3,2,1,1,0,0,0,0,3.533333


Train/Val sizes: 96000 24000
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3Bucket
sagemaker.config INFO - Applied value from config key = SageMaker.PythonSDK.Modules.Session.DefaultS3ObjectKeyPrefix
Train: s3://nyc-taxi-poc-101199/derived/eta_poc/train/train.csv
Val:   s3://nyc-taxi-poc-101199/derived/eta_poc/val/val.csv
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.VpcConfig.Subnets
sagemaker.config INFO - Applied value from config key = SageMaker.TrainingJob.VpcConfig.SecurityGroupIds
2025-09-17 18:43:37 Starting - Starting the training job...
2025-09-17 18:43:54 Starting - Preparing the instances for training...
2025-09-17 18:44:16 Downloading - Downloading input data...
2025-

In [18]:
# === Local predictions from the trained model artifact (no Batch Transform) ===
import os, tarfile, tempfile, boto3, io
import numpy as np
import pandas as pd

# 1) Download and extract the model artifact produced by the managed training job
s3 = boto3.client("s3")
model_s3 = estimator.model_data  # e.g., s3://.../output/model.tar.gz
print("Model artifact:", model_s3)

def parse_s3(url):
    assert url.startswith("s3://")
    rest = url[5:]
    bkt, _, key = rest.partition("/")
    return bkt, key

bkt, key = parse_s3(model_s3)
local_tar = "/tmp/model.tar.gz"
s3.download_file(bkt, key, local_tar)

extract_dir = "/tmp/xgb_model"
os.makedirs(extract_dir, exist_ok=True)
with tarfile.open(local_tar) as tar:
    tar.extractall(extract_dir)

# Find the model file (name varies by container/version)
def find_model_file(root):
    candidates = []
    for dp, dn, fn in os.walk(root):
        for f in fn:
            if f in ("xgboost-model", "model.xgb") or f.endswith((".json", ".bin")):
                candidates.append(os.path.join(dp, f))
    # prefer non-json binary first, else json
    for f in candidates:
        if os.path.basename(f) in ("xgboost-model", "model.xgb") or f.endswith(".bin"):
            return f
    return candidates[0] if candidates else None

model_path = find_model_file(extract_dir)
assert model_path, f"No model file found under {extract_dir}"
print("Using model file:", model_path)

# 2) Load booster and predict on your validation features
try:
    import xgboost as xgb
except ImportError:
    # lightweight install if missing
    import sys, subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "xgboost==1.7.6"])
    import xgboost as xgb

# val_df is label-first (duration_min), then features – created earlier in your script
val_tmp = val_df.copy()
y_true = val_tmp.iloc[:, 0].to_numpy()
X_val  = val_tmp.iloc[:, 1:].to_numpy()

dm = xgb.DMatrix(X_val)
booster = xgb.Booster()
booster.load_model(model_path)
y_pred = booster.predict(dm)

from sklearn.metrics import mean_absolute_error
mae_model = mean_absolute_error(y_true, y_pred)
print(f"XGBoost model MAE (min): {mae_model:0.3f}")

# 3) Save a compact errors file for Superset
#    (keep a couple of useful features for slicing)
feature_cols = [
    "trip_distance","hour","weekday","passenger_count",
    "payment_type","ratecodeid","vendorid","pulocationid","dolocationid",
    "is_airport","has_congestion_fee"
]
val_named = val_tmp.copy()
val_named.columns = ["duration_min"] + feature_cols

err = pd.DataFrame({
    "duration_min": y_true,
    "pred_eta_min": y_pred,
    "abs_err": np.abs(y_true - y_pred),
    "hour": val_named["hour"].to_numpy(),
    "trip_distance": val_named["trip_distance"].to_numpy(),
})

err_sample = err.sample(min(5000, len(err)), random_state=7)
local_csv = "eta_errors_model_sample.csv"
err_sample.to_csv(local_csv, index=False)

# Upload next to your other derived files
def parse_prefix(s3url):
    assert s3url.startswith("s3://")
    rest = s3url[5:]
    bucket, _, key = rest.partition("/")
    if key and not key.endswith("/"): key += "/"
    return bucket, key

bucket, prefix = parse_prefix(S3_OUTPUT_PREFIX)
dst_key = f"{prefix}eta_errors_model_sample.csv"
s3.upload_file(local_csv, bucket, dst_key)
print("Wrote model errors sample to:", f"s3://{bucket}/{dst_key}")
# === Local predictions from the trained model artifact (no Batch Transform) ===
import os, tarfile, tempfile, boto3, io
import numpy as np
import pandas as pd

# 1) Download and extract the model artifact produced by the managed training job
s3 = boto3.client("s3")
model_s3 = estimator.model_data  # e.g., s3://.../output/model.tar.gz
print("Model artifact:", model_s3)

def parse_s3(url):
    assert url.startswith("s3://")
    rest = url[5:]
    bkt, _, key = rest.partition("/")
    return bkt, key

bkt, key = parse_s3(model_s3)
local_tar = "/tmp/model.tar.gz"
s3.download_file(bkt, key, local_tar)

extract_dir = "/tmp/xgb_model"
os.makedirs(extract_dir, exist_ok=True)
with tarfile.open(local_tar) as tar:
    tar.extractall(extract_dir)

# Find the model file (name varies by container/version)
def find_model_file(root):
    candidates = []
    for dp, dn, fn in os.walk(root):
        for f in fn:
            if f in ("xgboost-model", "model.xgb") or f.endswith((".json", ".bin")):
                candidates.append(os.path.join(dp, f))
    # prefer non-json binary first, else json
    for f in candidates:
        if os.path.basename(f) in ("xgboost-model", "model.xgb") or f.endswith(".bin"):
            return f
    return candidates[0] if candidates else None

model_path = find_model_file(extract_dir)
assert model_path, f"No model file found under {extract_dir}"
print("Using model file:", model_path)

# 2) Load booster and predict on your validation features
try:
    import xgboost as xgb
except ImportError:
    # lightweight install if missing
    import sys, subprocess
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", "xgboost==1.7.6"])
    import xgboost as xgb

# val_df is label-first (duration_min), then features – created earlier in your script
val_tmp = val_df.copy()
y_true = val_tmp.iloc[:, 0].to_numpy()
X_val  = val_tmp.iloc[:, 1:].to_numpy()

dm = xgb.DMatrix(X_val)
booster = xgb.Booster()
booster.load_model(model_path)
y_pred = booster.predict(dm)

from sklearn.metrics import mean_absolute_error
mae_model = mean_absolute_error(y_true, y_pred)
print(f"XGBoost model MAE (min): {mae_model:0.3f}")

# 3) Save a compact errors file for Superset
#    (keep a couple of useful features for slicing)
feature_cols = [
    "trip_distance","hour","weekday","passenger_count",
    "payment_type","ratecodeid","vendorid","pulocationid","dolocationid",
    "is_airport","has_congestion_fee"
]
val_named = val_tmp.copy()
val_named.columns = ["duration_min"] + feature_cols

err = pd.DataFrame({
    "duration_min": y_true,
    "pred_eta_min": y_pred,
    "abs_err": np.abs(y_true - y_pred),
    "hour": val_named["hour"].to_numpy(),
    "trip_distance": val_named["trip_distance"].to_numpy(),
})

err_sample = err.sample(min(5000, len(err)), random_state=7)
local_csv = "eta_errors_model_sample.csv"
err_sample.to_csv(local_csv, index=False)

# Upload next to your other derived files
def parse_prefix(s3url):
    assert s3url.startswith("s3://")
    rest = s3url[5:]
    bucket, _, key = rest.partition("/")
    if key and not key.endswith("/"): key += "/"
    return bucket, key

bucket, prefix = parse_prefix(S3_OUTPUT_PREFIX)
dst_key = f"{prefix}eta_errors_model_sample.csv"
s3.upload_file(local_csv, bucket, dst_key)
print("Wrote model errors sample to:", f"s3://{bucket}/{dst_key}")


Model artifact: s3://amazon-sagemaker-166725664013-us-east-1-40b919aa80ef/dzd_4a968141po04fb/aq0kpzrmretbtz/dev/sagemaker-xgboost-2025-09-17-18-43-32-715/output/model.tar.gz
Using model file: /tmp/xgb_model/xgboost-model
XGBoost model MAE (min): 3.792
Wrote model errors sample to: s3://nyc-taxi-poc-101199/derived/eta_poc/eta_errors_model_sample.csv
Model artifact: s3://amazon-sagemaker-166725664013-us-east-1-40b919aa80ef/dzd_4a968141po04fb/aq0kpzrmretbtz/dev/sagemaker-xgboost-2025-09-17-18-43-32-715/output/model.tar.gz
Using model file: /tmp/xgb_model/xgboost-model
XGBoost model MAE (min): 3.792
Wrote model errors sample to: s3://nyc-taxi-poc-101199/derived/eta_poc/eta_errors_model_sample.csv
