Skip to content

Commit

Permalink
proxylib: Use channel instead of atomics
Browse files Browse the repository at this point in the history
This commit contains no functional change.

Signed-off-by: Sebastian Wicki <sebastian@isovalent.com>
  • Loading branch information
gandro authored and tklauser committed Aug 19, 2021
1 parent 82ec9f0 commit d2cc2b9
Showing 1 changed file with 15 additions and 6 deletions.
21 changes: 15 additions & 6 deletions proxylib/test/accesslog_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net"
"os"
"path/filepath"
"sync/atomic"
"syscall"
"time"

Expand All @@ -25,7 +24,7 @@ import (
type AccessLogServer struct {
Path string
Logs chan cilium.EntryType
closing uint32 // non-zero if closing, accessed atomically
done chan struct{}
listener *net.UnixListener
mu lock.Mutex // protects conns
conns []*net.UnixConn
Expand All @@ -34,7 +33,7 @@ type AccessLogServer struct {
// Close removes the unix domain socket from the filesystem
func (s *AccessLogServer) Close() {
if s != nil {
atomic.StoreUint32(&s.closing, 1)
close(s.done)
s.listener.Close()
s.mu.Lock()
for _, conn := range s.conns {
Expand All @@ -45,6 +44,15 @@ func (s *AccessLogServer) Close() {
}
}

func (s *AccessLogServer) isClosing() bool {
select {
case <-s.done:
return true
default:
return false
}
}

// Clear empties the access log server buffer, counting the passes and drops
func (s *AccessLogServer) Clear() (passed, drops int) {
passes, drops := 0, 0
Expand All @@ -71,6 +79,7 @@ func StartAccessLogServer(accessLogName string, bufSize int) *AccessLogServer {
server := &AccessLogServer{
Path: accessLogPath,
Logs: make(chan cilium.EntryType, bufSize),
done: make(chan struct{}),
}

// Create the access log listener
Expand All @@ -96,7 +105,7 @@ func StartAccessLogServer(accessLogName string, bufSize int) *AccessLogServer {
uc, err := server.listener.AcceptUnix()
if err != nil {
// These errors are expected when we are closing down
if atomic.LoadUint32(&server.closing) != 0 ||
if server.isClosing() ||
errors.Is(err, net.ErrClosed) ||
errors.Is(err, syscall.EINVAL) {
break
Expand All @@ -105,7 +114,7 @@ func StartAccessLogServer(accessLogName string, bufSize int) *AccessLogServer {
continue
}

if atomic.LoadUint32(&server.closing) != 0 {
if server.isClosing() {
break
}

Expand Down Expand Up @@ -140,7 +149,7 @@ func (s *AccessLogServer) accessLogger(conn *net.UnixConn) {
for {
n, _, flags, _, err := conn.ReadMsgUnix(buf, nil)
if err != nil {
if !isEOF(err) && atomic.LoadUint32(&s.closing) == 0 {
if !isEOF(err) && !s.isClosing() {
log.WithError(err).Error("Error while reading from access log connection")
}
break
Expand Down

0 comments on commit d2cc2b9

Please sign in to comment.