In [73]:
import sklearn
import pandas as pd
import boto3
import os
import numpy as np
from sagemaker import get_execution_role
import sagemaker
import json
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.parameters import (
    ParameterInteger, 
    ParameterFloat, 
    ParameterString, 
    ParameterBoolean
)

from sagemaker.processing import (
    ProcessingInput, 
    ProcessingOutput, 
    ScriptProcessor
)

from sagemaker.workflow.steps import (
    ProcessingStep, 
    TrainingStep, 
    CreateModelStep
)
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.parameters import (
    ParameterInteger, 
    ParameterFloat, 
    ParameterString, 
    ParameterBoolean
)
from sagemaker.workflow.clarify_check_step import (
    ModelBiasCheckConfig, 
    ClarifyCheckStep, 
    ModelExplainabilityCheckConfig
)
from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.conditions import (
    ConditionGreaterThan,
    ConditionGreaterThanOrEqualTo
)

from sagemaker.inputs import TrainingInput
from sagemaker.estimator import Estimator
from sagemaker.workflow.steps import TrainingStep

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

## Set constants

In [74]:
# Get some variables you need to interact with SageMaker service
boto_session = boto3.Session()
region = boto_session.region_name
bucket_name = sagemaker.Session().default_bucket()
sm_session = sagemaker.Session()
sm_client = boto_session.client("sagemaker")
sm_role = sagemaker.get_execution_role()

initialized = True

In [75]:
sm_role

'arn:aws:iam::531485126105:role/service-role/AmazonSageMaker-ExecutionRole-20230614T171444'

## check bucket name : S3

In [76]:
bucket_name

'sagemaker-us-east-1-531485126105'

In [77]:
# comment this because we not use metadata from idea-production

# domain_id = None
# NOTEBOOK_METADATA_FILE = "/opt/ml/metadata/resource-metadata.json"

# if os.path.exists(NOTEBOOK_METADATA_FILE):
#     with open(NOTEBOOK_METADATA_FILE, "r") as f:
#         data = json.load(f)
#         print(json.dumps(data, indent=4))
# else:
#     print("There is no metadata file.")

In [78]:
# Set names of pipeline objects
project = "Group-project"
bucket_prefix = "Solar1"
input_url = f"{project}/{bucket_prefix}/processing/input"
output_url = f"{project}/{bucket_prefix}/processing/output"

pipeline_name = f"{project}-pipeline"
pipeline_model_name = f"{project}-model-reg"
model_package_group_name = f"{project}-model-group"
endpoint_config_name = f"{project}-endpoint-config"
endpoint_name = f"{project}-endpoint"

# Set instance types and counts
process_instance_type = "ml.c5.xlarge"
train_instance_count = 1
train_instance_type = "ml.m5.xlarge"

# Set S3 urls for processed data
train_s3_url = f"s3://{output_url}/train"
validation_s3_url = f"s3://{output_url}/validation"
test_s3_url = f"s3://{output_url}/test"
baseline_s3_url = f"s3://{output_url}/baseline"
# train_s3_url = f"s3://{bucket_name}/{bucket_prefix}/train"
# validation_s3_url = f"s3://{bucket_name}/{bucket_prefix}/validation"
# test_s3_url = f"s3://{bucket_name}/{bucket_prefix}/test"
# baseline_s3_url = f"s3://{bucket_name}/{bucket_prefix}/baseline"




evaluation_s3_url = f"s3://{bucket_name}/{bucket_prefix}/evaluation"
prediction_baseline_s3_url = f"s3://{bucket_name}/{bucket_prefix}/prediction_baseline"

output_s3_url = f"s3://{bucket_name}/{bucket_prefix}/output"

In [79]:
# # Store some variables to keep the value between the notebooks
# %store bucket_name
# %store bucket_prefix
# %store sm_role
# %store region
# %store initialized

In [80]:
train_s3_url

's3://Group-project/Solar1/processing/output/train'

In [81]:
#store the variable
%store train_s3_url
%store validation_s3_url
%store test_s3_url
%store baseline_s3_url
%store model_package_group_name
%store evaluation_s3_url
%store prediction_baseline_s3_url
%store output_s3_url

Stored 'train_s3_url' (str)
Stored 'validation_s3_url' (str)
Stored 'test_s3_url' (str)
Stored 'baseline_s3_url' (str)
Stored 'model_package_group_name' (str)
Stored 'evaluation_s3_url' (str)
Stored 'prediction_baseline_s3_url' (str)
Stored 'output_s3_url' (str)


In [82]:
# Set instance types and counts
process_instance_type = "ml.c5.xlarge"
train_instance_count = 1
train_instance_type = "ml.m5.xlarge"

In [83]:
# check what variable we store
# so you can see some metadata still come from sagemaker_workshop_e2e_churn_1686747619
# some is our value
%store

Stored variables and their in-db values:
athena_table_name                      -> 'sagemaker_workshop_e2e_churn_1686747619'
baseline_s3_url                        -> 's3://Group-project/Solar1/processing/output/basel
bucket                                 -> 'sagemaker-studio-us-east-1-531485126105'
bucket_name                            -> 'sagemaker-us-east-1-531485126105'
bucket_prefix                          -> 'Solar1'
churn_feature_group_name               -> 'sagemaker-workshop-e2e-churn'
docker_image_name                      -> '683313688378.dkr.ecr.us-east-1.amazonaws.com/sage
domain_id                              -> 'd-1qvmpqvqiuve'
evaluation_s3_url                      -> 's3://sagemaker-us-east-1-531485126105/Solar1/eval
experiment_name                        -> 'Solar-Power-experiment-25-10-00-09'
framework_version                      -> '1.3-1'
initialized                            -> True
input_s3_url                           -> 's3://sagemaker-us-east-1-53148512

## Read data and upload it 

In [84]:
# read data and save it in pandas dataframe
# if this all file in our local
df_gen1 = pd.read_csv("processing/input/Plant_1_Generation_Data.csv")
df_gen2 = pd.read_csv("processing/input/Plant_2_Generation_Data.csv")

df_weather1 = pd.read_csv("processing/input/Plant_1_Weather_Sensor_Data.csv")
df_weather2 = pd.read_csv("processing/input/Plant_2_Weather_Sensor_Data.csv")

In [85]:
# check head
df_gen1.head()

          DATE_TIME  PLANT_ID       SOURCE_KEY  DC_POWER  AC_POWER  \
0  15-05-2020 00:00   4135001  1BY6WEcLGh8j5v7       0.0       0.0   
1  15-05-2020 00:00   4135001  1IF53ai7Xc0U56Y       0.0       0.0   
2  15-05-2020 00:00   4135001  3PZuoBAID5Wc2HD       0.0       0.0   
3  15-05-2020 00:00   4135001  7JYdWkrLSPkdwr4       0.0       0.0   
4  15-05-2020 00:00   4135001  McdE0feGgRqW7Ca       0.0       0.0   

   DAILY_YIELD  TOTAL_YIELD  
0          0.0    6259559.0  
1          0.0    6183645.0  
2          0.0    6987759.0  
3          0.0    7602960.0  
4          0.0    7158964.0  

In [86]:
# Run this cell to import or install the Data Wrangler widget to show automatic visualization and generate code to fix data quality issues
try:
    import sagemaker_datawrangler
except ImportError:
    !pip install --upgrade sagemaker-datawrangler
    import sagemaker_datawrangler

# Display Pandas DataFrame to view the widget: df, display(df), df.sample()... 

In [87]:
# #Upload data to S3 bucket
# s3 = boto3.client('s3')

# # Upload Plant_1_Generation_Data.csv
# a = s3.upload_file("processing/input/Plant_1_Generation_Data.csv", bucket_name, f"{input_url}/Plant_1_Generation_Data.csv")
# plant_1_url = f"{input_url}/Plant_1_Generation_Data.csv"

# # Upload Plant_2_Generation_Data.csv
# s3.upload_file("processing/input/Plant_2_Generation_Data.csv", bucket_name, f"{input_url}/Plant_2_Generation_Data.csv")
# plant_2_url = f"{input_url}/Plant_2_Generation_Data.csv"

# # Upload Plant_1_Weather_Sensor_Data.csv
# s3.upload_file("processing/input/Plant_1_Weather_Sensor_Data.csv", bucket_name, f"{input_url}/Plant_1_Weather_Sensor_Data.csv")
# weather_1_url = f"{input_url}/Plant_1_Weather_Sensor_Data.csv"

# # Upload Plant_2_Weather_Sensor_Data.csv
# s3.upload_file("processing/input/Plant_2_Weather_Sensor_Data.csv", bucket_name, f"{input_url}/Plant_2_Weather_Sensor_Data.csv")
# weather_2_url = f"{input_url}/Plant_2_Weather_Sensor_Data.csv"

# print("Upload complete.")

## Create pipeline
### Setup pipeline parameters

In [88]:
bucket_prefix

'Solar1'

In [128]:
# upload file to s3

try:
    input_s3_url
except NameError:      
    # If input_s3_url is not defined, upload the datasets to S3 and store the paths
    input_s3_url_gen1 = sagemaker.Session().upload_data(
        path="processing/input/Plant_1_Generation_Data.csv",
        bucket=bucket_name,
        key_prefix=f"{project}/{bucket_prefix}/processing/input"
    )
    input_s3_url_gen2 = sagemaker.Session().upload_data(
        path="processing/input/Plant_2_Generation_Data.csv",
        bucket=bucket_name,
        key_prefix=f"{project}/{bucket_prefix}/processing/input"
    )
    input_s3_url_weather1 = sagemaker.Session().upload_data(
        path="processing/input/Plant_1_Weather_Sensor_Data.csv",
        bucket=bucket_name,
        key_prefix=f"{project}/{bucket_prefix}/processing/input"
    )
    input_s3_url_weather2 = sagemaker.Session().upload_data(
        path="processing/input/Plant_2_Weather_Sensor_Data.csv",
        bucket=bucket_name,
        key_prefix=f"{project}/{bucket_prefix}/processing/input"
    )
    print("Upload complete.")

    # %store input_s3_url_gen1
    # %store input_s3_url_gen2
    # %store input_s3_url_weather1
    # %store input_s3_url_weather2

#### merge to one file and upload to s3

In [133]:
# Adjust datetime format
df_gen1['DATE_TIME'] = pd.to_datetime(df_gen1['DATE_TIME'])
df_weather1['DATE_TIME'] = pd.to_datetime(df_weather1['DATE_TIME'])
df_gen2['DATE_TIME'] = pd.to_datetime(df_gen2['DATE_TIME'])
df_weather2['DATE_TIME'] = pd.to_datetime(df_weather2['DATE_TIME'])

# Drop unnecessary columns and merge dataframes
df_plant1 = pd.merge(
    df_gen1.drop(columns=['PLANT_ID','AC_POWER','DAILY_YIELD']),
    df_weather1.drop(columns=['PLANT_ID', 'SOURCE_KEY']),
    on='DATE_TIME'
)

df_plant2 = pd.merge(
    df_gen2.drop(columns=['PLANT_ID','AC_POWER','DAILY_YIELD']),
    df_weather2.drop(columns=['PLANT_ID', 'SOURCE_KEY']),
    on='DATE_TIME'
)

combined_plant = pd.concat([df_plant1, df_plant2])

# adding separate time and date columns
combined_plant["DATE"] = pd.to_datetime(combined_plant["DATE_TIME"]).dt.date # add new column with date
combined_plant["TIME"] = pd.to_datetime(combined_plant["DATE_TIME"]).dt.time # add new column with time
combined_plant["MONTH"] = pd.to_datetime(combined_plant["DATE_TIME"]).dt.month # add new column with month

# add hours and minutes for ml models
combined_plant['HOURS'] = pd.to_datetime(combined_plant['TIME'],format='%H:%M:%S').dt.hour
combined_plant['MINUTES'] = pd.to_datetime(combined_plant['TIME'],format='%H:%M:%S').dt.minute
combined_plant['MINUTES_PASS'] = combined_plant['MINUTES'] + combined_plant['HOURS']*60


input_s3_url = sagemaker.Session().upload_data(
        path="processing/input/combined_plant.csv",
        bucket=bucket_name,
        key_prefix=f"{project}/{bucket_prefix}/processing/input"
    )

print("Upload complete.")


Upload complete.


In [134]:
input_s3_url

's3://sagemaker-us-east-1-531485126105/Group-project/Solar1/processing/input/combined_plant.csv'

In [141]:
# Set processing instance type
process_instance_type_param = ParameterString(
    name="ProcessingInstanceType",
    default_value=process_instance_type,
)

# Set training instance type
train_instance_type_param = ParameterString(
    name="TrainingInstanceType",
    default_value=train_instance_type,
)

# Set training instance count
train_instance_count_param = ParameterInteger(
    name="TrainingInstanceCount",
    default_value=train_instance_count
)

# Set model approval param
model_approval_status_param = ParameterString(
    name="ModelApprovalStatus",
    default_value="PendingManualApproval"
)

# Minimal threshold for model performance on the test dataset
test_score_threshold_param = ParameterFloat(
    name="TestScoreThreshold", 
    default_value=0.5
)

# Set S3 url for input dataset
input_s3_url_param = ParameterString(
    name="InputDataUrgen1",
    default_value=input_s3_url,
)

In [142]:
input_s3_url_param

ParameterString(name='InputDataUrgen1', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://sagemaker-us-east-1-531485126105/Group-project/Solar1/processing/input/combined_plant.csv')

### Build the pipeline steps


In [93]:
session = PipelineSession()

#### Processing step

In [94]:
input_url, output_url

('Group-project/Solar1/processing/input',
 'Group-project/Solar1/processing/output')

In [95]:
bucket_name

'sagemaker-us-east-1-531485126105'

In [138]:
%%writefile preprocessing.py

import pandas as pd
import numpy as np
import argparse
import os

bucket_name = 'sagemaker-us-east-1-531485126105' # my s3

# if have no input this will be default
input_url = f"s3://{bucket_name}/Group-project/Solar1/processing/input/"
output_url = f"s3://{bucket_name}/Group-project/Solar1/processing/output/"

def _parse_args():
    
    parser = argparse.ArgumentParser()
    # Data, model, and output directories
    # model_dir is always passed in from SageMaker. By default, this is an S3 path under the default bucket.
    parser.add_argument('--filepath', type=str, default='/Group-project/Solar1/processing/input/')
    parser.add_argument('--filename', type=str, default='combined_plant')
    parser.add_argument('--outputpath', type=str, default='Group-project/Solar1/processing/output/')
    
    return parser.parse_known_args()


if __name__=="__main__":
    # Process arguments
    args, _ = _parse_args()
    
    target_col = "DC_POWER"
    input_rul = input_url
    
    # # Load data
    combined_plant = pd.read_csv(os.path.join(args.filepath, args.filename), sep=";")
    
    # Shuffle and split the dataset
    train_data, validation_data, test_data = np.split(
        combined_plant.sample(frac=1, random_state=1729),
        [int(0.7 * len(combined_plant)), int(0.9 * len(combined_plant))],
    )

    print(f"Data split > train:{train_data.shape} | validation:{validation_data.shape} | test:{test_data.shape}")
    
    # Save datasets locally
    train_data.to_csv(os.path.join(args.outputpath, 'train/train.csv'), index=False, header=False)
    validation_data.to_csv(os.path.join(args.outputpath, 'validation/validation.csv'), index=False, header=False)
    test_data[target_col].to_csv(os.path.join(args.outputpath, 'test/test_y.csv'), index=False, header=False)
    test_data.drop([target_col], axis=1).to_csv(os.path.join(args.outputpath, 'test/test_x.csv'), index=False, header=False)


    # Save the baseline dataset for model monitoring
    combined_plant.drop([target_col], axis=1).to_csv(os.path.join(args.outputpath, 'baseline/baseline.csv'), index=False, header=False)
    
    print("## Processing complete. Exiting.")

    
    # # Shuffle the dataset
# shuffled_plant = combined_plant.sample(frac=1).reset_index(drop=True)

# # Split the shuffled dataset
# train_data, validation_data, test_data = np.split(shuffled_plant, [int(0.7 * len(shuffled_plant)), int(0.9 * len(shuffled_plant))])

# print(f"Data split > train: {train_data.shape} | validation: {validation_data.shape} | test: {test_data.shape}")

# print("## Processing complete. Exiting.")

# # Create a folder to save the files
# data_folder = "cleaned_data"  # Specify the data folder path
# os.makedirs(data_folder, exist_ok=True)

# # Define the file paths
# train_x_file = os.path.join(data_folder, "train_x.csv")
# train_y_file = os.path.join(data_folder, "train_y.csv")
# validation_x_file = os.path.join(data_folder, "validation_x.csv")
# validation_y_file = os.path.join(data_folder, "validation_y.csv")
# test_x_file = os.path.join(data_folder, "test_x.csv")
# test_y_file = os.path.join(data_folder, "test_y.csv")
# baseline_file = os.path.join(data_folder, "baseline.csv")
# file_path = os.path.join(data_folder, "combined_plant.csv")

# # Save datasets locally
# train_data.drop([target_col], axis=1).to_csv(train_x_file, index=False)
# train_data[target_col].to_csv(train_y_file, index=False)
# validation_data.drop([target_col], axis=1).to_csv(validation_x_file, index=False)
# validation_data[target_col].to_csv(validation_y_file, index=False)
# test_data.drop([target_col], axis=1).to_csv(test_x_file, index=False)
# test_data[target_col].to_csv(test_y_file, index=False)
# combined_plant.drop([target_col], axis=1).to_csv(baseline_file, index=False)
# combined_plant.to_csv(file_path, index=False)

# print("All files saved successfully to the local folder:", data_folder)

Overwriting preprocessing.py


In [140]:
# run preprocessing.py manually to test it work or not
# %run preprocessing.py

In [99]:
train_data.head()

                DATE_TIME       SOURCE_KEY    DC_POWER  DAILY_YIELD  \
38339 2020-06-04 02:15:00  Quc1TzYxW2pYoWX    0.000000  3721.000000   
7786  2020-05-18 20:15:00  McdE0feGgRqW7Ca    0.000000  5464.000000   
52358 2020-06-10 17:30:00  oZZkBaNadn6DNKz   16.927273  5451.818182   
25695 2020-05-28 22:45:00  LlT2YUhhzqhg5Sw    0.000000  8881.000000   
36636 2020-06-03 06:45:00  9kRcWv60rDACzjR  108.013333    32.733333   

       AMBIENT_TEMPERATURE  MODULE_TEMPERATURE  IRRADIATION        DATE  \
38339            23.482579           22.529118     0.000000  2020-06-04   
7786             24.169771           21.926884     0.000000  2020-05-18   
52358            28.161790           27.141686     0.009882  2020-06-10   
25695            27.772995           26.534715     0.000000  2020-05-28   
36636            24.083965           25.116690     0.072946  2020-06-03   

           TIME  MONTH  HOURS  MINUTES  MINUTES_PASS  
38339  02:15:00      6      2       15           135  
7786   20:15

In [100]:
combined_plant.head()

   DATE_TIME       SOURCE_KEY  DC_POWER  DAILY_YIELD  AMBIENT_TEMPERATURE  \
0 2020-05-15  1BY6WEcLGh8j5v7       0.0          0.0            25.184316   
1 2020-05-15  1IF53ai7Xc0U56Y       0.0          0.0            25.184316   
2 2020-05-15  3PZuoBAID5Wc2HD       0.0          0.0            25.184316   
3 2020-05-15  7JYdWkrLSPkdwr4       0.0          0.0            25.184316   
4 2020-05-15  McdE0feGgRqW7Ca       0.0          0.0            25.184316   

   MODULE_TEMPERATURE  IRRADIATION        DATE      TIME  MONTH  HOURS  \
0           22.857507          0.0  2020-05-15  00:00:00      5      0   
1           22.857507          0.0  2020-05-15  00:00:00      5      0   
2           22.857507          0.0  2020-05-15  00:00:00      5      0   
3           22.857507          0.0  2020-05-15  00:00:00      5      0   
4           22.857507          0.0  2020-05-15  00:00:00      5      0   

   MINUTES  MINUTES_PASS  
0        0             0  
1        0             0  
2        0 

#### Upload merge data to S3

In [101]:
# import boto3

# s3 = boto3.client('s3')

# # Upload train_x.csv
# s3.upload_file(train_x_file, bucket_name, "input/cleaned_data/train_x.csv")
# print(f'Train X file uploaded successfully to {bucket_name}.')

# # Upload train_y.csv
# s3.upload_file(train_y_file, bucket_name, "input/cleaned_data/train_y.csv")
# print(f'Train Y file uploaded successfully to {bucket_name}.')

# # Upload validation_x.csv
# s3.upload_file(validation_x_file, bucket_name, "input/cleaned_data/validation_x.csv")
# print(f'Validation X file uploaded successfully to {bucket_name}.')

# # Upload validation_y.csv
# s3.upload_file(validation_y_file, bucket_name, "input/cleaned_data/validation_y.csv")
# print(f'Validation Y file uploaded successfully to {bucket_name}.')

# # Upload test_x.csv
# s3.upload_file(test_x_file, bucket_name, "input/cleaned_data/test_x.csv")
# print(f'Test X file uploaded successfully to {bucket_name}.')

# # Upload test_y.csv
# s3.upload_file(test_y_file, bucket_name, "input/cleaned_data/test_y.csv")
# print(f'Test Y file uploaded successfully to {bucket_name}.')

# # Upload baseline.csv
# s3.upload_file(baseline_file, bucket_name, "input/cleaned_data/baseline.csv")
# print(f'Baseline file uploaded successfully to {bucket_name}.')

In [102]:
# check how many file in s3
s3 = boto3.client('s3')
response = s3.list_objects_v2(Bucket=bucket_name)

if 'Contents' in response:
    file_count = len(response['Contents'])
    print(f"Number of files in {bucket_name}: {file_count}")
else:
    print(f"No files found in {bucket_name}")

Number of files in sagemaker-us-east-1-531485126105: 287


In [103]:
# check file in the s3 or not
# if pre processing work it should have files 

s3 = boto3.client('s3')
prefix = "Group-project/Solar1"  # Specify the prefix to filter the files

response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

if 'Contents' in response:
    print(f"Files in {bucket_name} with prefix '{prefix}':")
    for obj in response['Contents']:
        file_name = obj['Key']
        print(file_name)
else:
    print(f"No files found in {bucket_name} with prefix '{prefix}'")

Files in sagemaker-us-east-1-531485126105 with prefix 'Group-project/Solar1':
Group-project/Solar1/processing/input/Plant_1_Generation_Data.csv
Group-project/Solar1/processing/input/Plant_1_Weather_Sensor_Data.csv
Group-project/Solar1/processing/input/Plant_2_Generation_Data.csv
Group-project/Solar1/processing/input/Plant_2_Weather_Sensor_Data.csv
Group-project/Solar1/processing/output/baseline/baseline.csv
Group-project/Solar1/processing/output/test/test_x.csv
Group-project/Solar1/processing/output/test/test_y.csv
Group-project/Solar1/processing/output/train/train.csv
Group-project/Solar1/processing/output/validation/validation.csv


In [104]:
train_s3_url

's3://Group-project/Solar1/processing/output/train'

In [105]:
f"{prefix}/processing/input"

'Group-project/Solar1/processing/input'

In [106]:
input_url = f"s3://{bucket_name}/Group-project/Solar1/processing/input/"
output_url = f"s3://{bucket_name}/Group-project/Solar1/processing/output/"

In [107]:
input_s3_url_param_g1

ParameterString(name='InputDataUrgen1', parameter_type=<ParameterTypeEnum.STRING: 'String'>, default_value='s3://sagemaker-us-east-1-531485126105/Group-project/Solar1/processing/input/Plant_1_Generation_Data.csv')

In [143]:
# Create SKLearnProcessor
sklearn_processor = SKLearnProcessor(
    framework_version="0.23-1",
    role=sm_role,
    instance_type=process_instance_type_param.default_value,
    instance_count=1,
    base_job_name=f"{pipeline_name}/preprocess",
    sagemaker_session=session,
)

processing_inputs = [
    ProcessingInput(source=input_s3_url_param, destination=f"{prefix}/processing/input")
]

#create output to s3
processing_outputs = [
    ProcessingOutput(output_name="train_data", source=f"{prefix}/processing/output", destination=train_s3_url),
    ProcessingOutput(output_name="validation_data", source=f"{prefix}/processing/output", destination=validation_s3_url),
    ProcessingOutput(output_name="test_data", source=f"{prefix}/processing/output", destination=test_s3_url),
    ProcessingOutput(output_name="baseline_data", source=f"{prefix}/processing/output", destination=baseline_s3_url),
]

# Run the SKLearnProcessor
processor_args = sklearn_processor.run(
    inputs=processing_inputs,
    outputs=processing_outputs,
    code='preprocessing.py',
    # arguments = ['arg1', 'arg2'],
)

# Define processing step
step_process = ProcessingStep(
    name=f"{pipeline_name}-preprocess-data",
    step_args=processor_args,
)




#### Training step

In [109]:
LinearRegression_image_uri = sagemaker.image_uris.retrieve(framework='linear-learner',region=region)

In [110]:
# Instantiate a Linear Learner estimator object
estimator = Estimator(
    image_uri=LinearRegression_image_uri,
    role=get_execution_role(),
    instance_type=train_instance_type_param,
    instance_count=train_instance_count_param,
    output_path=output_s3_url,
    sagemaker_session=session,
    base_job_name=f"{pipeline_name}/train",
)

# Define algorithm hyperparameters
estimator.set_hyperparameters(
    predictor_type='regressor',  # Set the predictor type to 'regressor' for linear regression
    feature_dim=5,  # Specify the number of input features
    epochs=100,  # Number of training epochs
    mini_batch_size=32,  # Mini-batch size for training
    learning_rate=0.1,  # Learning rate for the optimizer
    normalize_data='true',  # Normalize the input features
    loss='squared_loss',  # Loss function for linear regression
    early_stopping_patience=10,  # Patience for early stopping
    early_stopping_tolerance=0.001,  # Tolerance for early stopping
    early_stopping_enabled=True,  # Enable early stopping
)

# Define training inputs
training_inputs = {
    "train": TrainingInput(
        s3_data=step_process.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri,
        content_type="text/csv",
    ),
    "validation": TrainingInput(
        s3_data=step_process.properties.ProcessingOutputConfig.Outputs["validation_data"].S3Output.S3Uri,
        content_type="text/csv",
    ),
}

# Fit the estimator to the training data
training_args = estimator.fit(training_inputs)

# Define the training step
step_train = TrainingStep(
    name=f"{pipeline_name}-train",
    step_args=training_args,
)


#### Evaluation step
Create a model evaluation script to check if the model performance meets the specified threshold. 

In [144]:
%%writefile evaluation.py

import xgboost as xgb
from sklearn.metrics import mean_squared_error, r2_score

if __name__ == "__main__":   
    
    # All paths are local for the processing container
    model_path = f"{prefix}/processing/model/model.tar.gz"
    test_x_path = f"{prefix}processing/test/test_x.csv"
    test_y_path = f"{prefix}/processing/test/test_y.csv"
    output_dir = f"{prefix}/processing/evaluation"
    output_prediction_path = f"{prefix}/processing/output/"
        
    # Read model tar file
    with tarfile.open(model_path, "r:gz") as t:
        t.extractall(path=".")
    
    # Load model
    model = linear.Booster()
    model.load_model("linear-model")
    
    # Read test data
    X_test = pd.read_csv(test_x_path, header=None).values
    y_test = pd.read_csv(test_y_path, header=None).to_numpy()

    # Run predictions
    predictions = model.predict(X_test)

    # Calculate RMSE
    rmse = np.sqrt(mean_squared_error(y_test, predictions))
    
    # Calculate R2 score
    r2 = r2_score(y_test, predictions)
    
    report_dict = {
        "regression_metrics": {
            "rmse": {
                "value": rmse,
            },
            "r2_score": {
                "value": r2,
            },
        },
    }

    # Save evaluation report
    pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True)
    with open(f"{output_dir}/evaluation.json", "w") as f:
        f.write(json.dumps(report_dict))
    
    # Save prediction baseline file - we need it later for the model quality monitoring
    pd.DataFrame({"prediction": predictions,
                  "label": y_test.squeeze()}
                ).to_csv(os.path.join(output_prediction_path, 'prediction_baseline/prediction_baseline.csv'), index=False, header=True)


Overwriting evaluation.py


Create a processor to run the evaluation script and construct the evaluation step:

In [145]:
from sagemaker import Model
from sagemaker.inputs import CreateModelInput
from sagemaker.workflow.model_step import ModelStep
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.conditions import (
    ConditionGreaterThan,
    ConditionGreaterThanOrEqualTo
)

from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.workflow.properties import PropertyFile
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import (
    Join,
    JsonGet
)

from sagemaker.model_metrics import (
    MetricsSource, 
    ModelMetrics, 
    FileSource
)

from sagemaker.workflow.pipeline import Pipeline

In [146]:
script_processor = ScriptProcessor(
    image_uri=LinearRegression_image_uri,
    role=sm_role,
    command=["python3"],
    instance_type=process_instance_type_param,
    instance_count=1,
    base_job_name=f"{pipeline_name}/evaluate",
    sagemaker_session=session,
)
eval_inputs=[
    ProcessingInput(source=step_train.properties.ModelArtifacts.S3ModelArtifacts, 
                    destination="/opt/ml/processing/model"),
    ProcessingInput(source=step_process.properties.ProcessingOutputConfig.Outputs["test_data"].S3Output.S3Uri, 
                    destination="/opt/ml/processing/test"),
]

eval_outputs=[
    ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation", 
                     destination=evaluation_s3_url),
    ProcessingOutput(output_name="prediction_baseline_data", source="/opt/ml/processing/output/prediction_baseline", 
                     destination=prediction_baseline_s3_url),
]

eval_args = script_processor.run(
    inputs=eval_inputs,
    outputs=eval_outputs,
    code="evaluation.py",
)
    
evaluation_report = PropertyFile(
    name="ModelEvaluationReport", output_name="evaluation", path="evaluation.json"
)

step_eval = ProcessingStep(
    name=f"{pipeline_name}-evaluate-model",
    step_args=eval_args,
    property_files=[evaluation_report]
)

#### Register step
The register step creates a SageMaker model and registers a new version of a model in the SageMaker Model Registry within a [model package group](https://docs.aws.amazon.com/sagemaker/latest/dg/model-registry-model-group.html).

In [147]:
model = Model(
    image_uri=LinearRegression_image_uri,        
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    name=f"Solar-energy-regression-model",
    sagemaker_session=session,
    role=sm_role,
)

model_metrics = ModelMetrics(
    model_statistics=MetricsSource(
        s3_uri="{}/evaluation.json".format(
            step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"]
        ),
        content_type="application/json",
    )
)

register_args = model.register(
    content_types=["application/json"],  # Update with appropriate content type for regression
    response_types=["application/json"],  # Update with appropriate response type for regression
    inference_instances=["ml.t2.medium", "ml.m5.xlarge", "ml.m5.large"],
    transform_instances=["ml.m5.xlarge", "ml.m5.large"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status_param,
    model_metrics=model_metrics,
)

step_register = ModelStep(
    name=f"{pipeline_name}-register",
    step_args=register_args
)


#### Fail step
Add a Pipelines [FailStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.fail_step.FailStep) to stop the pipeline execution if the model performance metric doesn't meet the specified threshold. 

In [148]:
step_fail = FailStep(
    name=f"{pipeline_name}-fail",
    error_message=Join(on=" ", values=["Execution failed due to AUC Score >", test_score_threshold_param]),
)

#### Condition step
The condition step checks the model performance score and conditionally creates a model and registers it in the model registry, or stops and fails the pipeline execution.

In [149]:
cond_lte = ConditionGreaterThan(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.rmse.value",
    ),
    right=test_score_threshold_param,
)

step_cond = ConditionStep(
    name=f"{pipeline_name}-check-test-score",
    conditions=[cond_lte],
    if_steps=[step_register],
    else_steps=[step_fail],
)

#### Construct the pipeline 


In [117]:
%pip install --quiet sagemaker==2.132.0

[0mNote: you may need to restart the kernel to use updated packages.


In [156]:
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        process_instance_type_param,
        train_instance_type_param,
        train_instance_count_param,
        model_approval_status_param,
        test_score_threshold_param,
        input_s3_url_param,
    ],
    steps=[step_process, step_train, step_eval, step_cond],
    sagemaker_session=session,
)

In [152]:
# Create a new or update existing Pipeline
pipeline.upsert(role_arn=sm_role)

{'PipelineArn': 'arn:aws:sagemaker:us-east-1:531485126105:pipeline/Group-project-pipeline',
 'ResponseMetadata': {'RequestId': '54cff2f2-f010-4018-bb2c-967eca78f74f',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '54cff2f2-f010-4018-bb2c-967eca78f74f',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '90',
   'date': 'Mon, 26 Jun 2023 11:43:25 GMT'},
  'RetryAttempts': 0}}

In [157]:
pipeline_definition = json.loads(pipeline.describe()['PipelineDefinition'])
pipeline_definition

{'Version': '2020-12-01',
 'Metadata': {},
 'Parameters': [{'Name': 'ProcessingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.c5.xlarge'},
  {'Name': 'TrainingInstanceType',
   'Type': 'String',
   'DefaultValue': 'ml.m5.xlarge'},
  {'Name': 'TrainingInstanceCount', 'Type': 'Integer', 'DefaultValue': 1},
  {'Name': 'ModelApprovalStatus',
   'Type': 'String',
   'DefaultValue': 'PendingManualApproval'},
  {'Name': 'TestScoreThreshold', 'Type': 'Float', 'DefaultValue': 0.5},
  {'Name': 'InputDataUrgen1',
   'Type': 'String',
   'DefaultValue': 's3://sagemaker-us-east-1-531485126105/Group-project/Solar1/processing/input/combined_plant.csv'}],
 'PipelineExperimentConfig': {'ExperimentName': {'Get': 'Execution.PipelineName'},
  'TrialName': {'Get': 'Execution.PipelineExecutionId'}},
 'Steps': [{'Name': 'Group-project-pipeline-preprocess-data',
   'Type': 'Processing',
   'Arguments': {'ProcessingResources': {'ClusterConfig': {'InstanceType': 'ml.c5.xlarge',
      'InstanceCount'

## Execute the pipeline
The following code starts an execution of the pipeline with the specified parameters.

In [155]:
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceType=process_instance_type,
        TrainingInstanceType=train_instance_type,
        TrainingInstanceCount=train_instance_count,
        ModelApprovalStatus="PendingManualApproval",
        TestScoreThreshold=0.75,
        InputDataUrl=input_s3_url_param
    )
)

TypeError: Pipeline variables do not support __str__ operation. Please use `.to_string()` to convert it to string type in execution timeor use `.expr` to translate it to Json for display purpose in Python SDK.

In [None]:
# Un-comment this call if you want the notebook to wait until the pipeline's execution finished
execution.wait()
execution.list_steps()

In [None]:
import xgboost as xgb
from sklearn.datasets import load_boston
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error



# Define the XGBoost linear model
model = xgb.XGBRegressor(booster='gblinear')