Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ lint: lint.check ## Run golangci-lint

##@ Run
.PHONY: reviewable
reviewable: swag # Ensure a PR is ready for review.
reviewable: lint test-integration # Ensure a PR is ready for review.
@go mod tidy

.PHONY: check-diff
check-diff: reviewable # Ensure branch is clean.
check-diff: swag # Ensure branch is clean.
@test -z "$$(git status --porcelain)" || (echo "$$(git status --porcelain)" && $(FAIL))
@$(OK) branch is clean

Expand Down
24 changes: 2 additions & 22 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package cmd
import (
"context"
"log"
"time"

"github.com/compliance-framework/api/internal/api"
"github.com/compliance-framework/api/internal/api/handler"
Expand All @@ -14,7 +13,6 @@ import (
"github.com/compliance-framework/api/internal/service/digest"
"github.com/compliance-framework/api/internal/service/email"
"github.com/compliance-framework/api/internal/service/relational/workflows"
"github.com/compliance-framework/api/internal/service/scheduler"
"github.com/compliance-framework/api/internal/service/worker"
"github.com/compliance-framework/api/internal/workflow"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -86,11 +84,6 @@ func RunServer(cmd *cobra.Command, args []string) {
sugar.Fatalw("Failed to start worker service", "error", err)
}

// Initialize scheduler for other jobs (if any)
// Note: Digest scheduling is now handled by River's periodic jobs
sched := scheduler.NewCronScheduler(sugar)
sched.Start()

// Initialize workflow manager
workflowExecService := workflows.NewWorkflowExecutionService(db)
workflowInstService := workflows.NewWorkflowInstanceService(db)
Expand All @@ -101,11 +94,12 @@ func RunServer(cmd *cobra.Command, args []string) {
workflowInstService,
stepExecService,
sugar,
workerService,
)

metrics := api.NewMetricsHandler(ctx, sugar)
server := api.NewServer(ctx, sugar, cfg, metrics)
handler.RegisterHandlers(server, sugar, db, cfg, digestService, sched, workflowManager)
handler.RegisterHandlers(server, sugar, db, cfg, digestService, workflowManager, workerService, workerService.GetDAGExecutor())
oscal.RegisterHandlers(server, sugar, db, cfg)
auth.RegisterHandlers(server, sugar, db, cfg, metrics, emailService, workerService)

Expand All @@ -121,23 +115,9 @@ func RunServer(cmd *cobra.Command, args []string) {
sugar.Fatalw("Failed to start server", "error", err)
}

// Note: Defer statements are registered in reverse order of execution.
// This ensures proper shutdown order: scheduler -> worker service
defer func() {
// Stop worker service last (after scheduler has stopped)
if err := workerService.Stop(ctx); err != nil {
sugar.Errorw("Failed to stop worker service", "error", err)
}
}()

defer func() {
// Stop scheduler first
stopCtx := sched.Stop()
select {
case <-stopCtx.Done():
sugar.Debug("All scheduled jobs completed gracefully")
case <-time.After(10 * time.Second):
sugar.Warn("Scheduler shutdown timeout, some jobs may not have completed")
}
}()
}
48 changes: 26 additions & 22 deletions internal/api/handler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import (
"github.com/compliance-framework/api/internal/config"
"github.com/compliance-framework/api/internal/service/digest"
workflowsvc "github.com/compliance-framework/api/internal/service/relational/workflows"
"github.com/compliance-framework/api/internal/service/scheduler"
"github.com/compliance-framework/api/internal/workflow"
"github.com/labstack/echo/v4"
"go.uber.org/zap"
"gorm.io/gorm"
)

func RegisterHandlers(server *api.Server, logger *zap.SugaredLogger, db *gorm.DB, config *config.Config, digestService *digest.Service, sched scheduler.Scheduler, workflowManager *workflow.Manager) {
func RegisterHandlers(server *api.Server, logger *zap.SugaredLogger, db *gorm.DB, config *config.Config, digestService *digest.Service, workflowManager *workflow.Manager, notificationEnqueuer workflow.NotificationEnqueuer, dagExecutor *workflow.DAGExecutor) {
healthHandler := NewHealthHandler(logger, db)
healthHandler.Register(server.API().Group("/health"))

Expand All @@ -41,20 +40,20 @@ func RegisterHandlers(server *api.Server, logger *zap.SugaredLogger, db *gorm.DB
userHandler.RegisterSelfRoutes(userGroup)

// Digest handler (admin only)
if digestService != nil && sched != nil {
digestHandler := NewDigestHandler(digestService, sched, logger)
if digestService != nil {
digestHandler := NewDigestHandler(digestService, logger)
digestGroup := server.API().Group("/admin/digest")
digestGroup.Use(middleware.JWTMiddleware(config.JWTPublicKey))
digestGroup.Use(middleware.RequireAdminGroups(db, config, logger))
digestHandler.Register(digestGroup)
}

// Register workflow handlers
registerWorkflowHandlers(server, logger, db, config, workflowManager)
registerWorkflowHandlers(server, logger, db, config, workflowManager, notificationEnqueuer, dagExecutor)
}

// registerWorkflowHandlers registers all workflow-related HTTP handlers with authentication
func registerWorkflowHandlers(server *api.Server, logger *zap.SugaredLogger, db *gorm.DB, config *config.Config, workflowManager *workflow.Manager) {
func registerWorkflowHandlers(server *api.Server, logger *zap.SugaredLogger, db *gorm.DB, config *config.Config, workflowManager *workflow.Manager, notificationEnqueuer workflow.NotificationEnqueuer, dagExecutor *workflow.DAGExecutor) {
// Create workflow group with authentication middleware
workflowGroup := server.API().Group("/workflows")
workflowGroup.Use(middleware.JWTMiddleware(config.JWTPublicKey))
Expand All @@ -77,27 +76,28 @@ func registerWorkflowHandlers(server *api.Server, logger *zap.SugaredLogger, db

// Handlers that require workflow manager
if workflowManager != nil {
registerWorkflowExecutionHandlers(workflowGroup, logger, db, workflowManager)
registerWorkflowExecutionHandlers(workflowGroup, logger, db, workflowManager, notificationEnqueuer, dagExecutor)
}
}

// registerWorkflowExecutionHandlers registers execution-related handlers that require the workflow manager
func registerWorkflowExecutionHandlers(workflowGroup *echo.Group, logger *zap.SugaredLogger, db *gorm.DB, workflowManager *workflow.Manager) {
func registerWorkflowExecutionHandlers(workflowGroup *echo.Group, logger *zap.SugaredLogger, db *gorm.DB, workflowManager *workflow.Manager, notificationEnqueuer workflow.NotificationEnqueuer, dagExecutor *workflow.DAGExecutor) {
roleAssignmentService := workflowsvc.NewRoleAssignmentService(db)
assignmentService := workflow.NewAssignmentService(roleAssignmentService, db)
stepExecService := workflowsvc.NewStepExecutionService(db, nil)
assignmentService := workflow.NewAssignmentService(roleAssignmentService, stepExecService, db, logger, notificationEnqueuer)

// Workflow execution handler
workflowExecutionHandler := workflows.NewWorkflowExecutionHandler(logger, db, workflowManager, assignmentService)
workflowExecutionHandler.Register(workflowGroup.Group("/executions"))

// Step execution handler with transition service
transitionService := createStepTransitionService(db, logger)
transitionService := createStepTransitionService(db, logger, notificationEnqueuer, dagExecutor)
stepExecutionHandler := workflows.NewStepExecutionHandler(logger, db, transitionService, assignmentService)
stepExecutionHandler.Register(workflowGroup.Group("/step-executions"))
}

// createStepTransitionService creates and configures the step transition service with all dependencies
func createStepTransitionService(db *gorm.DB, logger *zap.SugaredLogger) *workflow.StepTransitionService {
func createStepTransitionService(db *gorm.DB, logger *zap.SugaredLogger, notificationEnqueuer workflow.NotificationEnqueuer, executor *workflow.DAGExecutor) *workflow.StepTransitionService {
// Create services needed for step transition
stepExecService := workflowsvc.NewStepExecutionService(db, nil)
stepDefService := workflowsvc.NewWorkflowStepDefinitionService(db)
Expand All @@ -107,17 +107,7 @@ func createStepTransitionService(db *gorm.DB, logger *zap.SugaredLogger) *workfl
roleAssignmentService := workflowsvc.NewRoleAssignmentService(db)

// Create assignment service
assignmentService := workflow.NewAssignmentService(roleAssignmentService, db)

// Create executor for step transition coordination
stdLogger := log.Default()
executor := workflow.NewDAGExecutor(
stepExecService,
workflowExecService,
stepDefService,
assignmentService,
stdLogger,
)
assignmentService := workflow.NewAssignmentService(roleAssignmentService, stepExecService, db, logger, notificationEnqueuer)

// Create evidence integration for step evidence storage
evidenceIntegration := workflow.NewEvidenceIntegration(db, logger)
Expand All @@ -126,6 +116,20 @@ func createStepTransitionService(db *gorm.DB, logger *zap.SugaredLogger) *workfl
stepExecService.SetEvidenceCreator(evidenceIntegration)
workflowExecService.SetEvidenceCreator(evidenceIntegration)

// Use the shared executor from the worker service when available so that there is exactly
// one DAGExecutor instance (consistent logger, notifications, and evidence integration).
// Fall back to constructing a local executor when the worker is disabled (executor == nil).
if executor == nil {
executor = workflow.NewDAGExecutor(
stepExecService,
workflowExecService,
stepDefService,
assignmentService,
log.Default(),
notificationEnqueuer,
)
}

// Create and return step transition service
return workflow.NewStepTransitionService(
stepExecService,
Expand Down
12 changes: 4 additions & 8 deletions internal/api/handler/digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,20 @@ import (

"github.com/compliance-framework/api/internal/api"
"github.com/compliance-framework/api/internal/service/digest"
"github.com/compliance-framework/api/internal/service/scheduler"
"github.com/labstack/echo/v4"
"go.uber.org/zap"
)

// DigestHandler handles digest-related API endpoints
type DigestHandler struct {
digestService *digest.Service
scheduler scheduler.Scheduler
logger *zap.SugaredLogger
}

// NewDigestHandler creates a new digest handler
func NewDigestHandler(digestService *digest.Service, sched scheduler.Scheduler, logger *zap.SugaredLogger) *DigestHandler {
func NewDigestHandler(digestService *digest.Service, logger *zap.SugaredLogger) *DigestHandler {
return &DigestHandler{
digestService: digestService,
scheduler: sched,
logger: logger,
}
}
Expand Down Expand Up @@ -50,12 +47,11 @@ func (h *DigestHandler) TriggerDigest(ctx echo.Context) error {
if jobName == "" {
jobName = "global-evidence-digest"
}

if h.scheduler == nil {
return ctx.JSON(http.StatusInternalServerError, api.NewError(fmt.Errorf("scheduler is not available")))
if jobName != "global-evidence-digest" {
return ctx.JSON(http.StatusBadRequest, api.NewError(fmt.Errorf("unsupported digest job: %s", jobName)))
}

if err := h.scheduler.RunNow(ctx.Request().Context(), jobName); err != nil {
if err := h.digestService.SendGlobalDigest(ctx.Request().Context()); err != nil {
h.logger.Errorw("Failed to trigger digest job", "job", jobName, "error", err)
return ctx.JSON(http.StatusInternalServerError, api.NewError(err))
}
Expand Down
102 changes: 5 additions & 97 deletions internal/api/handler/digest_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ package handler
import (
"context"
"encoding/json"
"fmt"
"net/http/httptest"
"testing"

"github.com/compliance-framework/api/internal/api"
"github.com/compliance-framework/api/internal/service/digest"
"github.com/compliance-framework/api/internal/service/email"
"github.com/compliance-framework/api/internal/service/scheduler"
"github.com/compliance-framework/api/internal/tests"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
Expand All @@ -27,56 +25,9 @@ type DigestApiIntegrationSuite struct {
server *api.Server
logger *zap.SugaredLogger
digestHandler *DigestHandler
mockScheduler *MockScheduler
emailService *email.Service
}

// MockScheduler implements the scheduler.Service interface for testing
type MockScheduler struct {
jobs map[string]bool
}

func NewMockScheduler() *MockScheduler {
return &MockScheduler{
jobs: make(map[string]bool),
}
}

func (m *MockScheduler) Start() {
// Mock implementation
}

func (m *MockScheduler) Stop() context.Context {
// Mock implementation
return context.Background()
}

func (m *MockScheduler) Schedule(schedule scheduler.Schedule, job scheduler.Job) error {
m.jobs[job.Name()] = true
return nil
}

func (m *MockScheduler) ScheduleCron(cronExpr string, job scheduler.Job) error {
m.jobs[job.Name()] = true
return nil
}

func (m *MockScheduler) RunNow(ctx context.Context, name string) error {
if _, exists := m.jobs[name]; !exists {
return fmt.Errorf("job %q not found", name)
}
// Mock job execution
return nil
}

func (m *MockScheduler) ListJobs() []string {
var jobs []string
for name := range m.jobs {
jobs = append(jobs, name)
}
return jobs
}

func (suite *DigestApiIntegrationSuite) SetupSuite() {
suite.IntegrationTestSuite.SetupSuite()

Expand All @@ -88,27 +39,22 @@ func (suite *DigestApiIntegrationSuite) SetupSuite() {
suite.Require().NoError(err, "Failed to create email service")
suite.emailService = emailService

// Create mock scheduler
suite.mockScheduler = NewMockScheduler()

// Create digest handler
digestService := digest.NewService(suite.DB, suite.emailService, nil, suite.Config, suite.logger)
suite.digestHandler = NewDigestHandler(digestService, suite.mockScheduler, suite.logger)
suite.digestHandler = NewDigestHandler(digestService, suite.logger)

// Setup server
metrics := api.NewMetricsHandler(context.Background(), logger.Sugar())
suite.server = api.NewServer(context.Background(), logger.Sugar(), suite.Config, metrics)

// Register handlers
RegisterHandlers(suite.server, suite.logger, suite.DB, suite.Config, digestService, suite.mockScheduler, nil)
RegisterHandlers(suite.server, suite.logger, suite.DB, suite.Config, digestService, nil, nil, nil)
}

func (suite *DigestApiIntegrationSuite) SetupTest() {
err := suite.Migrator.Refresh()
suite.Require().NoError(err)

// Pre-register the default job in the mock scheduler
suite.mockScheduler.jobs["global-evidence-digest"] = true
}

func (suite *DigestApiIntegrationSuite) TestTriggerDigest() {
Expand All @@ -132,22 +78,16 @@ func (suite *DigestApiIntegrationSuite) TestTriggerDigest() {
})

suite.Run("TriggerDigestWithCustomJob", func() {
// Pre-register the custom job
suite.mockScheduler.jobs["custom-job"] = true

rec := httptest.NewRecorder()
req := httptest.NewRequest("POST", "/api/admin/digest/trigger?job=custom-job", nil)
req.Header.Set("Authorization", "Bearer "+*token)

suite.server.E().ServeHTTP(rec, req)
suite.Equal(200, rec.Code, "Expected OK response for TriggerDigest with custom job")
suite.Equal(400, rec.Code, "Expected Bad Request response for unsupported custom digest job")

var response map[string]string
var response api.Error
err = json.Unmarshal(rec.Body.Bytes(), &response)
suite.Require().NoError(err, "Failed to unmarshal TriggerDigest response")

suite.Equal("Digest job triggered successfully", response["message"])
suite.Equal("custom-job", response["job"])
suite.Require().NoError(err, "Failed to unmarshal TriggerDigest error response")
})

suite.Run("TriggerDigestUnauthorized", func() {
Expand Down Expand Up @@ -190,35 +130,3 @@ func (suite *DigestApiIntegrationSuite) TestPreviewDigest() {
suite.Equal(401, rec.Code, "Expected Unauthorized response for missing token")
})
}

func (suite *DigestApiIntegrationSuite) TestTriggerDigestWithNilScheduler() {
// Test with nil scheduler to verify error handling
token, err := suite.GetAuthToken()
suite.Require().NoError(err)

// Create handler with nil scheduler
digestService := digest.NewService(suite.DB, suite.emailService, nil, suite.Config, suite.logger)
nilSchedulerHandler := NewDigestHandler(digestService, nil, suite.logger)

// Create a temporary echo context for testing
e := suite.server.E()
req := httptest.NewRequest("POST", "/api/admin/digest/trigger", nil)
req.Header.Set("Authorization", "Bearer "+*token)
rec := httptest.NewRecorder()
c := e.NewContext(req, rec)

err = nilSchedulerHandler.TriggerDigest(c)
suite.NoError(err, "Expected no error from TriggerDigest with nil scheduler")
suite.Equal(500, rec.Code, "Expected Internal Server Error when scheduler is nil")

var response api.Error
err = json.Unmarshal(rec.Body.Bytes(), &response)
suite.Require().NoError(err, "Failed to unmarshal error response")

// Check if the error contains our expected message
for _, errMsg := range response.Errors {
if msgStr, ok := errMsg.(string); ok {
suite.Contains(msgStr, "scheduler is not available")
}
}
}
Loading