1212//! let cluster = cluster::Cluster::new(&data_dir, runtime);
1313//! let lock_file = cluster_dir.path().join("lock");
1414//! let lock = lock::UnlockedFile::try_from(lock_file.as_path()).unwrap();
15- //! assert!(coordinate::run_and_stop(&cluster, lock, || cluster.exists()).unwrap())
15+ //! assert!(coordinate::run_and_stop(&cluster, lock, |_| Ok(()), |cluster | cluster.exists()).unwrap())
1616//! ```
1717
1818use std:: time:: Duration ;
@@ -30,16 +30,18 @@ use crate::lock;
3030/// (maybe) stops the cluster again, and finally returns the result of `action`.
3131/// If there are other users of the cluster – i.e. if an exclusive lock cannot
3232/// be acquired during the shutdown phase – then the cluster is left running.
33- pub fn run_and_stop < F , T > (
34- cluster : & Cluster ,
33+ pub fn run_and_stop < ' a , INIT , ACTION , T > (
34+ cluster : & ' a Cluster ,
3535 lock : lock:: UnlockedFile ,
36- action : F ,
36+ initialise : INIT ,
37+ action : ACTION ,
3738) -> Result < T , ClusterError >
3839where
39- F : std:: panic:: UnwindSafe + FnOnce ( ) -> T ,
40+ INIT : std:: panic:: UnwindSafe + FnOnce ( & ' a Cluster ) -> Result < ( ) , ClusterError > ,
41+ ACTION : std:: panic:: UnwindSafe + FnOnce ( & ' a Cluster ) -> T ,
4042{
41- let lock = startup ( cluster, lock) ?;
42- let action_res = std:: panic:: catch_unwind ( action) ;
43+ let lock = startup ( cluster, lock, initialise ) ?;
44+ let action_res = std:: panic:: catch_unwind ( || action ( cluster ) ) ;
4345 let _: Option < bool > = shutdown ( cluster, lock, |cluster| cluster. stop ( ) ) ?;
4446 match action_res {
4547 Ok ( result) => Ok ( result) ,
@@ -54,27 +56,33 @@ where
5456/// returning. If there are other users of the cluster – i.e. if an exclusive
5557/// lock cannot be acquired during the shutdown phase – then the cluster is left
5658/// running and is **not** destroyed.
57- pub fn run_and_destroy < F , T > (
58- cluster : & Cluster ,
59+ pub fn run_and_destroy < ' a , INIT , ACTION , T > (
60+ cluster : & ' a Cluster ,
5961 lock : lock:: UnlockedFile ,
60- action : F ,
62+ initialise : INIT ,
63+ action : ACTION ,
6164) -> Result < T , ClusterError >
6265where
63- F : std:: panic:: UnwindSafe + FnOnce ( ) -> T ,
66+ INIT : std:: panic:: UnwindSafe + FnOnce ( & ' a Cluster ) -> Result < ( ) , ClusterError > ,
67+ ACTION : std:: panic:: UnwindSafe + FnOnce ( & ' a Cluster ) -> T ,
6468{
65- let lock = startup ( cluster, lock) ?;
66- let action_res = std:: panic:: catch_unwind ( action) ;
69+ let lock = startup ( cluster, lock, initialise ) ?;
70+ let action_res = std:: panic:: catch_unwind ( || action ( cluster ) ) ;
6771 let shutdown_res = shutdown ( cluster, lock, |cluster| cluster. destroy ( ) ) ;
6872 match action_res {
6973 Ok ( result) => shutdown_res. map ( |_| result) ,
7074 Err ( err) => std:: panic:: resume_unwind ( err) ,
7175 }
7276}
7377
74- fn startup (
75- cluster : & Cluster ,
78+ fn startup < ' a , INIT > (
79+ cluster : & ' a Cluster ,
7680 mut lock : lock:: UnlockedFile ,
77- ) -> Result < lock:: LockedFileShared , ClusterError > {
81+ initialise : INIT ,
82+ ) -> Result < lock:: LockedFileShared , ClusterError >
83+ where
84+ INIT : std:: panic:: UnwindSafe + FnOnce ( & ' a Cluster ) -> Result < ( ) , ClusterError > ,
85+ {
7886 loop {
7987 lock = match lock. try_lock_exclusive ( ) {
8088 Ok ( Left ( lock) ) => {
@@ -85,6 +93,8 @@ fn startup(
8593 // exclusive lock, so we must check if the cluster is
8694 // running _now_, else loop back to the top again.
8795 if cluster. running ( ) ? {
96+ // Perform post-start initialisation.
97+ initialise ( cluster) ?;
8898 return Ok ( lock) ;
8999 } else {
90100 // Release all locks then sleep for a random time between
@@ -101,7 +111,9 @@ fn startup(
101111 }
102112 }
103113 Ok ( Right ( lock) ) => {
104- // We have an exclusive lock, so try to start the cluster.
114+ // We have an exclusive lock. Perform pre-start initialisation.
115+ initialise ( cluster) ?;
116+ // Now try to start the cluster.
105117 cluster. start ( ) ?;
106118 // Once started, downgrade to a shared log.
107119 return Ok ( lock. lock_shared ( ) ?) ;
0 commit comments