Skip to content

Commit

Permalink
Feature: add ready log line process condition
Browse files Browse the repository at this point in the history
  • Loading branch information
F1bonacc1 committed May 6, 2024
1 parent 3697dfc commit 03dc542
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 30 deletions.
13 changes: 13 additions & 0 deletions process-compose.override.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,19 @@ processes:
signal: 15
timeout_seconds: 4

log-line-ready:
command: "./test_loop.bash ready-log"
ready_log_line: "test loop 1"

dep-on-log-line-ready:
command: "echo log is ready"
availability:
restart: "on_failure"
backoff_seconds: 2
depends_on:
log-line-ready:
condition: process_log_ready

_process2:
command: "./test_loop.bash process2"
log_location: ./pc.proc2.{PC_REPLICA_NUM}.log
Expand Down
73 changes: 44 additions & 29 deletions src/app/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,35 +34,37 @@ const (

type Process struct {
sync.Mutex
globalEnv []string
confMtx sync.Mutex
procConf *types.ProcessConfig
procState *types.ProcessState
stateMtx sync.Mutex
procCond sync.Cond
procStartedCond sync.Cond
procStateChan chan string
procReadyCtx context.Context
readyCancelFn context.CancelFunc
procRunCtx context.Context
runCancelFn context.CancelFunc
procColor func(a ...interface{}) string
noColor func(a ...interface{}) string
redColor func(a ...interface{}) string
logBuffer *pclog.ProcessLogBuffer
logger pclog.PcLogger
command Commander
started bool
done bool
timeMutex sync.Mutex
startTime time.Time
liveProber *health.Prober
readyProber *health.Prober
shellConfig command.ShellConfig
printLogs bool
isMain bool
extraArgs []string
isStopped atomic.Bool
globalEnv []string
confMtx sync.Mutex
procConf *types.ProcessConfig
procState *types.ProcessState
stateMtx sync.Mutex
procCond sync.Cond
procStartedCond sync.Cond
procStateChan chan string
procReadyCtx context.Context
readyCancelFn context.CancelFunc
procLogReadyCtx context.Context
readyLogCancelFn context.CancelCauseFunc
procRunCtx context.Context
runCancelFn context.CancelFunc
procColor func(a ...interface{}) string
noColor func(a ...interface{}) string
redColor func(a ...interface{}) string
logBuffer *pclog.ProcessLogBuffer
logger pclog.PcLogger
command Commander
started bool
done bool
timeMutex sync.Mutex
startTime time.Time
liveProber *health.Prober
readyProber *health.Prober
shellConfig command.ShellConfig
printLogs bool
isMain bool
extraArgs []string
isStopped atomic.Bool
}

func NewProcess(
Expand Down Expand Up @@ -97,6 +99,7 @@ func NewProcess(
}

proc.procReadyCtx, proc.readyCancelFn = context.WithCancel(context.Background())
proc.procLogReadyCtx, proc.readyLogCancelFn = context.WithCancelCause(context.Background())
proc.procRunCtx, proc.runCancelFn = context.WithCancel(context.Background())
proc.setUpProbes()
proc.procCond = *sync.NewCond(proc)
Expand Down Expand Up @@ -310,6 +313,13 @@ func (p *Process) waitUntilReady() bool {
log.Error().Msgf("Process %s was aborted and won't become ready", p.getName())
p.setExitCode(1)
return false
case <-p.procLogReadyCtx.Done():
err := context.Cause(p.procLogReadyCtx)
if errors.Is(err, context.Canceled) {
return true
}
log.Error().Err(err).Msgf("Process %s was aborted and won't become ready", p.getName())
return false
}
}
}
Expand All @@ -335,6 +345,7 @@ func (p *Process) shutDown() error {
}
p.setState(types.ProcessStateTerminating)
p.stopProbes()
p.readyLogCancelFn(fmt.Errorf("process %s was shut down", p.getName()))
if isStringDefined(p.procConf.ShutDownParams.ShutDownCommand) {
return p.doConfiguredStop(p.procConf.ShutDownParams)
}
Expand Down Expand Up @@ -492,6 +503,10 @@ func (p *Process) handleOutput(pipe io.ReadCloser, handler func(message string))
Msg("error reading from stdout")
break
}
if p.procConf.ReadyLogLine != "" && p.procState.Health == types.ProcessHealthUnknown && strings.Contains(line, p.procConf.ReadyLogLine) {
p.procState.Health = types.ProcessHealthReady
p.readyLogCancelFn(nil)
}
handler(strings.TrimSuffix(line, "\n"))
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/app/project_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ func (p *ProjectRunner) waitIfNeeded(process *types.ProcessConfig) error {
if !ready {
return fmt.Errorf("process %s depended on %s to become ready, but it was terminated", process.ReplicaName, k)
}
case types.ProcessConditionLogReady:
log.Info().Msgf("%s is waiting for %s log line %s", process.ReplicaName, k, runningProc.procConf.ReadyLogLine)
ready := runningProc.waitUntilReady()
if !ready {
return fmt.Errorf("process %s depended on %s to become ready, but it was terminated", process.ReplicaName, k)
}
case types.ProcessConditionStarted:
log.Info().Msgf("%s is waiting for %s to start", process.ReplicaName, k)
runningProc.waitForStarted()
Expand Down
49 changes: 49 additions & 0 deletions src/app/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,3 +590,52 @@ func TestSystem_TestProcShutDownNoRestart(t *testing.T) {
return
}
}
func TestSystem_TestReadyLine(t *testing.T) {
proc1 := "proc1"
proc2 := "proc2"
shell := command.DefaultShellConfig()
project := &types.Project{
Processes: map[string]types.ProcessConfig{
proc1: {
Name: proc1,
ReplicaName: proc1,
Executable: shell.ShellCommand,
Args: []string{shell.ShellArgument, "sleep 0.3 && echo ready"},
ReadyLogLine: "ready",
},
proc2: {
Name: proc2,
ReplicaName: proc2,
Executable: shell.ShellCommand,
Args: []string{shell.ShellArgument, "sleep 2"},
DependsOn: map[string]types.ProcessDependency{
proc1: {
Condition: types.ProcessConditionLogReady,
},
},
},
},
ShellConfig: shell,
}
runner, err := NewProjectRunner(&ProjectOpts{
project: project,
})
if err != nil {
t.Errorf(err.Error())
return
}
go runner.Run()
time.Sleep(100 * time.Millisecond)
state := runner.getRunningProcess(proc2).getStatusName()

if state != types.ProcessStatePending {
t.Errorf("process %s is %s want %s", proc2, state, types.ProcessStatePending)
return
}
time.Sleep(400 * time.Millisecond)
state = runner.getRunningProcess(proc2).getStatusName()
if state != types.ProcessStateRunning {
t.Errorf("process %s is %s want %s", proc2, state, types.ProcessStateRunning)
return
}
}
1 change: 1 addition & 0 deletions src/loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func Load(opts *LoaderOptions) (*types.Project, error) {
validatePlatformCompatibility,
validateHealthDependencyHasHealthCheck,
validateDependencyIsEnabled,
validateNoIncompatibleHealthChecks,
)
admitProcesses(opts, mergedProject)
return mergedProject, err
Expand Down
16 changes: 16 additions & 0 deletions src/loader/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,22 @@ func validateHealthDependencyHasHealthCheck(p *types.Project) error {
}
log.Error().Msg(errStr)
}
if dep.Condition == types.ProcessConditionLogReady && depProc.ReadyLogLine == "" {
errStr := fmt.Sprintf("log ready dependency defined in '%s' but no ready log line exists in '%s'", procName, depName)
log.Error().Msg(errStr)
return fmt.Errorf(errStr)
}
}
}
return nil
}

func validateNoIncompatibleHealthChecks(p *types.Project) error {
for procName, proc := range p.Processes {
if proc.ReadinessProbe != nil && proc.ReadyLogLine != "" {
errStr := fmt.Sprintf("'ready_log_line' and readiness probe defined in '%s' are incompatible", procName)
log.Error().Msg(errStr)
return fmt.Errorf(errStr)
}
}
return nil
Expand Down
4 changes: 4 additions & 0 deletions src/types/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type ProcessConfig struct {
DependsOn DependsOnConfig `yaml:"depends_on,omitempty"`
LivenessProbe *health.Probe `yaml:"liveness_probe,omitempty"`
ReadinessProbe *health.Probe `yaml:"readiness_probe,omitempty"`
ReadyLogLine string `yaml:"ready_log_line,omitempty"`
ShutDownParams ShutDownParams `yaml:"shutdown,omitempty"`
DisableAnsiColors bool `yaml:"disable_ansi_colors,omitempty"`
WorkingDir string `yaml:"working_dir"`
Expand Down Expand Up @@ -161,6 +162,9 @@ const (

// ProcessConditionStarted is the type for waiting until a process has started (default).
ProcessConditionStarted = "process_started"

// ProcessConditionLogReady is the type for waiting until a process has printed a predefined log line
ProcessConditionLogReady = "process_log_ready"
)

type DependsOnConfig map[string]ProcessDependency
Expand Down
24 changes: 23 additions & 1 deletion www/docs/launcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,34 @@ processes:
condition: process_completed_successfully
```

There are 4 condition types that can be used in process dependencies:
There are 5 condition types that can be used in process dependencies:

* `process_completed` - is the type for waiting until a process has been completed (any exit code)
* `process_completed_successfully` - is the type for waiting until a process has been completed successfully (exit code 0)
* `process_healthy` - is the type for waiting until a process is healthy
* `process_started` - is the type for waiting until a process has started (default)
* `process_log_ready` - is the type for waiting until a process has printed a predefined log line. This requires the definition of `ready_log_line` in the dependent process.

##### Process Log Ready Example

In some situations a process's log output is a simple way to determine if it is ready or not. For example, we can wait for a 'ready' message in the process's logs as follows:

```yaml hl_lines="6 12"
processes:
world:
command: "echo Connected"
depends_on:
hello:
condition: process_log_ready
hello:
command: |
echo 'Preparing...'
sleep 1
echo 'I am ready to accept connections now'
ready_log_line: "ready to accept connections" # equal to *.ready to accept connections.*\n regex
```

> :bulb: `ready_log_line` and readiness probe are incompatible and can't be used at the same time.
## Run only specific processes

Expand Down

0 comments on commit 03dc542

Please sign in to comment.