Skip to content

Commit

Permalink
fix: prevent races from processing build logs after channel close (#4984
Browse files Browse the repository at this point in the history
)
  • Loading branch information
deansheather committed Nov 9, 2022
1 parent 3c10c7f commit 0ae8d5e
Showing 1 changed file with 28 additions and 0 deletions.
28 changes: 28 additions & 0 deletions coderd/provisionerjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"sort"
"strconv"
"sync"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -371,6 +372,7 @@ func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog,
var (
closed = make(chan struct{})
bufferedLogs = make(chan database.ProvisionerJobLog, 128)
logMut = &sync.Mutex{}
)
closeSubscribe, err := api.Pubsub.Subscribe(
provisionerJobLogsChannel(jobID),
Expand All @@ -380,12 +382,14 @@ func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog,
return
default:
}

jlMsg := provisionerJobLogsMessage{}
err := json.Unmarshal(message, &jlMsg)
if err != nil {
logger.Warn(ctx, "invalid provisioner job log on channel", slog.Error(err))
return
}

if jlMsg.CreatedAfter != 0 {
logs, err := api.Database.GetProvisionerLogsByIDBetween(ctx, database.GetProvisionerLogsByIDBetweenParams{
JobID: jobID,
Expand All @@ -397,6 +401,18 @@ func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog,
}

for _, log := range logs {
// Sadly we have to use a mutex here because events may be
// handled out of order due to golang goroutine scheduling
// semantics (even though Postgres guarantees ordering of
// notifications).
logMut.Lock()
select {
case <-closed:
logMut.Unlock()
return
default:
}

select {
case bufferedLogs <- log:
logger.Debug(ctx, "subscribe buffered log", slog.F("stage", log.Stage))
Expand All @@ -406,12 +422,24 @@ func (api *API) followLogs(jobID uuid.UUID) (<-chan database.ProvisionerJobLog,
// so just drop them.
logger.Warn(ctx, "provisioner job log overflowing channel")
}
logMut.Unlock()
}
}

if jlMsg.EndOfLogs {
// This mutex is to guard double-closes.
logMut.Lock()
select {
case <-closed:
logMut.Unlock()
return
default:
}
logger.Debug(ctx, "got End of Logs")

close(closed)
close(bufferedLogs)
logMut.Unlock()
}
},
)
Expand Down

0 comments on commit 0ae8d5e

Please sign in to comment.