In [None]:
"""
| Copyright (c) 2025
| Deutsches Zentrum für Luft- und Raumfahrt e.V. (DLR)
| Linder Höhe
| 51147 Köln

:Authors: Judith Heusel, Michael Roth, Keivan Kiyanfar, Erik Hedberg

:Version: 1.4.0

:Project: railpos

License
-------
This software is released under the 3-Clause BSD License.
Details about the license can be found in the LICENSE file.

Description
Example notebook for rail vehicle positioning within the mFUND project OnboardEU.
-------
"""

In [None]:
import sys
import toml
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path

path = Path().resolve()
rep_path = path.parent

if rep_path not in sys.path:
    sys.path.insert(0, str(rep_path))

from railpos import dataframes, helpers, kalmanfilter, railwaymaps, railwaypaths, timestamps, visualization
from onboardeu import onboardeuhelpers

%matplotlib widget

This notebook demonstrates offline positioning of a railway vehicle using GNSS (Global Navigation Satellite Systems) and IMU (Inertial Measurement Unit) data. It provides a step by step guide and example plots. The input data are in HDF5 format. The readme.md contains a link to the data.
The positioning pipeline is performed per journey of the vehicle, i.e., from standstill to standstill.

To run the notebook, place the files containing the map and the sensor data in the data folder of this repository, as explained in the readme.

## Data reading

### First, read the config file.

In [None]:
config_path = Path(rep_path, 'onboardeu', 'onboardeu_001_example.toml')
with open(config_path, 'r') as toml_file:
    config = toml.load(toml_file)

### Read the map data of the railway network. A RailwayMap object is created from the file containing the map data.

In [None]:
rmap = railwaymaps.RailwayMap.from_geopackage(Path(rep_path, config['map_filepath']))

### Read the sensor data (GNSS and IMU) for a journey.

#### Create the path to the HDF5 file.

In [None]:
path_hdf = Path(rep_path, config['data_filepath'])

#### Specify the number of the journey (in [0, ... 51]) and read the sensor data

In [None]:
journey_nr = 0
df_gnss, df_imu, attrs_gnss, attrs_imu = onboardeuhelpers.get_measurement_data(path_hdf, journey_nr)
df_gnss = dataframes.df2gdf(df_gnss, lon_column='lon', lat_column='lat')
df_gnss.to_crs(rmap.geo_df.crs, inplace=True)

#### Print the attributes to know what is contained in the data

In [None]:
attrs_gnss

In [None]:
attrs_imu

#### Add a column to the IMU dataframe with low pass filtered longitudinal acceleration. Decide which acceleration will be used as input for the Kalman filter.

In [None]:
f_cut = 0.5 # in Hz
df_imu['acc_x_filt'] = helpers.filtfilt_lp(df_imu.acc_x.values, f_cut=f_cut)
acc_col = 'acc_x_filt'

#### Plot the input data

In [None]:
i = 0
plt.close(i)
fig, ax = plt.subplots(2, 1, sharex=True, num=i, figsize=(8, 6))
ax[0].plot(df_gnss.times, df_gnss.speed, label='GNSS speed', color='teal', marker='.')
ax[1].plot(df_imu.times, df_imu.acc_x, label='Longitudinal acc. (veh. coord.)')
ax[1].plot(df_imu.times, df_imu.acc_y, label='Lateral acc.')
ax[1].plot(df_imu.times, df_imu.acc_z, label='Vertical acc.')
acc_int_filt = helpers.integrate_signal(df_imu.acc_x_filt.values, df_imu.times.values, bias=np.mean(df_imu.acc_x_filt.values))
acc_int = helpers.integrate_signal(df_imu.acc_x.values, df_imu.times.values, bias=np.mean(df_imu.acc_x.values))
ax[0].plot(df_imu.times, acc_int, label='Integrated long. acc.', color='magenta')
ax[0].plot(df_imu.times, acc_int_filt, label='Filtered, integrated long. acc.', color='pink')
for i in [0, 1]:
    ax[i].legend()
    ax[i].grid()
    ax[i].set_xlabel('Time in s')
ax[0].set_ylabel('Speed in m/s')
ax[1].set_ylabel('Acceleration in m/$s^2$')

## Find the path in the railway network which fits the data best

A path consists of tracks of the railway network together with their orientations in the path. The best path is found by minimising the projection errors of the input GNSS data to the valid paths.
Attention has to be paid to short paths. If the start and end tracks of a path both lie within a certain distance to both the start and end points, the original and the reverted path are found by the algorithm to be possible paths for the data. In this case, the path for which the path distances of the projections of the input GNSS data are increasing is selected.

In [None]:
rpaths = railwaypaths.rpaths_from_gnss(rmap, df_gnss, thr_meter=config['thr_meter'])
if len(rpaths) == 0:
    raise ValueError("No paths were found.")
flags_direction = [railwaypaths.flag_movement_direction_not_into_path_direction(rpath, df_gnss.geometry) for rpath in rpaths]
rpaths_valid = [rpath for nr, rpath in enumerate(rpaths) if flags_direction[nr]==False]
if len(rpaths_valid) == 0:
    raise ValueError("No valid path found.")

projection_errors_matrix, ibest = railwaypaths.rpath_projection_errors(rpaths_valid, df_gnss)
rpath = rpaths_valid[ibest].copy()

### Plot the projection errors of the path hypotheses

In [None]:
i = 1
plt.close(i)
fig = onboardeuhelpers.georef_plot_path_projection_errors(projection_errors_matrix, figsize=(4,3), num=i)

### Plot the paths and the GNSS data

In [None]:
max_speed = np.absolute(df_gnss['speed']).max()*1.1
cmap = visualization.create_linear_colormap(['midnightblue', 'teal', 'yellow',
                                            'darkorange'], vmin=0, vmax=max_speed, caption="Speed in m/s")
cmap_time = visualization.create_linear_colormap(['white', 'coral', 'red', 'darkred'], vmin=df_gnss.times.min(),
                                                 vmax=df_gnss.times.max(), caption='Time in s', max_labels=8)

foliummap = rmap.geo_df.explore(color='white', name='Railway network')
df_gnss.explore('times', cmap=cmap_time, m=foliummap, name='GNSS times')
df_gnss.explore('speed', name='GNSS speed', cmap=cmap, m=foliummap)
onboardeuhelpers.add_path_hypotheses_to_folium(foliummap, rpaths, add_layer_control=False)
rpath.railway_map.geo_df.iloc[rpath.tracks].explore(m=foliummap, name='Best path', color='black')
visualization.folium_layer_control(foliummap)
foliummap.add_to(visualization.folium_figure())
foliummap

### If the vehicle is driving backwards with respect to vehicle coordinate frame, reverse the path (such that decreasing vehicle distances lead to decreasing path distances).

Attention: Do not execute the following cell an even number of times (double reversing the path leads to the original path despite the vehicle is driving backwards).

In [None]:
forward_journey, _ = helpers.driving_direction(df_imu[acc_col].values, df_imu.times.values, df_gnss.speed.values, df_gnss.times.values)
if forward_journey<0:
    rpath.reverse()

## Perform an on-path Kalman filter (and smoother)

### Create the timestamp vector for the Kalman filter.

In [None]:
time_kf = timestamps.create_kf_timestamps({1:df_gnss.times, 2:df_imu.times}, dt_kf=0.01, timespan='max')

### Create OfflineMeasurement objects for GNSS and IMU data.

In [None]:
mm_gnss = kalmanfilter.OfflineMeasurement(df_gnss, df_gnss.times, time_kf, keydict=dict(zip(df_gnss.columns, df_gnss.columns)))
mm_imu = kalmanfilter.OfflineMeasurement(df_imu, df_imu.times, time_kf, keydict=dict(zip(df_imu.columns, df_imu.columns)))

### Create parameter dictionaries for general, time update and measurement update parameters.

In [None]:
params = {'railway_path': rpath, 'forward_journey': forward_journey, 'tsamp': config['tsamp'], 'acc_bias': df_imu[acc_col].mean()}

In [None]:
tu_params = {'acc_var': config['acc_var']}
mu_params = {'pos_var': config['pos_var'], 'speed_var': config['speed_var'], 'dpert': config['dpert'], 'nsat_min': config['nsat_min'],
             'accuracy_horizontal_max':config['accuracy_horizontal_max']}

In [None]:
standstill_kf = helpers.standstill_from_gnss(df_gnss.times.values, df_gnss.speed.values, time_kf)

### Initialise the Kalman filter object

In [None]:
kf, tu_params = kalmanfilter.prepare_kf(params, tu_params, df_gnss, df_imu, acc_col=acc_col, speed_col='speed')

### Kalman filtering

In [None]:
kf_keydict={'acc': acc_col, 'speed': 'speed', 'pos': 'geometry', 'speed_std': 'speedacc', 'pos_std': 'accuracy_horizontal', 'nsat': 'nsat'}

In [None]:
est_filt, cov_filt, est_pred, cov_pred = kalmanfilter.run_offline_kf(
    time_kf, kf, params, tu_params, mu_params, mm_imu, mm_gnss, standstill_kf=standstill_kf, kf_keys=kf_keydict)

### Smoothing

In [None]:
est_smth, cov_smth = kalmanfilter.rts(est_filt, cov_filt, est_pred, cov_pred, kf.fmat)

### Create an output GeoDataFrame containing the results of the offline positioning.

In [None]:
gdf_georef = kalmanfilter.create_georef_output(time_kf, est_smth, cov_smth, params['railway_path'])

The output GeoDataFrame contains time and speed information, the driven distance with respect to the path, driven tracks (integer indices referring to the GeoDataFrame of the RailwayMap), distances with respect to the tracks, position and speed standard deviations and absolute position information (cartesian and geographic coordinates).

### Create a GeoDataFrame containing the projected input (GNSS) positions to the railway path to compare input and output.

In [None]:
gdf_projected = onboardeuhelpers.get_gdf_projected_points(df_gnss, rpath, columns_to_keep=['times'])

### Plot the results and the input data.

In [None]:
i = 2
plt.close(i)
fig, ax = plt.subplots(2, 1, sharex=True, num=i)
ax[0].plot(df_gnss.times, df_gnss.speed, label='GNSS speed', color='teal')
ax[0].plot(gdf_georef.time_sec, gdf_georef.speed, color='pink', label='Georef. speed')
ax[0].plot(gdf_georef.time_sec, gdf_georef.velocity, color='magenta', label='Georef. velocity')
ax[0].fill_between(time_kf, gdf_georef.speed + np.sqrt(cov_smth[:, 1, 1]), gdf_georef.speed - np.sqrt(cov_smth[:, 1, 1]),
                    alpha=0.5)
ax[0].fill_between(time_kf, gdf_georef.velocity + np.sqrt(cov_smth[:, 1, 1]), gdf_georef.velocity - np.sqrt(cov_smth[:, 1, 1]),
                    alpha=0.5)
ax[1].plot(gdf_projected.times, gdf_projected.distance_on_path, label='Projected GNSS positions')
ax[1].plot(gdf_georef.time_sec, gdf_georef.distance_on_path, label='Georef. path distance')
ax[1].fill_between(time_kf, est_smth[:, 0] + np.sqrt(cov_smth[:, 0, 0]), est_smth[:, 0] - np.sqrt(cov_smth[:, 0, 0]),
                    alpha=0.5, color='grey')
for i in [0, 1]:
    ax[i].legend()
    ax[i].grid()
    ax[i].set_xlabel('Time in s')
ax[0].set_ylabel('Speed in m/s')
ax[1].set_ylabel('Distance in m')

### Visualise input and output data on the map

In [None]:
max_speed = np.absolute(gdf_georef['speed']).max()*1.1
cmap = visualization.create_linear_colormap(['midnightblue', 'teal', 'yellow',
                                            'darkorange'], vmin=0, vmax=max_speed, caption="Speed in m/s")
cmap_time = visualization.create_linear_colormap(['white', 'coral', 'red', 'darkred'], vmin=gdf_georef.time_sec.min(),
                                                 vmax=gdf_georef.time_sec.max(), caption='Time in s', max_labels=4)

foliummap = rmap.geo_df.explore(color='white', name='Railway network')
rpath.railway_map.geo_df.iloc[rpath.tracks].explore(m=foliummap, name='Path', color='lightgreen')
df_gnss.explore('times', cmap=cmap_time, m=foliummap, name='GNSS times')
df_gnss.explore('speed', name='GNSS speed', cmap=cmap, m=foliummap)
gdf_georef.iloc[::10,:].explore('speed', m=foliummap, cmap=cmap, name='Georeferencing output (speed)', vmin=0, vmax=max_speed)
visualization.folium_niedersachsen_image(foliummap)
visualization.folium_layer_control(foliummap)
foliummap.add_to(visualization.folium_figure())
foliummap

## Write the results to a file (HDF5 format)

In [None]:
onboardeuhelpers.georef_write_results(gdf_georef, Path(rep_path, 'data', 'onboardeu_001_georef.h5'), dset_name=('journey_' + str(journey_nr).zfill(2)))