Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelizable cannot aggregate or return multiple Collects #742

Closed
JamesArruda opened this issue Mar 5, 2024 · 4 comments
Closed

Parallelizable cannot aggregate or return multiple Collects #742

JamesArruda opened this issue Mar 5, 2024 · 4 comments
Labels
documentation Improvements or additions to documentation parallelism

Comments

@JamesArruda
Copy link

Current behavior

A DAG with one Parallelizable and two Collect statements cannot return results from both Collect nodes.

Stack Traces

KeyError: 'Key metric_2 not found in cache'

Steps to replicate behavior

import logging

from hamilton import driver
from hamilton.execution.executors import SynchronousLocalTaskExecutor
from hamilton.htypes import Collect, Parallelizable
import pandas as pd


ANALYSIS_OB = tuple[tuple[str,...], pd.DataFrame]
ANALYSIS_RES = dict[str, str | float]


def split_by_cols(full_data: pd.DataFrame, columns: list[str]) -> Parallelizable[ANALYSIS_OB]:
    for idx, grp in full_data.groupby(columns):
        yield (idx, grp)


def sub_metric_1(split_by_cols: ANALYSIS_OB, number: float=1.0) -> ANALYSIS_RES:
    idx, grp = split_by_cols
    return {"key": idx, "mean": grp["spend"].mean() + number}


def sub_metric_2(split_by_cols: ANALYSIS_OB) -> ANALYSIS_RES:
    idx, grp = split_by_cols
    return {"key": idx, "mean": grp["signups"].mean()}


def metric_1(sub_metric_1: Collect[ANALYSIS_RES], columns: list[str]) -> pd.DataFrame:
    data = [[k for k in d["key"]] + [d["mean"], "spend"] for d in sub_metric_1]
    cols = list(columns) + ["mean", "metric"]
    return pd.DataFrame(data, columns=cols)


def metric_2(sub_metric_2: Collect[ANALYSIS_RES], columns: list[str]) -> pd.DataFrame:
    data = [[k for k in d["key"]] + [d["mean"], "signups"] for d in sub_metric_2]
    cols = list(columns) + ["mean", "metric"]
    return pd.DataFrame(data, columns=cols)


def all_agg(metric_1: pd.DataFrame, metric_2: pd.DataFrame) -> pd.DataFrame:
    return pd.concat([metric_1, metric_2])


if __name__ == "__main__":
    from hamilton.execution import executors
    import __main__

    from hamilton.log_setup import setup_logging
    setup_logging(log_level=logging.DEBUG)

    local_executor = executors.SynchronousLocalTaskExecutor()

    dr = (
        driver.Builder()
        .enable_dynamic_execution(allow_experimental_mode=True)
        .with_modules(__main__)
        # .with_remote_executor(remote_executor)
        .with_local_executor(local_executor)
        .build()
    )
    df = pd.DataFrame(
        index=pd.date_range('20230101', '20230110'),
        data={  
            "signups": [1, 10, 50, 100, 200, 400, 700, 800, 1000, 1300],
            "spend": [10, 10, 20, 40, 40, 50, 100, 80, 90, 120],
            "region": ["A", "B", "C", "A", "B", "C", "A", "B", "C", "X"],
        }
    )
    ans = dr.execute(
        ["all_agg"],
        inputs={
            "full_data": df,
            "number": 3.1,
            "columns": ["region"],
        }
    )
    print(ans["all_agg"])

Library & System Information

Python 3.11.8 | packaged by conda-forge | (main, Feb 16 2024, 20:40:50) [MSC v.1937 64 bit (AMD64)] on win32

>>> hamilton.__version__
(1, 49, 2).

Expected behavior

I would expect to be able to retrieve any collections done. I can request one at a time for metric_1 and metric_2 and have it succeed.

Thank you in advance for your help!

@JamesArruda JamesArruda added the triage label for issues that need to be triaged. label Mar 5, 2024
@elijahbenizzy
Copy link
Collaborator

Hey! Thanks, this is a known limitation (see point 5 here) -- #301. That said, there's an easy workaround -- you can group them before running Collect. E.G.

def all_metrics(sub_metric_1: ANALYSIS_RES, sub_metric_2: ANALYSIS_RES) -> ANALYSIS_RES:
    return ... # join the two dicts in whatever way you want

def all_agg(all_metrics: Collect[ANALYSIS_RES]) -> pd.DataFrame:
    return ... # join them all into a dataframe

While its not ideal, it should just be adding one extra function! Note that this works in the above case where they're operating over the same partitions. In the case that they aren't, you'll want two separate parallelizations.

Going to reference this issue from the other one -- this is a good one to keep around/I can spend some time scoping out a fix.

@skrawcz skrawcz added documentation Improvements or additions to documentation parallelism and removed triage label for issues that need to be triaged. labels Mar 5, 2024
@skrawcz
Copy link
Collaborator

skrawcz commented Mar 5, 2024

Hey! Thanks, this is a known limitation (see point 5 here) -- #301. That said, there's an easy workaround -- you can group them before running Collect. E.G.

def all_metrics(sub_metric_1: ANALYSIS_RES, sub_metric_2: ANALYSIS_RES) -> ANALYSIS_RES:
    return ... # join the two dicts in whatever way you want

def all_agg(all_metrics: Collect[ANALYSIS_RES]) -> pd.DataFrame:
    return ... # join them all into a dataframe

While its not ideal, it should just be adding one extra function! Note that this works in the above case where they're operating over the same partitions. In the case that they aren't, you'll want two separate parallelizations.

Going to reference this issue from the other one -- this is a good one to keep around/I can spend some time scoping out a fix.

and if you want the flexibility to only compute one of them, you can utilize @config.when (docs) and have a few variants of the functions and pass in the correct configuration at driver build time to shape the DAG accordingly.

@JamesArruda
Copy link
Author

@elijahbenizzy, @skrawcz Thank you both for the help!

This is where I'm taking the workaround, and in case it's useful for you or anyone else looking at the ticket, this is successful:

@resolve(
    when=ResolveAt.CONFIG_AVAILABLE,
    decorate_with= lambda metric_names: inject(sub_metrics=group(*[source(x) for x in metric_names])),
)
def all_metrics(sub_metrics: list[ANALYSIS_RES], columns: list[str]) -> pd.DataFrame:
    frames = []
    for a in sub_metrics:
        frames.append(_to_frame(a, columns))
    return pd.concat(frames)

Don't forget:

from hamilton import settings
_config = {settings.ENABLE_POWER_USER_MODE:True}
_config["metric_names"] = ["sub_metric_1", "sub_metric_2"]

# Then in the driver building:
.with_config(_config)

@skrawcz
Copy link
Collaborator

skrawcz commented Jul 18, 2024

Closing this issue for now.

@skrawcz skrawcz closed this as completed Jul 18, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation parallelism
Projects
None yet
Development

No branches or pull requests

3 participants