In [None]:
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestRegressor
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LinearRegression
from sklearn.neighbors import KNeighborsRegressor
from sklearn.svm import SVR

In [None]:
from latentis.utils import seed_everything

seed_everything(0)

In [None]:
from latentis.pipeline._builder import DAGBuilder

dag = (
    DAGBuilder(infer_dataframe=True)
    .add_step("x", "passthrough", deps=None)
    .add_step("y", "passthrough", deps=None)
    .add_step("impute", SimpleImputer())
    .add_step("vitals", "passthrough", deps={"impute": ["age", "sex", "bmi", "bp"]})
    .add_step("blood", PCA(n_components=2, random_state=0), deps={"impute": ["s1", "s2", "s3", "s4", "s5", "s6"]})
    .add_step("rf", RandomForestRegressor(max_depth=5, random_state=0), deps=["blood", "vitals"])
    .add_step("svm", SVR(C=0.7), deps=["blood", "vitals"])
    .add_step("knn", KNeighborsRegressor(n_neighbors=5), deps=["blood", "vitals"])
    .add_step("meta", LinearRegression(), deps=["rf", "svm", "knn"])
    .make_dag()
)

In [None]:
class Procrustes(Transform):
    def fit(self, X, Y):
        self.x_scaler.fit(X)
        self.y_scaler.fit(Y)

        self.estimator.fit(self.x_scaler.transform(X), self.y_scaler.transform(Y))

        return self

    def transform(self, X, y=None):
        X = self.estimator.transform(self.x_scaler.transform(X))
        return self.y_scaler.inverse_transform(X)

In [None]:
from latentis.pipeline._builder import DAGBuilder

y_scaler = StandardScaler()
dag = (
    DAGBuilder(infer_dataframe=True)
    .add_step("x", "passthrough", deps=None)
    .add_step("y", "passthrough", deps=None)
    .add_step("x_scaling", StandardScaler(), x_deps=["x"])
    .add_step("y_scaling", y_scaler, y_deps=["y"])
    # .add_step("estimator", SVDEstimator(), x_deps=["x_scaling"], y_deps=["y_scaling"])
    .add_step("estimator", CCAEstimator(), x_deps=["x_scaling"], y_deps=["y_scaling"])
    .add_step("y_descaler", Reverse(y_scaler), x_deps={"estimator": 0})
    .make_dag()
)

In [None]:
from sklearn import datasets
from sklearn.model_selection import train_test_split


X, y = datasets.load_diabetes(return_X_y=True, as_frame=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)

dag.fit(X_train, y_train)
dag.predict(X_test)