In [6]:
!uv pip install polars

[2K[2mResolved [1m1 package[0m in 374ms[0m                                                  [0m
[2K[2mDownloaded [1m1 package[0m in 4.08s[0m1                                      [0m
[2K[2mInstalled [1m1 package[0m in 28ms[0m.31                                      [0m
 [32m+[39m [1mpolars[0m[2m==0.20.31[0m


In [7]:
# data processing
import datetime
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Union

import polars as pl


@dataclass(kw_only=True)
class Data:
    df: pl.DataFrame
    columns: Optional[list[str]] = None
    row_index: Optional[list[int]] = None
    date_partition_column: Optional[Union[str, list[datetime.datetime]]] = None
    partition_column: Optional[str] = None
    date_column: Optional[str] = None
    date_format: str = "%Y-%m-%d"
    target_column: str

    @staticmethod
    def load(load_path: Path, target_column: str, **kwargs) -> "Data":
        df = pl.read_csv(
            load_path,
        )
        return Data(df=df, target_column=target_column, **kwargs)

    @property
    def rendered_df(self) -> pl.DataFrame:
        df = self.df
        if self.columns is not None and len(self.columns):
            df = df.select(self.columns)
        if self.row_index is not None and len(self.row_index):
            df = df[self.row_index]
        if self.date_column is not None and not df.dtypes[df.columns.index(self.date_column)] == pl.Datetime:
            df = df.with_columns(
                pl.col(self.date_column).str.to_datetime(self.date_format)
            )
        return df

    def get_date_partitions(self):
        df = self.rendered_df

        if self.date_column is None:
            raise ValueError("date column is not set")
        start_date = df[self.date_column].min()
        end_date = df[self.date_column].max()
        if self.date_partition_column is None:
            return {"__all__": df}

        elif isinstance(self.date_partition_column, str):
            return {
                group: group_df
                for group, group_df in df.groupby([self.date_partition_column])
            }

        elif isinstance(self.date_partition_column, list) and isinstance(
            self.date_partition_column[0], datetime.datetime
        ):
            partition_dates = self.date_partition_column
            return {
                start_date: df.filter(
                    df[self.date_column].is_between(start_date, end_date)
                )
                for start_date, end_date in zip(
                    [start_date] + partition_dates, partition_dates + [end_date]
                )
            }

    def get_partitions(self):
        df = self.get_dataframe()
        if self.partition_column is not None:
            return {
                group: df_group
                for group, df_group in df.groupby([self.partition_column])
            }
        else:
            return {"__all__": df}


@dataclass(kw_only=True)
class TrainingData(Data): ...


class ExternalHoldoutData(Data): ...


class FeatureSelectionMethod: ...


def load(path: str) -> tuple[TrainingData, ExternalHoldoutData]: ...


def variable_downsampling(data: Data) -> Data: ...


def target_engineering(data: Data) -> Data: ...


def feature_selection(data: Data, method: FeatureSelectionMethod) -> Data: ...

In [8]:
pwd

[32m'/Users/lukas.innig/code/recipe-xflow'[0m

In [9]:
training_data = TrainingData.load(
    load_path=Path("include/x_flow/raw_data/DR_Demo_Bond_trading_RFQ_train.csv"),
    date_column="date",
    date_partition_column=[datetime.datetime(2018, 6, 3), datetime.datetime(2018, 6, 24)],
    date_format="%d/%m/%Y",
    target_column="Mid",
)
test_data = ExternalHoldoutData.load(
    load_path=Path("include/x_flow/raw_data/DR_Demo_Bond_trading_RFQ_test.csv"),
    date_column="date",
    date_format="%d/%m/%Y",
    target_column="Mid",
)

In [94]:
import datarobotx

In [13]:
from abc import ABC, abstractmethod

from utils.operator import Operator


class DataPreprocessor(ABC):
    def fit(self, df: Data) -> "DataPreprocessor":
        return self._fit(df)

    def transform(self, df: Data) -> Data:
        return self._transform(df)

    def fit_transform(self, df: Data) -> Data:
        return self._fit(df)._transform(df)

    @abstractmethod
    def _fit(self, df: Data) -> "DataPreprocessor": ...
    @abstractmethod
    def _transform(self, df: Data) -> Data: ...


class BinarizeData(DataPreprocessor):
    def __init__(
        self,
        threshold: float,
        operator: str,
        binarize_drop_regression_target=True,
        binarize_new_target_name="target_cat",
    ):
        self._threshold = threshold
        self._operator = operator
        self._binarize_drop_regression_target = binarize_drop_regression_target
        self._binarize_new_target_name = binarize_new_target_name

    def _fit(self, df: Data):
        return self

    def _transform(self, df: Data) -> pl.DataFrame:
        """helper function: binarize a target variable for classification"""
        categorical_data = df.rendered_df
        target_series = categorical_data[df.target_column]

        op_fun = Operator(operator=self._operator).apply_operation(self._threshold)

        categorical_data = categorical_data.with_columns(
            target_series.map_elements(op_fun, return_dtype=bool).alias(self._binarize_new_target_name)
        )
        if self._binarize_drop_regression_target:
            categorical_data.drop(df.target_column)

        df.df = categorical_data
        df.target_column = self._binarize_new_target_name

        return df

In [14]:
binarizer = BinarizeData(
    threshold=100, operator="<", binarize_drop_regression_target=True)

In [15]:
binarizer.fit_transform(training_data)
binarizer.fit_transform(test_data)


[1;35mData[0m[1m([0m
    [33mdf[0m=[35mshape[0m: [1m([0m6_803, [1;36m34[0m[1m)[0m
┌────────────┬────────────┬──────────┬─────────┬───┬───────────┬───────────┬───────────┬───────────┐
│ request_id ┆ date       ┆ cusip    ┆ BidAsk  ┆ … ┆ response_ ┆ num_broke ┆ trade_won ┆ target_ca │
│ ---        ┆ ---        ┆ ---      ┆ ---     ┆   ┆ revenue_E ┆ rs        ┆ ---       ┆ t         │
│ i64        ┆ datetime[1m[[0mμ ┆ str      ┆ f64     ┆   ┆ UR        ┆ ---       ┆ bool      ┆ ---       │
│            ┆ s[1m][0m         ┆          ┆         ┆   ┆ ---       ┆ i64       ┆           ┆ bool      │
│            ┆            ┆          ┆         ┆   ┆ f64       ┆           ┆           ┆           │
╞════════════╪════════════╪══════════╪═════════╪═══╪═══════════╪═══════════╪═══════════╪═══════════╡
│ [1;36m1781[0m       ┆ [1;36m2018[0m-[1;36m04[0m-[1;36m25[0m ┆ D20659WR ┆ [1;36m0.0242[0m  ┆ … ┆ [1;36m27923.751[0m ┆ [1;36m9[0m         ┆ false     ┆ true      │
│ 

In [None]:
from utils.fire import FIRE as FireHelper
from datarobotx.idp.autopilot import get_or_create_autopilot_run
from datarobotx.idp.datasets import get_or_create_dataset_from_df


In [None]:

import datarobot as dr

class FIRE(DataPreprocessor):
    def __init__(
        self,
        endpoint: str,
        token: str,
        reduction_method:str = "Rank Aggregation",
    ):
        self._endpoint = endpoint
        self._token = token
        self._reduction_method = reduction_method

    def _fit(self, df: Data):
        return self

    def _transform(self, df: Data) -> pl.DataFrame:
        """helper function: binarize a target variable for classification"""
        dataset_id = get_or_create_dataset_from_df(
            endpoint=self._endpoint,
            token=self._token,
            data_frame=df.rendered_df.to_pandas(),
            name="fire_dataset",
        )
        project_id = get_or_create_autopilot_run(
            endpoint=self._endpoint,
            token=self._token,
            dataset_id=dataset_id,
            name="fire_project",
            analyze_and_model_config={
                "target": df.target_column,
                "mode": "quick",
                "max_wait": 10000,
                "worker_count": -1,
            },
            advanced_options_config={
                "blend_best_models": False,
                "prepare_model_for_deployment": False,
                "min_secondary_validation_model_count": 0,
            }
        )

        fire = FireHelper.get(project_id=project_id)
        fire.main_feature_reduction(
            reduction_method=self._reduction_method
        )



        return df