# IPIN 2025 Flowcean Hands-on Session Solution



In [None]:
# some imports we will need
import logging

logger = logging.getLogger(__name__)

## Section 1 : Load and Prepare the Training Data

In [None]:
# Import flowcean and cli
import flowcean
import flowcean.cli

# Import some helper functions for loading ROS data
from os import PathLike

from _collections_abc import Iterable
from flowcean.core.transform import Lambda

# import transforms
from flowcean.polars import DataFrame, ExplodeTimeSeries, ZeroOrderHold
from flowcean.ros import load_rosbag

from _helper_functions import shift_in_time

# The function below looks for a config.yaml in the current directory
# In the config.yaml, we specify settings for our training run
config = flowcean.cli.initialize()


### Task 1.1 Load Rosbags and Choose Inputs
                    

In [None]:
# Configure the load_rosbag() function below
def load_and_process_rosbag(
    path: str | PathLike,
    message_paths: Iterable[str | PathLike] | None = None,
) -> DataFrame:
    logger.info("Loading rosbag from: %s", path)

    rosbag = load_rosbag(
        # TODO: TASK 1.1
        path=path,
        topics={
            "/turtle1/cmd_vel": [
                "linear.x",
                "angular.z",
            ],
            "/turtle1/pose": [
                "x",
                "y",
                "theta",
            ],
        },
        message_paths=message_paths,
    )
    return (
        DataFrame(rosbag)
        | ZeroOrderHold(
            features=[
                "/turtle1/cmd_vel",
                "/turtle1/pose",
            ],
            name="measurements",
        )
        | ExplodeTimeSeries("measurements")
        | Lambda(shift_in_time)
    )


# using our loaded config we want to create training and evaluation samples
samples_train = load_and_process_rosbag(
    config.rosbag.training_path,
    config.rosbag.message_paths,
)
samples_eval = load_and_process_rosbag(
    config.rosbag.evaluation_path,
    config.rosbag.message_paths,
)


### Task 1.2 Create Training Data Frame


In [None]:
# Modify the return statement below to include the necessary transforms
def load_and_process_rosbag(
    path: str | PathLike,
    message_paths: Iterable[str | PathLike] | None = None,
) -> DataFrame:
    logger.info("Loading rosbag from: %s", path)

    rosbag = load_rosbag(
        path=path,
        message_paths=message_paths,
        topics={
            "/turtle1/cmd_vel": [
                "linear.x",
                "angular.z",
            ],
            "/turtle1/pose": [
                "x",
                "y",
                "theta",
            ],
        },
    )
    return (
        DataFrame(rosbag)
        | ZeroOrderHold(
            features=[
                "/turtle1/cmd_vel",
                "/turtle1/pose",
            ],
            name="measurements",
        )
        | ExplodeTimeSeries("measurements")
        | Lambda(shift_in_time)
    )


# using our loaded config we want to create training and evaluation samples
samples_train = load_and_process_rosbag(
    config.rosbag.training_path,
    config.rosbag.message_paths,
)
samples_eval = load_and_process_rosbag(
    config.rosbag.evaluation_path,
    config.rosbag.message_paths,
)

## Section 2 : Select Learners across Libraries 


In [None]:
# we load all the learners for our training loop
from flowcean.sklearn import RandomForestRegressorLearner, RegressionTree
from flowcean.torch import LightningLearner, MultilayerPerceptron
from flowcean.xgboost import XGBoostRegressorLearner

inputs = [
    "/turtle1/pose/x",
    "/turtle1/pose/y",
    "/turtle1/pose/theta",
    "/turtle1/cmd_vel/linear.x",
    "/turtle1/cmd_vel/angular.z",
]
outputs = [
    "/turtle1/pose/x_next",
    "/turtle1/pose/y_next",
    "/turtle1/pose/theta_next",
]


### Task 2.1 Learner configuration


In [None]:
# create and configure the learners below
regression_tree = RegressionTree(**config.training.tree)
random_forest = RandomForestRegressorLearner(
    **config.training.forest,
)
mlp = LightningLearner(
    module=MultilayerPerceptron(
        learning_rate=config.training.mlp.learning_rate,
        input_size=len(inputs),
        output_size=len(outputs),
    ),
    batch_size=config.training.mlp.batch_size,
    max_epochs=config.training.mlp.max_epochs,
)
xgb = XGBoostRegressorLearner()

### Task 2.2 Prepare Sequential Learning

In [None]:
learners = [
    regression_tree,
    random_forest,
    mlp,
    xgb,
]

## Section 3: Training of the Models


In [None]:
# we load our learning strategy
from flowcean.core import learn_offline



### Task 3.1 Create a Sequential Learning Loop


In [None]:
models = []
for learner in learners:
    logger.info("Training model: %s", learner.name)
    model = learn_offline(
        samples_train,
        learner,
        inputs=inputs,
        outputs=outputs,
    )
    models.append(model)

## Section 4 : Evaluation and Model Comparison


In [None]:
# we load our metrics for comparison
from flowcean.sklearn import MeanAbsoluteError, MeanSquaredError, R2Score
from custom_metrics.euclidean_distance import MeanEuclideanDistance

# import function for model comparison
from flowcean.core import evaluate_offline

### Task 4.1 Chose Metrics for Evaluation



In [None]:
metrics = [
    MeanAbsoluteError(),
    MeanSquaredError(),
    R2Score(),
    MeanEuclideanDistance(
        columns=[
            "/turtle1/pose/x_next",
            "/turtle1/pose/y_next",
        ],
    ),
]

### Task 4.2 Create an Evaluation Loop


In [None]:
report = evaluate_offline(
    models,
    environment=samples_eval,
    metrics=metrics,
    inputs=inputs,
    outputs=outputs,
)
report.great_table()


###  Task 4.3 Select a Model and Visualization  



In [None]:
from _helper_functions import plot_predictions_vs_ground_truth

best_model = models[3]
logger.info("Selected model: %s", best_model.name)

# Plots are saved under plots/
plot_predictions_vs_ground_truth(
    samples_eval=samples_eval.observe().collect(),
    input_names=inputs,
    output_names=outputs,
    models=models,
)

# save the model to disk
best_model.save("model.fml")