@@ -19,11 +19,8 @@ use ic_registry_local_store::{compact_delta_to_changelog, LocalStoreImpl, LocalS
19
19
use ic_registry_proto_data_provider:: { ProtoRegistryDataProvider , ProtoRegistryDataProviderError } ;
20
20
use ic_registry_routing_table:: { routing_table_insert_subnet, CanisterMigrations , RoutingTable } ;
21
21
use ic_registry_subnet_features:: EcdsaConfig ;
22
+ use ic_test_utilities:: state:: CanisterStateBuilder ;
22
23
use ic_test_utilities:: state_manager:: FakeStateManager ;
23
- use ic_test_utilities:: {
24
- notification:: { Notification , WaitResult } ,
25
- state:: CanisterStateBuilder ,
26
- } ;
27
24
use ic_test_utilities_logger:: with_test_replica_logger;
28
25
use ic_test_utilities_metrics:: { fetch_int_counter_vec, metric_vec} ;
29
26
use ic_test_utilities_registry:: SubnetRecordBuilder ;
@@ -61,8 +58,144 @@ fn assert_deliver_batch_count_eq(
61
58
) ;
62
59
}
63
60
61
+ mod notification {
62
+ use std:: sync:: { Condvar , Mutex } ;
63
+ use std:: time:: Duration ;
64
+
65
+ /// One-off notification that can be used to synchronize two threads.
66
+ ///
67
+ /// ```no_run
68
+ /// use notification::{Notification, WaitResult};
69
+ /// use std::sync::Arc;
70
+ /// use std::thread;
71
+ /// use std::time::Duration;
72
+ ///
73
+ /// let notification = Arc::new(Notification::new());
74
+ /// let handle = thread::spawn({
75
+ /// let notification = Arc::clone(¬ification);
76
+ /// move || {
77
+ /// assert_eq!(
78
+ /// notification.wait_with_timeout(Duration::from_secs(10)),
79
+ /// WaitResult::Notified(()),
80
+ /// );
81
+ /// }
82
+ /// });
83
+ /// notification.notify(());
84
+ /// handle.join().unwrap();
85
+ /// ```
86
+ pub struct Notification < T > {
87
+ mutex : Mutex < Option < T > > ,
88
+ condvar : Condvar ,
89
+ }
90
+
91
+ /// The result of the `wait_with_timeout` call.
92
+ #[ derive( Clone , Copy , Debug , PartialEq , Eq , Hash ) ]
93
+ pub enum WaitResult < T > {
94
+ Notified ( T ) ,
95
+ TimedOut ,
96
+ }
97
+
98
+ impl < T > Default for Notification < T > {
99
+ fn default ( ) -> Self {
100
+ Self {
101
+ mutex : Mutex :: new ( None ) ,
102
+ condvar : Condvar :: new ( ) ,
103
+ }
104
+ }
105
+ }
106
+
107
+ impl < T > Notification < T >
108
+ where
109
+ T : Clone + std:: fmt:: Debug + PartialEq ,
110
+ {
111
+ /// Create a new notification that is not raised.
112
+ pub fn new ( ) -> Self {
113
+ Self :: default ( )
114
+ }
115
+
116
+ /// Set the notification to the raised state. Once notification is
117
+ /// saturated, it stays saturated forever, there is no way to reset it.
118
+ ///
119
+ /// # Panics
120
+ ///
121
+ /// Panics if `notify` is called twice with different values.
122
+ pub fn notify ( & self , value : T ) {
123
+ {
124
+ let mut guard = self . mutex . lock ( ) . unwrap ( ) ;
125
+ if let Some ( ref old_value) = * guard {
126
+ if value != * old_value {
127
+ panic ! (
128
+ "Notified twice with different values: first {:?}, then {:?}" ,
129
+ old_value, value
130
+ ) ;
131
+ } else {
132
+ return ;
133
+ }
134
+ }
135
+ * guard = Some ( value) ;
136
+ }
137
+ self . condvar . notify_all ( ) ;
138
+ }
139
+
140
+ /// Wait for another thread to call `notify`, but no longer than
141
+ /// `duration`.
142
+ ///
143
+ /// Returns `WaitResult::Notified(T)` if the notification was raised and
144
+ /// `WaitResult::TimedOut` if it didn't happen within `duration`.
145
+ pub fn wait_with_timeout ( & self , duration : Duration ) -> WaitResult < T > {
146
+ let guard = self . mutex . lock ( ) . unwrap ( ) ;
147
+
148
+ if let Some ( ref value) = * guard {
149
+ return WaitResult :: Notified ( value. clone ( ) ) ;
150
+ }
151
+
152
+ let ( guard, _result) = self . condvar . wait_timeout ( guard, duration) . unwrap ( ) ;
153
+ match * guard {
154
+ Some ( ref value) => WaitResult :: Notified ( value. clone ( ) ) ,
155
+ None => WaitResult :: TimedOut ,
156
+ }
157
+ }
158
+ }
159
+
160
+ #[ test]
161
+ fn test_single_threaded_notification ( ) {
162
+ let notification = Notification :: < i32 > :: new ( ) ;
163
+ assert_eq ! (
164
+ notification. wait_with_timeout( Duration :: from_millis( 0 ) ) ,
165
+ WaitResult :: TimedOut
166
+ ) ;
167
+
168
+ notification. notify ( 1 ) ;
169
+ assert_eq ! (
170
+ notification. wait_with_timeout( Duration :: from_millis( 0 ) ) ,
171
+ WaitResult :: Notified ( 1 )
172
+ ) ;
173
+ }
174
+
175
+ #[ test]
176
+ #[ should_panic( expected = "first 1, then 2" ) ]
177
+ fn test_panics_on_incompatible_notify ( ) {
178
+ let notification = Notification :: < i32 > :: new ( ) ;
179
+ notification. notify ( 1 ) ;
180
+ notification. notify ( 2 ) ;
181
+ }
182
+
183
+ #[ test]
184
+ fn test_notify_same_value_twice_does_not_panic ( ) {
185
+ let notification = Notification :: < i32 > :: new ( ) ;
186
+ notification. notify ( 1 ) ;
187
+ notification. notify ( 1 ) ;
188
+ assert_eq ! (
189
+ notification. wait_with_timeout( Duration :: from_millis( 0 ) ) ,
190
+ WaitResult :: Notified ( 1 )
191
+ ) ;
192
+ }
193
+ }
194
+
64
195
#[ test]
65
196
fn message_routing_does_not_block ( ) {
197
+ use notification:: { Notification , WaitResult } ;
198
+
66
199
with_test_replica_logger ( |log| {
67
200
let timeout = Duration :: from_secs ( 10 ) ;
68
201
0 commit comments