Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

netstack: add the fifo_sync queueing discipline #10217

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions pkg/tcpip/link/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,17 +198,22 @@ func (e *Endpoint) InjectInbound(protocol tcpip.NetworkProtocolNumber, pkt *stac
e.mu.RLock()
d := e.dispatcher
e.mu.RUnlock()
var pkts stack.PacketBufferList
pkts.PushBack(pkt)
if d != nil {
d.DeliverNetworkPacket(protocol, pkt)
d.DeliverNetworkPacket(pkts, 0)
}
}

// Attach saves the stack network-layer dispatcher for use later when packets
// are injected.
func (e *Endpoint) Attach(dispatcher stack.NetworkDispatcher) {
// TODO: Not sure this is right, although if InjectInbound always takes 1
// packet then we don't need to lock
func (e *Endpoint) Attach(dispatcher stack.NetworkDispatcher) int {
e.mu.Lock()
defer e.mu.Unlock()
e.dispatcher = dispatcher
return 1
}

// IsAttached implements stack.LinkEndpoint.IsAttached.
Expand Down
33 changes: 18 additions & 15 deletions pkg/tcpip/link/ethernet/ethernet.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,25 +59,28 @@ func (e *Endpoint) MTU() uint32 {
}

// DeliverNetworkPacket implements stack.NetworkDispatcher.
func (e *Endpoint) DeliverNetworkPacket(_ tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) {
if !e.ParseHeader(pkt) {
return
}
eth := header.Ethernet(pkt.LinkHeader().Slice())
dst := eth.DestinationAddress()
if dst == header.EthernetBroadcastAddress {
pkt.PktType = tcpip.PacketBroadcast
} else if header.IsMulticastEthernetAddress(dst) {
pkt.PktType = tcpip.PacketMulticast
} else if dst == e.LinkAddress() {
pkt.PktType = tcpip.PacketHost
} else {
pkt.PktType = tcpip.PacketOtherHost
func (e *Endpoint) DeliverNetworkPacket(pkts stack.PacketBufferList, index int) {
for _, pkt := range pkts.AsSlice() {
if !e.ParseHeader(pkt) {
return
}
eth := header.Ethernet(pkt.LinkHeader().Slice())
dst := eth.DestinationAddress()
if dst == header.EthernetBroadcastAddress {
pkt.PktType = tcpip.PacketBroadcast
} else if header.IsMulticastEthernetAddress(dst) {
pkt.PktType = tcpip.PacketMulticast
} else if dst == e.LinkAddress() {
pkt.PktType = tcpip.PacketHost
} else {
pkt.PktType = tcpip.PacketOtherHost
}
pkt.NetworkProtocolNumber = eth.Type()
}

// Note, there is no need to check the destination link address here since
// the ethernet hardware filters frames based on their destination addresses.
e.Endpoint.DeliverNetworkPacket(eth.Type() /* protocol */, pkt)
e.Endpoint.DeliverNetworkPacket(pkts, index)
}

// Capabilities implements stack.LinkEndpoint.
Expand Down
22 changes: 14 additions & 8 deletions pkg/tcpip/link/fdbased/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ import (
// NetworkDispatcher.
type linkDispatcher interface {
Stop()
dispatch() (bool, tcpip.Error)
dispatch(idx int) (stopped bool, err tcpip.Error)
release()
}

Expand Down Expand Up @@ -407,7 +407,7 @@ func isSocketFD(fd int) (bool, error) {
// then nothing happens.
//
// Attach implements stack.LinkEndpoint.Attach.
func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) {
func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) int {
e.mu.Lock()
defer e.mu.Unlock()
// nil means the NIC is being removed.
Expand All @@ -417,7 +417,7 @@ func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) {
}
e.Wait()
e.dispatcher = nil
return
return 0
}
if dispatcher != nil && e.dispatcher == nil {
e.dispatcher = dispatcher
Expand All @@ -427,11 +427,13 @@ func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) {
for i := range e.inboundDispatchers {
e.wg.Add(1)
go func(i int) { // S/R-SAFE: See above.
e.dispatchLoop(e.inboundDispatchers[i])
e.dispatchLoop(e.inboundDispatchers[i], i)
e.wg.Done()
}(i)
}
return len(e.inboundDispatchers)
}
return 0
}

// IsAttached implements stack.LinkEndpoint.IsAttached.
Expand Down Expand Up @@ -760,9 +762,10 @@ func (e *endpoint) InjectOutbound(dest tcpip.Address, packet *buffer.View) tcpip

// dispatchLoop reads packets from the file descriptor in a loop and dispatches
// them to the network stack.
func (e *endpoint) dispatchLoop(inboundDispatcher linkDispatcher) tcpip.Error {
func (e *endpoint) dispatchLoop(inboundDispatcher linkDispatcher, index int) tcpip.Error {
for {
cont, err := inboundDispatcher.dispatch()
// TODO: Inline
cont, err := inboundDispatcher.dispatch(index)
if err != nil || !cont {
if e.closed != nil {
e.closed(err)
Expand Down Expand Up @@ -803,10 +806,11 @@ type InjectableEndpoint struct {

// Attach saves the stack network-layer dispatcher for use later when packets
// are injected.
func (e *InjectableEndpoint) Attach(dispatcher stack.NetworkDispatcher) {
func (e *InjectableEndpoint) Attach(dispatcher stack.NetworkDispatcher) int {
e.mu.Lock()
defer e.mu.Unlock()
e.dispatcher = dispatcher
return 1
}

// InjectInbound injects an inbound packet. If the endpoint is not attached, the
Expand All @@ -815,8 +819,10 @@ func (e *InjectableEndpoint) InjectInbound(protocol tcpip.NetworkProtocolNumber,
e.mu.RLock()
d := e.dispatcher
e.mu.RUnlock()
var pkts stack.PacketBufferList
pkts.PushBack(pkt)
if d != nil {
d.DeliverNetworkPacket(protocol, pkt)
d.DeliverNetworkPacket(pkts, 1)
}
}

Expand Down
11 changes: 8 additions & 3 deletions pkg/tcpip/link/fdbased/mmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const (
tpFrameSize = 65536 + 128
tpBlockSize = tpFrameSize * 32
tpBlockNR = 1
tpFrameNR = (tpBlockSize * tpBlockNR) / tpFrameSize
tpFrameNR = (tpBlockSize * tpBlockNR) / tpFrameSize // TODO: Mask
)

// tPacketAlign aligns the pointer v at a tPacketAlignment boundary. Direct
Expand Down Expand Up @@ -130,6 +130,8 @@ type packetMMapDispatcher struct {
// ringOffset is the current offset into the ring buffer where the next
// inbound packet will be placed by the kernel.
ringOffset int

pkts stack.PacketBufferList
}

func (*packetMMapDispatcher) release() {}
Expand Down Expand Up @@ -168,7 +170,7 @@ func (d *packetMMapDispatcher) readMMappedPacket() (*buffer.View, bool, tcpip.Er

// dispatch reads packets from an mmaped ring buffer and dispatches them to the
// network stack.
func (d *packetMMapDispatcher) dispatch() (bool, tcpip.Error) {
func (d *packetMMapDispatcher) dispatch(idx int) (bool, tcpip.Error) {
pkt, stopped, err := d.readMMappedPacket()
if err != nil || stopped {
return false, err
Expand All @@ -192,6 +194,7 @@ func (d *packetMMapDispatcher) dispatch() (bool, tcpip.Error) {
pbuf := stack.NewPacketBuffer(stack.PacketBufferOptions{
Payload: buffer.MakeWithView(pkt),
})
pbuf.NetworkProtocolNumber = p
defer pbuf.DecRef()
if d.e.hdrSize > 0 {
if _, ok := pbuf.LinkHeader().Consume(d.e.hdrSize); !ok {
Expand All @@ -201,6 +204,8 @@ func (d *packetMMapDispatcher) dispatch() (bool, tcpip.Error) {
d.e.mu.RLock()
dsp := d.e.dispatcher
d.e.mu.RUnlock()
dsp.DeliverNetworkPacket(p, pbuf)
var pkts stack.PacketBufferList
pkts.PushBack(pbuf)
dsp.DeliverNetworkPacket(pkts, idx)
return true, nil
}
28 changes: 17 additions & 11 deletions pkg/tcpip/link/fdbased/packet_dispatchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ type readVDispatcher struct {

// buf is the iovec buffer that contains the packet contents.
buf *iovecBuffer

pkts stack.PacketBufferList
}

func newReadVDispatcher(fd int, e *endpoint) (linkDispatcher, error) {
Expand All @@ -176,7 +178,9 @@ func (d *readVDispatcher) release() {
}

// dispatch reads one packet from the file descriptor and dispatches it.
func (d *readVDispatcher) dispatch() (bool, tcpip.Error) {
func (d *readVDispatcher) dispatch(index int) (bool, tcpip.Error) {
defer d.pkts.Reset()

n, err := rawfile.BlockingReadvUntilStopped(d.EFD, d.fd, d.buf.nextIovecs())
if n <= 0 || err != nil {
return false, err
Expand Down Expand Up @@ -210,11 +214,13 @@ func (d *readVDispatcher) dispatch() (bool, tcpip.Error) {
return true, nil
}
}
pkt.NetworkProtocolNumber = p
d.pkts.PushBack(pkt)

d.e.mu.RLock()
dsp := d.e.dispatcher
d.e.mu.RUnlock()
dsp.DeliverNetworkPacket(p, pkt)
dsp.DeliverNetworkPacket(d.pkts, index)

return true, nil
}
Expand All @@ -237,6 +243,8 @@ type recvMMsgDispatcher struct {
// array is passed as the parameter to recvmmsg call to retrieve
// potentially more than 1 packet per unix.
msgHdrs []rawfile.MMsgHdr

pkts stack.PacketBufferList
}

const (
Expand Down Expand Up @@ -272,7 +280,9 @@ func (d *recvMMsgDispatcher) release() {

// recvMMsgDispatch reads more than one packet at a time from the file
// descriptor and dispatches it.
func (d *recvMMsgDispatcher) dispatch() (bool, tcpip.Error) {
func (d *recvMMsgDispatcher) dispatch(index int) (bool, tcpip.Error) {
defer d.pkts.Reset()

// Fill message headers.
for k := range d.msgHdrs {
if d.msgHdrs[k].Msg.Iovlen > 0 {
Expand All @@ -289,21 +299,16 @@ func (d *recvMMsgDispatcher) dispatch() (bool, tcpip.Error) {
if nMsgs == -1 || err != nil {
return false, err
}
// Process each of received packets.
// Keep a list of packets so we can DecRef outside of the loop.
var pkts stack.PacketBufferList

d.e.mu.RLock()
dsp := d.e.dispatcher
dsp := d.e.dispatcher // TODO: Can this really change?
d.e.mu.RUnlock()

defer func() { pkts.DecRef() }()
for k := 0; k < nMsgs; k++ {
n := int(d.msgHdrs[k].Len)
pkt := stack.NewPacketBuffer(stack.PacketBufferOptions{
Payload: d.bufs[k].pullBuffer(n),
})
pkts.PushBack(pkt)

// Mark that this iovec has been processed.
d.msgHdrs[k].Msg.Iovlen = 0
Expand Down Expand Up @@ -334,9 +339,10 @@ func (d *recvMMsgDispatcher) dispatch() (bool, tcpip.Error) {
continue
}
}

dsp.DeliverNetworkPacket(p, pkt)
pkt.NetworkProtocolNumber = p
d.pkts.PushBack(pkt)
}

dsp.DeliverNetworkPacket(d.pkts, index)
return true, nil
}
12 changes: 9 additions & 3 deletions pkg/tcpip/link/loopback/loopback.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ func New() stack.LinkEndpoint {

// Attach implements stack.LinkEndpoint.Attach. It just saves the stack network-
// layer dispatcher for later use when packets need to be dispatched.
func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) {
func (e *endpoint) Attach(dispatcher stack.NetworkDispatcher) int {
e.mu.Lock()
defer e.mu.Unlock()
e.dispatcher = dispatcher
return 1
}

// IsAttached implements stack.LinkEndpoint.IsAttached.
Expand Down Expand Up @@ -87,17 +88,22 @@ func (e *endpoint) WritePackets(pkts stack.PacketBufferList) (int, tcpip.Error)
e.mu.RLock()
d := e.dispatcher
e.mu.RUnlock()
// TODO: stack.SingleBufferList() to get these w/o alloc. Or have nic
// have both single-and-multi packet versions? <--- DO THIS
var newPkts stack.PacketBufferList
for _, pkt := range pkts.AsSlice() {
// In order to properly loop back to the inbound side we must create a
// fresh packet that only contains the underlying payload with no headers
// or struct fields set.
newPkt := stack.NewPacketBuffer(stack.PacketBufferOptions{
Payload: pkt.ToBuffer(),
})
newPkts.PushBack(newPkt)
if d != nil {
d.DeliverNetworkPacket(pkt.NetworkProtocolNumber, newPkt)
// TODO: We're doing single packets, which avoids GRO.
d.DeliverNetworkPacket(newPkts, 0)
}
newPkt.DecRef()
newPkts.Reset()
}
return pkts.Len(), nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/tcpip/link/nested/nested.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ func (e *Endpoint) Init(child stack.LinkEndpoint, embedder stack.NetworkDispatch
}

// DeliverNetworkPacket implements stack.NetworkDispatcher.
func (e *Endpoint) DeliverNetworkPacket(protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) {
func (e *Endpoint) DeliverNetworkPacket(pkts stack.PacketBufferList, index int) {
e.mu.RLock()
d := e.dispatcher
e.mu.RUnlock()
if d != nil {
d.DeliverNetworkPacket(protocol, pkt)
d.DeliverNetworkPacket(pkts, index)
}
}

Expand All @@ -71,7 +71,7 @@ func (e *Endpoint) DeliverLinkPacket(protocol tcpip.NetworkProtocolNumber, pkt *
}

// Attach implements stack.LinkEndpoint.
func (e *Endpoint) Attach(dispatcher stack.NetworkDispatcher) {
func (e *Endpoint) Attach(dispatcher stack.NetworkDispatcher) int {
e.mu.Lock()
e.dispatcher = dispatcher
e.mu.Unlock()
Expand All @@ -81,7 +81,7 @@ func (e *Endpoint) Attach(dispatcher stack.NetworkDispatcher) {
if dispatcher != nil {
pass = e.embedder
}
e.child.Attach(pass)
return e.child.Attach(pass)
}

// IsAttached implements stack.LinkEndpoint.
Expand Down
8 changes: 5 additions & 3 deletions pkg/tcpip/link/packetsocket/packetsocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@ func New(lower stack.LinkEndpoint) stack.LinkEndpoint {
}

// DeliverNetworkPacket implements stack.NetworkDispatcher.
func (e *endpoint) DeliverNetworkPacket(protocol tcpip.NetworkProtocolNumber, pkt *stack.PacketBuffer) {
e.Endpoint.DeliverLinkPacket(protocol, pkt)
func (e *endpoint) DeliverNetworkPacket(pkts stack.PacketBufferList, index int) {
for _, pkt := range pkts.AsSlice() {
e.Endpoint.DeliverLinkPacket(pkt.NetworkProtocolNumber, pkt)
}

e.Endpoint.DeliverNetworkPacket(protocol, pkt)
e.Endpoint.DeliverNetworkPacket(pkts, index)
}

// WritePackets implements stack.LinkEndpoint.
Expand Down
1 change: 1 addition & 0 deletions pkg/tcpip/link/qdisc/fifo/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
name = "fifo",
srcs = [
"fifo.go",
"fifo_sync.go",
"packet_buffer_circular_list.go",
],
visibility = ["//visibility:public"],
Expand Down
2 changes: 2 additions & 0 deletions pkg/tcpip/link/qdisc/fifo/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ func (d *discipline) WritePacket(pkt *stack.PacketBuffer) tcpip.Error {
return nil
}

func (*discipline) Kick(bool) {}

func (d *discipline) Close() {
d.closed.Store(qDiscClosed)
for i := range d.dispatchers {
Expand Down
Loading
Loading