Skip to content
Permalink
Browse files
Fix race condition in transaction handling
Once a server sends a response, it must be ready
to receive a new message with the same tag. While
using `defer` to clean up a tag once its message was
handled was convenient, it ended up creating a race
condition on low-latency connections with a highly
parallel client (such as `v9fs`) like so:

- Client sends Tmessage with tag X
- Server sends Rmessage with tag X
- Client sends Tmessage with tag X
- Server refuses to handle Tmessage because X has not been cleared
- Server clears X

This change removes that race condition, re-arranging the
order of operations to

1. Client sends Tmessage with tag X
2. Server handles message and clears X
3. Server sends Rmessage with tag X

We've already cleared the tag for re-use before sending a response.
An astute reader would note that with this re-ordering, a client may
"steal" a tag before an Rmessage is received by using it between steps
1 and 2. However, the client would be violating the 9P protocol, as it
has not received the corresponding R-message (because the server has
not even sent it yet!), and only harms itself in this scenario, as it
will become unable to recognise the tag on the Rmessage when it does come.
  • Loading branch information
droyo committed Aug 20, 2016
1 parent 6c36951 commit 0cb8c0a
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 22 deletions.
16 conn.go
@@ -209,8 +209,8 @@ func (c *conn) handleMessage(m styxproto.Msg) bool {
return c.handleFcall(cx, m)
case styxproto.BadMessage:
c.srv.logf("got bad message from %s: %s", c.remoteAddr(), m.Err)
c.Rerror(m.Tag(), "bad message: %s", m.Err)
c.clearTag(m.Tag())
c.Rerror(m.Tag(), "bad message: %s", m.Err)
return true
default:
c.Rerror(m.Tag(), "unexpected %T message", m)
@@ -261,12 +261,13 @@ Loop:
// - Setting a per-connection session limit
// - close connections that have not established a session in N seconds
func (c *conn) handleTauth(cx context.Context, m styxproto.Tauth) bool {
defer c.clearTag(m.Tag())
if c.srv.Auth == nil {
c.clearTag(m.Tag())
c.Rerror(m.Tag(), "%s", errNotSupported)
return true
}
if _, ok := c.sessionFid.Get(m.Afid()); ok {
c.clearTag(m.Tag())
c.Rerror(m.Tag(), "fid %x in use", m.Afid())
return false
}
@@ -293,7 +294,6 @@ func (c *conn) handleTauth(cx context.Context, m styxproto.Tauth) bool {
}

func (c *conn) handleTattach(cx context.Context, m styxproto.Tattach) bool {
defer c.clearTag(m.Tag())
var handler Handler = DefaultServeMux
if c.srv.Handler != nil {
handler = c.srv.Handler
@@ -307,16 +307,19 @@ func (c *conn) handleTattach(cx context.Context, m styxproto.Tattach) bool {
// We should call the Auth handler if Afid is NOFID, passing it
// a util.BlackHole.
if !c.sessionFid.Fetch(s, m.Afid()) {
c.clearTag(m.Tag())
c.Rerror(m.Tag(), "invalid afid %x", m.Afid())
return false
}
// From attach(5): The same validated afid may be used for
// multiple attach messages with the same uname and aname.
if s.User != string(m.Uname()) || s.Access != string(m.Aname()) {
c.clearTag(m.Tag())
c.Rerror(m.Tag(), "afid mismatch for %s on %s", m.Uname(), m.Aname())
return false
}
if err := <-s.authC; err != nil {
c.clearTag(m.Tag())
c.Rerror(m.Tag(), "auth failed: %s", err)
return false
}
@@ -328,15 +331,14 @@ func (c *conn) handleTattach(cx context.Context, m styxproto.Tattach) bool {
c.sessionFid.Put(m.Fid(), s)
s.IncRef()
s.files.Put(m.Fid(), file{name: "/", rwc: nil})
c.clearTag(m.Tag())
c.Rattach(m.Tag(), c.qid(".", styxproto.QTDIR))
return true
}

func (c *conn) handleTflush(cx context.Context, m styxproto.Tflush) bool {
defer c.clearTag(m.Tag())

oldtag := m.Oldtag()
c.clearTag(oldtag)
c.clearTag(m.Oldtag())
c.clearTag(m.Tag())

c.Rflush(m.Tag())
return true
@@ -57,7 +57,7 @@ func (info reqInfo) Path() string {
}

func (info reqInfo) Rerror(format string, args ...interface{}) {
defer info.session.conn.clearTag(info.tag)
info.session.conn.clearTag(info.tag)
info.session.conn.Rerror(info.tag, format, args...)
}

@@ -132,7 +132,6 @@ type Topen struct {
}

func (t Topen) Ropen(rwc interface{}, mode os.FileMode) {
defer t.session.conn.clearTag(t.tag)
var (
file file
f styxfile.Interface
@@ -156,6 +155,7 @@ func (t Topen) Ropen(rwc interface{}, mode os.FileMode) {
file.rwc = f
})
qid := t.session.conn.qid(t.Path(), qidType(mode))
t.session.conn.clearTag(t.tag)
t.session.conn.Ropen(t.tag, qid, 0)
}

@@ -188,7 +188,6 @@ func (t Twalk) Path() string {
// Is that correct in every case?

func (t Twalk) Rwalk(exists bool, mode os.FileMode) {
defer t.session.conn.clearTag(t.tag)
if !exists {
t.defaultResponse()
return
@@ -210,6 +209,7 @@ func (t Twalk) Rwalk(exists bool, mode os.FileMode) {
}
dir, _ = path.Split(dir)
}
t.session.conn.clearTag(t.tag)
if err := t.session.conn.Rwalk(t.tag, wqid...); err != nil {
panic(err)
}
@@ -225,7 +225,6 @@ type Tstat struct {
}

func (t Tstat) Rstat(info os.FileInfo) {
defer t.session.conn.clearTag(t.tag)
buf := make([]byte, styxproto.MaxStatLen)
uid, gid, muid := sys.FileOwner(info)
name := info.Name()
@@ -242,6 +241,7 @@ func (t Tstat) Rstat(info os.FileInfo) {
stat.SetAtime(uint32(info.ModTime().Unix())) // TODO: get atime
stat.SetMtime(uint32(info.ModTime().Unix()))
stat.SetQid(t.session.conn.qid(t.Path(), qidType(info.Mode())))
t.session.conn.clearTag(t.tag)
t.session.conn.Rstat(t.tag, stat)
}

@@ -261,7 +261,6 @@ type Tcreate struct {
}

func (t Tcreate) Rcreate(rwc interface{}) {
defer t.session.conn.clearTag(t.tag)
var (
f styxfile.Interface
err error
@@ -284,6 +283,7 @@ func (t Tcreate) Rcreate(rwc interface{}) {

qtype := qidType(t.Perm)
qid := t.session.conn.qid(file.name, qtype)
t.session.conn.clearTag(t.tag)
t.session.conn.Rcreate(t.tag, qid, 0)
}

@@ -298,10 +298,10 @@ type Tremove struct {
}

func (t Tremove) Rremove() {
defer t.session.conn.clearTag(t.tag)
t.session.conn.sessionFid.Del(t.fid)
t.session.files.Del(t.fid)
t.session.conn.qidpool.Del(t.Path())
t.session.conn.clearTag(t.tag)
t.session.conn.Rremove(t.tag)
if !t.session.DecRef() {
t.session.close()
@@ -320,7 +320,7 @@ type Twstat struct {
}

func (t Twstat) Rwstat() {
defer t.session.conn.clearTag(t.tag)
t.session.conn.clearTag(t.tag)
t.session.conn.Rwstat(t.tag)
}

@@ -110,8 +110,8 @@ func (s *Session) handleTwalk(cx context.Context, msg styxproto.Twalk, file file
// newfid must be unused or equal to fid
if newfid != msg.Fid() {
if _, ok := s.conn.sessionFid.Get(newfid); ok {
s.conn.Rerror(msg.Tag(), "Twalk: fid %x already in use", newfid)
s.conn.clearTag(msg.Tag())
s.conn.Rerror(msg.Tag(), "Twalk: fid %x already in use", newfid)
return false
}
}
@@ -126,8 +126,8 @@ func (s *Session) handleTwalk(cx context.Context, msg styxproto.Twalk, file file
s.conn.sessionFid.Put(newfid, s)
s.IncRef()
}
s.conn.Rwalk(msg.Tag())
s.conn.clearTag(msg.Tag())
s.conn.Rwalk(msg.Tag())
return true
}

@@ -180,7 +180,6 @@ func (s *Session) handleTremove(cx context.Context, msg styxproto.Tremove, file

func (s *Session) handleTstat(cx context.Context, msg styxproto.Tstat, file file) bool {
if file.auth {
defer s.conn.clearTag(msg.Tag())
buf := make([]byte, styxproto.MaxStatLen)
stat, _, err := styxproto.NewStat(buf, "", "", "", "")
if err != nil {
@@ -190,6 +189,7 @@ func (s *Session) handleTstat(cx context.Context, msg styxproto.Tstat, file file
}
stat.SetMode(styxproto.DMAUTH)
stat.SetQid(s.conn.qid("", styxproto.QTAUTH))
s.conn.clearTag(msg.Tag())
s.conn.Rstat(msg.Tag(), stat)
return true
}
@@ -210,8 +210,8 @@ func (s *Session) handleTwstat(cx context.Context, msg styxproto.Twstat, file fi
}

func (s *Session) handleTread(cx context.Context, msg styxproto.Tread, file file) bool {
defer s.conn.clearTag(msg.Tag())
if file.rwc == nil {
s.conn.clearTag(msg.Tag())
s.conn.Rerror(msg.Tag(), "file %s is not open for reading", file.name)
return false
}
@@ -227,6 +227,7 @@ func (s *Session) handleTread(cx context.Context, msg styxproto.Tread, file file
// TODO(droyo) cancellation
n, err := file.rwc.ReadAt(buf, msg.Offset())

s.conn.clearTag(msg.Tag())
if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
s.conn.Rerror(msg.Tag(), "%v", err)
} else {
@@ -236,31 +237,33 @@ func (s *Session) handleTread(cx context.Context, msg styxproto.Tread, file file
}

func (s *Session) handleTwrite(cx context.Context, msg styxproto.Twrite, file file) bool {
defer s.conn.clearTag(msg.Tag())
if file.rwc == nil {
s.conn.clearTag(msg.Tag())
s.conn.Rerror(msg.Tag(), "file %q is not opened for writing", file.name)
return false
}

// TODO(droyo): handle cancellation
w := util.NewSectionWriter(file.rwc, msg.Offset(), msg.Count())
n, err := io.Copy(w, msg)
if err != nil {
s.conn.clearTag(msg.Tag())
if n == 0 && err != nil {
s.conn.Rerror(msg.Tag(), "%v", err)
} else {
s.conn.Rwrite(msg.Tag(), n)
}
s.conn.Rwrite(msg.Tag(), n)
return true
}

func (s *Session) handleTclunk(cx context.Context, msg styxproto.Tclunk, file file) bool {
defer s.conn.clearTag(msg.Tag())
s.conn.sessionFid.Del(msg.Fid())
if file.rwc != nil {
if err := file.rwc.Close(); err != nil {
s.conn.Rerror(msg.Tag(), "close %s: %v", file.name, err)
}
}
s.files.Del(msg.Fid())
s.conn.clearTag(msg.Tag())
s.conn.Rclunk(msg.Tag())
if !s.DecRef() {
s.endSession()

0 comments on commit 0cb8c0a

Please sign in to comment.