-
Notifications
You must be signed in to change notification settings - Fork 852
/
JobRunner.cs
663 lines (586 loc) Β· 33.6 KB
/
JobRunner.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
using Agent.Sdk;
using Agent.Sdk.Knob;
using Agent.Sdk.Util;
using Microsoft.TeamFoundation.DistributedTask.WebApi;
using Pipelines = Microsoft.TeamFoundation.DistributedTask.Pipelines;
using Microsoft.VisualStudio.Services.Agent.Util;
using Microsoft.VisualStudio.Services.Common;
using Microsoft.VisualStudio.Services.WebApi;
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Net.Http;
using Newtonsoft.Json.Linq;
using Newtonsoft.Json;
using Microsoft.VisualStudio.Services.Agent.Worker.Telemetry;
namespace Microsoft.VisualStudio.Services.Agent.Worker
{
[ServiceLocator(Default = typeof(JobRunner))]
public interface IJobRunner : IAgentService
{
Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message, CancellationToken jobRequestCancellationToken);
void UpdateMetadata(JobMetadataMessage message);
}
public sealed class JobRunner : AgentService, IJobRunner
{
private IJobServerQueue _jobServerQueue;
private ITempDirectoryManager _tempDirectoryManager;
/// <summary>
/// Add public accessor for _jobServerQueue to make JobRunner more testable
/// See /Test/L0/Worker/JobRunnerL0.cs
/// </summary>
public IJobServerQueue JobServerQueue
{
set => _jobServerQueue = value;
}
public async Task<TaskResult> RunAsync(Pipelines.AgentJobRequestMessage message, CancellationToken jobRequestCancellationToken)
{
// Validate parameters.
Trace.Entering();
ArgUtil.NotNull(message, nameof(message));
ArgUtil.NotNull(message.Resources, nameof(message.Resources));
ArgUtil.NotNull(message.Variables, nameof(message.Variables));
ArgUtil.NotNull(message.Steps, nameof(message.Steps));
Trace.Info("Job ID {0}", message.JobId);
DateTime jobStartTimeUtc = DateTime.UtcNow;
ServiceEndpoint systemConnection = message.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
bool skipServerCertificateValidation = HostContext.GetService<IAgentCertificateManager>().SkipServerCertificateValidation;
// System.AccessToken
if (message.Variables.ContainsKey(Constants.Variables.System.EnableAccessToken) &&
StringUtil.ConvertToBoolean(message.Variables[Constants.Variables.System.EnableAccessToken].Value))
{
message.Variables[Constants.Variables.System.AccessToken] = new VariableValue(systemConnection.Authorization.Parameters["AccessToken"], false);
}
// back compat TfsServerUrl
message.Variables[Constants.Variables.System.TFServerUrl] = systemConnection.Url.AbsoluteUri;
// Make sure SystemConnection Url and Endpoint Url match Config Url base for OnPremises server
// System.ServerType will always be there after M133
if (!message.Variables.ContainsKey(Constants.Variables.System.ServerType) ||
string.Equals(message.Variables[Constants.Variables.System.ServerType]?.Value, "OnPremises", StringComparison.OrdinalIgnoreCase))
{
ReplaceConfigUriBaseInJobRequestMessage(message);
}
// Setup the job server and job server queue.
var jobServer = HostContext.GetService<IJobServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
Uri jobServerUrl = systemConnection.Url;
Trace.Info($"Creating job server with URL: {jobServerUrl}");
// jobServerQueue is the throttling reporter.
_jobServerQueue = HostContext.GetService<IJobServerQueue>();
VssConnection jobConnection = VssUtil.CreateConnection(
jobServerUrl,
jobServerCredential,
Trace,
skipServerCertificateValidation,
new DelegatingHandler[] { new ThrottlingReportHandler(_jobServerQueue) }
);
await jobServer.ConnectAsync(jobConnection);
_jobServerQueue.Start(message);
HostContext.WritePerfCounter($"WorkerJobServerQueueStarted_{message.RequestId.ToString()}");
IExecutionContext jobContext = null;
CancellationTokenRegistration? agentShutdownRegistration = null;
VssConnection taskConnection = null;
VssConnection legacyTaskConnection = null;
IResourceMetricsManager resourceDiagnosticManager = null;
try
{
// Create the job execution context.
jobContext = HostContext.CreateService<IExecutionContext>();
jobContext.InitializeJob(message, jobRequestCancellationToken);
Trace.Info("Starting the job execution context.");
jobContext.Start();
jobContext.Section(StringUtil.Loc("StepStarting", message.JobDisplayName));
//Start Resource Diagnostics if enabled in the job message
jobContext.Variables.TryGetValue("system.debug", out var systemDebug);
resourceDiagnosticManager = HostContext.GetService<IResourceMetricsManager>();
if (string.Equals(systemDebug, "true", StringComparison.OrdinalIgnoreCase))
{
resourceDiagnosticManager.Setup(jobContext);
_ = resourceDiagnosticManager.RunDebugResourceMonitor();
}
agentShutdownRegistration = HostContext.AgentShutdownToken.Register(() =>
{
// log an issue, then agent get shutdown by Ctrl-C or Ctrl-Break.
// the server will use Ctrl-Break to tells the agent that operating system is shutting down.
string errorMessage;
switch (HostContext.AgentShutdownReason)
{
case ShutdownReason.UserCancelled:
errorMessage = StringUtil.Loc("UserShutdownAgent");
break;
case ShutdownReason.OperatingSystemShutdown:
errorMessage = StringUtil.Loc("OperatingSystemShutdown", Environment.MachineName);
break;
default:
throw new ArgumentException(HostContext.AgentShutdownReason.ToString(), nameof(HostContext.AgentShutdownReason));
}
jobContext.AddIssue(new Issue() { Type = IssueType.Error, Message = errorMessage });
});
// Validate directory permissions.
string workDirectory = HostContext.GetDirectory(WellKnownDirectory.Work);
Trace.Info($"Validating directory permissions for: '{workDirectory}'");
try
{
Directory.CreateDirectory(workDirectory);
IOUtil.ValidateExecutePermission(workDirectory);
}
catch (Exception ex)
{
Trace.Error(ex);
jobContext.Error(ex);
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed);
}
// Set agent variables.
AgentSettings settings = HostContext.GetService<IConfigurationStore>().GetSettings();
jobContext.SetVariable(Constants.Variables.Agent.Id, settings.AgentId.ToString(CultureInfo.InvariantCulture));
jobContext.SetVariable(Constants.Variables.Agent.HomeDirectory, HostContext.GetDirectory(WellKnownDirectory.Root), isFilePath: true);
jobContext.SetVariable(Constants.Variables.Agent.JobName, message.JobDisplayName);
jobContext.SetVariable(Constants.Variables.Agent.CloudId, settings.AgentCloudId);
jobContext.SetVariable(Constants.Variables.Agent.IsSelfHosted, settings.IsMSHosted ? "0" : "1");
jobContext.SetVariable(Constants.Variables.Agent.MachineName, Environment.MachineName);
jobContext.SetVariable(Constants.Variables.Agent.Name, settings.AgentName);
jobContext.SetVariable(Constants.Variables.Agent.OS, VarUtil.OS);
jobContext.SetVariable(Constants.Variables.Agent.OSArchitecture, VarUtil.OSArchitecture);
jobContext.SetVariable(Constants.Variables.Agent.RootDirectory, HostContext.GetDirectory(WellKnownDirectory.Work), isFilePath: true);
if (PlatformUtil.RunningOnWindows)
{
jobContext.SetVariable(Constants.Variables.Agent.ServerOMDirectory, HostContext.GetDirectory(WellKnownDirectory.ServerOM), isFilePath: true);
}
if (!PlatformUtil.RunningOnWindows)
{
jobContext.SetVariable(Constants.Variables.Agent.AcceptTeeEula, settings.AcceptTeeEula.ToString());
}
jobContext.SetVariable(Constants.Variables.Agent.WorkFolder, HostContext.GetDirectory(WellKnownDirectory.Work), isFilePath: true);
jobContext.SetVariable(Constants.Variables.System.WorkFolder, HostContext.GetDirectory(WellKnownDirectory.Work), isFilePath: true);
var azureVmCheckCommand = jobContext.GetHostContext().GetService<IAsyncCommandContext>();
azureVmCheckCommand.InitializeCommandContext(jobContext, Constants.AsyncExecution.Commands.Names.GetAzureVMMetada);
azureVmCheckCommand.Task = Task.Run(() => jobContext.SetVariable(Constants.Variables.System.IsAzureVM, PlatformUtil.DetectAzureVM() ? "1" : "0"));
jobContext.AsyncCommands.Add(azureVmCheckCommand);
var dockerDetectCommand = jobContext.GetHostContext().GetService<IAsyncCommandContext>();
dockerDetectCommand.InitializeCommandContext(jobContext, Constants.AsyncExecution.Commands.Names.DetectDockerContainer);
dockerDetectCommand.Task = Task.Run(() => jobContext.SetVariable(Constants.Variables.System.IsDockerContainer, PlatformUtil.DetectDockerContainer() ? "1" : "0"));
jobContext.AsyncCommands.Add(dockerDetectCommand);
string toolsDirectory = HostContext.GetDirectory(WellKnownDirectory.Tools);
Directory.CreateDirectory(toolsDirectory);
jobContext.SetVariable(Constants.Variables.Agent.ToolsDirectory, toolsDirectory, isFilePath: true);
if (AgentKnobs.DisableGitPrompt.GetValue(jobContext).AsBoolean())
{
jobContext.SetVariable("GIT_TERMINAL_PROMPT", "0");
}
// Setup TEMP directories
_tempDirectoryManager = HostContext.GetService<ITempDirectoryManager>();
_tempDirectoryManager.InitializeTempDirectory(jobContext);
// todo: task server can throw. try/catch and fail job gracefully.
// prefer task definitions url, then TFS collection url, then TFS account url
var taskServer = HostContext.GetService<ITaskServer>();
Uri taskServerUri = null;
if (!string.IsNullOrEmpty(jobContext.Variables.System_TaskDefinitionsUri))
{
taskServerUri = new Uri(jobContext.Variables.System_TaskDefinitionsUri);
}
else if (!string.IsNullOrEmpty(jobContext.Variables.System_TFCollectionUrl))
{
taskServerUri = new Uri(jobContext.Variables.System_TFCollectionUrl);
}
var taskServerCredential = VssUtil.GetVssCredential(systemConnection);
if (taskServerUri != null)
{
Trace.Info($"Creating task server with {taskServerUri}");
taskConnection = VssUtil.CreateConnection(taskServerUri, taskServerCredential, Trace, skipServerCertificateValidation);
await taskServer.ConnectAsync(taskConnection);
}
// for back compat TFS 2015 RTM/QU1, we may need to switch the task server url to agent config url
if (!string.Equals(message?.Variables.GetValueOrDefault(Constants.Variables.System.ServerType)?.Value, "Hosted", StringComparison.OrdinalIgnoreCase))
{
if (taskServerUri == null || !await taskServer.TaskDefinitionEndpointExist())
{
Trace.Info($"Can't determine task download url from JobMessage or the endpoint doesn't exist.");
var configStore = HostContext.GetService<IConfigurationStore>();
taskServerUri = new Uri(configStore.GetSettings().ServerUrl);
Trace.Info($"Recreate task server with configuration server url: {taskServerUri}");
legacyTaskConnection = VssUtil.CreateConnection(taskServerUri, taskServerCredential, trace: Trace, skipServerCertificateValidation);
await taskServer.ConnectAsync(legacyTaskConnection);
}
}
// Expand the endpoint data values.
foreach (ServiceEndpoint endpoint in jobContext.Endpoints)
{
jobContext.Variables.ExpandValues(target: endpoint.Data);
VarUtil.ExpandEnvironmentVariables(HostContext, target: endpoint.Data);
}
// Expand the repository property values.
foreach (var repository in jobContext.Repositories)
{
// expand checkout option
var checkoutOptions = repository.Properties.Get<JToken>(Pipelines.RepositoryPropertyNames.CheckoutOptions);
if (checkoutOptions != null)
{
checkoutOptions = jobContext.Variables.ExpandValues(target: checkoutOptions);
checkoutOptions = VarUtil.ExpandEnvironmentVariables(HostContext, target: checkoutOptions);
repository.Properties.Set<JToken>(Pipelines.RepositoryPropertyNames.CheckoutOptions, checkoutOptions);
}
// expand workspace mapping
var mappings = repository.Properties.Get<JToken>(Pipelines.RepositoryPropertyNames.Mappings);
if (mappings != null)
{
mappings = jobContext.Variables.ExpandValues(target: mappings);
mappings = VarUtil.ExpandEnvironmentVariables(HostContext, target: mappings);
repository.Properties.Set<JToken>(Pipelines.RepositoryPropertyNames.Mappings, mappings);
}
}
// Expand container properties
foreach (var container in jobContext.Containers)
{
this.ExpandProperties(container, jobContext.Variables);
}
foreach (var sidecar in jobContext.SidecarContainers)
{
this.ExpandProperties(sidecar, jobContext.Variables);
}
// Send telemetry in case if git is preinstalled on windows platform
var isSelfHosted = StringUtil.ConvertToBoolean(jobContext.Variables.Get(Constants.Variables.Agent.IsSelfHosted));
if (PlatformUtil.RunningOnWindows && isSelfHosted)
{
var windowsPreinstalledGitCommand = jobContext.GetHostContext().GetService<IAsyncCommandContext>();
windowsPreinstalledGitCommand.InitializeCommandContext(jobContext, Constants.AsyncExecution.Commands.Names.WindowsPreinstalledGitTelemetry);
windowsPreinstalledGitCommand.Task = Task.Run(() =>
{
var hasPreinstalledGit = false;
var filePath = WhichUtil.Which("git.exe", require: false, trace: null);
if (!string.IsNullOrEmpty(filePath))
{
hasPreinstalledGit = true;
}
PublishTelemetry(context: jobContext, area: "PipelinesTasks", feature: "WindowsGitTelemetry", properties: new Dictionary<string, string>
{
{ "hasPreinstalledGit", hasPreinstalledGit.ToString() }
});
});
jobContext.AsyncCommands.Add(windowsPreinstalledGitCommand);
}
// Get the job extension.
Trace.Info("Getting job extension.");
var hostType = jobContext.Variables.System_HostType;
var extensionManager = HostContext.GetService<IExtensionManager>();
// We should always have one job extension
IJobExtension jobExtension =
(extensionManager.GetExtensions<IJobExtension>() ?? new List<IJobExtension>())
.Where(x => x.HostType.HasFlag(hostType))
.FirstOrDefault();
ArgUtil.NotNull(jobExtension, nameof(jobExtension));
List<IStep> jobSteps = null;
try
{
Trace.Info("Initialize job. Getting all job steps.");
jobSteps = await jobExtension.InitializeJob(jobContext, message);
}
catch (OperationCanceledException ex) when (jobContext.CancellationToken.IsCancellationRequested)
{
// set the job to canceled
// don't log error issue to job ExecutionContext, since server owns the job level issue
if (AgentKnobs.FailJobWhenAgentDies.GetValue(jobContext).AsBoolean() &&
HostContext.AgentShutdownToken.IsCancellationRequested)
{
PublishTelemetry(context: jobContext, area: "PipelinesTasks", feature: "AgentShutdown", properties: new Dictionary<string, string>
{
{ "JobId", jobContext.Variables.System_JobId.ToString() },
{ "JobResult", TaskResult.Failed.ToString() },
{ "TracePoint", "111"},
});
Trace.Error($"Job is canceled during initialize.");
Trace.Error($"Caught exception: {ex}");
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed);
}
else
{
Trace.Error($"Job is canceled during initialize.");
Trace.Error($"Caught exception: {ex}");
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Canceled);
}
}
catch (Exception ex)
{
// set the job to failed.
// don't log error issue to job ExecutionContext, since server owns the job level issue
Trace.Error($"Job initialize failed.");
Trace.Error($"Caught exception from {nameof(jobExtension.InitializeJob)}: {ex}");
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed);
}
// trace out all steps
Trace.Info($"Total job steps: {jobSteps.Count}.");
Trace.Verbose($"Job steps: '{string.Join(", ", jobSteps.Select(x => x.DisplayName))}'");
HostContext.WritePerfCounter($"WorkerJobInitialized_{message?.RequestId.ToString()}");
// Run all job steps
Trace.Info("Run all job steps.");
var stepsRunner = HostContext.GetService<IStepsRunner>();
try
{
await stepsRunner.RunAsync(jobContext, jobSteps);
}
catch (Exception ex)
{
// StepRunner should never throw exception out.
// End up here mean there is a bug in StepRunner
// Log the error and fail the job.
Trace.Error($"Caught exception from job steps {nameof(StepsRunner)}: {ex}");
jobContext.Error(ex);
return await CompleteJobAsync(jobServer, jobContext, message, TaskResult.Failed);
}
finally
{
Trace.Info("Finalize job.");
await jobExtension.FinalizeJob(jobContext);
}
Trace.Info($"Job result after all job steps finish: {jobContext.Result ?? TaskResult.Succeeded}");
if (jobContext.Variables.GetBoolean(Constants.Variables.Agent.Diagnostic) ?? false)
{
Trace.Info("Support log upload starting.");
IDiagnosticLogManager diagnosticLogManager = HostContext.GetService<IDiagnosticLogManager>();
try
{
await diagnosticLogManager.UploadDiagnosticLogsAsync(executionContext: jobContext, message: message, jobStartTimeUtc: jobStartTimeUtc);
Trace.Info("Support log upload complete.");
}
catch (Exception ex)
{
// Log the error but make sure we continue gracefully.
Trace.Info("Error uploading support logs.");
Trace.Error(ex);
}
}
Trace.Info("Completing the job execution context.");
return await CompleteJobAsync(jobServer, jobContext, message);
}
catch (AggregateException e)
{
ExceptionsUtil.HandleAggregateException((AggregateException)e, Trace.Error);
return TaskResult.Failed;
}
finally
{
if (agentShutdownRegistration != null)
{
agentShutdownRegistration.Value.Dispose();
agentShutdownRegistration = null;
}
legacyTaskConnection?.Dispose();
taskConnection?.Dispose();
jobConnection?.Dispose();
resourceDiagnosticManager?.Dispose();
await ShutdownQueue(throwOnFailure: false);
}
}
public void UpdateMetadata(JobMetadataMessage message)
{
if (message.PostLinesFrequencyMillis.HasValue && _jobServerQueue != null)
{
_jobServerQueue.UpdateWebConsoleLineRate(message.PostLinesFrequencyMillis.Value);
}
}
public void ExpandProperties(ContainerInfo container, Variables variables)
{
if (container == null || variables == null)
{
return;
}
// Expand port mapping
variables.ExpandValues(container.UserPortMappings);
// Expand volume mounts
variables.ExpandValues(container.UserMountVolumes);
foreach (var volume in container.UserMountVolumes.Values)
{
// After mount volume variables are expanded, they are final
container.MountVolumes.Add(new MountVolume(volume));
}
// Expand env vars
variables.ExpandValues(container.ContainerEnvironmentVariables);
// Expand image and options strings
container.ContainerImage = variables.ExpandValue(nameof(container.ContainerImage), container.ContainerImage);
container.ContainerCreateOptions = variables.ExpandValue(nameof(container.ContainerCreateOptions), container.ContainerCreateOptions);
}
private async Task<TaskResult> CompleteJobAsync(IJobServer jobServer, IExecutionContext jobContext, Pipelines.AgentJobRequestMessage message, TaskResult? taskResult = null)
{
ArgUtil.NotNull(message, nameof(message));
jobContext.Section(StringUtil.Loc("StepFinishing", message.JobDisplayName));
TaskResult result = jobContext.Complete(taskResult);
try
{
await ShutdownQueue(throwOnFailure: true);
}
catch (AggregateException ex)
{
ExceptionsUtil.HandleAggregateException((AggregateException)ex, Trace.Error);
result = TaskResultUtil.MergeTaskResults(result, TaskResult.Failed);
}
catch (Exception ex)
{
Trace.Error($"Caught exception from {nameof(JobServerQueue)}.{nameof(_jobServerQueue.ShutdownAsync)}");
Trace.Error("This indicate a failure during publish output variables. Fail the job to prevent unexpected job outputs.");
Trace.Error(ex);
result = TaskResultUtil.MergeTaskResults(result, TaskResult.Failed);
}
// Clean TEMP after finish process jobserverqueue, since there might be a pending fileupload still use the TEMP dir.
_tempDirectoryManager?.CleanupTempDirectory();
if (!jobContext.Features.HasFlag(PlanFeatures.JobCompletedPlanEvent))
{
Trace.Info($"Skip raise job completed event call from worker because Plan version is {message.Plan.Version}");
return result;
}
Trace.Info("Raising job completed event.");
var jobCompletedEvent = new JobCompletedEvent(message.RequestId, message.JobId, result,
jobContext.Variables.Get(Constants.Variables.Agent.RunMode) == Constants.Agent.CommandLine.Flags.Once);
var completeJobRetryLimit = 5;
var exceptions = new List<Exception>();
while (completeJobRetryLimit-- > 0)
{
try
{
await jobServer.RaisePlanEventAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, jobCompletedEvent, default(CancellationToken));
return result;
}
catch (TaskOrchestrationPlanNotFoundException ex)
{
Trace.Error($"TaskOrchestrationPlanNotFoundException received, while attempting to raise JobCompletedEvent for job {message.JobId}.");
Trace.Error(ex);
return TaskResult.Failed;
}
catch (TaskOrchestrationPlanSecurityException ex)
{
Trace.Error($"TaskOrchestrationPlanSecurityException received, while attempting to raise JobCompletedEvent for job {message.JobId}.");
Trace.Error(ex);
return TaskResult.Failed;
}
catch (Exception ex)
{
Trace.Error($"Catch exception while attempting to raise JobCompletedEvent for job {message.JobId}, job request {message.RequestId}.");
Trace.Error(ex);
exceptions.Add(ex);
}
// delay 5 seconds before next retry.
await Task.Delay(TimeSpan.FromSeconds(5));
}
// rethrow exceptions from all attempts.
throw new AggregateException(exceptions);
}
private async Task ShutdownQueue(bool throwOnFailure)
{
if (_jobServerQueue != null)
{
try
{
Trace.Info("Shutting down the job server queue.");
await _jobServerQueue.ShutdownAsync();
}
catch (AggregateException ex)
{
ExceptionsUtil.HandleAggregateException((AggregateException)ex, Trace.Error);
}
catch (Exception ex) when (!throwOnFailure)
{
Trace.Error($"Caught exception from {nameof(JobServerQueue)}.{nameof(_jobServerQueue.ShutdownAsync)}");
Trace.Error(ex);
}
finally
{
_jobServerQueue = null; // Prevent multiple attempts.
}
}
}
// the scheme://hostname:port (how the agent knows the server) is external to our server
// in other words, an agent may have it's own way (DNS, hostname) of refering
// to the server. it owns that. That's the scheme://hostname:port we will use.
// Example: Server's notification url is http://tfsserver:8080/tfs
// Agent config url is https://tfsserver.mycompany.com:9090/tfs
private Uri ReplaceWithConfigUriBase(Uri messageUri)
{
AgentSettings settings = HostContext.GetService<IConfigurationStore>().GetSettings();
try
{
Uri result = null;
Uri configUri = new Uri(settings.ServerUrl);
if (Uri.TryCreate(new Uri(configUri.GetComponents(UriComponents.SchemeAndServer, UriFormat.Unescaped)), messageUri.PathAndQuery, out result))
{
//replace the schema and host portion of messageUri with the host from the
//server URI (which was set at config time)
return result;
}
}
catch (InvalidOperationException ex)
{
//cannot parse the Uri - not a fatal error
Trace.Error(ex);
}
catch (UriFormatException ex)
{
//cannot parse the Uri - not a fatal error
Trace.Error(ex);
}
return messageUri;
}
private void ReplaceConfigUriBaseInJobRequestMessage(Pipelines.AgentJobRequestMessage message)
{
ServiceEndpoint systemConnection = message.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
Uri systemConnectionUrl = systemConnection.Url;
// fixup any endpoint Url that match SystemConnection Url.
foreach (var endpoint in message.Resources.Endpoints)
{
if (Uri.Compare(endpoint.Url, systemConnectionUrl, UriComponents.SchemeAndServer, UriFormat.Unescaped, StringComparison.OrdinalIgnoreCase) == 0)
{
endpoint.Url = ReplaceWithConfigUriBase(endpoint.Url);
Trace.Info($"Ensure endpoint url match config url base. {endpoint.Url}");
}
}
// fixup any repository Url that match SystemConnection Url.
foreach (var repo in message.Resources.Repositories)
{
if (Uri.Compare(repo.Url, systemConnectionUrl, UriComponents.SchemeAndServer, UriFormat.Unescaped, StringComparison.OrdinalIgnoreCase) == 0)
{
repo.Url = ReplaceWithConfigUriBase(repo.Url);
Trace.Info($"Ensure repository url match config url base. {repo.Url}");
}
}
// fixup well known variables. (taskDefinitionsUrl, tfsServerUrl, tfsCollectionUrl)
if (message.Variables.ContainsKey(WellKnownDistributedTaskVariables.TaskDefinitionsUrl))
{
string taskDefinitionsUrl = message.Variables[WellKnownDistributedTaskVariables.TaskDefinitionsUrl].Value;
message.Variables[WellKnownDistributedTaskVariables.TaskDefinitionsUrl] = ReplaceWithConfigUriBase(new Uri(taskDefinitionsUrl)).AbsoluteUri;
Trace.Info($"Ensure System.TaskDefinitionsUrl match config url base. {message.Variables[WellKnownDistributedTaskVariables.TaskDefinitionsUrl].Value}");
}
if (message.Variables.ContainsKey(WellKnownDistributedTaskVariables.TFCollectionUrl))
{
string tfsCollectionUrl = message.Variables[WellKnownDistributedTaskVariables.TFCollectionUrl].Value;
message.Variables[WellKnownDistributedTaskVariables.TFCollectionUrl] = ReplaceWithConfigUriBase(new Uri(tfsCollectionUrl)).AbsoluteUri;
Trace.Info($"Ensure System.TFCollectionUrl match config url base. {message.Variables[WellKnownDistributedTaskVariables.TFCollectionUrl].Value}");
}
if (message.Variables.ContainsKey(Constants.Variables.System.TFServerUrl))
{
string tfsServerUrl = message.Variables[Constants.Variables.System.TFServerUrl].Value;
message.Variables[Constants.Variables.System.TFServerUrl] = ReplaceWithConfigUriBase(new Uri(tfsServerUrl)).AbsoluteUri;
Trace.Info($"Ensure System.TFServerUrl match config url base. {message.Variables[Constants.Variables.System.TFServerUrl].Value}");
}
}
private void PublishTelemetry(IExecutionContext context, string area, String feature, Dictionary<string, string> properties)
{
try
{
var cmd = new Command("telemetry", "publish");
cmd.Data = JsonConvert.SerializeObject(properties, Formatting.None);
cmd.Properties.Add("area", area);
cmd.Properties.Add("feature", feature);
var publishTelemetryCmd = new TelemetryCommandExtension();
publishTelemetryCmd.Initialize(HostContext);
publishTelemetryCmd.ProcessCommand(context, cmd);
}
catch (Exception ex)
{
Trace.Warning($"Unable to publish agent shutdown telemetry data. Exception: {ex}");
}
}
}
}