Skip to content

Commit

Permalink
Merge branch 'master' into go-mod-update-operator-tls-opts
Browse files Browse the repository at this point in the history
  • Loading branch information
dapr-bot committed Jan 22, 2024
2 parents 1e29d46 + ce2bb47 commit c899877
Showing 1 changed file with 62 additions and 74 deletions.
136 changes: 62 additions & 74 deletions tests/integration/suite/daprd/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@ import (
"net/http"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"

"github.com/microsoft/durabletask-go/api"
"github.com/microsoft/durabletask-go/backend"
"github.com/microsoft/durabletask-go/client"
"github.com/microsoft/durabletask-go/task"

"google.golang.org/grpc/status"

runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/dapr/tests/integration/framework"
procdaprd "github.com/dapr/dapr/tests/integration/framework/process/daprd"
Expand All @@ -57,63 +57,6 @@ type workflow struct {
grpcClient runtimev1pb.DaprClient
}

func (w *workflow) startWorkflow(ctx context.Context, t *testing.T, name string, input string) string {
// use http client to start the workflow
reqURL := fmt.Sprintf("http://localhost:%d/v1.0-beta1/workflows/dapr/%s/start", w.daprd.HTTPPort(), name)
data, err := json.Marshal(input)
require.NoError(t, err)
reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, reqURL, strings.NewReader(string(data)))
req.Header.Set("Content-Type", "application/json")
require.NoError(t, err)
resp, err := w.httpClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusAccepted, resp.StatusCode)
var response struct {
InstanceID string `json:"instanceID"`
}
err = json.NewDecoder(resp.Body).Decode(&response)
require.NoError(t, err)

return response.InstanceID
}

// terminate workflow
func (w *workflow) terminateWorkflow(ctx context.Context, t *testing.T, instanceID string, nonRecursive string) {
// use http client to terminate the workflow
reqURL := fmt.Sprintf("http://localhost:%d/v1.0-beta1/workflows/dapr/%s/terminate", w.daprd.HTTPPort(), instanceID)
if nonRecursive != "" {
reqURL += "?non_recursive=" + nonRecursive
}
reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, reqURL, nil)
require.NoError(t, err)
resp, err := w.httpClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusAccepted, resp.StatusCode)
}

// purge workflow
func (w *workflow) purgeWorkflow(ctx context.Context, t *testing.T, instanceID string, nonRecursive string) {
// use http client to purge the workflow
reqURL := fmt.Sprintf("http://localhost:%d/v1.0-beta1/workflows/dapr/%s/purge", w.daprd.HTTPPort(), instanceID)
if nonRecursive != "" {
reqURL += "?non_recursive=" + nonRecursive
}
reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, reqURL, nil)
require.NoError(t, err)
resp, err := w.httpClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusAccepted, resp.StatusCode)
}

func (w *workflow) Setup(t *testing.T) []framework.Option {
handler := http.NewServeMux()
handler.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -173,9 +116,6 @@ func (w *workflow) Run(t *testing.T, ctx context.Context) {
backendClient.StartWorkItemListener(taskhubCtx, r)
defer cancelTaskhub()

// Wait for wfEngine to be ready
time.Sleep(5 * time.Second)

id := api.InstanceID(w.startWorkflow(ctx, t, "SingleActivity", "Dapr"))
metadata, err := backendClient.WaitForOrchestrationCompletion(ctx, id, api.WithFetchPayloads(true))
require.NoError(t, err)
Expand All @@ -185,7 +125,7 @@ func (w *workflow) Run(t *testing.T, ctx context.Context) {

t.Run("terminate", func(t *testing.T) {
delayTime := 4 * time.Second
executedActivity := false
var executedActivity atomic.Bool
r := task.NewTaskRegistry()
r.AddOrchestratorN("Root", func(ctx *task.OrchestrationContext) (any, error) {
tasks := []task.Task{}
Expand All @@ -208,17 +148,14 @@ func (w *workflow) Run(t *testing.T, ctx context.Context) {
return nil, nil
})
r.AddActivityN("Fail", func(ctx task.ActivityContext) (any, error) {
executedActivity = true
executedActivity.Store(true)
return nil, errors.New("failed: Should not have executed the activity")
})

taskhubCtx, cancelTaskhub := context.WithCancel(ctx)
backendClient.StartWorkItemListener(taskhubCtx, r)
defer cancelTaskhub()

// Wait for wfEngine to be ready
time.Sleep(5 * time.Second)

// Test terminating with and without recursion
for _, nonRecursive := range []string{"", "false", "true"} {
// `non_recursive` = "" means no query param, which should default to false
Expand Down Expand Up @@ -266,7 +203,7 @@ func (w *workflow) Run(t *testing.T, ctx context.Context) {
}

// Verify tht none of the L2 suborchestrations executed the activity in case of recursive termination
assert.Equal(t, nonRecursiveBool, executedActivity)
assert.Equal(t, nonRecursiveBool, executedActivity.Load())
})
}
})
Expand All @@ -290,9 +227,6 @@ func (w *workflow) Run(t *testing.T, ctx context.Context) {
backendClient.StartWorkItemListener(taskhubCtx, r)
defer cancelTaskhub()

// Wait for wfEngine to be ready
time.Sleep(5 * time.Second)

// Test purging with and without recursion
for _, nonRecursive := range []string{"", "false", "true"} {
// `non_recursive` = "" means no query param, which should default to false
Expand Down Expand Up @@ -358,13 +292,67 @@ func (w *workflow) Run(t *testing.T, ctx context.Context) {
backendClient.StartWorkItemListener(taskhubCtx, r)
defer cancelTaskhub()

// Wait for wfEngine to be ready
time.Sleep(5 * time.Second)

id := api.InstanceID(w.startWorkflow(ctx, t, "root", "Dapr"))
metadata, err := backendClient.WaitForOrchestrationCompletion(ctx, id, api.WithFetchPayloads(true))
require.NoError(t, err)
assert.True(t, metadata.IsComplete())
assert.Equal(t, `"Hello, Dapr!"`, metadata.SerializedOutput)
})
}

func (w *workflow) startWorkflow(ctx context.Context, t *testing.T, name string, input string) string {
// use http client to start the workflow
reqURL := fmt.Sprintf("http://localhost:%d/v1.0-beta1/workflows/dapr/%s/start", w.daprd.HTTPPort(), name)
data, err := json.Marshal(input)
require.NoError(t, err)
reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, reqURL, strings.NewReader(string(data)))
req.Header.Set("Content-Type", "application/json")
require.NoError(t, err)
resp, err := w.httpClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusAccepted, resp.StatusCode)
var response struct {
InstanceID string `json:"instanceID"`
}
err = json.NewDecoder(resp.Body).Decode(&response)
require.NoError(t, err)

return response.InstanceID
}

// terminate workflow
func (w *workflow) terminateWorkflow(ctx context.Context, t *testing.T, instanceID string, nonRecursive string) {
// use http client to terminate the workflow
reqURL := fmt.Sprintf("http://localhost:%d/v1.0-beta1/workflows/dapr/%s/terminate", w.daprd.HTTPPort(), instanceID)
if nonRecursive != "" {
reqURL += "?non_recursive=" + nonRecursive
}
reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, reqURL, nil)
require.NoError(t, err)
resp, err := w.httpClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusAccepted, resp.StatusCode)
}

// purge workflow
func (w *workflow) purgeWorkflow(ctx context.Context, t *testing.T, instanceID string, nonRecursive string) {
// use http client to purge the workflow
reqURL := fmt.Sprintf("http://localhost:%d/v1.0-beta1/workflows/dapr/%s/purge", w.daprd.HTTPPort(), instanceID)
if nonRecursive != "" {
reqURL += "?non_recursive=" + nonRecursive
}
reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, reqURL, nil)
require.NoError(t, err)
resp, err := w.httpClient.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
require.Equal(t, http.StatusAccepted, resp.StatusCode)
}

0 comments on commit c899877

Please sign in to comment.