# EV Charging Prediction using Amazon SageMaker

## 1. Setup and Data Preparation

In [49]:
import pandas as pd
import boto3
import sagemaker
from sagemaker.inputs import TrainingInput
from sagemaker.tuner import IntegerParameter, ContinuousParameter, HyperparameterTuner
from sagemaker import image_uris
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OrdinalEncoder, OneHotEncoder,RobustScaler
from sklearn.impute import SimpleImputer
from sklearn.model_selection import train_test_split

In [2]:
# Set up S3 client and bucket information
s3 = boto3.client("s3")
bucket_name="ev-charging-processed"
input_data_path = "s3://ev-charging-processed/data/part-00000-e0b3237a-add5-471c-ba5e-111b16d710db-c000.snappy.parquet"

## 2. Data Processing Function

In [9]:
def process_data(input_data_path, training_data_folder = "training_data"):
    # Read the input data
    df = pd.read_parquet(input_data_path)

    # Define feature groups
    numerical_features = [
        "distance_driven_(since_last_charge)_(km)", "temperature_(°c)",
        "vehicle_age_(years)", "charging_start_hour", "charging_start_day",
        "charging_end_hour", "charging_end_day", "charging_rate_x_battery_capacity",
        "temperature_x_charging_duration", "effective_battery_capacity_(kwh)",
        "battery_percentage_charged", "charging_duration_(hours)", "charging_rate_(kw)"
    ]
    ohe_columns = ["vehicle_model", "charging_station_location", "time_of_day", "user_type"]
    ordinal_columns = ["charger_type"]

    target_column = ["energy_consumed_(kwh)"]

    # Prepare the dataset
    df_ = df[target_column + numerical_features + ohe_columns + ordinal_columns]

    # Define preprocessing steps
    numeric_transformer = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="mean")),
        ("scaler", RobustScaler())
    ])

    ordinal_transformer = Pipeline(steps=[
        ("ordinal", OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1,
                                   categories=[["Level 1", "Level 2", "DC Fast Charger"]]))
    ])

    ohe_transformer = Pipeline(steps=[
        ("ohe", OneHotEncoder(handle_unknown="ignore"))
    ])

    preprocessor = ColumnTransformer(transformers=[
        ("numeric", numeric_transformer, numerical_features),
        ("ordinal", ordinal_transformer, ordinal_columns),
        ("ohe", ohe_transformer, ohe_columns)
    ])

    # Split the data
    X = df_.drop(target_column, axis=1)
    y = df_[target_column]
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

    # Fit and transform the training data
    X_train_processed = preprocessor.fit_transform(X_train)
    X_test_processed = preprocessor.transform(X_val)

    # Convert to DataFrame and add target column
    feature_names = (numerical_features +
                     ordinal_columns +
                     preprocessor.named_transformers_["ohe"].get_feature_names_out(ohe_columns).tolist())
    
    train_df = pd.DataFrame(X_train_processed, columns=feature_names)
    train_df["energy_consumed_(kwh)"] = y_train.values

    test_df = pd.DataFrame(X_test_processed, columns=feature_names)
    test_df["energy_consumed_(kwh)"] = y_val.values

    train_df = train_df[["energy_consumed_(kwh)"] + [col for col in train_df.columns if col != "energy_consumed_(kwh)"]]
    test_df = test_df[["energy_consumed_(kwh)"] + [col for col in test_df.columns if col != "energy_consumed_(kwh)"]]

    training_data_object = "training_data.csv"
    validation_data_object = "validation_data.csv"

    # Save processed data
    train_df.to_csv(training_data_object, index=False, header=False)
    test_df.to_csv(validation_data_object, index=False, header=False)

    # upload to S3
    sagemaker.Session().upload_data(path=training_data_object, bucket=bucket_name, key_prefix=training_data_folder)
    sagemaker.Session().upload_data(path=validation_data_object, bucket=bucket_name, key_prefix=training_data_folder)

    print(f"Training data uploaded to s3://{bucket_name}/{training_data_folder}/{training_data_object}")
    print(f"Validation data uploaded to s3://{bucket_name}/{training_data_folder}/{validation_data_object}")

In [10]:
process_data(input_data_path, training_data_folder = "training_data")

Training data uploaded to s3://ev-charging-processed/training_data/training_data.csv
Validation data uploaded to s3://ev-charging-processed/training_data/validation_data.csv


## 3. Load Preprocessed Data

In [11]:
training_data_folder = "training_data"
training_data_object = "training_data.csv"
validation_data_object = "validation_data.csv"

# Load preprocessed data from S3
training_data_path = f"s3://{bucket_name}/{training_data_folder}/{training_data_object}"
validation_data_path = f"s3://{bucket_name}/{training_data_folder}/{validation_data_object}"

print(training_data_path)
print(validation_data_path)

s3://ev-charging-processed/training_data/training_data.csv
s3://ev-charging-processed/training_data/validation_data.csv


## 4. Set Up SageMaker Training

In [12]:
xgboost_image = image_uris.retrieve("xgboost", boto3.Session().region_name, "1.7-1")

In [13]:
# Set up training and validation data inputs
training_data = TrainingInput(
    s3_data=training_data_path,
    content_type="text/csv"
)

# validation data
validation_data = TrainingInput(
    s3_data=validation_data_path,
    content_type="text/csv"
)

In [18]:
# Set base hyperparameters
base_hyperparameters = {"objective": "reg:absoluteerror","num_round":50}

In [19]:
# Set S3 folder for model artifacts
artifacts_folder = "model-artifacts"

## 5. Create and Train XGBoost Estimator

In [20]:
# Define the Estimator
estimator = sagemaker.estimator.Estimator(
    image_uri=xgboost_image,
    hyperparameters=base_hyperparameters,
    role=sagemaker.get_execution_role(),
    instance_count=1,
    instance_type="ml.m4.xlarge",
    output_path=f's3://{bucket_name}/{artifacts_folder}',
    sagemaker_session=sagemaker.Session()
)

In [21]:
# Train the model
estimator.fit({"train": training_data, "validation": validation_data})

INFO:sagemaker:Creating training-job with name: sagemaker-xgboost-2024-10-17-14-14-29-386


2024-10-17 14:14:32 Starting - Starting the training job...
2024-10-17 14:14:45 Starting - Preparing the instances for training...
2024-10-17 14:15:22 Downloading - Downloading input data...
2024-10-17 14:15:53 Downloading - Downloading the training image......
2024-10-17 14:16:59 Training - Training image download completed. Training in progress...[34m[2024-10-17 14:17:09.488 ip-10-0-241-221.ec2.internal:7 INFO utils.py:28] RULE_JOB_STOP_SIGNAL_FILENAME: None[0m
[34m[2024-10-17 14:17:09.512 ip-10-0-241-221.ec2.internal:7 INFO profiler_config_parser.py:111] User has disabled profiler.[0m
[34m[2024-10-17:14:17:09:INFO] Imported framework sagemaker_xgboost_container.training[0m
[34m[2024-10-17:14:17:09:INFO] Failed to parse hyperparameter objective value reg:absoluteerror to Json.[0m
[34mReturning the value itself[0m
[34m[2024-10-17:14:17:09:INFO] No GPUs detected (normal if no gpus installed)[0m
[34m[2024-10-17:14:17:09:INFO] Running XGBoost Sagemaker in algorithm mode[0m


## 6. Evaluate Training Results

In [22]:
# Create a SageMaker client
sagemaker_client = boto3.client("sagemaker")

# Get the training job description
response = sagemaker_client.describe_training_job(TrainingJobName="sagemaker-xgboost-2024-10-17-14-14-29-386")

# Access the training job's CloudWatch metrics
for metric in response["FinalMetricDataList"]:
    print(metric["MetricName"], metric["Value"])

train:mae 16.106800079345703
validation:mae 19.194759368896484


## 7. Hyperparameter Tuning

In [34]:
# Define hyperparameter ranges for tuning
hyperparameter_ranges = {
    "max_depth": IntegerParameter(3, 10),
    "eta": ContinuousParameter(0.05, 0.7),
    "gamma": ContinuousParameter(1, 5),
    "min_child_weight": IntegerParameter(1, 10),
    "subsample": ContinuousParameter(0.4, 1.0),
    "colsample_bytree": ContinuousParameter(0.1, 1.0),
}

In [35]:
# Define metric definitions and objective metric
metric_definitions = [{
    "Name": "validation:mae",
    "Regex": ".*\\[validation-mae\\] ([0-9\\.]+)"
}]

objective_metric_name = "validation:mae"

In [36]:
# Create the HyperparameterTuner
tuner = HyperparameterTuner(
    estimator=estimator,
    objective_metric_name=objective_metric_name,
    hyperparameter_ranges=hyperparameter_ranges,
    metric_definitions=metric_definitions,
    objective_type="Minimize",
    max_jobs=50,
    max_parallel_jobs=3
)

In [38]:
# Start the hyperparameter tuning job
tuner.fit({
    "train": training_data,
    "validation": validation_data
}, job_name="xgb-tuning-job-4")

INFO:sagemaker:Creating hyperparameter tuning job with name: xgb-tuning-job-4


...........................................................................................................................................................................................................!


## 8. Tuning Results

In [39]:
tuning_job_name = "xgb-tuning-job-4"
response = sagemaker_client.describe_hyper_parameter_tuning_job(HyperParameterTuningJobName=tuning_job_name)

# Get the best training job
best_training_job = response["BestTrainingJob"]
print("Best Training Job:", best_training_job)

Best Training Job: {'TrainingJobName': 'xgb-tuning-job-4-047-bf5b00b3', 'TrainingJobArn': 'arn:aws:sagemaker:us-east-1:296062546796:training-job/xgb-tuning-job-4-047-bf5b00b3', 'CreationTime': datetime.datetime(2024, 10, 17, 16, 2, 53, tzinfo=tzlocal()), 'TrainingStartTime': datetime.datetime(2024, 10, 17, 16, 3, 1, tzinfo=tzlocal()), 'TrainingEndTime': datetime.datetime(2024, 10, 17, 16, 3, 30, tzinfo=tzlocal()), 'TrainingJobStatus': 'Completed', 'TunedHyperParameters': {'colsample_bytree': '0.1', 'eta': '0.23737753183860888', 'gamma': '3.583377359657563', 'max_depth': '10', 'min_child_weight': '1', 'subsample': '1.0'}, 'FinalHyperParameterTuningJobObjectiveMetric': {'MetricName': 'validation:mae', 'Value': 19.0451602935791}, 'ObjectiveStatus': 'Succeeded'}


In [42]:
# Print details of the best training job
best_training_job_name = best_training_job["TrainingJobName"]
best_training_job_response = sagemaker_client.describe_training_job(TrainingJobName=best_training_job_name)

In [44]:
# Get the best hyperparameters
best_hyperparameters = best_training_job_response["HyperParameters"]
print("Best Hyperparameters:", best_hyperparameters)

Best Hyperparameters: {'_tuning_objective_metric': 'validation:mae', 'colsample_bytree': '0.1', 'eta': '0.23737753183860888', 'gamma': '3.583377359657563', 'max_depth': '10', 'min_child_weight': '1', 'num_round': '50', 'objective': 'reg:absoluteerror', 'subsample': '1.0'}


In [48]:
# Metrics
metrics = best_training_job_response.get("FinalMetricDataList", [])
for metric in metrics:
    print(f"Metric Name: {metric['MetricName']}, Value: {metric['Value']}")

Metric Name: train:mae, Value: 17.688709259033203
Metric Name: validation:mae, Value: 19.0451602935791
Metric Name: validation:mae, Value: 19.0451602935791
Metric Name: ObjectiveMetric, Value: 19.0451602935791


## 9. Deploy the Model

In [50]:
# Create a SageMaker session
sagemaker_session = sagemaker.Session()
# Create an estimator from the best training job
best_model = sagemaker.estimator.Estimator.attach(best_training_job_name, sagemaker_session=sagemaker_session)


2024-10-17 16:03:46 Starting - Found matching resource for reuse
2024-10-17 16:03:46 Downloading - Downloading the training image
2024-10-17 16:03:46 Training - Training image download completed. Training in progress.
2024-10-17 16:03:46 Uploading - Uploading generated training model
2024-10-17 16:03:46 Completed - Resource reused by training job: xgb-tuning-job-4-050-318522bb


In [52]:
# Deploy the model
predictor = best_model.deploy(
    initial_instance_count=1,
    instance_type="ml.t2.medium",  
    endpoint_name="ev-charging-predictor-1"
)

INFO:sagemaker:Creating model with name: xgb-tuning-job-4-047-bf5b00b3-2024-10-17-17-03-19-414
INFO:sagemaker:Creating endpoint-config with name ev-charging-predictor-1
INFO:sagemaker:Creating endpoint with name ev-charging-predictor-1


--------------!

In [53]:
print(f"Model deployed. Endpoint name: {predictor.endpoint_name}")

Model deployed. Endpoint name: ev-charging-predictor-1


In [54]:
# delete the endpoint
sagemaker_client.delete_endpoint(EndpointName=predictor.endpoint_name)
print(f"Endpoint {predictor.endpoint_name} has been scheduled for deletion")

Endpoint ev-charging-predictor-1 has been scheduled for deletion
