Skip to content

Commit

Permalink
separate DF SDK classes from DF worker classes
Browse files Browse the repository at this point in the history
fix typo

DurableSDK now compiles by itself

Allow ExternalSDK to handle orchestration

document next steps

allow external SDK to set the user-code's input. Still need to refactor this logic for the worker to continue working with old SDK

add import module

supress traces

avoid nullptr

pass tests

fix E2E tests

develop E2E tests

Enabled external durable client (#765)

Co-authored-by: Michael Peng <michaelpeng@microsoft.com>

bindings work

conditional binding intialization

conditional import

Added exception handling logic

Revert durableController name to durableFunctionsUtils

Ensure unit tests are functioning properly

Corrected unit test names

Turned repeated variables in unit tests into static members

Fixed issue with building the worker

Fix E2E test

Fixed unit test setup

Fixed another unit test setup

Remove string representation of booleans

patch e2e test

remove typo in toString

Update PowerShell language worker pipelines (#750)

* Install .Net to a global location

* Remove .Net installation tasks

* Update install .Net 6 task

* Update Windows image to use windows-latest

Make throughput warning message visible for tooling diagnosis (#757)

Update grpc.tools to version 2.43.0

Update Google.Protobuf.Tools to version 3.19.4

Revert "Update Google.Protobuf.Tools to version 3.19.4"

This reverts commit bcbd022.

Revert "Update grpc.tools to version 2.43.0"

This reverts commit ccb323a.

Update Google.Protobuf to 3.19.4 and grpc.tools to  2.43.0 (#762)

* Update grpc.tools to version 2.43.0

* Update Google.Protobuf.Tools to version 3.19.4

Switch from Grpc.Core to Grpc.Net.Client (#758)

* Upgraded protobuf versions and removed Grpc.Core dependency

* Updated channel and option types used

* Change channel credentials

* Added http prefix to url

* Add valid URL check and explicitly include credentials

Update pipeline logic to generate the SBOM for release builds (#767)

Return results from Start-DurableExternalEventListener (#685) (#753)

Co-authored-by: Greg Roll <Greg.Roll@oobe.com.au>

add e2e test for GetTaskResult

parametrize test

patch new e2e test

patch external contrib

fix typo in test

comment changes

Adds IExternalInvoker (#776)

* Define a contract for the external invoker

* Remove extraneous comments and variables

rename hasOrchestrationContext to hasInitializedDurableFunction

remove outdated TODO comment

remove now unused function - CreateOrchestrationBindingInfo

Allow worker to read results directly from the external SDK (#777)

comment out external SDK path
  • Loading branch information
David Justo committed Apr 7, 2022
1 parent 3642ec0 commit d7450f7
Show file tree
Hide file tree
Showing 22 changed files with 315 additions and 142 deletions.
36 changes: 36 additions & 0 deletions src/DurableSDK/Commands/GetDurableTaskResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//

#pragma warning disable 1591 // Missing XML comment for publicly visible type or member 'member'

namespace Microsoft.Azure.Functions.PowerShellWorker.Durable.Commands
{
using System.Collections;
using System.Management.Automation;
using Microsoft.Azure.Functions.PowerShellWorker.Durable.Tasks;

[Cmdlet("Get", "DurableTaskResult")]
public class GetDurableTaskResultCommand : PSCmdlet
{
[Parameter(Mandatory = true)]
[ValidateNotNull]
public DurableTask[] Task { get; set; }

private readonly DurableTaskHandler _durableTaskHandler = new DurableTaskHandler();

protected override void EndProcessing()
{
var privateData = (Hashtable)MyInvocation.MyCommand.Module.PrivateData;
var context = (OrchestrationContext)privateData[SetFunctionInvocationContextCommand.ContextKey];

_durableTaskHandler.GetTaskResult(Task, context, WriteObject);
}

protected override void StopProcessing()
{
_durableTaskHandler.Stop();
}
}
}
17 changes: 17 additions & 0 deletions src/DurableSDK/DurableTaskHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ internal class DurableTaskHandler
if (scheduledHistoryEvent != null)
{
scheduledHistoryEvent.IsProcessed = true;
scheduledHistoryEvent.IsPlayed = true;
}

if (completedHistoryEvent != null)
Expand All @@ -190,6 +191,7 @@ internal class DurableTaskHandler
}

completedHistoryEvent.IsProcessed = true;
completedHistoryEvent.IsPlayed = true;
}
}

Expand All @@ -207,6 +209,21 @@ internal class DurableTaskHandler
}
}

public void GetTaskResult(
IReadOnlyCollection<DurableTask> tasksToQueryResultFor,
OrchestrationContext context,
Action<object> output)
{
foreach (var task in tasksToQueryResultFor) {
var scheduledHistoryEvent = task.GetScheduledHistoryEvent(context, true);
var processedHistoryEvent = task.GetCompletedHistoryEvent(context, scheduledHistoryEvent, true);
if (processedHistoryEvent != null)
{
output(GetEventResult(processedHistoryEvent));
}
}
}

public void Stop()
{
_waitForStop.Set();
Expand Down
26 changes: 26 additions & 0 deletions src/DurableSDK/ExternalInvoker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//

namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
using System;
using System.Collections;
using System.Management.Automation;

internal class ExternalInvoker : IExternalInvoker
{
private readonly Func<PowerShell, object> _externalSDKInvokerFunction;

public ExternalInvoker(Func<PowerShell, object> invokerFunction)
{
_externalSDKInvokerFunction = invokerFunction;
}

public Hashtable Invoke(IPowerShellServices powerShellServices)
{
return (Hashtable)_externalSDKInvokerFunction.Invoke(powerShellServices.GetPowerShell());
}
}
}
16 changes: 16 additions & 0 deletions src/DurableSDK/IExternalInvoker.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
//

namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
using System.Collections;

// Represents a contract for the
internal interface IExternalInvoker
{
// Method to invoke an orchestration using the external Durable SDK
Hashtable Invoke(IPowerShellServices powerShellServices);
}
}
4 changes: 1 addition & 3 deletions src/DurableSDK/IOrchestrationInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,11 @@

namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
{
using System;
using System.Collections;
using System.Management.Automation;

internal interface IOrchestrationInvoker
{
Hashtable Invoke(OrchestrationBindingInfo orchestrationBindingInfo, IPowerShellServices pwsh);
void SetExternalInvoker(Action<PowerShell> externalInvoker);
void SetExternalInvoker(IExternalInvoker externalInvoker);
}
}
6 changes: 3 additions & 3 deletions src/DurableSDK/IPowerShellServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ internal interface IPowerShellServices

void SetDurableClient(object durableClient);

OrchestrationBindingInfo SetOrchestrationContext(ParameterBinding orchestrationContext, out Action<object> externalInvoker);
OrchestrationBindingInfo SetOrchestrationContext(ParameterBinding context, out IExternalInvoker externalInvoker);

void ClearOrchestrationContext();

public void TracePipelineObject();
public void AddParameter(string name, object value);
void TracePipelineObject();

void AddParameter(string name, object value);

IAsyncResult BeginInvoke(PSDataCollection<object> output);

Expand Down
9 changes: 0 additions & 9 deletions src/DurableSDK/OrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@ public class OrchestrationContext

internal OrchestrationActionCollector OrchestrationActionCollector { get; } = new OrchestrationActionCollector();

internal object ExternalResult;
internal bool ExternalIsError;

internal void SetExternalResult(object result, bool isError)
{
this.ExternalResult = result;
this.ExternalIsError = isError;
}

internal object CustomStatus { get; set; }
}
}
115 changes: 61 additions & 54 deletions src/DurableSDK/OrchestrationInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,81 +11,88 @@ namespace Microsoft.Azure.Functions.PowerShellWorker.Durable
using System.Linq;
using System.Management.Automation;

// using PowerShellWorker.Utility;
using Microsoft.Azure.Functions.PowerShellWorker.Durable.Actions;

internal class OrchestrationInvoker : IOrchestrationInvoker
{
private Action<PowerShell> externalInvoker = null;
private IExternalInvoker _externalInvoker;
internal static string isOrchestrationFailureKey = "IsOrchestrationFailure";

public Hashtable Invoke(OrchestrationBindingInfo orchestrationBindingInfo, IPowerShellServices pwsh)
public Hashtable Invoke(
OrchestrationBindingInfo orchestrationBindingInfo,
IPowerShellServices powerShellServices)
{

try
{
if (pwsh.UseExternalDurableSDK())
if (powerShellServices.UseExternalDurableSDK())
{
externalInvoker.Invoke(pwsh.GetPowerShell());
var result = orchestrationBindingInfo.Context.ExternalResult;
var isError = orchestrationBindingInfo.Context.ExternalIsError;
if (isError)
{
throw (Exception)result;
}
else
{
return (Hashtable)result;
}
return InvokeExternalDurableSDK(powerShellServices);
}
return InvokeInternalDurableSDK(orchestrationBindingInfo, powerShellServices);
}
catch (Exception ex)
{
ex.Data.Add(isOrchestrationFailureKey, true);
throw;
}
finally
{
powerShellServices.ClearStreamsAndCommands();
}
}

var outputBuffer = new PSDataCollection<object>();
var context = orchestrationBindingInfo.Context;
public Hashtable InvokeExternalDurableSDK(IPowerShellServices powerShellServices)
{
return _externalInvoker.Invoke(powerShellServices);
}

// context.History should never be null when initializing CurrentUtcDateTime
var orchestrationStart = context.History.First(
e => e.EventType == HistoryEventType.OrchestratorStarted);
context.CurrentUtcDateTime = orchestrationStart.Timestamp.ToUniversalTime();
public Hashtable InvokeInternalDurableSDK(
OrchestrationBindingInfo orchestrationBindingInfo,
IPowerShellServices powerShellServices)
{
var outputBuffer = new PSDataCollection<object>();
var context = orchestrationBindingInfo.Context;

// Marks the first OrchestratorStarted event as processed
orchestrationStart.IsProcessed = true;
// context.History should never be null when initializing CurrentUtcDateTime
var orchestrationStart = context.History.First(
e => e.EventType == HistoryEventType.OrchestratorStarted);
context.CurrentUtcDateTime = orchestrationStart.Timestamp.ToUniversalTime();

// Marks the first OrchestratorStarted event as processed
orchestrationStart.IsProcessed = true;

pwsh.AddParameter(orchestrationBindingInfo.ParameterName, context);
pwsh.TracePipelineObject();
// Finish initializing the Function invocation
powerShellServices.AddParameter(orchestrationBindingInfo.ParameterName, context);
powerShellServices.TracePipelineObject();

var asyncResult = pwsh.BeginInvoke(outputBuffer);
var asyncResult = powerShellServices.BeginInvoke(outputBuffer);

var (shouldStop, actions) =
orchestrationBindingInfo.Context.OrchestrationActionCollector.WaitForActions(asyncResult.AsyncWaitHandle);
var (shouldStop, actions) =
orchestrationBindingInfo.Context.OrchestrationActionCollector.WaitForActions(asyncResult.AsyncWaitHandle);

if (shouldStop)
if (shouldStop)
{
// The orchestration function should be stopped and restarted
powerShellServices.StopInvoke();
// return (Hashtable)orchestrationBindingInfo.Context.OrchestrationActionCollector.output;
return CreateOrchestrationResult(isDone: false, actions, output: null, context.CustomStatus);
}
else
{
try
{
// The orchestration function should be stopped and restarted
pwsh.StopInvoke();
// return (Hashtable)orchestrationBindingInfo.Context.OrchestrationActionCollector.output;
return CreateOrchestrationResult(isDone: false, actions, output: null, context.CustomStatus);
// The orchestration function completed
powerShellServices.EndInvoke(asyncResult);
var result = CreateReturnValueFromFunctionOutput(outputBuffer);
return CreateOrchestrationResult(isDone: true, actions, output: result, context.CustomStatus);
}
else
catch (Exception e)
{
try
{
// The orchestration function completed
pwsh.EndInvoke(asyncResult);
var result = CreateReturnValueFromFunctionOutput(outputBuffer);
return CreateOrchestrationResult(isDone: true, actions, output: result, context.CustomStatus);
}
catch (Exception e)
{
// The orchestrator code has thrown an unhandled exception:
// this should be treated as an entire orchestration failure
throw new OrchestrationFailureException(actions, context.CustomStatus, e);
}
// The orchestrator code has thrown an unhandled exception:
// this should be treated as an entire orchestration failure
throw new OrchestrationFailureException(actions, context.CustomStatus, e);
}
}
finally
{
pwsh.ClearStreamsAndCommands();
}
}

public static object CreateReturnValueFromFunctionOutput(IList<object> pipelineItems)
Expand All @@ -108,9 +115,9 @@ public static object CreateReturnValueFromFunctionOutput(IList<object> pipelineI
return new Hashtable { { "$return", orchestrationMessage } };
}

public void SetExternalInvoker(Action<PowerShell> externalInvoker)
public void SetExternalInvoker(IExternalInvoker externalInvoker)
{
this.externalInvoker = externalInvoker;
_externalInvoker = externalInvoker;
}
}
}
Loading

0 comments on commit d7450f7

Please sign in to comment.