Skip to content

Utah-Math-Data-Science/Data-Assimilation-Flow-Matching

Repository files navigation

Data Assimilation with Flow Matching: The Ensemble Flow Filter

Code for replicating the results in Flow Matching for Efficient and Scalable Data Assimilation.

Installation

  1. Install uv:

    curl -LsSf https://astral.sh/uv/install.sh | sh
  2. Install Python dependencies using uv:

    uv sync
  3. Activate the Python virtual environment:

    source .venv/bin/activate
  4. Test your installation:

    pytest tests
  5. Edit the out_dir and run_subdir fields of the Conf class in src/conf/conf.py to the directory where you want the output of every experiment to be saved.

Supplementary Documentation

  • Hydra: Command-line inferface configuration library for configuring the experiments in this project.
  • Hydra ORM: Library for saving experiment configurations to an SQLite database.
  • PyTorch: Library for implementing the models.
  • PyTorch Lightning: Library for handling model training.

Running the data assilimation algorithms

Examples for the running the code are in the Examples subsection.

Run the command

python src/dafm/main.py dataset=<dataset> model=<model> <other_overrides>...

where:

  • <dataset> is one of:

    • Lorenz96Bao2024EnSF: N-dimensional chaotic system with parameters from [Bao2024b].
    • KuramotoSivashinsky: 1-dimensional chaotic Kuramoto-Sivashinsky PDE.
    • NavierStokesDim256: Navier-Stokes PDE with periodic boundary conditions discretized on a 256x256 grid.
  • <model> is one of:

    • ScoreMatchingMarginal: EnSF described in [Bao2024b].

      • Variants available: ScoreMatchingMarginalBao2024EnSF
    • FlowMatchingMarginal: Our EnFF methods that approximates the flow matching vector field using a Monte Carlo approximation.

      • Variants available: FlowMatchingMarginalConditionalOptimalTransport for EnFF-OT and FlowMatchingMarginalPreviousPosteriorToPredictive for EnFF-F2P.
    • BootstrapParticleFilter

    • EnsembleKalmanFilterPerturbedObservations

    • EnsembleKalmanFilterPerturbedObservationsIterative

    • EnsembleRandomizedSquareRootFilter: Known as the Ensemble Square Root Filter.

    • LocalEnsembleTransformKalmanFilter

  • <other_overrides>...: Other overrides for the model.

    Add the flag -c job to the Python command see what can be overridden from the command line. Some useful overrides include:

    • Changing the diffusion path:

      model/diffusion_path=VarianceExploding
    • For the flow matching models, changing the guidance vector field approximation:

      model/guidance=<guidance>

      where <guidance> is one of:

      • MonteCarlo*: the Monte Carlo approximation from section 3.2 of [Feng2025].
        • Variants available: MonteCarloTargetConditionalOptimalTransport
      • Local*: the local approximation from section 3.3 of [Feng2025].
        • Variants available: LocalConstant
    • Using particle noise perturbation:

      model.use_state_perturbation=true model.state_perturbation_std=0.5
    • Using particle inflation:

      model/inflation_scale=ConstantScale model.inflation_scale.constant=1.01

Examples

The following are example commands to show how to run the code.

# Run EnFF-F2P for Kuramoto-Sivashinsky with a grid of size 1024
python src/dafm/main.py dataset=KuramotoSivashinsky model=FlowMatchingMarginalPreviousPosteriorToPredictive model/guidance=LocalConstant model.guidance.schedule.constant=0.005 model.diffusion_path.sigma_min=1e-3 model.sampling_time_step_count=5
# Run EnFF-F2P for Navier-Stokes with a grid of size 256x256
python src/dafm/main.py dataset=NavierStokesDim256 model=FlowMatchingMarginalPreviousPosteriorToPredictive model/guidance=LocalConstant model.guidance.schedule.constant=0.001 model.diffusion_path.sigma_min=1e-3 model.sampling_time_step_count=10

For more examples, see the following bash scripts:

  • tune.sh: Runs a hyperparameter sweep for EnFF-OT, EnFF-F2P, and EnSF.
  • tune_classical.sh: Runs a hyperparameter sweep for the classical methods (e.g., EnKF).
  • test_best.sh: Evaluates EnFF-OT, EnFF-F2P, and EnSF using the best hyperparameters.
  • test_best_classical_comparison.sh: Evaluates all methods using the best hyperparameters for the datasets used in the comparison with classical methods.

To run these scripts, install GNU parallel. Once installed, replace --eta -j 1 with --dry-run in the bash scripts to generate many example commands.

Processing experiment output

We provide Jupyter notebooks in the notebooks directory to process the experiment output:

  • tune.ipynb: Notebook for compiling the results of a hyperparameter sweep for EnFF-OT, EnFF-F2P, and EnSF, or a hyperparameter sweep for the comparison with classical methods. See tune.sh and tune_classical.sh to run these hyperparameter sweeps. It saves a CSV file containing the best hyperparameters in the sweeps directory.
  • logged_metrics.ipynb: Notebook for producing Figure 2. See test_best.sh to produce the data for this figure.
  • sensitivity.ipynb: Notebook for producing the ablation study figure for EnFF-OT and EnFF-F2P (Figure 6). See tune.sh to produce the data for this figure.
  • classical_comparison.ipynb: Notebook for producing Figure 5. See test_best_classical_comparison.sh to produce the data for this figure.
  • datasets_*.ipynb: Notebooks for visualizing the dynamical systems used in the paper.
  • trajectories_*.ipynb: Notebooks for visualizing the estimated dynamical system states produced by each model. Set save_data=True in src/dafm/main.py on line 86 to save the estimated states before running the model to save the estimated states.

Running data assimilation experiments in parallel

Warning

On network file systems (NFS), starting multiple processes running this code can corrupt the SQLite database storing the experiment configurations. See question (5) of the SQLite FAQs. See the Preflight section below to see how to ensure the experiment configurations are written to the database serially.

Using GNU parallel, multiple experiments can be run in parallel.

parallel --eta --header : python src/dafm/main.py <override_1>={<param_1>} <override_2>={<param_2>} ... ::: <param_1> <p1value_1> <p1value_2> ... ::: <param_2> <p2value_1> <p2value_2> ...

Preflight

To ensure that experiment configurations are saved to the database serially, run GNU parallel command with -j 1 and the Python command with -c job.

parallel -j 1 --eta --header : python src/dafm/main.py -c job <override_1>={<param_1>} <override_2>={<param_2>} ... ::: <param_1> <p1value_1> <p1value_2> ... ::: <param_2> <p2value_1> <p2value_2> ...

Once this command has finished, all the experiment configurations have been saved. Next, run the first GNU parallel command to begin running the experiments in parallel.

References

[Bao2024b](1, 2) F. Bao, Z. Zhang, and G. Zhang, "An ensemble score filter for tracking high-dimensional nonlinear dynamical systems," Computer Methods in Applied Mechanics and Engineering, vol. 432, p. 117447, Dec. 2024, doi: 10.1016/j.cma.2024.117447.
[Feng2025](1, 2) R. Feng, T. Wu, C. Yu, W. Deng, and P. Hu, "On the Guidance of Flow Matching," Feb. 04, 2025, arXiv: arXiv:2502.02150. doi: 10.48550/arXiv.2502.02150.

About

Data assimilation with flow matching.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 2

  •  
  •