Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions guidelines/mwm-developer-setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@ Note. if you already have docker container for Minio Rabbit etc running Stop the
- `kubectl apply -n argo -f https://raw.githubusercontent.com/argoproj/argo-workflows/master/manifests/quick-start-postgres.yaml`
- `kubectl config set-context --current --namespace=argo`

To disable argo authentication run

kubectl patch deployment \
argo-server \
--namespace argo \
--type='json' \
-p='[{"op": "replace", "path": "/spec/template/spec/containers/0/args", "value": [
"server",
"--auth-mode=server"
]}]'

Note. below Im using bash as its my preferred option, But if you to are using bash and your on windows (wsl2) you MUST make sure you windows .kube/config is also pointing to the same K8's cluster, this is because the code running in vs will look in there for the context to write k8's secrets too!

now in a bash window (can be cmd or powershell)
Expand Down
215 changes: 107 additions & 108 deletions src/TaskManager/Plug-ins/Argo/ArgoClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,9 @@

namespace Monai.Deploy.WorkflowManager.TaskManager.Argo
{
public interface IArgoClient
public class ArgoClient : BaseArgoClient, IArgoClient
{
Task<Workflow> Argo_CreateWorkflowAsync(string argoNamespace, WorkflowCreateRequest body, CancellationToken cancellationToken);

Task<Workflow> Argo_GetWorkflowAsync(string argoNamespace, string name, string getOptions_resourceVersion, string fields, CancellationToken cancellationToken);

Task<WorkflowTemplate> Argo_GetWorkflowTemplateAsync(string argoNamespace, string name, string getOptions_resourceVersion);

Task<Workflow> Argo_StopWorkflowAsync(string argoNamespace, string name, WorkflowStopRequest body);

Task<Workflow> Argo_TerminateWorkflowAsync(string argoNamespace, string name, WorkflowTerminateRequest body);

Task<Version?> Argo_GetVersionAsync();

Task<string?> Argo_Get_WorkflowLogsAsync(string argoNamespace, string name, string podName, string logOptions_container);

Task<WorkflowTemplate> Argo_CreateWorkflowTemplateAsync(string argoNamespace, WorkflowTemplateCreateRequest body, CancellationToken cancellationToken);
}

public class ArgoClient : IArgoClient
{
private readonly HttpClient _httpClient;

public string BaseUrl { get; set; } = "http://localhost:2746";

private string FormattedBaseUrl { get { return BaseUrl != null ? BaseUrl.TrimEnd('/') : ""; } }

public ArgoClient(HttpClient httpClient)
{
_httpClient = httpClient;
}
public ArgoClient(HttpClient httpClient) : base(httpClient) { }

public async Task<Workflow> Argo_CreateWorkflowAsync(string argoNamespace, WorkflowCreateRequest body, CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -164,27 +136,113 @@ public async Task<WorkflowTemplate> Argo_GetWorkflowTemplateAsync(string argoNam
urlBuilder.Length--;
return await GetRequest<string>(urlBuilder, true).ConfigureAwait(false);
}
private async Task<T> SendRequest<T>(StringContent stringContent, StringBuilder urlBuilder, string Method, CancellationToken cancellationToken)

/// <param name="cancellationToken">A cancellation token that can be used by other objects or threads to receive notice of cancellation.</param>
/// <returns>A successful response.</returns>
/// <exception cref="ApiException">A server side error occurred.</exception>
public virtual async Task<WorkflowTemplate> Argo_CreateWorkflowTemplateAsync(string argoNamespace, WorkflowTemplateCreateRequest body, CancellationToken cancellationToken)
{
Guard.Against.NullOrWhiteSpace(argoNamespace);
Guard.Against.Null(body);

var urlBuilder = new StringBuilder();
urlBuilder.Append(CultureInfo.InvariantCulture, $"{FormattedBaseUrl}/api/v1/workflow-templates/{argoNamespace}");

var method = "POST";
var content = new StringContent(Newtonsoft.Json.JsonConvert.SerializeObject(body));

Check warning

Code scanning / CodeQL

Cross-site scripting

[User-provided value](1) flows to here and is written to HTML or JavaScript.
return await SendRequest<WorkflowTemplate>(content, urlBuilder, method, cancellationToken).ConfigureAwait(false);
}

/// <param name="cancellationToken">A cancellation token that can be used by other objects or threads to receive notice of cancellation.</param>
/// <returns>A successful response.</returns>
/// <exception cref="ApiException">A server side error occurred.</exception>
public virtual async Task<bool> Argo_DeleteWorkflowTemplateAsync(string argoNamespace, string templateName, CancellationToken cancellationToken)
{
using (var request = new HttpRequestMessage())
Guard.Against.NullOrWhiteSpace(argoNamespace);

var urlBuilder = new StringBuilder();
urlBuilder.Append(CultureInfo.InvariantCulture, $"{FormattedBaseUrl}/api/v1/workflow-templates/{argoNamespace}/{templateName}");

var method = "DELETE";
var response = await HttpClient.SendAsync(new HttpRequestMessage(new HttpMethod(method), urlBuilder.ToString()), cancellationToken).ConfigureAwait(false);
return (int)response.StatusCode == 200;
}

public static string ConvertToString(object value, CultureInfo cultureInfo)
{
if (value == null)
{
stringContent.Headers.ContentType = System.Net.Http.Headers.MediaTypeHeaderValue.Parse("application/json");
request.Content = stringContent;
request.Method = new HttpMethod(Method);
request.Headers.Accept.Add(System.Net.Http.Headers.MediaTypeWithQualityHeaderValue.Parse("application/json"));
request.RequestUri = new Uri(urlBuilder.ToString(), UriKind.RelativeOrAbsolute);
return "";
}

HttpResponseMessage? response = null;
try
if (value is Enum)
{
var name = Enum.GetName(value.GetType(), value);
if (name != null)
{
response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead, cancellationToken).ConfigureAwait(false);
var field = System.Reflection.IntrospectionExtensions.GetTypeInfo(value.GetType()).GetDeclaredField(name);
if (field != null)
{
if (System.Reflection.CustomAttributeExtensions.GetCustomAttribute(field, typeof(System.Runtime.Serialization.EnumMemberAttribute)) is System.Runtime.Serialization.EnumMemberAttribute attribute)
{
return attribute.Value ?? name;
}
}

var converted = Convert.ToString(Convert.ChangeType(value, Enum.GetUnderlyingType(value.GetType()), cultureInfo));
return converted ?? string.Empty;
}
catch (Exception ex)
}
else if (value is bool boolean)
{
return Convert.ToString(boolean, cultureInfo).ToLowerInvariant();
}
else if (value is byte[] v)
{
return Convert.ToBase64String(v);
}
else if (value.GetType().IsArray)
{
var array = Enumerable.OfType<object>((Array)value);
return string.Join(",", Enumerable.Select(array, o => ConvertToString(o, cultureInfo)));
}

var result = Convert.ToString(value, cultureInfo);
return result ?? "";
}
}

/// <summary>
/// <see cref="BaseArgoClient"/> generic functions relating to argo requests
/// </summary>
public class BaseArgoClient
{
public string BaseUrl { get; set; } = "http://localhost:2746";

protected string FormattedBaseUrl { get { return BaseUrl != null ? BaseUrl.TrimEnd('/') : ""; } }

protected readonly HttpClient HttpClient;

public BaseArgoClient(HttpClient httpClient)
{
HttpClient = httpClient;
}

protected async Task<T> SendRequest<T>(StringContent stringContent, StringBuilder urlBuilder, string method, CancellationToken cancellationToken)
{
using (var request = new HttpRequestMessage())
{
if (stringContent is not null)
{
var mess = ex.Message;
throw;
stringContent.Headers.ContentType = System.Net.Http.Headers.MediaTypeHeaderValue.Parse("application/json");
request.Content = stringContent;
}
request.Method = new HttpMethod(method);
request.Headers.Accept.Add(System.Net.Http.Headers.MediaTypeWithQualityHeaderValue.Parse("application/json"));
request.RequestUri = new Uri(urlBuilder.ToString(), UriKind.RelativeOrAbsolute);

HttpResponseMessage? response = null;
response = await HttpClient.SendAsync(request, HttpCompletionOption.ResponseContentRead, cancellationToken).ConfigureAwait(false);

try
{
Expand Down Expand Up @@ -222,7 +280,7 @@ private async Task<T> SendRequest<T>(StringContent stringContent, StringBuilder
}
}

private async Task<T?> GetRequest<T>(StringBuilder urlBuilder, bool isLogs = false)
protected async Task<T?> GetRequest<T>(StringBuilder urlBuilder, bool isLogs = false)
{

using (var request = new HttpRequestMessage())
Expand All @@ -231,7 +289,7 @@ private async Task<T> SendRequest<T>(StringContent stringContent, StringBuilder
request.Headers.Accept.Add(System.Net.Http.Headers.MediaTypeWithQualityHeaderValue.Parse("application/json"));
request.RequestUri = new Uri(urlBuilder.ToString(), UriKind.RelativeOrAbsolute);

var response = await _httpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
var response = await HttpClient.SendAsync(request, HttpCompletionOption.ResponseHeadersRead).ConfigureAwait(false);
try
{
var headers_ = Enumerable.ToDictionary(response.Headers, h_ => h_.Key, h_ => h_.Value);
Expand Down Expand Up @@ -271,10 +329,9 @@ private async Task<T> SendRequest<T>(StringContent stringContent, StringBuilder
}
}


protected virtual async Task<ObjectResponseResult<T?>> ReadObjectResponseAsync<T>(HttpResponseMessage response, IReadOnlyDictionary<string, IEnumerable<string>> headers, bool isLogs = false)
{
if (response == null || response.Content == null)
if (response == null || response.Content == null || response.Content.GetType().Name == "EmptyContent")
{
return new ObjectResponseResult<T?>(default, string.Empty);
}
Expand Down Expand Up @@ -306,7 +363,7 @@ private async Task<T> SendRequest<T>(StringContent stringContent, StringBuilder
}
}

protected static string DecodeLogs(string logInput)
public static string DecodeLogs(string logInput)
{
var rows = logInput.Split(new String[] { "\n" }, StringSplitOptions.None);
var jsonBody = $"[{string.Join(",", rows)}]";
Expand Down Expand Up @@ -348,23 +405,7 @@ protected virtual async Task<ObjectResponseResult<string>> ReadLogResponseAsync(
}
}

/// <param name="cancellationToken">A cancellation token that can be used by other objects or threads to receive notice of cancellation.</param>
/// <returns>A successful response.</returns>
/// <exception cref="ApiException">A server side error occurred.</exception>
public virtual async Task<WorkflowTemplate> Argo_CreateWorkflowTemplateAsync(string argoNamespace, WorkflowTemplateCreateRequest body, CancellationToken cancellationToken)
{
Guard.Against.NullOrWhiteSpace(argoNamespace);
Guard.Against.Null(body);

var urlBuilder = new StringBuilder();
urlBuilder.Append(CultureInfo.InvariantCulture, $"{FormattedBaseUrl}/api/v1/workflow-templates/{argoNamespace}");

var Method = "POST";
var content = new StringContent(Newtonsoft.Json.JsonConvert.SerializeObject(body));
return await SendRequest<WorkflowTemplate>(content, urlBuilder, Method, cancellationToken).ConfigureAwait(false);
}

protected struct ObjectResponseResult<T>
protected readonly struct ObjectResponseResult<T>
{
public ObjectResponseResult(T responseObject, string responseText)
{
Expand All @@ -377,61 +418,19 @@ public ObjectResponseResult(T responseObject, string responseText)
public string Text { get; }
}

private string ConvertToString(object value, CultureInfo cultureInfo)
{
if (value == null)
{
return "";
}

if (value is Enum)
{
var name = Enum.GetName(value.GetType(), value);
if (name != null)
{
var field = System.Reflection.IntrospectionExtensions.GetTypeInfo(value.GetType()).GetDeclaredField(name);
if (field != null)
{
if (System.Reflection.CustomAttributeExtensions.GetCustomAttribute(field, typeof(System.Runtime.Serialization.EnumMemberAttribute)) is System.Runtime.Serialization.EnumMemberAttribute attribute)
{
return attribute.Value ?? name;
}
}

var converted = Convert.ToString(Convert.ChangeType(value, Enum.GetUnderlyingType(value.GetType()), cultureInfo));
return converted ?? string.Empty;
}
}
else if (value is bool boolean)
{
return Convert.ToString(boolean, cultureInfo).ToLowerInvariant();
}
else if (value is byte[] v)
{
return Convert.ToBase64String(v);
}
else if (value.GetType().IsArray)
{
var array = Enumerable.OfType<object>((Array)value);
return string.Join(",", Enumerable.Select(array, o => ConvertToString(o, cultureInfo)));
}

var result = Convert.ToString(value, cultureInfo);
return result ?? "";
}
class ArgoLogEntry
{
public string Content { get; set; } = "";

public string PodName { get; set; } = "";
}

class ArgoLogEntryResult
{
public ArgoLogEntry Result { get; set; } = new ArgoLogEntry();
}
}


public class Version
{
[Newtonsoft.Json.JsonProperty("buildDate", Required = Newtonsoft.Json.Required.Always)]
Expand Down
18 changes: 18 additions & 0 deletions src/TaskManager/Plug-ins/Argo/ArgoPlugin.cs
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,10 @@ private void ProcessTaskPluginArguments(Workflow workflow)
template.PriorityClassName = priorityClassName;
}
}
if (priorityClassName is not null)
{
workflow.Spec.PodPriorityClassName = priorityClassName;
}
}

private static void AddLimit(Dictionary<string, string>? resources, Template2 template, ResourcesKey key)
Expand Down Expand Up @@ -981,5 +985,19 @@ public async Task<WorkflowTemplate> CreateArgoTemplate(string template)
throw;
}
}

public async Task<bool> DeleteArgoTemplate(string templateName)
{
try
{
var client = _argoProvider.CreateClient(_baseUrl, _apiToken, _allowInsecure);
return await client.Argo_DeleteWorkflowTemplateAsync(_namespace, templateName, new CancellationToken()).ConfigureAwait(false);
}
catch (Exception ex)
{
_logger.ErrorDeletingWorkflowTemplate(ex);
throw;
}
}
}
}
22 changes: 21 additions & 1 deletion src/TaskManager/Plug-ins/Argo/Controllers/TemplateController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public class TemplateController : ControllerBase
{
private readonly ArgoPlugin _argoPlugin;
private readonly ILogger<TemplateController> _tempLogger;

public TemplateController(
IServiceScopeFactory scopeFactory,
ILogger<TemplateController> tempLogger,
Expand All @@ -51,7 +52,7 @@ public async Task<ActionResult<WorkflowTemplate>> CreateArgoTemplate()

var value2 = await reader.ReadToEndAsync();

if (String.IsNullOrWhiteSpace(value2))
if (string.IsNullOrWhiteSpace(value2))
{
return BadRequest("No file recieved");
}
Expand All @@ -68,5 +69,24 @@ public async Task<ActionResult<WorkflowTemplate>> CreateArgoTemplate()

return Ok(workflowTemplate);
}

[Route("{name}")]
[HttpDelete]
public async Task<ActionResult<bool>> DeleteArgoTemplate(string name)
{
if (string.IsNullOrWhiteSpace(name))
{
return BadRequest("No name parameter provided");
}

try
{
return Ok(await _argoPlugin.DeleteArgoTemplate(name));
}
catch (Exception)
{
return BadRequest("message: Argo unable to process template");
}
}
}
}
Loading