Skip to content

Commit

Permalink
fix: use dask for concatenation, increases perf
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinBernstorff committed Oct 3, 2022
1 parent 8e4511e commit 4235f5c
Showing 1 changed file with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Takes a time-series and flattens it into a set of prediction times with
describing values."""

import datetime as dt
import os
from collections.abc import Callable
Expand All @@ -9,10 +8,13 @@
from pathlib import Path
from typing import Any, Optional, Union

import dask.dataframe as dd
import numpy as np
import pandas as pd
from catalogue import Registry # noqa # pylint: disable=unused-import
from dask.diagnostics import ProgressBar
from pandas import DataFrame
from tqdm.dask import TqdmCallback
from wasabi import Printer, msg

from psycop_feature_generation.timeseriesflattener.resolve_multiple_functions import (
Expand All @@ -24,6 +26,8 @@
generate_feature_colname,
)

ProgressBar().register()


def select_and_assert_keys(dictionary: dict, key_list: list[str]) -> dict:
"""Keep only the keys in the dictionary that are in key_order, and orders
Expand Down Expand Up @@ -604,10 +608,18 @@ def add_temporal_predictors_from_list_of_argument_dictionaries( # pylint: disab
]

msg.info("Feature generation complete, concatenating")
concatenated_dfs = pd.concat(
flattened_predictor_dfs,
axis=1,
).reset_index()

flattened_predictor_dds = [
dd.from_pandas(df, npartitions=6) for df in flattened_predictor_dfs
]

# Concatenate with dask, and show progress bar
with TqdmCallback(desc="compute"):
concatenated_dfs = (
dd.concat(flattened_predictor_dds, axis=1, interleave_partitions=True)
.compute() # Converts to pandas dataframe
.reset_index()
)

self.df = pd.merge(
self.df,
Expand Down

0 comments on commit 4235f5c

Please sign in to comment.