Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do we want to support iterators for data loading? #19

Open
HamiltonRepoMigrationBot opened this issue Feb 26, 2023 · 4 comments
Open

Do we want to support iterators for data loading? #19

HamiltonRepoMigrationBot opened this issue Feb 26, 2023 · 4 comments
Labels
enhancement New feature or request migrated-from-old-repo Migrated from old repository product idea question Further information is requested

Comments

@HamiltonRepoMigrationBot
Copy link
Collaborator

Issue by skrawcz
Monday Feb 07, 2022 at 17:25 GMT
Originally opened as stitchfix/hamilton#68


What?

If we want to chunk over data, a natural way to do that is via an iterator.

Example: Enable "input" functions to be iterators

def my_loading_funct(...) -> Iterator[pd.Series]:
     ...
     yield some_chunk 

This is fraught with some edge cases. But could be a more natural way to chunk over large data sets? This perhaps requires a new driver -- as we'd want some next() type semantic logic on the output of execute...

Originally posted by @skrawcz in stitchfix/hamilton#43 (comment)

Things to think through whether this is something useful to have:

  1. Where would we allow this? Only on "input" nodes?
  2. How would we exercise them in a deterministic fashion? i.e. does execute() care? and we iterate over them until they're exhausted? Or does execute() only do one iteration?
  3. How do we coordinate multiple inputs that are iterators? What if they're of different lengths?
  4. How would we ensure people don't create a mess that's hard to debug?
  5. Would this work for the distributed graph adapters?
@HamiltonRepoMigrationBot HamiltonRepoMigrationBot added enhancement New feature or request product idea question Further information is requested labels Feb 26, 2023
@HamiltonRepoMigrationBot
Copy link
Collaborator Author

Comment by elijahbenizzy
Saturday Oct 29, 2022 at 17:31 GMT


Yeah I'd need to see more of a workflow/motivating example for this TBH. E.G. a graphadapter that chunks up data and iterates in a streaming sense could be high-value as well...

@elijahbenizzy
Copy link
Collaborator

OK, inspired by @jdonaldson's hamilton-like framework, I'm curious what could happen if we use chunking. Going to walk through a few different UIs I've been mulling over that would fit into the way we currently do things.

some use-cases:

  1. Run the whole DAG on a stream of data -- E.G. run it within a kafka stream
  2. Mini-batch training (everything streaming until the end, when we combine it all together to form a model)
  3. Training until resolution (E.G. train until a loss hits a certain value or a number of iterations has been hit)
  4. Running over multi-configurations (another way of doing subdags, actually, but this could be dynamic/runtime generated)
  5. Generating the same data from multiple files and joining them together

requirements/nice-to-have

  1. Superset of Hamilton/what can be run now
  2. Ability to run a node only once, and run a node every time a batch is executed (see node that is always re-run #90)
  3. Does not preclude intelligent caching -- E.G. it gets halfway through and dies, we should be able to store the state
    . Hamilton does not store everything (as we currently do now) -- ideally we'd just store the outputs in memory

Ideas

Chunking with type annotations

The idea here is that we use typing.Annotated to specify a function "shape

def training_files() -> List[str]:
    return [file_1, file_2, file_3]

def training_batch(training_files: List[str]) -> Chunked[pd.DataFrame]:
    """Releases the dataframe in batches for training."""
    for mini_training_file in training_files:
        yield pd.read_csv(mini_training_file)

def bad_data_filtered(training_batch: pd.DataFrame) -> pd.DataFrame:
    """Map operation that takes in a dataframe and outputs a dataframe. This is run each time the training_batch function is completed. We know that to be the case, as it is a pure map function."""
    return training_batch.filter(_filter_fn(training_batch))

def X(training_batch: pd.DataFrame) -> pd.DataFrame:
    return training_batch[FEATURE_COLUMNS]

def Y(training_batch: pd.DataFrame) -> pd.Series:
    return training_batch[TARGET_COLUMNS]

def model(X: Chunked[pd.DataFrame], Y: Chunked[pd.Series]) -> Model:
    """Aggregate operation. This is run one time per each yield from before, but the final model is returned"""
    model = Model()
    for x, y in zip(X,Y):
        model.minibatch_train(x,y)

Say we have three batches -- what's the order of operations?

  1. training_files is executed once
  2. the subdag consisting of bad_data_filtered -> X,Y is executed four times (one for each input)
  3. the subdag consisting of model is executed once -- looping over the inputs

So, the rules are:

  1. Generator fns Anything with Chunked[...] as the return type is treated as a generator
  2. Map fns Anything without Chunked as the return type or the input type is...
    a. Called once per yield if a Chunked is upstream
    b. Called once if no Chunked is upstream
  3. Aggregator fns Anything with Chunked as the input type but Chunked not as the output type is run once using the upstream generator
  4. custom maps Anything with Chunked as both is a little weird -- its equivalent to (1.a), but has the potential to compress/extend the iteration. TBD if this is supported... E.G. batching over batches.

There's some interesting implications/extensions.

Hydrating from cache

In this case, model isn't saved (or rather, it is, but it can't be used). You could imagine something like this:

def model(X: Chunked[pd.DataFrame], Y: Chunked[pd.Series], model: Model=None) -> Model:
    """Aggregate operation. This is run using the generator from before, but the final model is returned"""
    model = Model() if model is None else model
    for x, y in zip(X,Y):
        model.minibatch_train(x,y)

In this case, it accepts a previous model from the DAG run, or you can seed it from a certain result. Note, however, that
this would have to come with a truncation of the upstream data. So, what this implies is that you could rerun from where you left off, so long as you add the ability to self-hydrate.

Parallelization

Any _map_ function could easily be parallelized, which could provide natural node fusion (fuse all the nodes in the map operation). This is just plain-old map-reduce.

Logging functions

We could potentially break the aggregators into two and have one map it. There's likely a much cleaner way to write this, but...

def model_training(X: Chunked[pd.DataFrame], Y: Chunked[pd.Series]) -> Tuple[Model, TrainingMetrics]:
    """Aggregate operation broken up. Note that the model is just returned on the last yield."""
    model = Model() if model is None else model
    for x, y in zip(X,Y):
        metrics = model.minibatch_train(x,y)
        yield [model, metrics]

# Just returns the ID of where it was logged
# This is a side-effecty function and should likely be a materialized result
def logger(model_training: Chunked[Tuple[Model, TrainingMetrics]]) -> str:
    run_instance = generate_run_intance_from_logger()
    for _, metrics in model_training:
        run_instance.log(metrics)
    return run_instance.id

def model(model_training: Chunked[Tuple[Model, TrainingMetrics]]) -> Model:
    *_, (final_model, __) = model_training
    return final_model

Note that this implementation likely involves intelligent caching, as two outputs depend on it.

Nested Chunking

Yeah, this is too messy to allow for now, so I'm going to go ahead and say its banned. Could be implemented if we allow the Chunk->Chunk shape of a function.

@elijahbenizzy
Copy link
Collaborator

OK, latest API proposal -- feel free to react/comment. Planning to scope/implement shortly. Note these are just adaptations of the above.

Two primary use-cases we want to handle:

  1. Map over an arbitrary number of files, process, then join (either in parallel or in series)
  2. Minibatch training -- loading a bunch of data, doing feature engineering, updating the model's state (likely in series, possibly in parallel)

API

We are using type-hints to handle this -- they will be loose wrappers over generators. Three generic types, inspired (in part) by classic map-reduce:

Sequential, E.G.

def files_to_process(dir: str) -> Sequential[str]:
    """Anything downstream of this that depends on `files_to_process` as an `str`
    will run in sequence, until a Collect function."""
    for f in list_dir(dir):
        if f.endswith('.csv'):
            yield f

Parallel (likely not in v0), E.G.

def files_to_process(dir: str) -> Parallel[str]:
    """Anything downstream of this that depends on `files_to_process` as an `str`
    can run in parallel, until a Collect function. Note that this means that this entire generator might be run greedily, and the 
    results are dished out"""
    for f in list_dir(dir):
        if f.endswith('.csv'):
            yield f

Collect, E.G.

# setting up basic map reduce example 
def file_loaded(files_to_process: str) -> str:
    with open(files_to_process, 'r') as f:
        return f.read()

def file_counted(file_loaded: str) -> dict:
    return Counter(tokenize(file_loaded))

def word_counts(file_counted: Collect[Counter]) -> dict:
    """Joins all the word counts"""
    full_counter = Counter()
    for counter in file_counted:
        for word, counter in counter.items():
            full_counter[word] += count
    return full_counter

The rules are as follows:

  1. These are all valid edges:
    • Sequential[T_1] -> T_1
    • Parallel[T_1] -> T_1
    • Parallel[T_1] -> Collect[T_1] (degenerate case)
    • Sequential[T_1] -> Collect[T_1] (degenerate case)
    • Sequential[T_1] -> [T_1] -> ... [T_n] -> Collect[T_n] (rerun a whole group of nodes with different inputs)
    • Parallel[T_1] -> [T_1] -> ... [T_n] -> Collect[T_n] (rerun a whole group of nodes with different inputs)
  2. Nothing "mapped" by Parallel or Sequential can be left dangling (it must be "reduced" with Collect)
  3. Multiple nodes can subscribe to a generator (E.G. two functions can yield) -- they will get the same result, and it will be treated like a spliterator.
  4. diamond patterns will follow from (3) -- effectively a join within a subdag

Edge cases

  1. See above for multiple listeners/diamond patterns
  2. No commitments on nested generators yet -- TBD on complexity
  3. Joining two generators (multiple Collect types on different ones) should probably not be allowed for simplicity sake. Rather, the operation over the input types should probably be pushed upstream to the generator.

Implementation

TBD for now -- will scope out and report. First implementation will likely break some of the edge cases above. Some requirements though:

  1. We should have a path for caching mid-sequential (E.G. idempotent DAGs)
  2. We should have a path for debugging "subdags"
  3. We should reduce the memory used so we don't store anything we don't need -- otherwise it'll be messy (this requires changing core Hamilton)
  4. We should know/track that different subdags are that -- both in the viz (dotted lines/groups), as well as in some metadata that's reported afterwards.

@elijahbenizzy
Copy link
Collaborator

More edge cases:

  1. Something outside a Sequential/Collect group depends on something inside it (this is actually not an edge case, this is dangling group)
  2. Two collects on the same Sequential -- this should probably work, but we can ban it for now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request migrated-from-old-repo Migrated from old repository product idea question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants