In [2]:
import json
import logging
import sys

import click
import pandas as pd

logger = logging.getLogger(__name__)
handler = logging.StreamHandler(sys.stdout)
logger.setLevel(logging.INFO)
logger.addHandler(handler)

#### 1. Download parameters for pipeline from configs_file

In [3]:
from typing import List, Optional
from dataclasses import dataclass, field
from marshmallow_dataclass import class_schema
import yaml

RANDOM_STATE = 42
TEST_SIZE = 0.3
N_ESTIMATORS = 100


@dataclass()
class DownloadingParams:
    """ Structure for train model parameters """
    file_link: str = field()
    output_folder: str = field(default="data/raw/")
    name: str = field(default="data.zip")
        
@dataclass
class SplittingParams:
    """ Structure contain parameters for splitting data """
    test_size: float = field(default=TEST_SIZE)
    random_state: int = field(default=RANDOM_STATE)
        

@dataclass
class FeatureParams:
    """ Structure contain categorical and numerical params in dataset"""
    cat_features: List[str]
    num_features: List[str]
    target: Optional[str]

@dataclass
class OutliersNulls:
    """ Structure contain parameters for preparing data """
    outliers: str = field(default= 'RestingBP')
    nulls: str = field(default= 'Cholesterol')
    target: str = field(default='HeartDisease')
        
MEAN_SAMPLES_LEAF = 5
CRITERION = "gini"

@dataclass()
class ModelType:
    """ Structure for train model parameters """
    model_type: str = field(default='RandomForestClassifier')
    n_estimators: int = field(default=N_ESTIMATORS)
    min_samples_leaf: int = field(default=MEAN_SAMPLES_LEAF)
    criterion: str = field(default=CRITERION)
    random_state: int = field(default=RANDOM_STATE)


@dataclass
class CustomTransformer:
    """ Structure contain switch for custom transformer """
    use_custom_transformer: bool

In [5]:
@dataclass
class TrainingPipelineParams:
    """Structure for pipeline parameters"""
    input_data_path: str
    metric_path: str
    save_model_path: str
    save_transformer_path: str
    downloading_params: DownloadingParams
    model_type: ModelType
    clean_features: OutliersNulls
    custom_transformer: CustomTransformer
    feature_params: FeatureParams
    splitting_params: SplittingParams


TrainingPipelineParamsSchema = class_schema(TrainingPipelineParams)


def read_training_pipeline_params(path: str) -> TrainingPipelineParams:
    with open(path, "r") as input_stream:
        schema = TrainingPipelineParamsSchema()
        return schema.load(yaml.safe_load(input_stream))

In [6]:
config_path = "../../configs/train_config_random_forest.yaml"
training_pipeline_params: TrainingPipelineParams = read_training_pipeline_params(config_path)

training_pipeline_params

ValidationError: {'clean_features': {'nulls': ['Not a valid list.'], 'outliers': ['Not a valid list.']}}

In [None]:
training_pipeline_params.downloading_params

In [None]:
import gdown

In [None]:
def download_data(url: str, output_file_path: str):
    logger.info("Loading dataset... ")
    try:
        gdown.download(url=url, output=output_file_path, fuzzy=True, quiet=True)
        logger.info("Dataset was downloaded")
    except ConnectionError:
        logger.info("ConnectionError: you should have link to the internet!")

url = training_pipeline_params.downloading_params.file_link
output = "../../" + training_pipeline_params.downloading_params.output_folder + training_pipeline_params.downloading_params.name
download_data(url, output)

In [None]:
training_pipeline_params.downloading_params.output_folder

In [None]:
import zipfile
def unzip_downloaded_data(path_to_zip_file: str):
    with zipfile.ZipFile(path_to_zip_file, 'r') as zip_ref:
        zip_ref.extractall(path_to_zip_file)

In [None]:
unzip_downloaded_data(output, "../../" + training_pipeline_params.downloading_params.output_folder)

In [None]:
def read_raw_data(path: str) -> pd.DataFrame:
    """Read data from csv file"""
    logger.info('Loading dataset from %s...', path)
    df = pd.read_csv(path)
    logger.info('Loading from %s finished', path)
    logger.info('Data shape %s', df.shape)
    return df
#                                 training_pipeline_params.input_data_path
df: pd.DataFrame = read_raw_data('../../' + training_pipeline_params.input_data_path) 

In [None]:
training_pipeline_params.clean_features

In [None]:
training_pipeline_params.clean_features.

In [None]:
from typing import Tuple, Optional
def prepare_data(df: pd.DataFrame, params) -> Tuple[pd.DataFrame, Optional[pd.Series]]:
    """Prepare data"""
    logger.info('Preparing dataset...')
    logger.info('Outliers handling...')
    outliers_field = params.outliers_field
    row = df[df[outliers_field] == 0].index
    df = df.drop(df.index[row])

    logger.info('Nulls handling...')
    nulls_field = params.nulls_field
    median_values = df[nulls_field].median()
    row = df[df[nulls_field] == 0].index
    df.loc[row, nulls_field] = median_values

    target_name = params.target_field
    if target_name in df.columns:
        logger.info('Extract and drop target feature...')
        target = df[target_name]
        df = df.drop([target_name], axis=1)
        return df, target
    else:
        return df, None

In [None]:
training_pipeline_params.clean_features

In [None]:
prepare_data(df, training_pipeline_params.clean_features)