In [None]:
%%writefile processing_script.py
import os
import argparse
import glob
import pandas as pd
import numpy as np



def get_cutoff_timestamp(file_paths):
    print("PASS 1: Calculating global time cutoff...")
    dates = []
    for i, path in enumerate(file_paths):
        try:
            df_chunk = pd.read_parquet(path)
            if 'session_start' not in df_chunk.columns:
                continue
            dates.append(pd.to_datetime(df_chunk['session_start']))
            del df_chunk
        except Exception as e:
            print(f"Skipping file {path}: {e}")

    if not dates:
        raise ValueError("No valid data found.")

    all_dates = pd.concat(dates, ignore_index=True)
    all_dates = all_dates.sort_values()
    cutoff_index = int(len(all_dates) * 0.8)
    cutoff_date = all_dates.iloc[cutoff_index]
    
    print(f"Global Time Cutoff found: {cutoff_date}")
    return cutoff_date

def process_file(path, output_dir, cutoff_date):
    try:
        filename = os.path.basename(path)
        df = pd.read_parquet(path)
        
        # Feature Engineering
        if "session_start" not in df.columns: return 
        
        df["session_start"] = pd.to_datetime(df["session_start"])
        df["session_hour"] = df["session_start"].dt.hour
        df["session_weekday"] = df["session_start"].dt.weekday
        df["is_weekend"] = df["session_weekday"].isin([5, 6]).astype(int)

        # Ratios
        df["cart_to_view_ratio"] = df["n_cart"] / df["n_views"].replace(0, 1)

        # --- FINAL CLEAN FEATURE LIST (NO MATH LEAKS) ---
        # Removed: n_events (Direct Math Leak)
        # Removed: session_duration_seconds (Checkout Time Leak)
        # Removed: log_session_duration (Derived from duration)
        feature_cols = [
            "n_views", 
            "n_cart", 
            "n_unique_product", 
            "n_unique_category", 
            "session_hour", 
            "session_weekday", 
            "is_weekend",
            "cart_to_view_ratio"
        ]
        
        label_col = "did_purchase"
        
        if label_col not in df.columns: return

        # Split
        train_mask = df["session_start"] < cutoff_date
        
        # Select Columns
        final_cols = feature_cols + [label_col]
        df_final = df[final_cols]
        
        train_chunk = df_final[train_mask]
        test_chunk = df_final[~train_mask]

        # Write
        if not train_chunk.empty:
            train_chunk.to_csv(f"{output_dir}/train/train_{filename}.csv", index=False, header=False)
        if not test_chunk.empty:
            test_chunk.to_csv(f"{output_dir}/test/test_{filename}.csv", index=False, header=False)
            
    except Exception as e:
        print(f"Error processing {path}: {e}")



if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--input-data", type=str, default="/opt/ml/processing/input")
    parser.add_argument("--output-data", type=str, default="/opt/ml/processing/output")
    args = parser.parse_args()

    all_files = glob.glob(os.path.join(args.input_data, "*"))
    input_files = [f for f in all_files if os.path.isfile(f) and not f.endswith(".py")]
    
    if not input_files:
        raise RuntimeError(f"No files found")

    os.makedirs(os.path.join(args.output_data, "train"), exist_ok=True)
    os.makedirs(os.path.join(args.output_data, "test"), exist_ok=True)

    cutoff_date = get_cutoff_timestamp(input_files)
    
    print(f"Starting processing...")
    for i, f in enumerate(input_files):
        process_file(f, args.output_data, cutoff_date)

    print("Job Complete.")

In [None]:
import sagemaker
from sagemaker import get_execution_role
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput

# 1. Configuration
region = sagemaker.Session().boto_region_name
role = get_execution_role()
bucket = "your-s3-bucket"  
input_prefix = "features/session_features" 
output_prefix = "processed/data"           

# 2. Define the Processor
processor = ScriptProcessor(
    command=['python3'],
    image_uri=sagemaker.image_uris.retrieve("sklearn", region=region, version="1.2-1"),
    role=role,
    instance_count=1,              
    instance_type='ml.m5.2xlarge', 
    base_job_name='clickstream-feature-eng'
)

# 3. Launch
print(f"Launching Processing Job")
processor.run(
    code='processing_script.py',
    
    inputs=[
        ProcessingInput(
            source=f"s3://{bucket}/{input_prefix}",
            destination='/opt/ml/processing/input',
            s3_data_distribution_type='ShardedByS3Key'
        )
    ],
    
    outputs=[
        ProcessingOutput(
            output_name='train',
            source='/opt/ml/processing/output/train',
            destination=f's3://{bucket}/{output_prefix}/train'
        ),
        ProcessingOutput(
            output_name='test',
            source='/opt/ml/processing/output/test',
            destination=f's3://{bucket}/{output_prefix}/test'
        )
    ],
    wait=True,
    logs=True
)

In [None]:
import s3fs

fs = s3fs.S3FileSystem()
bucket = "your-s3-bucket"
output_prefix = "processed/data"

print("TRAINING DATA")
train_files = fs.glob(f"s3://{bucket}/{output_prefix}/train/*.csv")
print(f"Found {len(train_files)} training parts.")
print(f"Sample: {train_files[0] if train_files else 'None'}")

print("\nTEST DATA")
test_files = fs.glob(f"s3://{bucket}/{output_prefix}/test/*.csv")
print(f"Found {len(test_files)} testing parts.")

In [None]:
# This creates the file 'train.py' with the Pandas logic
script_content = """
import argparse
import os
import glob
import pandas as pd
import xgboost as xgb
import pickle
import gc

def load_dataset(path):
    print(f"Loading CSV files from {path}...")
    files = glob.glob(os.path.join(path, "*.csv"))
    if not files:
        raise ValueError(f"No CSV files found in {path}")
    
    dfs = []
    for f in files:
        # Optimization: Use float32 to save 50% RAM
        dfs.append(pd.read_csv(f, header=None, dtype='float32'))
    
    df = pd.concat(dfs, ignore_index=True)
    
    # Label is the LAST column
    y = df.iloc[:, -1]
    X = df.iloc[:, :-1]
    
    return X, y

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('--train', type=str, default=os.environ.get('SM_CHANNEL_TRAIN'))
    parser.add_argument('--test', type=str, default=os.environ.get('SM_CHANNEL_TEST'))
    parser.add_argument('--model-dir', type=str, default=os.environ.get('SM_MODEL_DIR'))
    parser.add_argument('--max_depth', type=int, default=5)
    parser.add_argument('--eta', type=float, default=0.2)
    parser.add_argument('--num_round', type=int, default=50)
    parser.add_argument('--objective', type=str, default='binary:logistic')
    
    args = parser.parse_args()

    # 1. Load Data
    print("Loading Train Data...")
    X_train, y_train = load_dataset(args.train)
    
    print("Loading Test Data...")
    X_test, y_test = load_dataset(args.test)
    
    # 2. Convert to DMatrix (Memory Intensive Step)
    print("Converting to DMatrix...")
    dtrain = xgb.DMatrix(X_train, label=y_train)
    dtest = xgb.DMatrix(X_test, label=y_test)
    
    # Free RAM immediately
    del X_train, y_train, X_test, y_test
    gc.collect()
    
    # 3. Train
    print("Starting Training...")
    params = {
        'max_depth': args.max_depth,
        'eta': args.eta,
        'objective': args.objective,
        'eval_metric': 'auc',
        'verbosity': 1
    }
    
    model = xgb.train(
        params, 
        dtrain, 
        num_boost_round=args.num_round,
        evals=[(dtest, "test")]
    )
    
    # 4. Save
    model_path = os.path.join(args.model_dir, "xgboost-model")
    with open(model_path, 'wb') as f:
        pickle.dump(model, f)
    print("Training Complete. Model saved.")
"""

with open("train.py", "w") as f:
    f.write(script_content)

print("train.py created.")

In [None]:
import sagemaker
from sagemaker.xgboost.estimator import XGBoost


bucket = "YOUR_BUCKET_NAME"  
prefix = "processed/data"

role = sagemaker.get_execution_role()


xgb_estimator = XGBoost(
    entry_point='train.py',          
    role=role,
    instance_count=1,
    instance_type='ml.m5.2xlarge',   
    framework_version='1.5-1',       
    py_version='py3',
    hyperparameters={
        'max_depth': 6,
        'eta': 0.2,
        'objective': 'binary:logistic',
        'num_round': 50,            
        'verbosity': 1
    },
    base_job_name='clickstream-xgb-train'
)


print(f"Launching Training Job using bucket: {bucket}...")
print(f"Training Data: s3://{bucket}/{prefix}/train/")
print(f"Testing Data:  s3://{bucket}/{prefix}/test/")

xgb_estimator.fit({
    'train': f's3://{bucket}/{prefix}/train/',
    'test':  f's3://{bucket}/{prefix}/test/'
})

In [None]:
from sagemaker.xgboost.model import XGBoostModel
from sagemaker.serializers import CSVSerializer


model_s3_uri = xgb_estimator.model_data
print(f"Found model artifact: {model_s3_uri}")


xgb_model = XGBoostModel(
    model_data=model_s3_uri,
    role=role,
    entry_point='inference.py',       
    framework_version='1.5-1',
    py_version='py3'
)

print("Deploying to live endpoint...")
predictor = xgb_model.deploy(
    initial_instance_count=1,
    instance_type='ml.m5.xlarge',
    serializer=CSVSerializer()
)

print(f"Deployment Complete! Endpoint Name: {predictor.endpoint_name}")

In [22]:

# Scenario A: Window Shopper (0 Cart)
shopper_no_cart = "5,0,4,2,14,2,0,0.0"

# Scenario B: Serious Buyer (2 Cart)
shopper_with_cart = "5,2,4,2,14,2,0,0.4"

print("\n--- LIVE PREDICTIONS ---")
print(f"Window Shopper: {predictor.predict(shopper_no_cart)}")
print(f"Serious Buyer:  {predictor.predict(shopper_with_cart)}")


--- LIVE PREDICTIONS ---
Window Shopper: [['0.0052060504']]
Serious Buyer:  [['0.60220134']]


In [23]:

print("Deleting endpoint...")
predictor.delete_endpoint()
print("✅ Endpoint deleted. No further charges.")

INFO:sagemaker:Deleting endpoint configuration with name: sagemaker-xgboost-2025-12-10-20-44-15-138


Deleting endpoint...


INFO:sagemaker:Deleting endpoint with name: sagemaker-xgboost-2025-12-10-20-44-15-138


✅ Endpoint deleted. No further charges.
