Skip to content

Conversation

@laouji
Copy link
Contributor

@laouji laouji commented Aug 29, 2025

We don't want unbounded goroutines generated by this flush command, so this moves the concurrency control to the iteration command which can limit them to the number of pages fetched

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Aug 29, 2025

Walkthrough

Temporal test helpers now accept a maximum page size (25) and iterate workflows concurrently per page. The api_connectors test call is updated to pass the new page-size argument; workflow-id matching logic was simplified to rely on connectorID.String(). flushRemainingWorkflows collects termination errors and ignores NotFound.

Changes

Cohort / File(s) Summary of Changes
Temporal iteration concurrency and paging
test/e2e/suite_test.go
Added errors and serviceerror imports. Updated iterateThroughTemporalWorkflowExecutions signature to accept maxPageSize int32, set MaximumPageSize on requests, and process executions concurrently via goroutines, WaitGroup, and a results channel; pagination stops when any callback returns true or pages exhausted. flushRemainingWorkflows uses page size 25, terminates workflows, collects errors, and ignores NotFound.
Call site and matching logic update
test/e2e/api_connectors_test.go
Updated call to iterateThroughTemporalWorkflowExecutions(..., 25, callback). Simplified callback condition to use only strings.Contains(info.Execution.WorkflowId, connectorID.String()) (removed Reference.String() check). Assignment of workflowID/runID and subsequent waiting logic remain unchanged.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant TS as Test Suite
  participant Iter as iterateThroughTemporalWorkflowExecutions
  participant TC as Temporal Client
  participant WF as Workflows (per page)
  participant CB as Callback goroutines
  participant Ch as Results Channel

  TS->>Iter: iterateThroughTemporalWorkflowExecutions(ctx, cl, 25, cb)
  loop Pages until NextPageToken == nil or shouldStop
    Iter->>TC: ListOpenWorkflowExecutions(MaximumPageSize=25, NextPageToken)
    TC-->>Iter: Executions + NextPageToken
    par For each execution
      Iter->>CB: spawn callback in goroutine
      CB-->>Ch: bool (shouldStop for this item)
    and continue...
    end
    Iter->>Iter: wait for all goroutines, drain Ch
    alt Any result == true
      Iter-->>TS: return (shouldStop)
    else No result true
      Note right of Iter: Continue to next page if token present
    end
  end
  Iter-->>TS: done
Loading
sequenceDiagram
  autonumber
  participant TS as Test Suite
  participant Flush as flushRemainingWorkflows
  participant Iter as iterateThroughTemporalWorkflowExecutions
  participant Term as Terminate Workflow

  TS->>Flush: flushRemainingWorkflows(ctx, cl)
  Flush->>Iter: iterate(..., maxPageSize=25, callback=terminate)
  loop Each workflow (concurrent inside iterate)
    Iter->>Term: Terminate(workflow)
    Term-->>Flush: error or nil
  end
  Flush->>Flush: collect errors, ignore serviceerror.NotFound
  alt Any non-NotFound error
    Flush->>TS: assert failure
  else No actionable errors
    Flush-->>TS: success
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45–60 minutes

Suggested reviewers

  • paul-nicolas
  • fguery

Poem

I hop through pages, twenty-five at most,
Spawning tiny hares—each a worker host.
They nibble callbacks, signal when to stop,
Ignore the ghosts NotFound along the hop.
With whiskered rigor, tests now neatly flow. 🥕🐇

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch test/improve-e2e-error-handling

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore or @coderabbit ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@laouji laouji marked this pull request as ready for review August 29, 2025 13:38
@laouji laouji requested a review from a team as a code owner August 29, 2025 13:38
@codecov
Copy link

codecov bot commented Aug 29, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 66.72%. Comparing base (834aa7a) to head (f1e3c7f).
⚠️ Report is 2 commits behind head on main.

Additional details and impacted files
@@           Coverage Diff           @@
##             main     #525   +/-   ##
=======================================
  Coverage   66.71%   66.72%           
=======================================
  Files         761      761           
  Lines       39749    39749           
=======================================
+ Hits        26518    26521    +3     
+ Misses      11844    11842    -2     
+ Partials     1387     1386    -1     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
test/e2e/suite_test.go (1)

191-205: Early-stop signal isn’t propagated beyond the current page.

Breaking the results loop doesn’t stop outer pagination; function should return when any callback returns true.

Apply:

- // wait for this batch of goroutines to finish before allowing the loop to continue
+ // wait for this batch of goroutines to finish before allowing the loop to continue
  wg.Wait()
  close(ch)

- for shouldStop := range ch {
-   if shouldStop {
-     break
-   }
- }
+ stopAll := false
+ for shouldStop := range ch {
+   if shouldStop {
+     stopAll = true
+     break
+   }
+ }
+ if stopAll {
+   return
+ }

  if len(workflowRes.NextPageToken) == 0 {
    break
  }
🧹 Nitpick comments (2)
test/e2e/suite_test.go (1)

140-144: Avoid magic number 25 for page size.

Define a shared const and reuse here and at the call site to keep them in sync.

Example:

const temporalPageSize = 25
test/e2e/api_connectors_test.go (1)

456-456: Use shared page-size const instead of literal 25.

Keeps this call aligned with suite_test.go.

- iterateThroughTemporalWorkflowExecutions(ctx, cl, 25, func(info *v17.WorkflowExecutionInfo) bool {
+ iterateThroughTemporalWorkflowExecutions(ctx, cl, temporalPageSize, func(info *v17.WorkflowExecutionInfo) bool {
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 834aa7a and a9fa05c.

📒 Files selected for processing (2)
  • test/e2e/api_connectors_test.go (1 hunks)
  • test/e2e/suite_test.go (3 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: Tests
🔇 Additional comments (1)
test/e2e/suite_test.go (1)

8-8: Imports look good.

Needed for new error handling paths.

Also applies to: 21-21

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
test/e2e/suite_test.go (2)

145-152: Good: error aggregation avoids the deadlock risk.

Switching from a buffered channel to a mutex-protected slice fixes the “drain-after-iteration” deadlock the earlier code risked.


184-192: Fix compile error (chan capacity type) and loop variable capture.

  • make(chan …, maxPageSize) fails: capacity must be int, not int32.
  • Goroutine captures the loop variable info; pass it as a parameter.
-		ch := make(chan bool, maxPageSize)
-		wg := &sync.WaitGroup{}
+		pageSize := int(maxPageSize)
+		ch := make(chan bool, pageSize)
+		var wg sync.WaitGroup
 		for _, info := range workflowRes.Executions {
-			wg.Add(1)
-			go func() {
-				defer wg.Done()
-				ch <- callbackFn(info)
-			}()
+			wg.Add(1)
+			go func(info *v17.WorkflowExecutionInfo) {
+				defer wg.Done()
+				ch <- callbackFn(info)
+			}(info)
 		}
🧹 Nitpick comments (3)
test/e2e/suite_test.go (3)

140-140: Prefer a const for page size.

Use a const to document intent and avoid accidental mutation.

-	maxPageSize := 25
+	const maxPageSize = 25

142-144: Minor: avoid pointer mutex and preallocate errs capacity.

Slightly simpler and avoids extra allocs.

-	mu := &sync.Mutex{}
-	errs := make([]error, 0)
+	var mu sync.Mutex
+	errs := make([]error, 0, maxPageSize)

169-170: Type consistency for page size.

maxPageSize is int32 for the RPC (good), but channel capacities are int. Cast where used for channels (see next comment) or consider taking an int here and casting to int32 only for the request.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between a9fa05c and 97ae275.

📒 Files selected for processing (1)
  • test/e2e/suite_test.go (3 hunks)
🔇 Additional comments (3)
test/e2e/suite_test.go (3)

8-8: Imports look correct and necessary.

Both errors and serviceerror are used below.

Also applies to: 21-21


155-162: LGTM: selective NotFound handling.

Treating NotFound as non-fatal and asserting others keeps the flush robust.


177-180: LGTM: request construction.

Namespace, paging token, and MaximumPageSize usage are correct.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
test/e2e/suite_test.go (1)

155-162: Filter NotFound at source to reduce locking and allocations.

Skip appending NotFound inside the callback to avoid taking the mutex and growing errs unnecessarily.

 iterateThroughTemporalWorkflowExecutions(ctx, cl, int32(maxPageSize), func(info *v17.WorkflowExecutionInfo) bool {
-    err := cl.TerminateWorkflow(ctx, info.Execution.WorkflowId, info.Execution.RunId, "system flush")
-    if err != nil {
-        mu.Lock()
-        errs = append(errs, err)
-        mu.Unlock()
-    }
+    if err := cl.TerminateWorkflow(ctx, info.Execution.WorkflowId, info.Execution.RunId, "system flush"); err != nil {
+        var notFoundErr *serviceerror.NotFound
+        if errors.As(err, &notFoundErr) {
+            return false
+        }
+        mu.Lock()
+        errs = append(errs, err)
+        mu.Unlock()
+    }
     return false
 })
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between cb7f20d and 8d2e090.

📒 Files selected for processing (1)
  • test/e2e/suite_test.go (3 hunks)
🔇 Additional comments (6)
test/e2e/suite_test.go (6)

8-8: Imports LGTM.

Brings in errors and Temporal serviceerror used below; consistent.

Also applies to: 21-21


140-151: Good: eliminates the earlier error-channel deadlock and bounds termination concurrency.

Mutex-protected slice avoids backpressure during pagination; concurrency now limited to page size.


175-181: Request construction looks correct.

MaximumPageSize uses int32 as required by Temporal; pagination token handled.


199-203: Good: labeled break cleanly stops pagination across pages.

No leak risk since channel is closed after wg.Wait; senders are done.


205-206: LGTM.

Correctly terminates when there’s no next page.


169-171: iterateThroughTemporalWorkflowExecutions call sites updated Verified all usages now include the new maxPageSize argument.

@laouji laouji added this pull request to the merge queue Aug 29, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 8d2e090 and f1e3c7f.

📒 Files selected for processing (1)
  • test/e2e/api_connectors_test.go (1 hunks)
🔇 Additional comments (1)
test/e2e/api_connectors_test.go (1)

457-463: Confirm ConnectorID.String() covers legacy IDs before narrowing match
Ensure all child workflows—install (“run-tasks-…”), uninstall, and reset—are generated using ConnectorID.String() (and that this returns the same legacy Reference value) so the strings.Contains(..., connectorID.String()) check won’t miss any existing workflows.

if (strings.Contains(info.Execution.WorkflowId, connectorID.Reference.String()) ||
strings.Contains(info.Execution.WorkflowId, connectorID.String())) &&
strings.HasPrefix(info.Execution.WorkflowId, searchKeyword) {
iterateThroughTemporalWorkflowExecutions(ctx, cl, 25, func(info *v17.WorkflowExecutionInfo) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix arg type: pass int32 to iterator to avoid compile error

iterateThroughTemporalWorkflowExecutions now takes maxPageSize int32. Passing 25 (untyped int) won’t compile.

Apply:

-	iterateThroughTemporalWorkflowExecutions(ctx, cl, 25, func(info *v17.WorkflowExecutionInfo) bool {
+	iterateThroughTemporalWorkflowExecutions(ctx, cl, int32(25), func(info *v17.WorkflowExecutionInfo) bool {

Optionally declare a typed const near this function and reuse it:

const temporalMaxPageSize int32 = 25
🤖 Prompt for AI Agents
In test/e2e/api_connectors_test.go at line ~456, the call
iterateThroughTemporalWorkflowExecutions(ctx, cl, 25, ...) fails to compile
because the function now expects maxPageSize int32; change the literal to an
int32 by either replacing 25 with int32(25) or (preferably) declare a typed
constant like temporalMaxPageSize int32 = 25 and pass that constant to the
function so the argument type matches.

@github-merge-queue github-merge-queue bot removed this pull request from the merge queue due to failed status checks Aug 29, 2025
@laouji laouji added this pull request to the merge queue Aug 29, 2025
Merged via the queue into main with commit aee0563 Aug 29, 2025
8 checks passed
@laouji laouji deleted the test/improve-e2e-error-handling branch August 29, 2025 15:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants