Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Waffle ml examples #117

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
from typing import List, Tuple

"""This file shows a basic approach to hyperparameter tuning with the "run n iterations,
select the best" strategy. This does the following:
1. Set up an initial iteration
2. execute n_iterations (passed from config) number of iterations of the optimization routine
3. Select the best iteration

This makes use of the @parameterized_subdag/resolve pattern to run one subdag for each iteration,
consisting of the optimization routine.

Note that you have a lot of options as to how to set up optimization, and this presumes a stateless
one. See instructions in the iteration_0/iteration_n subdag for more details.
"""


import optimizer_iteration
import pandas as pd

from hamilton.function_modifiers import (
ResolveAt,
inject,
parameterized_subdag,
resolve,
source,
subdag,
value,
)
from hamilton.function_modifiers.dependencies import group


def training_dataset() -> pd.DataFrame:
"""Mock function to return training data.

:return: Whatever you want.
"""
pass


def evaluation_dataset() -> pd.DataFrame:
"""Mock function to return evaluation data.

:return: Whatever you want.
"""
pass


Iteration = Tuple[optimizer_iteration.Model, optimizer_iteration.EvaluationMetrics]


@subdag(
optimizer_iteration,
inputs={
"prior_hyperparameters": source("initial_hyperparameters"),
"prior_evaluation_metrics": value(None),
},
)
def iteration_0(
evaluation_metrics: optimizer_iteration.EvaluationMetrics,
trained_model: optimizer_iteration.Model,
) -> Iteration:
"""First iteration of the hyperparameter routine. This could easily be combined with the
subdag below, but it is separated to avoid ugly parameterized if statements (E.G. the source
of `hyperparamters` versus `prior_hyperparameters`). Another option would be to have
`iteration_0_hyperparameters` and `iteration_0_evaluation_metrics` as inputs to the overall
DAG -- up to you!

:param evaluation_metrics: Metrics to use to evaluate the model
:param trained_model: Model that was trained
:return: Tuple of the hyperparameters and evaluation metrics.
This is purely for easy access later, so that if one queries for `iteration_i`
they get both hyperparameters and evaluation metrics.
"""
return (trained_model, evaluation_metrics)


@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda n_iterations: parameterized_subdag(
optimizer_iteration,
**{
f"iteration_{i}": {
"inputs": {
"prior_hyperparameters": source(f"iteration_{i - 1}.hyperparameters"),
"prior_evaluation_metrics": source(f"iteration_{i - 1}.evaluation_metrics"),
}
}
for i in range(1, n_iterations)
},
),
)
def iteration_n(
evaluation_metrics: optimizer_iteration.EvaluationMetrics,
trained_model: optimizer_iteration.Model,
) -> Iteration:
"""Parameterized subdag to run the hyperparameter optimization routine.
See description above for more clarity.

:param evaluation_metrics: Metrics to evaluate the
:param trained_model: Model that was trained.
:return: Tuple of the hyperparameters and evaluation metrics.
"""
return (trained_model, evaluation_metrics)


@resolve(
when=ResolveAt.CONFIG_AVAILABLE,
decorate_with=lambda n_iterations: inject(
iterations=group(*[source(f"iteration_{i}") for i in range(n_iterations)])
),
)
def best_iteration(iterations: List[Iteration]) -> Iteration:
"""Returns the best iteration of all of them.

TODO -- implement me!

:param iterations:
:return:
"""
pass
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import hyperparameter_tuning
import optimizer_iteration

from hamilton.function_modifiers import source, subdag

"""This file contains a pipeline of pipelines.
What this does is:
1. Call out to a subdag for each of the model optimization routines
2. Pass specific parameters for them
3. Combines them to form a "comparator" to do something (vote/select, or combine) the models.

Note that this is slightly opinionated about the workflow. Specifically,
This approach requires that you hardcode the types of models in code.
This makes sense if they're each radically different (E.G. decision trees versus
logic regression versus calling out to GPT-4 and seeing what happens),
but if they're variants on the same (effectively different groups of hyperparameters
that a gaussian process might not capture well), then you might want to make this more configuration
driven. To do this, you can use @parameterize_subdag/resolve.
"""


@subdag(
hyperparameter_tuning,
inputs={"initial_hyperparameters": source("initial_hyperparameters_model_a")},
config={"n_iterations": 25, "model_type": "model_a"},
)
def model_a(best_iteration: hyperparameter_tuning.Iteration) -> hyperparameter_tuning.Iteration:
"""Training routine (subdag) for model A.
This is just a pass-through, although it could do whatever you want.

:param best_iteration:
:return:
"""
return best_iteration


@subdag(
hyperparameter_tuning,
inputs={"initial_hyperparameters": source("initial_hyperparameters_model_b")},
config={"n_iterations": 14, "model_type": "model_b"},
)
def model_b(best_iteration: hyperparameter_tuning.Iteration) -> hyperparameter_tuning.Iteration:
"""Training routine (subdag) for model B.
This is just a pass-through, although it could do whatever you want.

:param best_iteration:
:return:
"""
return best_iteration


@subdag(
hyperparameter_tuning,
inputs={"initial_hyperparameters": source("initial_hyperparameters_model_b")},
config={"n_iterations": 19, "model_type": "model_c"},
)
def model_c(best_iteration: hyperparameter_tuning.Iteration) -> hyperparameter_tuning.Iteration:
"""Training routine (subdag) for model B.
This is just a pass-through, although it could do whatever you want.

:param best_iteration:
:return:
"""
return best_iteration


def best_model(
model_a: hyperparameter_tuning.Iteration,
model_b: hyperparameter_tuning.Iteration,
model_c: hyperparameter_tuning.Iteration,
) -> optimizer_iteration.Model:
"""Some comparator/selector to chose which model. Note this could do anything you want
(even combine a model), but for now it'll do nothing (you can fill this in!)

Should you want to retrain after this, you could pull in the training/holdout
set and execute it in another function.

:param model_a: Model A trained + evaluation metrics
:param model_b: Model B trained + evaluation metrics
:param model_c: Model C trained + evaluation metrics
:return: The model we want to use.
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import dataclasses
from random import random
from typing import Any

import pandas as pd

from hamilton.function_modifiers import config, does


def _sleep_random(**kwargs: Any):
import time

time.sleep(random() * 0.2)


class Model:
"""Sample class to represent a model."""

pass


@dataclasses.dataclass
class EvaluationMetrics:
"""Sample dataclass to represent a set of evaluation metrics."""

score: float


@dataclasses.dataclass
class Hyperparameters:
"""Sample dataclass to represent a set of hyperparameters."""

some_value: float
some_other_value: float


@does(_sleep_random)
def hyperparameters(
prior_hyperparameters: Hyperparameters, prior_evaluation_metrics: EvaluationMetrics = None
) -> Hyperparameters:
"""Yields hyperparameters given a prior set of
hyperparameters and an evaluation metrics. Note that this assumeds a *stateless* optimizer,
but this is not the case (unless you're using `random`). Choices are:
1. Have this take in *all* prior hyperparameters and evaluation metrics, and then
execute an optimization routine from scratch every time. That's potentially inefficient,
but it works with a purely functional perspective. You'd basically have to call the optimizer
in a for-loop.
2. Have this pass its state through to the next iterations (E.G. return a tuple of hyperparameters,
state, and have the subdag extract the state and pass it along). Initial state is None. This is
another clean, functional way to do it, but might require changing the way the upstream optimizer
works/adding more code around it.
3. Have an upstream function that returns a stateful object that hamilton does *not* know about.
This is the easiest to implement, but potentially hacky. Specifically, it could break if you need parallel
execution and aren't careful -- the stateful object would effectively have to call out to a service.

Honestly, I recommend (3) for this to test with -- its easy to get started locally, then if you
truly need a distributed optimization system like sigopt, you could just have it be a resource
that calls out to external services. This could also help: https://github.com/DAGWorks-Inc/hamilton/issues/90.

Curently this just sleeps for a small random period of time to make the
APM charts look pretty :)

:param prior_hyperparameters: Hyperparameters from the previous step.
:param prior_evaluation_metrics: Evaluation metrics from the previous step.
:return: New hyperparameters to try.
"""
pass


@config.when(model_type=None)
@does(_sleep_random)
def trained_model__default(
hyperparameters: Hyperparameters, training_dataset: pd.DataFrame
) -> Model:
"""Trains the model, given the set of hyperparameters and the training data.
This is the default implementation, when no model type is passed in.

Curently this just sleeps for a small random period of time to make the
APM charts look pretty :)

:param hyperparameters: Hyperparameters to train.
:return: The trained model object.
"""
pass


@config.when(model_type="model_a")
@does(_sleep_random)
def trained_model__model_a(
hyperparameters: Hyperparameters, training_dataset: pd.DataFrame
) -> Model:
pass
# return trained_model__default()


@config.when(model_type="model_b")
@does(_sleep_random)
def trained_model__model_b(
hyperparameters: Hyperparameters, training_dataset: pd.DataFrame
) -> Model:
"""Model training for model b"""
pass


@config.when(model_type="model_c")
@does(_sleep_random)
def trained_model__model_c(
hyperparameters: Hyperparameters, training_dataset: pd.DataFrame
) -> Model:
"""Model training for model C"""
pass


@does(_sleep_random)
def evaluation_metrics(trained_model: Model, evaluation_dataset: pd.DataFrame) -> EvaluationMetrics:
"""Evaluates the model, given the trained model and the evaluation dataset.

:param trained_model: Model to train
:param evaluation_dataset: Dataset to evaluate on.
:return: Evaluation metrics.
"""
pass