<a href="https://colab.research.google.com/github/benhandy/workflow/blob/main/Feature_Engineering_for_ML.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
"""
Feature Engineering Lab: Building ML-Ready Datasets

 In this lab, I'll walk through the process of feature engineering in three steps:

  First is data extraction, where I'll connect to various data sources, manage large data volumes, and extract the dataset needed to train the ML model.

   Next is feature creation, where I'll transform the extracted data into numerical features that the model can learn from.
    This involves labeling records to train the model, generating vector embeddings to represent text data, and encoding categorical values into numerical formats.

     Finally, I'll focus on feature storage. After cleaning and transforming the data, I'll store the final dataset in an accessible system for the ML team.
      This step includes splitting the data into training and testing sets and saving them in S3 buckets for further exploration and model training.

"""


"\nFeature Engineering Lab: Building ML-Ready Datasets\n\nIn this lab, I'll walk through the process of feature engineering in three steps. First is data extraction, where I'll connect to various data sources, manage large data volumes, and extract the dataset needed to train the ML model. This step sets the foundation for everything that follows. \n\nNext is feature creation, where I'll transform the extracted data into numerical features that the model can learn from. This involves labeling records to train the model, generating vector embeddings to represent text data, and encoding categorical values into numerical formats.\n\nFinally, I'll focus on feature storage. After cleaning and transforming the data, I'll store the final dataset in an accessible system for the ML team. This step includes splitting the data into training and testing sets and saving them in S3 buckets for further exploration and model training.\n\n"

In [None]:
import os
import io
import datetime as dt
import pickle

import awswrangler as wr
import boto3
from dotenv import load_dotenv
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, OneHotEncoder, StandardScaler, KBinsDiscretizer

%load_ext sql

In [None]:
%config SqlMagic.style = '_DEPRECATED_DEFAULT'

In [None]:
BUCKET_NAME = 'de-c4w2lab1-339712961490-us-east-1-data-bucket'

In [None]:
# getting endpoint in AWS bash terminal
# this is a bash command
aws rds describe-db-instances --db-instance-identifier de-c4w2lab1-rds --output text --query "DBInstances[].Endpoint.Address"


In [None]:
# updating env file with endpoint address
DBHOST=de-c4w2lab1-rds.cfe0es80i889.us-east-1.rds.amazonaws.com
DBUSER = admin
DBPASSWORD = adminpwrd
DBPORT = 3306
DBNAME = classicmodels

In [None]:
# loading env variables and connecting to database
load_dotenv('./src/env', override=True)

DBHOST = os.getenv('DBHOST')
DBPORT = os.getenv('DBPORT')
DBNAME = os.getenv('DBNAME')
DBUSER = os.getenv('DBUSER')
DBPASSWORD = os.getenv('DBPASSWORD')

connection_url = f"mysql+pymysql://{DBUSER}:{DBPASSWORD}@{DBHOST}:{DBPORT}/{DBNAME}"

%sql {connection_url}

In [None]:
# sql command to test database connection
%%sql
use classicmodels;
show tables;

# examine ratings table
SELECT *
FROM ratings
LIMIT 10;

Now I'll create a new base dataset for the ML model by extracting the required data from different tables. Using this initial dataset, I'll add new features or enhance the ones I've selected. I'll also use the schema from the transformed table to define and create a new table.

In [None]:
"""

I'll open the file ./src/de-c4w2lab1-etl-glue-job.py. This file contains the code for the AWS Glue Job.
 In the job, I created a node to pull the required data from each table in classicmodels using the Glue Catalog.
  Then, I used a SQL query to join the tables, and the result of that join was stored in S3.

"""

# This is the ./src/de-c4w2lab1-etl-glue-job.py file

import sys

from awsglue import DynamicFrame
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext


def sparkSqlQuery(
    glueContext, query, mapping, transformation_ctx
) -> DynamicFrame:
    for alias, frame in mapping.items():
        frame.toDF().createOrReplaceTempView(alias)
    result = spark.sql(query)
    return DynamicFrame.fromDF(result, glueContext, transformation_ctx)


args = getResolvedOptions(
    sys.argv, ["JOB_NAME", "glue_connection", "glue_database", "target_path"]
)
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# script generated for node Products
products_node = glueContext.create_dynamic_frame.from_options(
    connection_type="mysql",
    connection_options={
        "useConnectionProperties": "true",
        "dbtable": "classicmodels.products",
        "connectionName": args["glue_connection"],
    },
    transformation_ctx="products_node",
)

# Script generated for node Customers
customers_node = glueContext.create_dynamic_frame.from_options(
    connection_type="mysql",
    connection_options={
        "useConnectionProperties": "true",
        "dbtable": "classicmodels.customers",
        "connectionName": args["glue_connection"],
    },
    transformation_ctx="customers_node",
)

# script generated for node Ratings
ratings_node = glueContext.create_dynamic_frame.from_options(
    connection_type="mysql",
    connection_options={
        "useConnectionProperties": "true",
        "dbtable": "classicmodels.ratings",
        "connectionName": args["glue_connection"],
    },
    transformation_ctx="ratings_node",
)

# script generated for node Join
sql_join_query = """
select r.customerNumber
, c.city
, c.state
, c.postalCode
, c.country
, c.creditLimit
, r.productCode
, p.productLine
, p.productScale
, p.quantityInStock
, p.buyPrice
, p.MSRP
, r.productRating
from ratings r
join products p on p.productCode = r.productCode
join customers c on c.customerNumber = r.customerNumber;
"""

join_node = sparkSqlQuery(
    glueContext,
    query=sql_join_query,
    mapping={
        "ratings": ratings_node,
        "products": products_node,
        "customers": customers_node,
    },
    transformation_ctx="join_node",
)

# script generated for node de-c1w4-s3
s3_upload_node = glueContext.getSink(
    path=f"{args['target_path']}/ratings_ml_training/",
    connection_type="s3",
    updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=["customerNumber"],
    enableUpdateCatalog=True,
    transformation_ctx="s3_upload_node",
)
s3_upload_node.setCatalogInfo(
    catalogDatabase=args["glue_database"],
    catalogTableName="ratings_ml_training",
)
s3_upload_node.setFormat("glueparquet", compression="snappy")
s3_upload_node.writeFrame(join_node)
job.commit()


In [None]:
# creating a pandas dataframe and displaying it
sql_join_query = """
select r.customerNumber
, c.city
, c.state
, c.postalCode
, c.country
, c.creditLimit
, r.productCode
, p.productLine
, p.productScale
, p.quantityInStock
, p.buyPrice
, p.MSRP
, r.productRating
, c.customerSince
from ratings r
join products p on p.productCode = r.productCode
join customers c on c.customerNumber = r.customerNumber;
"""

result = %sql {sql_join_query}

model_df = result.DataFrame()

model_df.head()

In [None]:
# creating a basic train/test split of the dataset
model_train_df, model_test_df = train_test_split(
                model_df,
                test_size=0.2,
                random_state=42
            )

In [None]:
model_train_df.head()

In [None]:
# to create the required features based on the initial sub-datasets, I'll have to distinguish between numerical and categorical variables
# first i am converting column names to lowercase to avoid problems down the line
model_train_df.columns = [col.lower() for col in model_train_df.columns]

In [None]:
# creating the users dataframe
user_columns = ["customernumber", "city", "country", "creditlimit", "customersince"]
users_df = model_train_df[user_columns].copy()

# creating the items dataframe
item_columns = ["productcode", "productline", "productscale", "quantityinstock", "buyprice", "msrp"]
items_df = model_train_df[item_columns].copy()

In [None]:
users_df.head()

In [None]:
items_df.head()

Now i am feature engineering for numerical variables. To standardize my numerical data, I create a StandardScaler instance, fit the scaler to the data, and transform the data.

In [None]:
# instantiating the StandardScaler object and assign it to a variable

# StandardScaler instance for the user numerical data
user_num_standard_scaler = StandardScaler()

# StandardScaler instance for the item numerical data
item_num_standard_scaler = StandardScaler()


In [None]:
# using the fit() method over the data

from sklearn.preprocessing import StandardScaler

# choosing the numerical columns to apply the standard scaler in the two datasets
user_num_columns_std = ["creditlimit"]
item_num_columns_std = ["quantityinstock", "buyprice", "msrp"]

# computing mean and standard deviation of each feature with the fit method
user_num_standard_scaler = StandardScaler().fit(users_df[user_num_columns_std])
item_num_standard_scaler = StandardScaler().fit(items_df[item_num_columns_std])



In [None]:
"""
I will apply the transform() method to both user_num_standard_scaler and item_num_standard_scaler
 I need to pass the same DataFrames, users_df and items_df, along with their respective selected columns lists:
  user_num_columns_std for users_df and item_num_columns_std for items_df.
   The results of transform() will be numpy arrays, so I will convert them back into pandas DataFrames.

"""

# applying transform
user_num_vars_std = user_num_standard_scaler.transform(users_df[user_num_columns_std])
item_num_vars_std = item_num_standard_scaler.transform(items_df[item_num_columns_std])


# results are numpy arrays, now i transform them into pandas dataframes
user_num_vars_std_df = pd.DataFrame(user_num_vars_std, columns=user_num_columns_std, index=users_df.index)
item_num_vars_std_df = pd.DataFrame(item_num_vars_std, columns=item_num_columns_std, index=items_df.index)



In [None]:
item_num_vars_std_df.head()

In [None]:
user_num_vars_std_df.head()


Before encoding categorical variables, I'll extract and clean the features. I'll create two lists:

*   user_cat_cols for the categorical variables in users_df, which include city and country.

*   item_cat_cols for items_df, which include productline and productscale.

In [None]:
# column names for the categorical variables
user_cat_cols = ["city", "country"]
item_cat_cols = ["productline", "productscale"]

In [None]:
# Creating the datasets with only categorical features
users_cat_df = users_df[user_cat_cols].copy()
items_cat_df = items_df[item_cat_cols].copy()

# Converting string categories into lowercase
users_cat_df = users_cat_df.apply(lambda col: col.map(lambda x: x.strip().lower() if isinstance(x, str) else x))
items_cat_df = items_cat_df.apply(lambda col: col.map(lambda x: x.strip().lower() if isinstance(x, str) else x))


The features I've selected are categorical, so I’ll use scikit-learn’s OneHotEncoder to create a one-hot encoded array. By default, the encoder derives categories from unique values, but I can also pass them manually. First, I’ll instantiate the OneHotEncoder object and set handle_unknown="ignore", which ensures that unknown categories are encoded as all zeros.

In [None]:
# create the instance of the one-hot encoder object for each dataset.
user_cat_ohe = OneHotEncoder(handle_unknown="ignore")
item_cat_ohe = OneHotEncoder(handle_unknown="ignore")



In [None]:
# passing the DataFrames users_cat_df and items_cat_df to perform the transformation
# then converting it into the dense matrix with the todense() method
# using the encoder object to find the categories of each feature
user_cat_ohe.fit(users_cat_df)
item_cat_ohe.fit(items_cat_df)

# transform with the encoder objects
encoded_user_cat_features = user_cat_ohe.transform(users_cat_df).todense()
encoded_item_cat_features = item_cat_ohe.transform(items_cat_df).todense()

In [None]:
encoded_user_cat_df = pd.DataFrame(
    encoded_user_cat_features,
    columns=user_cat_ohe.get_feature_names_out(user_cat_cols),
    index=users_df.index
)

encoded_item_cat_df = pd.DataFrame(
    encoded_item_cat_features,
    columns=item_cat_ohe.get_feature_names_out(item_cat_cols),
    index=items_df.index
)

In [None]:
encoded_user_cat_df.head()

In [None]:
encoded_item_cat_df.head()

Now I create bins for the customers given their antiquity with the company in the following way:

-A bin for the most recent customers, with 1 year or less of antiquity

-A bin for customers with an antiquity greater than 1 but less than 3 years

-A bin for customers with antiquity of 3-5 years

-A bin for customers with more than 5 years of antiquity

In order to generate the binning process, first I have to create the loyalty_program_years column based on the date provided in the customersince column.

In [None]:
import pandas as pd
from sklearn.preprocessing import KBinsDiscretizer

# define bin edges
bin_edges = [0, 1, 3, 5, float('inf')]

# converting the `customersince` column of the dataframe `users_df` to datetime data type with `pd.to_datetime()` method
users_df['customersince'] = pd.to_datetime(users_df['customersince'])

# compute current timestamp with `pd.Timestamp.now()`
current_date = pd.Timestamp.now()

# create the column with the difference between the `current_date` value and the `customersince` column of the `users_df` dataframe
# convert the value in days by applying `dt.days` method and make an integer division by 365
users_df['loyalty_program_years'] = (current_date - users_df['customersince']).dt.days // 365

"""
Initialize the `KBinsDiscretizer` class with the following parameters:
Set `n_bins` as `len(bin_edges) - 1` because the number of bins is always one less than the number of edges.
Set `encode` equal to `'onehot-dense'`. This specifies how to encode the transformed result. Here the strategy is to apply to the bins a one-hot encoding.
Set `strategy` parameter equal to `'uniform'` which is a binning strategy where bins are of equal width.
Leave `subsample` equal to `None` as it is; this option means that all the training samples are used when computing the quantiles that determine the binning thresholds.
"""
kbins = KBinsDiscretizer(n_bins=len(bin_edges) - 1, encode='onehot-dense', strategy='uniform', subsample=None)

# selecting `loyalty_program_years` from the dataframe `users_df`. Use double brackets to output it as a dataframe, not a series
loyalty_program_years = users_df[['loyalty_program_years']]

# applying `fit()` method to the dataframe `loyalty_program_years`
kbins.fit(loyalty_program_years)

# applying `transform()` method to transform to the dataframe `loyalty_program_years`
loyalty_program_years_binned = kbins.transform(loyalty_program_years)

# converting the binned data to a DataFrame with appropriate column names
bin_labels = ['0-1 years', '1-3 years', '3-5 years', '5+ years']
loyalty_program_years_binned_df = pd.DataFrame(loyalty_program_years_binned, columns=bin_labels, index=users_df.index)

loyalty_program_years_binned_df.head()


In the recommendation system, the label is the rating (1 to 5) a user gives to products. I’ll treat these ratings as categories and use MinMaxScaler to scale them from [1, 5] to [-1, 1] for better compatibility with ML models.

In [None]:
# creating an instance of MinMaxScaler for ratings
rating_scaler = MinMaxScaler(feature_range=(-1, 1))

In [None]:
# apply `fit()` method
rating_scaler.fit(model_train_df[["productrating"]])

# perform transformation
ratings = rating_scaler.transform(model_train_df[["productrating"]])


scaled_ratings_df = pd.DataFrame(ratings, columns=["scaled_productrating"], index=model_train_df.index)
scaled_ratings_df.head()


In [None]:
# create the dataset of transformed features from the training data, using the panda's concat() method over the axis=1
transformed_train_df = pd.concat([
                                    users_df[["customernumber"]],
                                    user_num_vars_std_df,
                                    encoded_user_cat_df,
                                    items_df[["productcode"]],
                                    item_num_vars_std_df,
                                    encoded_item_cat_df,
                                    loyalty_program_years_binned_df,
                                    scaled_ratings_df
                                ],
                                axis=1
                                )

transformed_train_df.head()

With this joined dataset, I can safely store it in an S3 bucket. In addition to the dataset, I also need to store the artifacts from my scaler and encoder objects. These are crucial because they've been applied to the training data, and I’ll need the computed values to transform the test data or use the model for inference later.

In [None]:
# uploading the artifacts and then i will insert the data
s3 = boto3.client('s3')

artifacts_folder = 'preprocessing/artifacts'

# standard scaler for users
user_num_std_scaler_pkl = pickle.dumps(user_num_standard_scaler)
s3.put_object(Bucket=BUCKET_NAME, Key=f'{artifacts_folder}/user_std_scaler.pkl', Body=user_num_std_scaler_pkl)

# standard scaler for items
item_num_std_scaler_pkl = pickle.dumps(item_num_standard_scaler)
s3.put_object(Bucket=BUCKET_NAME, Key=f'{artifacts_folder}/item_std_scaler.pkl', Body=item_num_std_scaler_pkl)

# binnerizer for user's years with company
kbins_pkl = pickle.dumps(kbins)
s3.put_object(Bucket=BUCKET_NAME, Key=f'{artifacts_folder}/user_kbins.pkl', Body=kbins_pkl)

# standard scaler for users
user_cat_ohe_pkl = pickle.dumps(user_cat_ohe)
s3.put_object(Bucket=BUCKET_NAME, Key=f'{artifacts_folder}/user_cat_ohe.pkl', Body=user_cat_ohe_pkl)

# standard scaler for items
item_cat_ohe_pkl = pickle.dumps(item_cat_ohe)
s3.put_object(Bucket=BUCKET_NAME, Key=f'{artifacts_folder}/item_cat_ohe.pkl', Body=item_cat_ohe_pkl)

# scaler for ratings
rating_scaler_pkl = pickle.dumps(rating_scaler)
s3.put_object(Bucket=BUCKET_NAME, Key=f'{artifacts_folder}/ratings_min_max_scaler.pkl', Body=rating_scaler_pkl)


In [None]:
# save data
data_s3_url = f's3://{BUCKET_NAME}/preprocessing/data/ratings_for_ml/train'

transformed_train_df.to_parquet(data_s3_url,
                                compression='snappy',
                                engine='pyarrow',
                                partition_cols=['productcode'],
                                existing_data_behavior='delete_matching'
                            )


In [None]:
# view in AWS S3 bucket
!aws s3 ls s3://$BUCKET_NAME/preprocessing/data/ratings_for_ml/train/

In [None]:
# read back the data
train_data = pd.read_parquet(data_s3_url)
train_data.head()

Until now, I’ve only worked on transforming my training data. When I need to test my model or perform inference, I’ll transform the incoming data using the same steps I followed for the training data. The only difference is that I don’t need to apply the fit() method to the scalers or encoders. I only use the transform() method on the test or inference data since the fit() method is for computing statistics from the training data, and I don’t need to recalculate them for testing.

In [None]:
# now i replicate the steps that i followed for training data to transform the test DataFrame model_test_df
# convert the column names from the `model_test_df` dataframe into lowercase
model_test_df.columns = [col.lower() for col in model_test_df.columns]

# creating the users dataframe from `model_test_df` with the `copy()` method
user_columns = ["customernumber", "city", "country", "creditlimit", "customersince"]
users_test_df = model_test_df[user_columns].copy()

# creating the items dataframe from `model_test_df` with the `copy()` method
item_columns = ["productcode", "productline", "productscale", "quantityinstock", "buyprice", "msrp"]
items_test_df = model_test_df[item_columns].copy()

In [None]:
# use the transform() method of those objects and apply them to the test data (users_test_df and items_test_df

# transforming the test data
user_test_num_vars_std = user_num_standard_scaler.transform(users_test_df[user_num_columns_std])
item_test_num_vars_std = item_num_standard_scaler.transform(items_test_df[item_num_columns_std])

# results are numpy arrays which i will transform into pandas dataframes
user_test_num_vars_std_df = pd.DataFrame(user_test_num_vars_std, columns=user_num_columns_std, index=users_test_df.index)
item_test_num_vars_std_df = pd.DataFrame(item_test_num_vars_std, columns=item_num_columns_std, index=items_test_df.index)


In [None]:
user_test_num_vars_std_df.head()

In [None]:
item_test_num_vars_std_df.head()

In [None]:
"""
selecting the categorical features from the test DataFrames using the column names provided.
Then, I'll convert all the string values into lowercase.
After that, I’ll use the transform() method to compute the one-hot encodings,
and I’ll chain it with the todense() method to get the complete matrix.
"""
# column names for the categorical variables
user_cat_cols = ["city", "country"]
item_cat_cols = ["productline", "productscale"]

# creating the datasets with only categorical features
users_test_cat_df = users_test_df[user_cat_cols].copy()
items_test_cat_df = items_test_df[item_cat_cols].copy()

# converting string categories into lowercase
users_test_cat_df = users_test_cat_df.map(lambda x: x.strip().lower())
items_test_cat_df = items_test_cat_df.map(lambda x: x.strip().lower())

# transform with the encoder objects
encoded_user_test_cat_features = user_cat_ohe.transform(users_test_cat_df).todense()
encoded_item_test_cat_features = item_cat_ohe.transform(items_test_cat_df).todense()

encoded_user_test_cat_df = pd.DataFrame(
    encoded_user_test_cat_features,
    columns=user_cat_ohe.get_feature_names_out(user_cat_cols),
    index=users_test_df.index
)

encoded_item_test_cat_df = pd.DataFrame(
    encoded_item_test_cat_features,
    columns=item_cat_ohe.get_feature_names_out(item_cat_cols),
    index=items_test_df.index
)


In [None]:
encoded_user_test_cat_df.head()

In [None]:
encoded_item_test_cat_df.head()

In [None]:
# applying the `transform()` method of the `KBinsDiscretizer` objects to the test data
# i need to create the `loyalty_program_years` column to create the bins from it

from sklearn.preprocessing import KBinsDiscretizer

# define the bin edges
bin_edges = [0, 1, 3, 5, float('inf')]

# convert the `customersince` column of the dataframe `users_test_df` to datetime data type with `pd.to_datetime()` method
users_test_df['customersince'] = pd.to_datetime(users_test_df['customersince'])

# create the column with the difference between the `current_date` value and the `customersince` column of the `users_test_df` dataframe
# convert the value in days by applying `dt.days` method and make an integer division by 365
users_test_df['loyalty_program_years'] = (current_date - users_test_df['customersince']).dt.days // 365

# select `loyalty_program_years` from the dataframe `users_test_df`. Use double brackets to output it as a dataframe, not a series
loyalty_program_years_test = users_test_df[['loyalty_program_years']]

# instantiate KBinsDiscretizer and apply `transform()` method to the `loyalty_program_years_test` dataframe
kbins_discretizer = KBinsDiscretizer(n_bins=4, encode='onehot', strategy='uniform')
loyalty_program_years_test_binned = kbins_discretizer.fit_transform(loyalty_program_years_test)

# convert the binned data to a dataframe with appropriate column names
bin_labels = ['0-1 years', '1-3 years', '3-5 years', '5+ years']
loyalty_program_years_test_binned_df = pd.DataFrame(loyalty_program_years_test_binned.toarray(), columns=bin_labels, index=users_test_df.index)

loyalty_program_years_test_binned_df.head()


In [None]:
# performing transformation over the ratings
ratings_test = rating_scaler.transform(model_test_df[["productrating"]])

scaled_ratings_test_df = pd.DataFrame(ratings_test, columns=["scaled_productrating"], index=model_test_df.index)
scaled_ratings_test_df.head()

In [None]:
# now i collect all my transformed DataFrames into one that will be saved into the S3 bucket
transformed_test_df = pd.concat([
                                    users_test_df[["customernumber"]],
                                    user_test_num_vars_std_df,
                                    encoded_user_test_cat_df,
                                    items_test_df[["productcode"]],
                                    item_test_num_vars_std_df,
                                    encoded_item_test_cat_df,
                                    scaled_ratings_test_df
                                ],
                                axis=1
                                )

transformed_test_df.head()

In [None]:
# finally i save the transformed_test_df DataFrame into the S3 bucket
data_s3_url = f's3://{BUCKET_NAME}/preprocessing/data/ratings_for_ml/test'

transformed_test_df.to_parquet(data_s3_url,
                                compression='snappy',
                                engine='pyarrow',
                                partition_cols=['productcode'],
                                existing_data_behavior='delete_matching'
                            )