Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/cgo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,7 @@ jobs:
# blocking on unrelated legacy custom analyzer findings in tests or other analyzer
# families.
- name: Run custom linters
run: make golint-custom LINTER_FLAGS="-errstringmatch -panicinlibrarycode -test=false"
run: make golint-custom LINTER_FLAGS="-errstringmatch -panicinlibrarycode -manualmutexunlock -test=false"

actions-build:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion pkg/agentdrain/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ func (c *Coordinator) LoadSnapshots(snapshots map[string][]byte) error {
// minerFor retrieves the miner for the given stage, returning an error if missing.
func (c *Coordinator) minerFor(stage string) (*Miner, error) {
c.mu.RLock()
defer c.mu.RUnlock()
m, ok := c.miners[stage]
c.mu.RUnlock()
if !ok {
return nil, fmt.Errorf("agentdrain: no miner registered for stage %q", stage)
}
Expand Down
21 changes: 13 additions & 8 deletions pkg/agentdrain/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ func (m *Miner) TrainEvent(evt AgentEvent) (*MatchResult, error) {
result.Stage = evt.Stage
// Propagate stage to cluster.
m.mu.Lock()
defer m.mu.Unlock()
if c, ok := m.store.get(result.ClusterID); ok && c.Stage == "" {
c.Stage = evt.Stage
}
m.mu.Unlock()
return result, nil
}

Expand All @@ -134,20 +134,25 @@ func (m *Miner) AnalyzeEvent(evt AgentEvent) (*MatchResult, *AnomalyReport, erro
return nil, nil, errors.New("agentdrain: AnalyzeEvent: empty event after masking")
}

m.mu.RLock()
inferResult, _ := m.findBestMatchingCluster(tokens)
m.mu.RUnlock()
inferResult := func() *MatchResult {
m.mu.RLock()
defer m.mu.RUnlock()
result, _ := m.findBestMatchingCluster(tokens)
return result
}()

isNew := inferResult == nil
result, err := m.TrainEvent(evt)
if err != nil {
return nil, nil, err
}

var cluster *Cluster
m.mu.RLock()
cluster, _ = m.store.get(result.ClusterID)
m.mu.RUnlock()
cluster := func() *Cluster {
m.mu.RLock()
defer m.mu.RUnlock()
c, _ := m.store.get(result.ClusterID)
return c
}()

detector, err := NewAnomalyDetector(m.cfg.SimThreshold, m.cfg.RareClusterThreshold)
if err != nil {
Expand Down
43 changes: 24 additions & 19 deletions pkg/cli/compile_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,27 +186,32 @@ func watchAndCompileWorkflows(ctx context.Context, markdownFile string, compiler
depGraph.RemoveWorkflow(event.Name)
case event.Has(fsnotify.Write) || event.Has(fsnotify.Create):
// Handle file modification or creation - add to debounced compilation
debounceMu.Lock()
modifiedFiles[event.Name] = struct{}{}

// Reset debounce timer
if debounceTimer != nil {
debounceTimer.Stop()
}
debounceTimer = time.AfterFunc(debounceDelay, func() {
func() {
debounceMu.Lock()
filesToCompile := make([]string, 0, len(modifiedFiles))
for file := range modifiedFiles {
filesToCompile = append(filesToCompile, file)
defer debounceMu.Unlock()
modifiedFiles[event.Name] = struct{}{}

// Reset debounce timer
if debounceTimer != nil {
debounceTimer.Stop()
}
// Clear the modifiedFiles map
modifiedFiles = make(map[string]struct{})
debounceMu.Unlock()

// Compile the modified files using dependency graph
compileModifiedFilesWithDependencies(ctx, compiler, depGraph, filesToCompile, verbose)
})
debounceMu.Unlock()
debounceTimer = time.AfterFunc(debounceDelay, func() {
filesToCompile := func() []string {
debounceMu.Lock()
defer debounceMu.Unlock()
files := make([]string, 0, len(modifiedFiles))
for file := range modifiedFiles {
files = append(files, file)
}
// Clear the modifiedFiles map
modifiedFiles = make(map[string]struct{})
return files
}()

// Compile the modified files using dependency graph
compileModifiedFilesWithDependencies(ctx, compiler, depGraph, filesToCompile, verbose)
})
}()
}

case err, ok := <-watcher.Errors:
Expand Down
26 changes: 16 additions & 10 deletions pkg/cli/docker_images.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,18 @@ func IsDockerImageDownloading(image string) bool {
func IsDockerAvailable(ctx context.Context) bool {
ctx = normalizeDockerContext(ctx)

pullState.mu.RLock()
if pullState.mockAvailableInUse {
available := pullState.mockDockerAvailable
pullState.mu.RUnlock()
dockerImagesLog.Printf("Mock: Docker available: %v", available)
return available
mockEnabled, mockAvailable := func() (bool, bool) {
pullState.mu.RLock()
defer pullState.mu.RUnlock()
if pullState.mockAvailableInUse {
return true, pullState.mockDockerAvailable
}
return false, false
}()
if mockEnabled {
dockerImagesLog.Printf("Mock: Docker available: %v", mockAvailable)
return mockAvailable
}
pullState.mu.RUnlock()

cmd := exec.CommandContext(ctx, "docker", "info")
cmd.Stdout = nil
Expand Down Expand Up @@ -146,9 +150,11 @@ func StartDockerImageDownload(ctx context.Context, image string) bool {
// Start the download in a goroutine with retry logic
go func() {
defer func() {
pullState.mu.Lock()
delete(pullState.downloading, image)
pullState.mu.Unlock()
func() {
pullState.mu.Lock()
defer pullState.mu.Unlock()
delete(pullState.downloading, image)
}()
if r := recover(); r != nil {
dockerImagesLog.Printf("Panic in docker image download for %s (recovered): %v", image, r)
}
Expand Down
54 changes: 35 additions & 19 deletions pkg/console/spinner.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,22 +127,27 @@ func NewSpinner(message string) *SpinnerWrapper {

func (s *SpinnerWrapper) Start() {
if s.enabled && s.program != nil {
s.mu.Lock()
if s.running {
s.mu.Unlock()
shouldStart := func() bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.running {
return false
}
s.running = true
s.wg.Add(1)
return true
}()
if !shouldStart {
spinnerLog.Print("Spinner already running, skipping Start")
return
}
s.running = true
s.wg.Add(1)
s.mu.Unlock()
spinnerLog.Print("Starting spinner")
go func() {
defer s.wg.Done()
defer func() {
s.mu.Lock()
defer s.mu.Unlock()
s.running = false
s.mu.Unlock()
}()
defer func() {
if r := recover(); r != nil {
Expand All @@ -156,31 +161,40 @@ func (s *SpinnerWrapper) Start() {

func (s *SpinnerWrapper) Stop() {
if s.enabled && s.program != nil {
s.mu.Lock()
if s.running {
wasRunning := func() bool {
s.mu.Lock()
defer s.mu.Unlock()
if !s.running {
return false
}
s.running = false
s.mu.Unlock()
return true
}()
if wasRunning {
spinnerLog.Print("Stopping spinner")
s.program.Quit()
s.wg.Wait() // Wait for the goroutine to complete
fmt.Fprintf(os.Stderr, "%s%s", ansiCarriageReturn, ansiClearLine)
} else {
s.mu.Unlock()
}
}
}

func (s *SpinnerWrapper) StopWithMessage(msg string) {
if s.enabled && s.program != nil {
s.mu.Lock()
if s.running {
wasRunning := func() bool {
s.mu.Lock()
defer s.mu.Unlock()
if !s.running {
return false
}
s.running = false
s.mu.Unlock()
return true
}()
if wasRunning {
s.program.Quit()
s.wg.Wait() // Wait for the goroutine to complete
fmt.Fprintf(os.Stderr, "%s%s%s\n", ansiCarriageReturn, ansiClearLine, msg)
} else {
s.mu.Unlock()
// Still print the message even if spinner wasn't running
fmt.Fprintf(os.Stderr, "%s\n", msg)
}
Expand All @@ -192,9 +206,11 @@ func (s *SpinnerWrapper) StopWithMessage(msg string) {

func (s *SpinnerWrapper) UpdateMessage(message string) {
if s.enabled && s.program != nil {
s.mu.Lock()
running := s.running
s.mu.Unlock()
running := func() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.running
}()
if running {
s.program.Send(updateMessageMsg(message))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/parser/virtual_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ const BuiltinPathPrefix = "@builtin:"
// In native builds, builtin virtual files are checked first, then os.ReadFile.
var readFileFunc = func(path string) ([]byte, error) {
builtinVirtualFilesMu.RLock()
defer builtinVirtualFilesMu.RUnlock()
content, ok := builtinVirtualFiles[path]
builtinVirtualFilesMu.RUnlock()
if ok {
return content, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/parser/virtual_fs_wasm.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ func init() {
readFileFunc = func(path string) ([]byte, error) {
// Check builtin virtual files first (embedded engine .md files etc.)
builtinVirtualFilesMu.RLock()
defer builtinVirtualFilesMu.RUnlock()
builtinContent, builtinOK := builtinVirtualFiles[path]
builtinVirtualFilesMu.RUnlock()
if builtinOK {
return builtinContent, nil
}
Expand Down
Loading