Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom handlers to support trigger return values #8874

Merged
merged 4 commits into from
Feb 1, 2023

Conversation

cgillum
Copy link
Contributor

@cgillum cgillum commented Oct 28, 2022

Issue describing the changes in this PR

The current custom handlers implementation doesn't send trigger return values back to the extension. Trigger return value support is required for Durable Functions SDKs to work correctly in a custom handler worker.

More specific comments on the changes can be found in the code comments.

Pull request checklist

  • My changes do not require documentation changes
    • Otherwise: Documentation issue linked to PR
  • My changes should not be added to the release notes for the next release
    • Otherwise: I've added my notes to release_notes.md
  • My changes do not need to be backported to a previous version
    • Otherwise: Backport tracked by issue/PR #issue_or_pr
  • My changes do not require diagnostic events changes
    • Otherwise: I have added/updated all related diagnostic events and their documentation (Documentation issue linked to PR)
  • I have added all required tests (Unit tests, E2E tests)

Example app

Here's an example Go app that was used to test this new functionality. The middleware.go source code is the part that implements the custom handler protocol needed by Durable Functions.

main.go

package main

import (
	"fmt"
	"log"
	"net/http"
	"os"

	"github.com/microsoft/durabletask-go/task"
)

// HelloCities is an orchestrator function that generates a "hello" message for several cities.
func HelloCities(ctx *task.OrchestrationContext) (any, error) {
	var helloTokyo string
	if err := ctx.CallActivity(SayHello, "Tokyo").Await(&helloTokyo); err != nil {
		return nil, err
	}
	var helloLondon string
	if err := ctx.CallActivity(SayHello, "London").Await(&helloLondon); err != nil {
		return nil, err
	}
	var helloSeattle string
	if err := ctx.CallActivity(SayHello, "Seattle").Await(&helloSeattle); err != nil {
		return nil, err
	}
	return []string{helloTokyo, helloLondon, helloSeattle}, nil
}

// SayHello is an activity function that takes a city name as an input and returns "Hello, {city}!"
func SayHello(ctx task.ActivityContext) (any, error) {
	var input string
	if err := ctx.GetInput(&input); err != nil {
		return "", err
	}
	return fmt.Sprintf("Hello, %s!", input), nil
}

func main() {
	listenAddr := ":8080"
	if val, ok := os.LookupEnv("FUNCTIONS_CUSTOMHANDLER_PORT"); ok {
		listenAddr = ":" + val
	}

	http.HandleFunc("/HelloCities", MapOrchestrator(HelloCities))
	http.HandleFunc("/SayHello", MapActivity(SayHello))

	log.Printf("Listening for function invocations on %s\n", listenAddr)
	log.Fatal(http.ListenAndServe(listenAddr, nil))
}

middleware.go

package main

import (
	"context"
	"encoding/base64"
	"encoding/json"
	"fmt"
	"net/http"

	"github.com/microsoft/durabletask-go/api"
	"github.com/microsoft/durabletask-go/internal/helpers"
	"github.com/microsoft/durabletask-go/internal/protos"
	"github.com/microsoft/durabletask-go/task"
	"google.golang.org/protobuf/proto"
	"google.golang.org/protobuf/types/known/wrapperspb"
)

// NOTE: For more details on custom handlers, see https://learn.microsoft.com/en-us/azure/azure-functions/functions-custom-handlers.

type InvokeRequest struct {
	Data     map[string]interface{}
	Metadata map[string]interface{}
}

type InvokeResponse struct {
	Outputs     map[string]string
	Logs        []string
	ReturnValue json.RawMessage
}

func MapOrchestrator(o task.Orchestrator) func(http.ResponseWriter, *http.Request) {
	r := task.NewTaskRegistry()
	if err := r.AddOrchestratorN("*", o); err != nil {
		panic(fmt.Errorf("ERROR: Failed to register the orchestrator function: %w", err))
	}
	executor := task.NewTaskExecutor(r)

	return func(w http.ResponseWriter, httpReq *http.Request) {
		var invokeRequest InvokeRequest
		d := json.NewDecoder(httpReq.Body)
		d.Decode(&invokeRequest)

		// TODO: Give the schema, construct the context object and invoke the orchestrator
		contextParam := invokeRequest.Data["context"]
		base64encodedPayload := contextParam.(string)

		if err := json.Unmarshal([]byte(base64encodedPayload), &base64encodedPayload); err != nil {
			fmt.Printf("ERROR: Failed to json-decode context payload string: %v\n", err)
			return
		}

		protoBytes, err := base64.StdEncoding.DecodeString(base64encodedPayload)
		if err != nil {
			fmt.Printf("ERROR: Failed to base64-decode request string: %v\n", err)
			return
		}

		var request protos.OrchestratorRequest
		if err := proto.Unmarshal(protoBytes, &request); err != nil {
			fmt.Printf("ERROR: Failed to deserialize request protobuf: %v\n", err)
			return
		}
		fmt.Printf("Orchestrator request for instance ID '%s': %v\n", request.InstanceId, &request)

		results, err := executor.ExecuteOrchestrator(context.TODO(), api.InstanceID(request.InstanceId), request.PastEvents, request.NewEvents)
		if err != nil {
			fmt.Printf("ERROR: Unexpected failure executing the orchestrator function: %v\n", err)
			return
		}
		fmt.Printf("Orchestrator returned a response: %v\n", results.Response)

		respBytes, err := proto.Marshal(results.Response)
		if err != nil {
			fmt.Printf("ERROR: Failed to marshal orchestrator results to protobuf: %v\n", err)
			return
		}

		base64bytes := base64.StdEncoding.EncodeToString(respBytes)
		fmt.Printf("Sending back base64 encoded string: %s\n", base64bytes)

		// Send the response back to the Functions host in a JSON envelope
		invokeResponse := &InvokeResponse{ReturnValue: []byte(`"` + base64bytes + `"`)}
		responseJson, err := json.Marshal(invokeResponse)
		if err != nil {
			fmt.Printf("ERROR: Failed to marshal response payload to JSON: %v\n", err)
			return
		}
		fmt.Println("Sending response JSON:", string(responseJson))
		w.Header().Set("Content-Type", "application/json")
		w.Write(responseJson)
	}
}

func MapActivity(a task.Activity) func(http.ResponseWriter, *http.Request) {
	r := task.NewTaskRegistry()
	if err := r.AddActivityN("*", a); err != nil {
		panic(fmt.Errorf("ERROR: Failed to register the activity function: %w", err))
	}
	executor := task.NewTaskExecutor(r)

	return func(w http.ResponseWriter, r *http.Request) {
		var invokeRequest InvokeRequest
		d := json.NewDecoder(r.Body)
		d.Decode(&invokeRequest)

		fmt.Println("Activity request:", invokeRequest)

		sys := invokeRequest.Metadata["sys"].(map[string]interface{})
		name := sys["MethodName"].(string)
		instanceID := invokeRequest.Metadata["instanceId"].(string)

		var rawInput *wrapperspb.StringValue
		if data, ok := invokeRequest.Metadata["data"]; ok && data != nil {
			rawInputStr := data.(string)
			rawInput = wrapperspb.String(rawInputStr)
		}

		ts := helpers.NewTaskScheduledEvent(-1, name, nil, rawInput)
		e, err := executor.ExecuteActivity(context.TODO(), api.InstanceID(instanceID), ts)
		if err != nil {
			panic(fmt.Errorf("ERROR: Activity execution failed with an error: %w", err))
		}

		// Send the response back to the Functions host in a JSON envelope
		var returnValue string
		var statusCode int
		if tc := e.GetTaskCompleted(); tc != nil {
			fmt.Printf("Task completed: %v\n", tc.Result.GetValue())
			returnValue = tc.Result.GetValue()
			statusCode = 200
		} else if tf := e.GetTaskFailed(); tf != nil {
			fmt.Printf("Task failed: %v\n", tf.FailureDetails)
			statusCode = 500
		} else {
			panic(fmt.Errorf("Unexpected event type: %v", e))
		}

		invokeResponse := &InvokeResponse{ReturnValue: []byte(returnValue)}
		responseJson, err := json.Marshal(invokeResponse)
		if err != nil {
			fmt.Printf("ERROR: Failed to marshal response payload to JSON: %v\n", err)
			return
		}
		fmt.Printf("Sending %d response: %s\n", statusCode, string(responseJson))
		w.Header().Set("Content-Type", "application/json")
		w.WriteHeader(statusCode)
		w.Write(responseJson)
	}
}

@cgillum cgillum requested a review from a team as a code owner October 28, 2022 16:35
Copy link
Contributor Author

@cgillum cgillum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding some inline comments to explain my changes.

else
{
// Some triggers (like Durable Functions' triggers) assign $return implicitly.
scriptInvocationResult.Return = httpScriptInvocationResult.ReturnValue;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prior to this change, return values were only saved when there was an explicit $return binding declared in function.json. Durable Functions triggers, however, do not use explicit $return bindings. Instead, the $return binding is implicit in the trigger implementation, thus the return value wasn't being returned by this logic.

I couldn't think of any meaningful reason why return values should be ignored when there's no $return binding in function.json, so I added this else case to unblock Durable Functions, and any other trigger that might also have implicit return value bindings.

Copy link
Member

@fabiocav fabiocav left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change looks good. Would be good to have some validation/tests for this scenario as well.

@cgillum
Copy link
Contributor Author

cgillum commented Oct 28, 2022

Looks like my change broke one of the unit tests, so I'll need to go ahead and make test changes anyways for this.

@cgillum
Copy link
Contributor Author

cgillum commented Oct 28, 2022

I noticed that the Specialization_ThreadUtilization integration test failed in both of my last two commits. However, it failed for different reasons, and I'm thinking my change might not be related to these failures. Is this test known to be flakey?

All unit tests are passing.

release_notes.md Outdated Show resolved Hide resolved
@ghost ghost added the no-recent-activity label Jan 19, 2023
@ghost
Copy link

ghost commented Jan 19, 2023

This pull request has been automatically marked as stale because it has been marked as requiring author feedback but has not had any activity for 7 days. It will be closed if no further activity occurs within 7 days of this comment.

@liliankasem
Copy link
Member

I noticed that the Specialization_ThreadUtilization integration test failed in both of my last two commits. However, it failed for different reasons, and I'm thinking my change might not be related to these failures. Is this test known to be flakey?

All unit tests are passing.

I do think Specialization_ThreadUtilization is one of the flakey ones - I'm rerunning the pipeline to see what what happens.

Is there anything else you need from us to help you get this PR merged? I think from our side there are some open comments that need to be addressed and release note conflicts - otherwise happy to take to review again and help get approvals in place

@cgillum
Copy link
Contributor Author

cgillum commented Jan 31, 2023

Thanks @liliankasem! I updated the release notes file and merged with the latest from dev. I think the only other outstanding item was getting an ACK from @pragnagopa on my logging change.

@liliankasem
Copy link
Member

Thanks @liliankasem! I updated the release notes file and merged with the latest from dev. I think the only other outstanding item was getting an ACK from @pragnagopa on my logging change.

I think you're all clear!

@cgillum
Copy link
Contributor Author

cgillum commented Feb 1, 2023

Thanks @liliankasem! I'll go ahead and merge it then.

@cgillum cgillum merged commit 52dd704 into dev Feb 1, 2023
@cgillum cgillum deleted the cgillum/custom-handlers branch February 1, 2023 18:20
@horihiro
Copy link

horihiro commented Feb 2, 2023

@cgillum Is this PR related this issue?
#7684

@cgillum
Copy link
Contributor Author

cgillum commented Feb 2, 2023

@horihiro yes, I think it will solve that issue as well!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants