Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use grpc Url instead of Baseurl in DF Client management operations #66

Merged
merged 8 commits into from
Feb 14, 2024
8 changes: 5 additions & 3 deletions src/AzureFunctions.PowerShell.Durable.SDK.psd1
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
@{
# Version number of this module.
ModuleVersion = '1.0.1'
ModuleVersion = '1.1.0'

# Supported PSEditions
CompatiblePSEditions = @('Core')
Expand Down Expand Up @@ -37,8 +37,10 @@
'Get-DurableStatus',
'New-DurableOrchestrationCheckStatusResponse',
'Send-DurableExternalEvent',
'Start-DurableOrchestration'
'Stop-DurableOrchestration'
'Start-DurableOrchestration',
'Stop-DurableOrchestration',
'Suspend-DurableOrchestration',
'Resume-DurableOrchestration'
)

# Cmdlets to export from this module, for best performance, do not use wildcards and do not delete the entry, use an empty array if there are no cmdlets to export.
Expand Down
64 changes: 61 additions & 3 deletions src/AzureFunctions.PowerShell.Durable.SDK.psm1
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ function Get-DurableStatus {
$DurableClient = GetDurableClientFromModulePrivateData
}

$requestUrl = "$($DurableClient.BaseUrl)/instances/$InstanceId"
$requestUrl = "$($DurableClient.rpcBaseUrl)/instances/$InstanceId"

$query = @()
if ($ShowHistory.IsPresent) {
Expand Down Expand Up @@ -159,7 +159,65 @@ function Stop-DurableOrchestration {
$DurableClient = GetDurableClientFromModulePrivateData
}

$requestUrl = "$($DurableClient.BaseUrl)/instances/$InstanceId/terminate?reason=$([System.Web.HttpUtility]::UrlEncode($Reason))"
$requestUrl = "$($DurableClient.rpcBaseUrl)/instances/$InstanceId/terminate?reason=$([System.Web.HttpUtility]::UrlEncode($Reason))"

Invoke-RestMethod -Uri $requestUrl -Method 'POST'
}

function Suspend-DurableOrchestration {
[CmdletBinding()]
param(
[Parameter(
Mandatory = $true,
Position = 0,
ValueFromPipelineByPropertyName = $true)]
[ValidateNotNullOrEmpty()]
[string] $InstanceId,

[Parameter(
Mandatory = $true,
Position = 1,
ValueFromPipelineByPropertyName = $true)]
[ValidateNotNullOrEmpty()]
[string] $Reason
)

$ErrorActionPreference = 'Stop'

if ($null -eq $DurableClient) {
$DurableClient = GetDurableClientFromModulePrivateData
}

$requestUrl = "$($DurableClient.rpcBaseUrl)/instances/$InstanceId/suspend?reason=$([System.Web.HttpUtility]::UrlEncode($Reason))"

Invoke-RestMethod -Uri $requestUrl -Method 'POST'
}

function Resume-DurableOrchestration {
[CmdletBinding()]
param(
[Parameter(
Mandatory = $true,
Position = 0,
ValueFromPipelineByPropertyName = $true)]
[ValidateNotNullOrEmpty()]
[string] $InstanceId,

[Parameter(
Mandatory = $true,
Position = 1,
ValueFromPipelineByPropertyName = $true)]
[ValidateNotNullOrEmpty()]
[string] $Reason
)

$ErrorActionPreference = 'Stop'

if ($null -eq $DurableClient) {
$DurableClient = GetDurableClientFromModulePrivateData
}

$requestUrl = "$($DurableClient.rpcBaseUrl)/instances/$InstanceId/resume?reason=$([System.Web.HttpUtility]::UrlEncode($Reason))"

Invoke-RestMethod -Uri $requestUrl -Method 'POST'
}
Expand Down Expand Up @@ -291,7 +349,7 @@ function GetRaiseEventUrl(
[string] $TaskHubName,
[string] $ConnectionName) {

$RequestUrl = $DurableClient.BaseUrl + "/instances/$InstanceId/raiseEvent/$EventName"
$RequestUrl = $DurableClient.rpcBaseUrl + "/instances/$InstanceId/raiseEvent/$EventName"

$query = @()
if ($null -eq $TaskHubName) {
Expand Down
8 changes: 7 additions & 1 deletion src/DurableEngine/OrchestrationInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ internal Hashtable Invoke(IPowerShellServices powerShellServices)
while (true)
{
// block this thread until user-code thread (the PS orchestrator) invokes a DF CmdLet or completes.
var orchestratorReturned = context.SharedMemory.YieldToUserCodeThread(orchestratorReturnedHandle);
var orchestratorReturned = context.SharedMemory.WaitForInvokerThreadTurn(orchestratorReturnedHandle);
if (orchestratorReturned)
{
// The PS orchestrator has a return value, there's no more DF APIs to await.
Expand Down Expand Up @@ -109,6 +109,12 @@ internal Hashtable Invoke(IPowerShellServices powerShellServices)
await task.GetDTFxTask();
} // Exceptions are ignored at this point, they will be re-surfaced by the PS code if left unhandled.
catch { }

// Wake up user-code thread. For a small moment, both the user code thread and the invoker thread
// will be running at the same time.
// However, the invoker thread will block itself again at the start of the next loop until the user-code
// thread yields control.
context.SharedMemory.WakeUserCodeThread();
}
};

Expand Down
22 changes: 11 additions & 11 deletions src/DurableEngine/SharedMemory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,30 @@ public void YieldToInvokerThread()
{
// Wake invoker thread.
invokerThreadTurn.Set();

// Block user-code thread.
userCodeThreadTurn.Reset();
userCodeThreadTurn.WaitOne();
}

/// <summary>
/// Blocks Orchestration-invoker thread, wakes up user-code thread.
/// This is usually used after the invoker has a result for the PS orchestrator.
/// Blocks Orchestration-invoker thread until the user-code thread completes or yields.
/// </summary>
/// <param name="completionHandle">The WaitHandle tracking if the user-code thread completed.</param>
/// <returns>True if the user-code thread completed, False if it requests an API to be awaited.</returns>
public bool YieldToUserCodeThread(WaitHandle completionHandle)
public bool WaitForInvokerThreadTurn(WaitHandle completionHandle)
{
// Wake user-code thread
userCodeThreadTurn.Set();

// Get invoker thread ready to block
invokerThreadTurn.Reset();

// Wake up when either the user-code returns, or when we're yielded-to for `await`'ing.
var index = WaitHandle.WaitAny(new[] { completionHandle, invokerThreadTurn });
var shouldStop = index == 0;
return shouldStop;
}

/// <summary>
/// Wakes up the user-code thread without blocking the invoker thread.
/// The invoker thread should block itself afterwards to prevent races.
/// </summary>
public void WakeUserCodeThread()
{
userCodeThreadTurn.Set();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,5 +227,35 @@ public async Task DurableClientTerminatesOrchestration()
Assert.Equal("Terminated intentionally", (string)finalStatusResponseBody.output);
});
}

[Fact]
public async Task DurableClientSuspendOrchestration()
{
var initialResponse = await Utilities.GetHttpStartResponse(
orchestratorName: "SendDurableExternalEventOrchestrator",
clientRoute: "suspendingOrchestrators");
Assert.Equal(HttpStatusCode.Accepted, initialResponse.StatusCode);

await ValidateDurableWorkflowResults(
initialResponse,
validateIntermediateResponse: (dynamic intermediateStatusResponseBody) =>
{
Assert.Equal("Suspended", (string)intermediateStatusResponseBody.runtimeStatus);
Assert.Equal("Suspend orchestrator", (string)intermediateStatusResponseBody.output);
});

await ValidateDurableWorkflowResults(
initialResponse,
validateIntermediateResponse: (dynamic intermediateStatusResponseBody) =>
{
Assert.Equal("Running", (string)intermediateStatusResponseBody.runtimeStatus);
},
validateFinalResponse: (dynamic finalStatusResponseBody) =>
{
Assert.Equal("Completed", (string)finalStatusResponseBody.runtimeStatus);
Assert.Equal("FirstTimeout", finalStatusResponseBody.output[0].ToString());
Assert.Equal("SecondExternalEvent", finalStatusResponseBody.output[1].ToString());
});
}
}
}
25 changes: 25 additions & 0 deletions test/E2E/durableApp/DurableClientSuspending/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"bindings": [
{
"authLevel": "function",
"name": "Request",
"type": "httpTrigger",
"direction": "in",
"route": "suspendingOrchestrators/{FunctionName}",
"methods": [
"post",
"get"
]
},
{
"type": "http",
"direction": "out",
"name": "Response"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
]
}
23 changes: 23 additions & 0 deletions test/E2E/durableApp/DurableClientSuspending/run.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
param($Request, $TriggerMetadata)
$ErrorActionPreference = 'Stop'

Write-Host "DurableClientSuspending started"

$OrchestratorInputs = @{ FirstDuration = 5; SecondDuration = 60 }

$FunctionName = $Request.Params.FunctionName
$InstanceId = Start-DurableOrchestration -FunctionName $FunctionName -InputObject $OrchestratorInputs
Write-Host "Started orchestration with ID = '$InstanceId'"

Start-Sleep -Seconds 5
Suspend-DurableOrchestration -InstanceId $InstanceId -Reason 'Suspend orchestrator'

$SuspendResponse = New-DurableOrchestrationCheckStatusResponse -Request $Request -InstanceId $InstanceId
Push-OutputBinding -Name Response -Value $SuspendResponse

Start-Sleep -Seconds 10
Resume-DurableOrchestration -InstanceId $InstanceId -Reason 'Resume orchestrator'

Send-DurableExternalEvent -InstanceId $InstanceId -EventName "SecondExternalEvent"

Write-Host "DurableClientSuspending completed"