In [None]:
import itertools
import logging

import numpy as np

from pandas import DataFrame
from sklearn.linear_model import ElasticNet
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import train_test_split
logger = logging.getLogger(__name__)


In [None]:
from dbnd import band, task, output, log_metric, project_path
from dbnd_examples.data import data_repo
from targets import DataTarget


In [None]:
def calculate_metrics(actual, pred):
    rmse = np.sqrt(mean_squared_error(actual, pred))
    mae = mean_absolute_error(actual, pred)
    r2 = r2_score(actual, pred)
    return rmse, mae, r2


Tasks are where computation is done. Tasks produce targets as outputs and consume targets as inputs. Targets can be a S3 path, a local file or a database record.

If we'll express PredictWineQuality as tasks and targets, it will look like this:
![alt text](wine_quality.png)

In [None]:

@task(result=output.csv)
def validate_model(model: ElasticNet, validation_dataset: DataFrame) -> str:
    logger.info("Running validate model demo: %s", validation_dataset)
    # support for py3 parqeut
    validation_dataset = validation_dataset.rename(str, axis="columns")
    validation_x = validation_dataset.drop(["quality"], 1)
    validation_y = validation_dataset[["quality"]]

    prediction = model.predict(validation_x)
    (rmse, mae, r2) = calculate_metrics(validation_y, prediction)

    log_metric("rmse", rmse)
    log_metric("mae", rmse)
    log_metric("r2", r2)

    return ["%s,%s,%s" % (rmse, mae, r2)]

Every task can be configurable, and it can be done by using parameters. For example, we can add alpha or l1_ratio parameter to train_model. It might look like this

In [None]:

@task
def train_model(
    test_set: DataFrame,
    training_set: DataFrame,
    alpha: float = 0.5,
    l1_ratio: float = 0.5,
) -> ElasticNet:
    lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio)
    lr.fit(training_set.drop(["quality"], 1), training_set[["quality"]])
    prediction = lr.predict(test_set.drop(["quality"], 1))

    (rmse, mae, r2) = calculate_metrics(test_set[["quality"]], prediction)

    log_metric("alpha", alpha)
    log_metric("rmse", rmse)
    log_metric("mae", rmse)
    log_metric("r2", r2)

    logging.info(
        "Elasticnet model (alpha=%f, l1_ratio=%f): rmse = %f, mae = %f, r2 = %f",
        alpha,
        l1_ratio,
        rmse,
        mae,
        r2,
    )
    return lr


The first thing we'll do is creating a task that split the data into separate train, test and validation sets.

In [None]:
@task( result=("training_set", "test_set", "validation_set")) 
def prepare_data(raw_data: DataFrame):
    train_df, test_df = train_test_split(raw_data)
    test_df, validation_df = train_test_split(test_df, test_size=0.5)
    return train_df, test_df, validation_df

@task
def calculate_alpha(alpha: float = 0.5):
    alpha += 0.1
    return alpha




Now, we can put all tasks together.  We need to define tha output of the @band (model and validation) and assign them
That's all :) 

In [None]:
@band(result=("model", "validation"))
def predict_wine_quality( 
    data: DataTarget = data_repo.wines,
    alpha: float = 0.5,
    l1_ratio: float = 0.5,
    good_alpha: bool = False,
):
    training_set, test_set, validation_set = prepare_data(raw_data=data) 
    if good_alpha:
        alpha = calculate_alpha(alpha)

    model = train_model(
        test_set=test_set,
        training_set=training_set,
        alpha=alpha,
        l1_ratio=l1_ratio,
    )

    validation = validate_model(
        model=model, validation_dataset=validation_set
    )
    return model, validation


In [None]:
#run pipeline
wine = predict_wine_quality.t(alpha=0.4, data=data_repo.wines)
wine.dbnd_run()

In [None]:
#run pipeline with defferent alpha
wine = predict_wine_quality.t(alpha=0.3, data=data_repo.wines)
wine.dbnd_run()



In [None]:
@band
def predict_wine_quality_parameter_search( 
    data: DataTarget = data_repo.wines,
    alpha_step: float = 0.3,
    l1_ratio_step: float = 0.4,
):
    result = {}
    variants = list(
        itertools.product(np.arange(0, 1, alpha_step), np.arange(0, 1, l1_ratio_step))
    )
    logger.info("All Variants: %s", variants)
    for alpha_value, l1_ratio in variants:
        exp_name = "Predict_%f_l1_ratio_%f" % (alpha_value, l1_ratio)
        model, validation = predict_wine_quality(
            data=data, alpha=alpha_value, l1_ratio=l1_ratio, task_name=exp_name
        )

        result[exp_name] = (model, validation)
    return result


In [None]:
#run pipeline with defferent alpha
wine_search = predict_wine_quality_parameter_search.t(data=data_repo.wines)
wine_search.dbnd_run()
