In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
# Python ≥3.5 is required
import sys
assert sys.version_info >= (3, 5)

# Common imports
import argparse
import pandas as pd
from pathlib import Path
import os

# Ignore useless warnings 
import warnings
warnings.filterwarnings(action="ignore", message="^internal gelsd")

In [3]:
# Set the repo root directory to use as base path
os.chdir((Path.cwd() / "").parents[0]) # to change with base path
base_path = os.getcwd()

In [4]:
# Import project specific libraries
from src.config import read_config
from src.custom_pipeline_modeler import custom_dataflow
from src.input_collector import InputArguments, DataAccessor
from src.sk_pipeline_modeler import *

# Configuration Settings

In [5]:
# identify the path to the configurations folder
config_path = str(base_path.replace("\\", "/") +"/"+"conf")
args = InputArguments(pathConfFile=config_path)

def prepare_config_path(args) -> dict:
    config_path = f"{args.pathConfFile}/{{}}"
    print("Training configurations filepath: {}".format(config_path.format("config.json")))
    return {"config": config_path.format("config.json")}

    # Prepare paths to config file and eventual other files

file_paths = prepare_config_path(args)

# Read config
print("Reading Housing model configuration ........")
config_path = file_paths["config"]
cfg = read_config(config_path=config_path)

#Select path to data to process
train_data_path = cfg.trainset_path
test_data_path = cfg.testset_path
train_target_path = cfg.train_target_path
test_target_path = cfg.test_target_path
prod_data_path = cfg.housing_data_path


Training configurations filepath: C:/Users/mdetomaso/Desktop/K_Code/custom-ml-pipelines/conf/config.json
Reading Housing model configuration ........


# Collect Housing data

In [6]:
# access housing data
housing = DataAccessor(**cfg.input_config).load_housing_data()
housing.head()

Housing data loaded successfully.


Unnamed: 0,longitude,latitude,housing_median_age,total_rooms,total_bedrooms,population,households,median_income,median_house_value,ocean_proximity
0,-122.23,37.88,41.0,880.0,129.0,322.0,126.0,8.3252,452600.0,NEAR BAY
1,-122.22,37.86,21.0,7099.0,1106.0,2401.0,1138.0,8.3014,358500.0,NEAR BAY
2,-122.24,37.85,52.0,1467.0,190.0,496.0,177.0,7.2574,352100.0,NEAR BAY
3,-122.25,37.85,52.0,1274.0,235.0,558.0,219.0,5.6431,341300.0,NEAR BAY
4,-122.25,37.85,52.0,1627.0,280.0,565.0,259.0,3.8462,342200.0,NEAR BAY


# Training 

In [7]:
def custom_training_pipeline(
    args, config=cfg, data_path=train_data_path, target_path=train_target_path, env=None
):
    custom_pipeline = custom_dataflow(
        args=args, config=config, data_path=data_path, target_path=target_path, env=env
    )
    estimator = RandomForestRegressor()
    stages = [
        "collect-data-stage",
        "split-train-test-stage",
        "select-numerical-attributes-stage",
        "impute-numerical-missing-values-stage",
        "create-new-attributes-stage",
        "scale-numerical-attributes-stage",
        "select-categorical-attributes-stage",
        "impute-categorical-missing-values-stage",
        "encode-categorical-features-stage",
        "merge-num-cat-features-stage",
        "train-model-stage", #step needed only for training
        "make-predictions-stage",
        "evaluate-results-stage" #step used only when target is avialable
             ]

    for stage in stages:
        stage = custom_pipeline.stages[stage]
        print(stage.name, "........")
        stage.run()
    return custom_pipeline

training_pipe = custom_training_pipeline(args, env=None)
# Look at results
stage = training_pipe.stages["evaluate-results-stage"]
stage.run()

Building Housing model training dataflow ........
collect-data-stage ........
Housing data loaded successfully.
split-train-test-stage ........
Read data/input/housing.csv
Stratified Train / Test split.
Train set dimesions: (16512, 11)
Test set dimesions: (4128, 11)
Train target dimesions: (16512,)
Test target dimesions: (4128,)
data/outputs/partitions/train_set.csv
Wrote data/outputs/partitions/train_set.csv
data/outputs/partitions/train_target.csv
Wrote data/outputs/partitions/train_target.csv
data/outputs/partitions/test_set.csv
Wrote data/outputs/partitions/test_set.csv
data/outputs/partitions/test_target.csv
Wrote data/outputs/partitions/test_target.csv
data/outputs/partitions/train_test_sets.csv
Wrote data/outputs/partitions/train_test_sets.csv
select-numerical-attributes-stage ........
Read data/outputs/partitions/train_set.csv
Wrote data/outputs/numerical_features.csv
impute-numerical-missing-values-stage ........
Read data/outputs/numerical_features.csv
Numerical missing value



Wrote data/outputs/model_artifacts/trained_housing_model.pkl
make-predictions-stage ........
Load model from data/outputs/model_artifacts/trained_housing_model.pkl
Read data/outputs/all_processed.csv
Wrote data/outputs/predictions.csv
evaluate-results-stage ........
Read data/outputs/predictions.csv
Read data/outputs/partitions/train_target.csv
Read data/outputs/predictions.csv
Read data/outputs/partitions/train_target.csv


0    17938.528303
Name: RMSE, dtype: float64

# Testing

In [8]:
def custom_testing_pipeline(
    args, config=cfg, data_path=test_data_path, target_path=test_target_path, env=None
):
    custom_pipeline = custom_dataflow(
        args=args, config=config, data_path=data_path, target_path=target_path, env=env
    )
    estimator = RandomForestRegressor()
    stages = [
        "select-numerical-attributes-stage",
        "impute-numerical-missing-values-stage",
        "create-new-attributes-stage",
        "scale-numerical-attributes-stage",
        "select-categorical-attributes-stage",
        "impute-categorical-missing-values-stage",
        "encode-categorical-features-stage",
        "merge-num-cat-features-stage",
        "make-predictions-stage",
        "evaluate-results-stage"
             ]

    for stage in stages:
        stage = custom_pipeline.stages[stage]
        print(stage.name, "........")
        stage.run()
    return custom_pipeline

testing_pipe = custom_testing_pipeline(args, env=None)
# Look at results
stage = testing_pipe.stages["evaluate-results-stage"]
stage.run()

Building Housing model training dataflow ........
select-numerical-attributes-stage ........
Read data/outputs/partitions/test_set.csv
Wrote data/outputs/numerical_features.csv
impute-numerical-missing-values-stage ........
Read data/outputs/numerical_features.csv
Numerical missing values imputed.
Wrote data/outputs/imputed_numerical_features.csv
create-new-attributes-stage ........
Read data/outputs/imputed_numerical_features.csv
New numerical attributes created.
Wrote data/outputs/enriched_numerical_features.csv
scale-numerical-attributes-stage ........
Read data/outputs/enriched_numerical_features.csv
Wrote data/outputs/model_artifacts/numerical_scaler.pkl
Wrote data/outputs/scaled_numerical_features.csv
select-categorical-attributes-stage ........
Read data/outputs/partitions/test_set.csv
Wrote data/outputs/categorical_features.csv
impute-categorical-missing-values-stage ........
Read data/outputs/categorical_features.csv
Categorical missing values imputed.
Wrote data/outputs/imput

0    53094.055981
Name: RMSE, dtype: float64

# Production

In [9]:
def custom_prod_pipeline(
    args, config=cfg, data_path=prod_data_path, target_path=None, env=None
):
    custom_pipeline = custom_dataflow(
        args=args, config=config, data_path=data_path, target_path=target_path, env=env
    )
    estimator = RandomForestRegressor()
    stages = [
        "select-numerical-attributes-stage",
        "impute-numerical-missing-values-stage",
        "create-new-attributes-stage",
        "scale-numerical-attributes-stage",
        "select-categorical-attributes-stage",
        "impute-categorical-missing-values-stage",
        "encode-categorical-features-stage",
        "merge-num-cat-features-stage",
        "make-predictions-stage"
             ]

    for stage in stages:
        stage = custom_pipeline.stages[stage]
        print(stage.name, "........")
        stage.run()
    return custom_pipeline

prod_pipe = custom_prod_pipeline(args, env=None)
predictions = pd.read_csv("data/outputs/predictions.csv")
predictions

Building Housing model training dataflow ........
select-numerical-attributes-stage ........
Read data/input/housing.csv
Wrote data/outputs/numerical_features.csv
impute-numerical-missing-values-stage ........
Read data/outputs/numerical_features.csv
Numerical missing values imputed.
Wrote data/outputs/imputed_numerical_features.csv
create-new-attributes-stage ........
Read data/outputs/imputed_numerical_features.csv
New numerical attributes created.
Wrote data/outputs/enriched_numerical_features.csv
scale-numerical-attributes-stage ........
Read data/outputs/enriched_numerical_features.csv
Wrote data/outputs/model_artifacts/numerical_scaler.pkl
Wrote data/outputs/scaled_numerical_features.csv
select-categorical-attributes-stage ........
Read data/input/housing.csv
Wrote data/outputs/categorical_features.csv
impute-categorical-missing-values-stage ........
Read data/outputs/categorical_features.csv
Categorical missing values imputed.
Wrote data/outputs/imputed_categorical_features.csv


Unnamed: 0,predictions
0,423636.11
1,391698.07
2,399827.09
3,330156.03
4,281872.00
...,...
20635,79162.00
20636,80454.00
20637,87975.00
20638,83307.99


Alternatively, a shorter pipeline with less steps could have included...

- stage_1. CollectDataStage

- stage_2. PartitionDataStage

- stage_3. ProcessNumericalFeaturesStage (all numerical features related preprocessing steps)

- stage_4. ProcessCategoricalFeaturesStage (all numerical features related preprocessing steps)

- stage_5. TrainModelStage

- stage_6. PredictStage

- stage_7. EvaluateStage