<center>
<h1> D ONE MLOps AWS BlogPost </h1>
    <h2>Work In Progress</h2>
<hr>
<h1>Random Forest Trainig Pipeline</h1>
<hr>
 </center>

<h2>Problem Introduction</h2>

* See EDA notebook
* Add a few high level lines here

# Setting up shop

A known challenge with SageMaker Studio default libraries is that they are often outdated. In this case, we want to make sure that pandas and s3fs is up-to-date. s3fs is a file system interface for S3 (https://pypi.org/project/s3fs/).

In [1]:
! pip install pandas s3fs --upgrade

  from cryptography.utils import int_from_bytes
  from cryptography.utils import int_from_bytes
[0m

# Import packages
We need a few packages, let's import them.

In [34]:
from sklearn.model_selection import train_test_split
from datetime import datetime, timedelta

import boto3  # Amazon Web Services (AWS) Software Development Kit (SDK) for Python
import os
import pandas as pd
import sagemaker
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

pd.options.mode.chained_assignment = None  # default='warn'

# Set up global variables

In [35]:
# This is the role that SageMaker assumes to leverage AWS resources
ROLE = sagemaker.get_execution_role()
ROLE

'arn:aws:iam::451811961115:role/AmazonSageMaker-ExecutionRole-20200526T075798'

In [36]:
# Paths for the containers that we will create
# S3 Bucket where the data is stored
BUCKET_NAME = "aws-sagemaker-blogpost"
BUCKET = f's3://{BUCKET_NAME}'

# Raw data paths
RAW_DATA_FOLDER = 'data'
# Note if you change the line below, you also need to change it in the cell that creates the prepare_data.py
RAW_DATA_FILE = 'wind_turbines.csv'
RAW_DATA_PATH = os.path.join(BUCKET, RAW_DATA_FOLDER, RAW_DATA_FILE)

# Path where the processed objects will be stored
now = datetime.now() # get current time to ensure uniqueness of the output folders
PROCESSED_DATA_FOLDER = 'processed_' + now.strftime("%Y-%m-%d_%H%M_%S%f")
PROCESSED_DATA_PATH = os.path.join(BUCKET, PROCESSED_DATA_FOLDER)

# Paths for model train, validation, test split
TRAIN_DATA_PATH = os.path.join(PROCESSED_DATA_PATH, 'train.csv')
VALIDATION_DATA_PATH = os.path.join(PROCESSED_DATA_PATH, 'validation.csv')
TEST_DATA_PATH = os.path.join(PROCESSED_DATA_PATH, 'test.csv')

# Path to model output data
MODEL_OUTPUT = os.path.join(BUCKET, 'output')

In [40]:
# Job name for preprocessing
PREPROCESSING_JOB_NAME = 'windTurbinesPreprocessing'

# Preprocess data (TODO)

The data comes in chunks, but for our purposes, we would like to have one large file with data. We could also extend the code to read only new data and add it to the file, but for simplicity, let's load the whole dataset for processing.

Note: Having the data in chunks would allow for online processing of new data when it comes in, e.g., when a turbine reports the next couple of months data.

For details on the dataset, see the exploratory data analysis work in Part 1.

* TODO: Expand
* TODO: Link to EDA.

We first create a preprocessing script that will be used by a built-in SKLearn container to run the preprocessing job of our pipeline. Running the cell below will produce a Python file whose content is the same as the cell's content and named `prepare_data.py`. It will be saved in the notebooks instance path (see left pane of the SageMaker Studio JupyterLab).

We then define the container that will read in the data from the S3 bucket and use this script to preprocess it and then write the processed files back to the S3 bucket.

In [43]:
%%writefile prepare_data.py

import argparse
import logger
import pandas as pd
from typing import List, Tuple, Union
from datetime import datetime, timedelta

logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

def assert_col_of_df(df: pd.DataFrame, col: Union[List[str], str]) -> None:
    """Helper function to assert that a column `col` is a column of `df`.
    
    Args:
        df: Dataframe.
        col: String value to test.
    
    Returns:
        None.
        
    Raises:
        ValueError if `col` is not a column of `df`.
    """
    if isinstance(col, str):
        col = [col]

    for c in col:
        try:
            assert c in df.columns      
        except AssertionError:
            raise ValueError(f"Invalid input value. Column {c} is not a column of df.")

            
def get_train_test_split(
        df: pd.DataFrame,
        n_days_test: int
        ) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Splits the input data frame into a training and test set.

    Args:
        df: Raw input data.
        n_days_test: Number of days to consider for the test split. The n_days_test last 
            days of the input data will be selected for the test split.

    Returns:
        Tuple[pd.DataFrame]: Raw train and test data splits.
    """
    _date_col = 'date'
    _measured_at_col = 'measured_at'
    
    assert_col_of_df(df=df, col=_measured_at_col)
    
    # Take only the date part of the string, i.e., the first 10 characters
    df[_date_col] = df[_measured_at_col].apply(lambda x: x[:10])
    # Convert to date object
    df[_date_col] = pd.to_datetime(df[_date_col], format='%Y-%m-%d')
    
    # Get the test dates
    min_date = df[_date_col].min()
    max_date = df[_date_col].max()
    
    test_dates = [
        datetime.strftime(max_date - timedelta(days=i), '%Y-%m-%d') for i in range(n_days_test)
    ]
    
    df_train = df[~df[_date_col].isin(test_dates)].drop(_date_col, axis=1)
    df_test = df[df[_date_col].isin(test_dates)].drop(_date_col, axis=1)
    
    logger.info(f"Train set ranges from {min_date} until {min(test_dates)} (not included).")
    logger.info(f"Test set ranges from {min(test_dates)} until {max(test_dates)}.")
    
    return df_train, df_test


def fill_nulls(df: pd.DataFrame, col: str) -> pd.DataFrame:
    """Fills nulls in column `col` of dataframe `df`.
    
    Args:
        df: Raw input dataframe.
        col: Column of `df` with nulls filled with 0.
        
    Returns:
        Dataframe with nulls filled.
    """
    assert_col_of_df(df=df, col=col)
        
    df.loc[:, col] = (
        df
        .loc[:, col]
        .fillna(0)
    )
    
    logger.info(f"Filled nulls in column {col} with 0.")

    return df


def filter_power(df: pd.DataFrame, col_power: str, min_power: float) -> pd.DataFrame:
    """Filters the `df` on the power column `col_power`.
    
    
    Args:
        df: Raw input dataframe.
        col_power: Column of `df` with the power production.
        min_power: Minimum values of power production considered. Rows with smaller
            values are filtered out.
        
    Returns:
        Dataframe filtered on `min_power`.
    """
    assert_col_of_df(df=df, col=col_power)
    
    filter_power = df[col_power] > min_power
    
    rowcount_before = df.shape[0]
    df = (
        df
        .loc[filter_power]
    )
    rowcount_after = df.shape[0]
    logger.info(f"Removed {rowcount_before-rowcount_after} rows which had power below {min_power}.")
    
    return df


def transform_error_types(
    df: pd.DataFrame,
    col_errors: str,
    errors_to_classify: List[int]
    )-> pd.DataFrame:
    """Transforms error types.
    
    Error types are classified according to the `errors_to_classify`. If the error
    type is not in the list, it will be replaced with 9. Nulls are replaced with
    0.

    Args:
        df: Raw input data.
        col_errors: Column of `df` with the error types.
        errors_to_classify: List of error types that should be considered.
    
    Returns:
        Dataframe with transformed error types.
    """
    assert_col_of_df(df=df, col=col_errors)

    df[col_errors] = (
        df
        .loc[:, col_errors]
        .fillna(0)
        .apply(lambda x: int(x))
        .apply(lambda x: x if x in errors_to_classify else 9)
    )
    logger.info(f"Transformed error types in column {col_errors}.")

    return df


def split_features_target(
    df: pd.DataFrame,
    features: List[str],
    target: str
    ) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Function to split the dataframe into input features and labels.
    
    Args:
        df: Raw input data.
        features: List of the features to be included in the transformation.
        target: Target column.
    
    Returns:
        Tuple[pd.DataFrame]: Transformed dataframes (input features and labels).
    """
    assert_col_of_df(df=df, col=(features + [target]))

    y = df.loc[:, target]
    x = df.loc[:, features]
    logger.info(f"Split dataframe into features with shape {x.shape} and target with shape {y.shape}.")
    return x, y


def wrap_transform_data(
    df: pd.DataFrame,
    col_power: str,
    min_power: float,
    col_errors: str,
    errors_to_classify: List[int],
    features: List[str],
    target: str
    ) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """Wrapper for transforming the data for the model
    
    Processing is applied in the following steps:
        1. Filtering out low power values
        2. Transforming error types
        3. Fill nuls in all of the feature columns
        4. Split dataset into features and target (x, y)
        
    Args:
        df: Input dataframe.
        col_power: Column of `df` with the power production.
        min_power: Minimum values of power production considered. Rows with smaller
            values are filtered out.
        col_errors: Column of `df` with the error types.
        errors_to_classify: List of error types that should be considered.
        features: List of the features to be included in the transformation.
        target: Target column.
    
    Returns:
        
    """
    # 1. Filter out low power
    df = filter_power(df=df, col_power=col_power, min_power=min_power)

    # 2. Transform error types
    df = transform_error_types(df=df, col_errors=col_errors, errors_to_classify=errors_to_classify)

    # 3. Fill nulls in all of the feature columns
    for feat in features:
        df = fill_nulls(df=df, col=feat)

    # 4. Split dataset into features and target
    x, y = split_features_target(df=df, features=features, target=target) 
    
    return x, y


# ----- CONSTANTS ----- #
# Columns of df
# Error column <> target
COL_ERRORS = 'categories_sk'
# Power produced column (used for filtering out small values)
COL_POWER = 'power'
# Error (i.e., target) classification list
ERRORS_TO_CLASSIFY = [0, 3, 5, 7, 8]
# Features to consider for the model
FEATURES = ['wind_speed', 'power', 'nacelle_direction', 'wind_direction',
            'rotor_speed', 'generator_speed', 'temp_environment',
            'temp_hydraulic_oil', 'temp_gear_bearing', 'cosphi',
            'blade_angle_avg', 'hydraulic_pressure']
# Power values to filter out
MIN_POWER = 0.05
# Filname of the raw data file
RAW_DATA_FILE = 'wind_turbines.csv'


if __name__ == '__main__':
    
    logger.debug(f'Preprocessing job started.')
    # Parse the SDK arguments that are passed when creating the SKlearn container
    parser = argparse.ArgumentParser()
    parser.add_argument("--n_test_days", type=int, default=10)
    parser.add_argument("--n_val_days", type=int, default=10)
    args, _ = parser.parse_known_args()

    logger.debug(f"Received arguments {args}.")

    # Read in data locally in the container
    input_data_path = os.path.join("/opt/ml/processing/input", RAW_DATA_FILE)
    logger.debug(f"Reading input data from {input_data_path}")
    # Read raw input data
    df = pd.read_csv(input_data_path)
    logger.debug(f"Shape of data is:{df.shape}")

    # ---- Preprocess the data set ----
    logger.debug("Split data into training+validation and test set.")
    df_train_valid, df_test = get_train_test_split(df=df, n_days_test=args.n_test_days) 

    logger.debug("Split training+validation into training and validation set.")
    df_train, df_val = get_train_test_split(df=df, n_days_test=args.n_val_days) 

    logger.debug("Transforming training data.")
    x_train, y_train = wrap_transform_data(
        df=df_train,
        col_power=COL_POWER,
        min_power=MIN_POWER,
        col_errors=COL_ERRORS,
        errors_to_classify=ERRORS_TO_CLASSIFY,
        features=FEATURES,
        target=COL_ERRORS
    )
    
    logger.debug("Transforming validation data.")
    x_val, y_val = wrap_transform_data(
        df=df_val,
        col_power=COL_POWER,
        min_power=MIN_POWER,
        col_errors=COL_ERRORS,
        errors_to_classify=ERRORS_TO_CLASSIFY,
        features=FEATURES,
        target=COL_ERRORS
    )

    # Create local output directories. These directories live on the container that is spun up.
    try:
        os.makedirs("/opt/ml/processing/train")
        os.makedirs("/opt/ml/processing/validation")
        os.makedirs("/opt/ml/processing/test")
        print("Successfully created directories")
    except Exception as e:
        # if the Processing call already creates these directories (or directory otherwise cannot be created)
        logger.debug(e)
        logger.debug("Could Not Make Directories.")
        pass

    # Save data locally on the container that is spun up.
    try:
        pd.DataFrame(x_train).to_csv("/opt/ml/processing/train/x_train.csv", header=True, index=False)
        pd.DataFrame(y_train).to_csv("/opt/ml/processing/train/y_train.csv", header=True, index=False)
        pd.DataFrame(x_val).to_csv("/opt/ml/processing/validation/x_val.csv", header=True, index=False)
        pd.DataFrame(y_val).to_csv("/opt/ml/processing/validation/y_val.csv", header=True, index=False)
        pd.DataFrame(df_test).to_csv("/opt/ml/processing/test/test.csv", header=True, index=False)
        logger.debug("Files Successfully Written Locally")
    except Exception as e:
        logger.debug("Could Not Write the Files")
        logger.debug(e)
        pass

    logger.debug("Finished running processing job")

Overwriting prepare_data.py


If you read the script carefully you see that the training, validation, and test datasets are stored in the file path `/opt/ml/processing/...`. This is not the path in your S3 bucket, but refers to the local path on the container that is spun up. Do you know where you can find the train, validation, and test set? 

We specified that in the beginning when setting up path variables (e.g., `TEST_DATA_PATH`)! This will become useful later.
The first line of the cell's output will indicate you the job's name. Go back to the AWS SageMaker Console and check the processing jobs. In my case - eu-central-1 AZ - the link is https://eu-central-1.console.aws.amazon.com/sagemaker/home?region=eu-central-1#/processing-jobs.

From there, you can leverage CloudWatch to check the logs for the job or other monitoring tools.

In [None]:
%%time

base_job_name = PREPROCESSING_JOB_NAME
sklearn_processor = SKLearnProcessor(
    base_job_name=base_job_name,
    framework_version="0.20.0",
    role=ROLE,
    instance_type="ml.m5.xlarge",
    instance_count=1
)

sklearn_processor.run(
    code="prepare_data.py",
    inputs=[ProcessingInput(source=RAW_DATA_PATH, destination="/opt/ml/processing/input")],
    outputs=[
        ProcessingOutput(
            destination=PROCESSED_DATA_PATH,
            source="/opt/ml/processing/train"),
        ProcessingOutput(
            destination=PROCESSED_DATA_PATH,
            source="/opt/ml/processing/validation"),
        ProcessingOutput(
            destination=PROCESSED_DATA_PATH,
            source="/opt/ml/processing/test"),
    ],
    arguments=["--n_test_days", "20",
              "--n_val_days", "30"],
)

preprocessing_job_description = sklearn_processor.jobs[-1].describe()


Job Name:  windTurbinesPreprocessing-2022-05-11-16-12-05-699
Inputs:  [{'InputName': 'input-1', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://aws-sagemaker-blogpost/data/wind_turbines.csv', 'LocalPath': '/opt/ml/processing/input', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}, {'InputName': 'code', 'AppManaged': False, 'S3Input': {'S3Uri': 's3://sagemaker-eu-central-1-451811961115/windTurbinesPreprocessing-2022-05-11-16-12-05-699/input/code/prepare_data.py', 'LocalPath': '/opt/ml/processing/input/code', 'S3DataType': 'S3Prefix', 'S3InputMode': 'File', 'S3DataDistributionType': 'FullyReplicated', 'S3CompressionType': 'None'}}]
Outputs:  [{'OutputName': 'output-1', 'AppManaged': False, 'S3Output': {'S3Uri': 's3://aws-sagemaker-blogpost/processed_2022-05-11_1604_19431566', 'LocalPath': '/opt/ml/processing/train', 'S3UploadMode': 'EndOfJob'}}, {'OutputName': 'output-2', 'AppManaged': False, 'S3Output': {'S3Uri

In [16]:
from preprocessing import (
    get_train_test_split,
    wrap_transform_data
)
# ----- CONSTANTS ----- #
# Columns of df
# Error column <> target
COL_ERRORS = 'categories_sk'
# Power produced column (used for filtering out small values)
COL_POWER = 'power'
# Error (i.e., target) classification list
ERRORS_TO_CLASSIFY = [0, 3, 5, 7, 8]
# Features to consider for the model
FEATURES = ['wind_speed', 'power', 'nacelle_direction', 'wind_direction',
            'rotor_speed', 'generator_speed', 'temp_environment',
            'temp_hydraulic_oil', 'temp_gear_bearing', 'cosphi',
            'blade_angle_avg', 'hydraulic_pressure']
# Power values to filter out
MIN_POWER = 0.05
# Number of days in the (hidden) test set
N_TEST_DAYS = 20
# Number of days in the validation set
N_VAL_DAYS = 20


# Read raw input data
df = pd.read_csv(RAW_DATA_PATH)

# Split data into training+validation and test set
df_train_valid, df_test = get_train_test_split(df=df, n_days_test=N_TEST_DAYS) 

# Split training+validation into training and validation set
df_train, df_val = get_train_test_split(df=df, n_days_test=N_VAL_DAYS) 

x_train, y_test = wrap_transform_data(
    df=df_train,
    col_power=COL_POWER,
    min_power=MIN_POWER,
    col_errors=COL_ERRORS,
    errors_to_classify=ERRORS_TO_CLASSIFY,
    features=FEATURES,
    target=COL_ERRORS
)

x_val, y_val = wrap_transform_data(
    df=df_val,
    col_power=COL_POWER,
    min_power=MIN_POWER,
    col_errors=COL_ERRORS,
    errors_to_classify=ERRORS_TO_CLASSIFY,
    features=FEATURES,
    target=COL_ERRORS
)

In [20]:
x_val['measured_at'].min(), x_val['measured_at'].max()

('2020-03-12 00:00:00.0000000 +01:00', '2020-03-31 17:00:00.0000000 +01:00')

In [22]:
x_train['measured_at'].min(), x_train['measured_at'].max()

('2020-01-01 01:00:00.0000000 +01:00', '2020-03-11 23:50:00.0000000 +01:00')

In [None]:
min_date = df['date'].min()
max_date = df['date'].max()

In [None]:
test_dates = [datetime.strftime(max_date - timedelta(days=i), '%Y-%m-%d') for i in range(3)]
test_dates

In [None]:
select_and_transform_data(df, features)