Skip to content

Commit

Permalink
Fixed race condition at the edge of builds. Using queue channel.
Browse files Browse the repository at this point in the history
The last loop collecting the remaining objectFiles may be run before the
last jobs completes.
This commit replaces the two channels used to fill objectFiles and to
signal error with direct variable access guarded by mutex, this avoids
race conditions at the end and streamlines the whole process.

Also added a 'queue' channel to feed the goroutines, this is not strictly
part of the fix, but helps to fairly distribute the workload.
  • Loading branch information
cmaglie committed Aug 26, 2019
1 parent 764fa09 commit 0785c72
Showing 1 changed file with 43 additions and 43 deletions.
86 changes: 43 additions & 43 deletions legacy/builder/builder_utils/utils.go
Expand Up @@ -169,57 +169,57 @@ func compileFilesWithRecipe(ctx *types.Context, sourcePath *paths.Path, sources
if len(sources) == 0 {
return objectFiles, nil
}
objectFilesChan := make(chan *paths.Path)
errorsChan := make(chan error)
doneChan := make(chan struct{})
var objectFilesMux sync.Mutex
var errors []error
var errorsMux sync.Mutex

ctx.Progress.Steps = ctx.Progress.Steps / float64(len(sources))
var wg sync.WaitGroup

// Split jobs into batches of N jobs each; wait for the completion of a batch to start the next
par := ctx.Jobs

go func() {
for total := 0; total < len(sources); total += par {
for i := total; i < total+par && i < len(sources); i++ {
wg.Add(1)
go func(source *paths.Path) {
defer wg.Done()
PrintProgressIfProgressEnabledAndMachineLogger(ctx)
objectFile, err := compileFileWithRecipe(ctx, sourcePath, source, buildPath, buildProperties, includes, recipe)
if err != nil {
errorsChan <- err
} else {
objectFilesChan <- objectFile
}
}(sources[i])
}
wg.Wait()
queue := make(chan *paths.Path)
job := func(source *paths.Path) {
PrintProgressIfProgressEnabledAndMachineLogger(ctx)
objectFile, err := compileFileWithRecipe(ctx, sourcePath, source, buildPath, buildProperties, includes, recipe)
if err != nil {
errorsMux.Lock()
errors = append(errors, err)
errorsMux.Unlock()
} else {
objectFilesMux.Lock()
objectFiles.Add(objectFile)
objectFilesMux.Unlock()
}
}

doneChan <- struct{}{}
}()

go func() {
wg.Wait()
doneChan <- struct{}{}
}()

for {
select {
case objectFile := <-objectFilesChan:
objectFiles.Add(objectFile)
case err := <-errorsChan:
return nil, i18n.WrapError(err)
case <-doneChan:
close(objectFilesChan)
for objectFile := range objectFilesChan {
objectFiles.Add(objectFile)
// Spawn jobs runners
var wg sync.WaitGroup
for i := 0; i < ctx.Jobs; i++ {
wg.Add(1)
go func() {
for source := range queue {
job(source)
}
objectFiles.Sort()
return objectFiles, nil
wg.Done()
}()
}

// Feed jobs until error or done
for _, source := range sources {
errorsMux.Lock()
gotError := len(errors) > 0
errorsMux.Unlock()
if gotError {
break
}
queue <- source
}
close(queue)
wg.Wait()
if len(errors) > 0 {
// output the first error
return nil, i18n.WrapError(errors[0])
}
objectFiles.Sort()
return objectFiles, nil
}

func compileFileWithRecipe(ctx *types.Context, sourcePath *paths.Path, source *paths.Path, buildPath *paths.Path, buildProperties *properties.Map, includes []string, recipe string) (*paths.Path, error) {
Expand Down

0 comments on commit 0785c72

Please sign in to comment.