@@ -30,10 +30,10 @@ public SqlTriggerBindingIntegrationTests(ITestOutputHelper output) : base(output
3030 public async Task SingleOperationTriggerTest ( )
3131 {
3232 this . EnableChangeTrackingForTable ( "Products" ) ;
33- this . StartFunctionHost ( nameof ( ProductsTrigger ) , Common . SupportedLanguages . CSharp ) ;
3433
3534 var changes = new List < SqlChange < Product > > ( ) ;
36- this . MonitorProductChanges ( changes ) ;
35+ DataReceivedEventHandler [ ] changeHandlers = new [ ] { this . GetProductChangeHandler ( changes , "SQL Changes: " ) } ;
36+ this . StartFunctionHost ( nameof ( ProductsTrigger ) , Common . SupportedLanguages . CSharp , useTestFolder : false , changeHandlers ) ;
3737
3838 // Considering the polling interval of 5 seconds and batch-size of 10, it should take around 15 seconds to
3939 // process 30 insert operations. Similar reasoning is used to set delays for update and delete operations.
@@ -65,10 +65,10 @@ public async Task SingleOperationTriggerTest()
6565 public async Task MultiOperationTriggerTest ( )
6666 {
6767 this . EnableChangeTrackingForTable ( "Products" ) ;
68- this . StartFunctionHost ( nameof ( ProductsTrigger ) , Common . SupportedLanguages . CSharp ) ;
6968
7069 var changes = new List < SqlChange < Product > > ( ) ;
71- this . MonitorProductChanges ( changes ) ;
70+ DataReceivedEventHandler [ ] changeHandlers = new [ ] { this . GetProductChangeHandler ( changes , "SQL Changes: " ) } ;
71+ this . StartFunctionHost ( nameof ( ProductsTrigger ) , Common . SupportedLanguages . CSharp , useTestFolder : false , changeHandlers ) ;
7272
7373 // Insert + multiple updates to a row are treated as single insert with latest row values.
7474 this . InsertProducts ( 1 , 5 ) ;
@@ -97,13 +97,51 @@ public async Task MultiOperationTriggerTest()
9797 changes . Clear ( ) ;
9898 }
9999
100+ /// <summary>
101+ /// Ensures correct functionality with user functions running across multiple functions host processes.
102+ /// </summary>
103+ [ Fact ]
104+ public async Task MultiHostTriggerTest ( )
105+ {
106+ this . EnableChangeTrackingForTable ( "Products" ) ;
107+
108+ var changes = new List < SqlChange < Product > > ( ) ;
109+ DataReceivedEventHandler [ ] changeHandlers = new [ ] { this . GetProductChangeHandler ( changes , "SQL Changes: " ) } ;
110+
111+ // Prepare three function host processes
112+ this . StartFunctionHost ( nameof ( ProductsTrigger ) , Common . SupportedLanguages . CSharp , useTestFolder : false , changeHandlers ) ;
113+ this . StartFunctionHost ( nameof ( ProductsTrigger ) , Common . SupportedLanguages . CSharp , useTestFolder : false , changeHandlers ) ;
114+ this . StartFunctionHost ( nameof ( ProductsTrigger ) , Common . SupportedLanguages . CSharp , useTestFolder : false , changeHandlers ) ;
115+
116+ // Considering the polling interval of 5 seconds and batch-size of 10, it should take around 15 seconds to
117+ // process 90 insert operations across all functions host processes. Similar reasoning is used to set delays
118+ // for update and delete operations.
119+ this . InsertProducts ( 1 , 90 ) ;
120+ await Task . Delay ( TimeSpan . FromSeconds ( 20 ) ) ;
121+ ValidateProductChanges ( changes , 1 , 90 , SqlChangeOperation . Insert , id => $ "Product { id } ", id => id * 100 ) ;
122+ changes . Clear ( ) ;
123+
124+ // All table columns (not just the columns that were updated) would be returned for update operation.
125+ this . UpdateProducts ( 1 , 60 ) ;
126+ await Task . Delay ( TimeSpan . FromSeconds ( 15 ) ) ;
127+ ValidateProductChanges ( changes , 1 , 60 , SqlChangeOperation . Update , id => $ "Updated Product { id } ", id => id * 100 ) ;
128+ changes . Clear ( ) ;
129+
130+ // The properties corresponding to non-primary key columns would be set to the C# type's default values
131+ // (null and 0) for delete operation.
132+ this . DeleteProducts ( 31 , 90 ) ;
133+ await Task . Delay ( TimeSpan . FromSeconds ( 15 ) ) ;
134+ ValidateProductChanges ( changes , 31 , 90 , SqlChangeOperation . Delete , _ => null , _ => 0 ) ;
135+ changes . Clear ( ) ;
136+ }
137+
100138 /// <summary>
101139 /// Tests the error message when the user table is not present in the database.
102140 /// </summary>
103141 [ Fact ]
104142 public void TableNotPresentTriggerTest ( )
105143 {
106- this . StartFunctionsHostAndWaitForError (
144+ this . StartFunctionHostAndWaitForError (
107145 nameof ( TableNotPresentTrigger ) ,
108146 true ,
109147 "Could not find table: 'dbo.TableNotPresent'." ) ;
@@ -115,7 +153,7 @@ public void TableNotPresentTriggerTest()
115153 [ Fact ]
116154 public void PrimaryKeyNotCreatedTriggerTest ( )
117155 {
118- this . StartFunctionsHostAndWaitForError (
156+ this . StartFunctionHostAndWaitForError (
119157 nameof ( PrimaryKeyNotPresentTrigger ) ,
120158 true ,
121159 "Could not find primary key created in table: 'dbo.ProductsWithoutPrimaryKey'." ) ;
@@ -128,7 +166,7 @@ public void PrimaryKeyNotCreatedTriggerTest()
128166 [ Fact ]
129167 public void ReservedPrimaryKeyColumnNamesTriggerTest ( )
130168 {
131- this . StartFunctionsHostAndWaitForError (
169+ this . StartFunctionHostAndWaitForError (
132170 nameof ( ReservedPrimaryKeyColumnNamesTrigger ) ,
133171 true ,
134172 "Found reserved column name(s): '_az_func_ChangeVersion', '_az_func_AttemptCount', '_az_func_LeaseExpirationTime' in table: 'dbo.ProductsWithReservedPrimaryKeyColumnNames'." +
@@ -141,7 +179,7 @@ public void ReservedPrimaryKeyColumnNamesTriggerTest()
141179 [ Fact ]
142180 public void UnsupportedColumnTypesTriggerTest ( )
143181 {
144- this . StartFunctionsHostAndWaitForError (
182+ this . StartFunctionHostAndWaitForError (
145183 nameof ( UnsupportedColumnTypesTrigger ) ,
146184 true ,
147185 "Found column(s) with unsupported type(s): 'Location' (type: geography), 'Geometry' (type: geometry), 'Organization' (type: hierarchyid)" +
@@ -154,7 +192,7 @@ public void UnsupportedColumnTypesTriggerTest()
154192 [ Fact ]
155193 public void ChangeTrackingNotEnabledTriggerTest ( )
156194 {
157- this . StartFunctionsHostAndWaitForError (
195+ this . StartFunctionHostAndWaitForError (
158196 nameof ( ProductsTrigger ) ,
159197 false ,
160198 "Could not find change tracking enabled for table: 'dbo.Products'." ) ;
@@ -177,17 +215,22 @@ ALTER TABLE [dbo].[{tableName}]
177215 " ) ;
178216 }
179217
180- private void MonitorProductChanges ( List < SqlChange < Product > > changes )
218+ private DataReceivedEventHandler GetProductChangeHandler ( List < SqlChange < Product > > changes , string messagePrefix )
181219 {
182- int index = 0 ;
183- string prefix = "SQL Changes: " ;
220+ return ProductChangeHandler ;
184221
185- this . FunctionHost . OutputDataReceived += ( sender , e ) =>
222+ void ProductChangeHandler ( object sender , DataReceivedEventArgs e )
186223 {
187- if ( e . Data != null && ( index = e . Data . IndexOf ( prefix , StringComparison . Ordinal ) ) >= 0 )
224+ int index = 0 ;
225+
226+ if ( e . Data != null && ( index = e . Data . IndexOf ( messagePrefix , StringComparison . Ordinal ) ) >= 0 )
188227 {
189- string json = e . Data [ ( index + prefix . Length ) ..] ;
190- changes . AddRange ( JsonConvert . DeserializeObject < IReadOnlyList < SqlChange < Product > > > ( json ) ) ;
228+ string json = e . Data [ ( index + messagePrefix . Length ) ..] ;
229+
230+ lock ( changes )
231+ {
232+ changes . AddRange ( JsonConvert . DeserializeObject < IReadOnlyList < SqlChange < Product > > > ( json ) ) ;
233+ }
191234 }
192235 } ;
193236 }
@@ -248,7 +291,7 @@ private static void ValidateProductChanges(List<SqlChange<Product>> changes, int
248291 /// <param name="functionName">Name of the user function that should cause error in trigger listener</param>
249292 /// <param name="useTestFolder">Whether the functions host should be launched from test folder</param>
250293 /// <param name="expectedErrorMessage">Expected error message string</param>
251- private void StartFunctionsHostAndWaitForError ( string functionName , bool useTestFolder , string expectedErrorMessage )
294+ private void StartFunctionHostAndWaitForError ( string functionName , bool useTestFolder , string expectedErrorMessage )
252295 {
253296 string errorMessage = null ;
254297 var tcs = new TaskCompletionSource < bool > ( ) ;
@@ -268,7 +311,7 @@ void OutputHandler(object sender, DataReceivedEventArgs e)
268311 } ;
269312
270313 // All trigger integration tests are only using C# functions for testing at the moment.
271- this . StartFunctionHost ( functionName , Common . SupportedLanguages . CSharp , useTestFolder , OutputHandler ) ;
314+ this . StartFunctionHost ( functionName , Common . SupportedLanguages . CSharp , useTestFolder , new DataReceivedEventHandler [ ] { OutputHandler } ) ;
272315 this . FunctionHost . OutputDataReceived -= OutputHandler ;
273316 this . FunctionHost . Kill ( ) ;
274317
0 commit comments