Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions pkg/parser/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ var mcpConfigSchema string

// ValidateMainWorkflowFrontmatterWithSchema validates main workflow frontmatter using JSON schema
func ValidateMainWorkflowFrontmatterWithSchema(frontmatter map[string]any) error {
return validateWithSchema(frontmatter, mainWorkflowSchema, "main workflow file")
// Remove internal-only fields from validation
cleanedFrontmatter := removeInternalFields(frontmatter)
return validateWithSchema(cleanedFrontmatter, mainWorkflowSchema, "main workflow file")
}

// ValidateMainWorkflowFrontmatterWithSchemaAndLocation validates main workflow frontmatter with file location info
func ValidateMainWorkflowFrontmatterWithSchemaAndLocation(frontmatter map[string]any, filePath string) error {
return validateWithSchemaAndLocation(frontmatter, mainWorkflowSchema, "main workflow file", filePath)
// Remove internal-only fields from validation
cleanedFrontmatter := removeInternalFields(frontmatter)
return validateWithSchemaAndLocation(cleanedFrontmatter, mainWorkflowSchema, "main workflow file", filePath)
}

// ValidateIncludedFileFrontmatterWithSchema validates included file frontmatter using JSON schema
Expand Down Expand Up @@ -203,3 +207,21 @@ func min(a, b int) int {
}
return b
}

// removeInternalFields creates a copy of the frontmatter without internal-only fields
// that should not be validated against the public schema
func removeInternalFields(frontmatter map[string]any) map[string]any {
if frontmatter == nil {
return nil
}

// Create a copy of the frontmatter
cleaned := make(map[string]any)
for key, value := range frontmatter {
// Currently no internal-only fields need to be filtered
// This function is kept for future extensibility
cleaned[key] = value
}

return cleaned
}
8 changes: 4 additions & 4 deletions pkg/workflow/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ func (c *Compiler) parseWorkflowFile(markdownPath string) (*WorkflowData, error)
}

// Apply defaults
c.applyDefaults(workflowData, markdownPath)
c.applyDefaults(workflowData, markdownPath, result.Frontmatter)

// Apply pull request draft filter if specified
c.applyPullRequestDraftFilter(workflowData, result.Frontmatter)
Expand Down Expand Up @@ -881,7 +881,7 @@ func (c *Compiler) extractAliasName(frontmatter map[string]any) string {
}

// applyDefaults applies default values for missing workflow sections
func (c *Compiler) applyDefaults(data *WorkflowData, markdownPath string) {
func (c *Compiler) applyDefaults(data *WorkflowData, markdownPath string, frontmatter map[string]any) {
// Check if this is an alias trigger workflow (by checking if user specified "on.alias")
isAliasTrigger := false
if data.On == "" {
Expand Down Expand Up @@ -988,8 +988,8 @@ func (c *Compiler) applyDefaults(data *WorkflowData, markdownPath string) {
models: read`
}

// Generate concurrency configuration using the dedicated concurrency module
data.Concurrency = GenerateConcurrencyConfig(data, isAliasTrigger)
// Generate concurrency configuration using the new policy system
data.Concurrency = GenerateConcurrencyConfigWithFrontmatter(data, isAliasTrigger, frontmatter, c.verbose)

if data.RunName == "" {
data.RunName = fmt.Sprintf(`run-name: "%s"`, data.Name)
Expand Down
39 changes: 37 additions & 2 deletions pkg/workflow/concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,48 @@ import (
)

// GenerateConcurrencyConfig generates the concurrency configuration for a workflow
// based on its trigger types and characteristics.
// based on its trigger types and characteristics. Now supports advanced policy computation.
func GenerateConcurrencyConfig(workflowData *WorkflowData, isAliasTrigger bool) string {
// Don't override if already set
// Don't override if already set by user
if workflowData.Concurrency != "" {
return workflowData.Concurrency
}

// Try to use the new policy system first
return generateConcurrencyWithPolicySystem(workflowData, isAliasTrigger)
}

// GenerateConcurrencyConfigWithFrontmatter generates concurrency config using the policy system
// This function maintains the same interface but no longer parses frontmatter for policies
func GenerateConcurrencyConfigWithFrontmatter(workflowData *WorkflowData, isAliasTrigger bool, frontmatter map[string]any, verbose bool) string {
// Don't override if already set by user
if workflowData.Concurrency != "" {
return workflowData.Concurrency
}

// Use the policy system with code-based rules only
return generateConcurrencyWithPolicySystem(workflowData, isAliasTrigger)
}

// generateConcurrencyWithPolicySystem uses the policy system with code-based rules only
func generateConcurrencyWithPolicySystem(workflowData *WorkflowData, isAliasTrigger bool) string {
// Compute policy using code-based rules only
computed, err := computeConcurrencyPolicy(workflowData, isAliasTrigger)
if err != nil {
// Fall back to legacy behavior if policy system fails
return generateLegacyConcurrency(workflowData, isAliasTrigger)
}

yaml := generateConcurrencyYAML(computed)
if yaml == "" {
return generateLegacyConcurrency(workflowData, isAliasTrigger)
}

return yaml
}

// generateLegacyConcurrency provides the original concurrency generation logic as fallback
func generateLegacyConcurrency(workflowData *WorkflowData, isAliasTrigger bool) string {
// Generate concurrency configuration based on workflow type
// Note: Check alias trigger first since alias workflows also contain pull_request events
if isAliasTrigger {
Expand Down
200 changes: 200 additions & 0 deletions pkg/workflow/concurrency_policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package workflow

import (
"fmt"
"strings"
)

// ConcurrencyPolicy represents a single concurrency policy definition
type ConcurrencyPolicy struct {
Group string `json:"group" yaml:"group"`
Node string `json:"node" yaml:"node"`
CancelInProgress *bool `json:"cancel-in-progress,omitempty" yaml:"cancel-in-progress,omitempty"`
}

// ConcurrencyPolicySet represents a set of policies for different contexts
type ConcurrencyPolicySet struct {
Default *ConcurrencyPolicy `json:"*,omitempty" yaml:"*,omitempty"`
Issues *ConcurrencyPolicy `json:"issues,omitempty" yaml:"issues,omitempty"`
PullRequest *ConcurrencyPolicy `json:"pull_requests,omitempty" yaml:"pull_requests,omitempty"`
Schedule *ConcurrencyPolicy `json:"schedule,omitempty" yaml:"schedule,omitempty"`
Manual *ConcurrencyPolicy `json:"workflow_dispatch,omitempty" yaml:"workflow_dispatch,omitempty"`
Custom map[string]*ConcurrencyPolicy `json:"-" yaml:"-"` // for any other trigger types
}

// ComputedConcurrencyPolicy represents the final computed concurrency configuration
type ComputedConcurrencyPolicy struct {
Group string
CancelInProgress *bool
}

// computeConcurrencyPolicy computes the final concurrency configuration based on workflow characteristics
func computeConcurrencyPolicy(workflowData *WorkflowData, isAliasTrigger bool) (*ComputedConcurrencyPolicy, error) {
// Get default policies based on workflow characteristics
policySet := getDefaultPolicySet(isAliasTrigger)

// Determine which specific policy to use based on workflow triggers
selectedPolicy := selectPolicyForWorkflow(workflowData, isAliasTrigger, policySet)

// Compute the final concurrency configuration
computed := &ComputedConcurrencyPolicy{}

if selectedPolicy != nil {
// Build the group identifier
computed.Group = buildGroupIdentifier(selectedPolicy, workflowData)
computed.CancelInProgress = selectedPolicy.CancelInProgress
} else {
// Fallback to basic group
computed.Group = "gh-aw-${{ github.workflow }}"
}

return computed, nil
}

// getDefaultPolicySet returns the default policy set based on workflow characteristics
func getDefaultPolicySet(isAliasTrigger bool) *ConcurrencyPolicySet {
policySet := &ConcurrencyPolicySet{
Custom: make(map[string]*ConcurrencyPolicy),
}

// Default policy for all workflows
policySet.Default = &ConcurrencyPolicy{
Group: "workflow",
Node: "",
}

// Issues policy with cancel
cancelTrue := true
policySet.Issues = &ConcurrencyPolicy{
Group: "workflow",
Node: "issue.number || github.event.pull_request.number", // Support both issue and PR for alias
CancelInProgress: &cancelTrue,
}

// Pull request policy with cancel (use ref for backwards compatibility with existing tests)
policySet.PullRequest = &ConcurrencyPolicy{
Group: "workflow",
Node: "github.ref", // Use ref instead of pull_request.number for compatibility
CancelInProgress: &cancelTrue,
}

// For alias triggers, override to not use cancellation
if isAliasTrigger {
policySet.Issues.CancelInProgress = nil
policySet.PullRequest.CancelInProgress = nil
}

return policySet
}

// selectPolicyForWorkflow selects the most appropriate policy for the given workflow
func selectPolicyForWorkflow(workflowData *WorkflowData, isAliasTrigger bool, policySet *ConcurrencyPolicySet) *ConcurrencyPolicy {
if isAliasTrigger {
// For alias workflows, prefer issues policy if available
if policySet.Issues != nil {
return policySet.Issues
}
}

// Check if this is a pull request workflow
if isPullRequestWorkflow(workflowData.On) {
if policySet.PullRequest != nil {
return policySet.PullRequest
}
}

// Check for schedule workflows
if strings.Contains(workflowData.On, "schedule") {
if policySet.Schedule != nil {
return policySet.Schedule
}
}

// Check for manual workflows
if strings.Contains(workflowData.On, "workflow_dispatch") {
if policySet.Manual != nil {
return policySet.Manual
}
}

// Check for issues workflows
if strings.Contains(workflowData.On, "issues") {
if policySet.Issues != nil {
return policySet.Issues
}
}

// Check for custom trigger types in the policy custom map
for triggerType, customPolicy := range policySet.Custom {
if strings.Contains(workflowData.On, triggerType) {
return customPolicy
}
}

// Fall back to default policy
return policySet.Default
}

// buildGroupIdentifier constructs the final group identifier string
func buildGroupIdentifier(policy *ConcurrencyPolicy, workflowData *WorkflowData) string {
if policy == nil {
return "gh-aw-${{ github.workflow }}"
}

// Start with the base group
var parts []string

// Always include the gh-aw prefix
parts = append(parts, "gh-aw")

// Add the workflow identifier
if policy.Group == "workflow" {
parts = append(parts, "${{ github.workflow }}")
} else {
// Use custom group identifier
parts = append(parts, policy.Group)
}

// Add the node identifier if specified
if policy.Node != "" {
var nodeExpr string
switch policy.Node {
case "issue.number":
nodeExpr = "${{ github.event.issue.number }}"
case "pull_request.number":
nodeExpr = "${{ github.event.pull_request.number }}"
case "github.ref":
nodeExpr = "${{ github.ref }}"
case "issue.number || github.event.pull_request.number":
// Special case for alias workflows
nodeExpr = "${{ github.event.issue.number || github.event.pull_request.number }}"
default:
// Custom node expression
if strings.HasPrefix(policy.Node, "${{") {
nodeExpr = policy.Node
} else {
nodeExpr = fmt.Sprintf("${{ %s }}", policy.Node)
}
}
parts = append(parts, nodeExpr)
}

return strings.Join(parts, "-")
}

// generateConcurrencyYAML generates the final YAML for the concurrency section
func generateConcurrencyYAML(computed *ComputedConcurrencyPolicy) string {
if computed == nil {
return ""
}

var lines []string
lines = append(lines, "concurrency:")
lines = append(lines, fmt.Sprintf(" group: \"%s\"", computed.Group))

if computed.CancelInProgress != nil && *computed.CancelInProgress {
lines = append(lines, " cancel-in-progress: true")
}

return strings.Join(lines, "\n")
}
Loading