Skip to content

Commit

Permalink
[Multicast] Support IGMPv3 leave action (#3389)
Browse files Browse the repository at this point in the history
IGMPv3 doesn't have an explicit message to describe a member leaving a
multicast group. Instead, IGMPv3 report message is used, but the group
record has an empty "include" sources. So Antrea Agent checks the group
record type and source number with IGMPv3 report message, and picks out
the groups which the member expects to leave.

Signed-off-by: wenyingd <wenyingd@vmware.com>
  • Loading branch information
wenyingd committed Mar 18, 2022
1 parent abfb09c commit f64356e
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 4 deletions.
147 changes: 147 additions & 0 deletions pkg/agent/multicast/mcast_controller_test.go
Expand Up @@ -24,6 +24,9 @@ import (
"time"

"antrea.io/libOpenflow/openflow13"
"antrea.io/libOpenflow/protocol"
"antrea.io/libOpenflow/util"
"antrea.io/ofnet/ofctrl"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -55,6 +58,8 @@ var (
}
nodeIf1IP = net.ParseIP("192.168.20.22")
externalInterfaceIP = net.ParseIP("192.168.50.23")
pktInSrcMAC, _ = net.ParseMAC("11:22:33:44:55:66")
pktInDstMAC, _ = net.ParseMAC("01:00:5e:00:00:16")
)

func TestAddGroupMemberStatus(t *testing.T) {
Expand Down Expand Up @@ -241,6 +246,62 @@ func TestClearStaleGroups(t *testing.T) {
}
}

func TestProcessPacketIn(t *testing.T) {
mockController := newMockMulticastController(t)
snooper := mockController.igmpSnooper
stopCh := make(chan struct{})
defer close(stopCh)
go mockController.eventHandler(stopCh)

getIPs := func(ipStrs []string) []net.IP {
ips := make([]net.IP, len(ipStrs))
for i := range ipStrs {
ips[i] = net.ParseIP(ipStrs[i])
}
return ips
}
for _, tc := range []struct {
iface *interfacestore.InterfaceConfig
version uint8
joinedGroups sets.String
leftGroups sets.String
}{
{
iface: createInterface("p1", 1),
joinedGroups: sets.NewString("224.1.101.2", "224.1.101.3", "224.1.101.4"),
leftGroups: sets.NewString(),
version: 1,
},
{
iface: createInterface("p2", 2),
joinedGroups: sets.NewString("224.1.102.2", "224.1.102.3", "224.1.102.4"),
leftGroups: sets.NewString("224.1.102.3"),
version: 2,
},
{
iface: createInterface("p3", 3),
joinedGroups: sets.NewString("224.1.103.2", "224.1.103.3", "224.1.103.4"),
leftGroups: sets.NewString("224.1.103.2"),
version: 3,
},
} {
packets := createIGMPReportPacketIn(getIPs(tc.joinedGroups.List()), getIPs(tc.leftGroups.List()), tc.version, uint32(tc.iface.OFPort))
mockOFClient.EXPECT().SendIGMPQueryPacketOut(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
for _, pkt := range packets {
mockIfaceStore.EXPECT().GetInterfaceByOFPort(uint32(tc.iface.OFPort)).Return(tc.iface, true)
err := snooper.processPacketIn(pkt)
assert.Nil(t, err)
}
time.Sleep(time.Second)
expGroups := tc.joinedGroups.Difference(tc.leftGroups)
statuses := mockController.getGroupMemberStatusesByPod(tc.iface.InterfaceName)
assert.Equal(t, expGroups.Len(), len(statuses))
for _, s := range statuses {
assert.True(t, expGroups.Has(s.group.String()))
}
}
}

func compareGroupStatus(t *testing.T, cache cache.Indexer, event *mcastGroupEvent) {
obj, exits, err := cache.GetByKey(event.group.String())
assert.Nil(t, err)
Expand Down Expand Up @@ -280,3 +341,89 @@ func (c *Controller) initialize(t *testing.T) error {
mockMulticastSocket.EXPECT().AllocateVIFs(gomock.Any(), uint16(1)).Times(1).Return([]uint16{1, 2}, nil)
return c.Initialize()
}

func createInterface(name string, ofport uint32) *interfacestore.InterfaceConfig {
return &interfacestore.InterfaceConfig{
InterfaceName: name,
Type: interfacestore.ContainerInterface,
OVSPortConfig: &interfacestore.OVSPortConfig{
OFPort: int32(ofport),
},
ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{
PodName: name,
},
}
}

func createIGMPReportPacketIn(joinedGroups []net.IP, leftGroups []net.IP, version uint8, ofport uint32) []*ofctrl.PacketIn {
joinMessages := createIGMPJoinMessage(joinedGroups, version)
leaveMessages := createIGMPLeaveMessage(leftGroups, version)
generatePacket := func(m util.Message) ofctrl.PacketIn {
pkt := openflow13.NewPacketIn()
matchInport := openflow13.NewInPortField(ofport)
pkt.Match.AddField(*matchInport)
ipPacket := &protocol.IPv4{
Version: 0x4,
IHL: 5,
Protocol: IGMPProtocolNumber,
Length: 20 + m.Len(),
Data: m,
}
pkt.Data = protocol.Ethernet{
HWDst: pktInDstMAC,
HWSrc: pktInSrcMAC,
Ethertype: protocol.IPv4_MSG,
Data: ipPacket,
}
return ofctrl.PacketIn(*pkt)
}
pkts := make([]*ofctrl.PacketIn, 0)
for _, m := range joinMessages {
pkt := generatePacket(m)
pkts = append(pkts, &pkt)
}
for _, m := range leaveMessages {
pkt := generatePacket(m)
pkts = append(pkts, &pkt)
}
return pkts
}

func createIGMPLeaveMessage(groups []net.IP, version uint8) []util.Message {
pkts := make([]util.Message, 0)
switch version {
case 2:
for i := range groups {
pkts = append(pkts, protocol.NewIGMPv2Leave(groups[i]))
}
return pkts
case 3:
records := make([]protocol.IGMPv3GroupRecord, 0)
for _, g := range groups {
records = append(records, protocol.NewGroupRecord(protocol.IGMPIsIn, g, nil))
}
pkts = append(pkts, protocol.NewIGMPv3Report(records))
}
return pkts
}

func createIGMPJoinMessage(groups []net.IP, version uint8) []util.Message {
pkts := make([]util.Message, 0)
switch version {
case 1:
for i := range groups {
pkts = append(pkts, protocol.NewIGMPv1Report(groups[i]))
}
case 2:
for i := range groups {
pkts = append(pkts, protocol.NewIGMPv2Report(groups[i]))
}
case 3:
records := make([]protocol.IGMPv3GroupRecord, 0)
for _, g := range groups {
records = append(records, protocol.NewGroupRecord(protocol.IGMPIsEx, g, nil))
}
pkts = append(pkts, protocol.NewIGMPv3Report(records))
}
return pkts
}
16 changes: 12 additions & 4 deletions pkg/agent/multicast/mcast_discovery.go
Expand Up @@ -118,6 +118,10 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error {
return err
}
klog.V(2).InfoS("Received PacketIn for IGMP packet", "in_port", iface.OFPort)
podName := "unknown"
if iface.Type == interfacestore.ContainerInterface {
podName = iface.PodName
}
igmp, err := parseIGMPPacket(pktIn.Data)
if err != nil {
return err
Expand All @@ -127,7 +131,7 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error {
fallthrough
case protocol.IGMPv2Report:
mgroup := igmp.(*protocol.IGMPv1or2).GroupAddress
klog.InfoS("Received IGMPv1or2 Report message", "group", mgroup.String(), "interface", iface.PodName)
klog.InfoS("Received IGMPv1or2 Report message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName)
event := &mcastGroupEvent{
group: mgroup,
eType: groupJoin,
Expand All @@ -139,10 +143,14 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error {
msg := igmp.(*protocol.IGMPv3MembershipReport)
for _, gr := range msg.GroupRecords {
mgroup := gr.MulticastAddress
klog.InfoS("Received IGMPv3 Report message", "group", mgroup.String(), "interface", iface.PodName)
klog.InfoS("Received IGMPv3 Report message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName, "recordType", gr.Type, "sourceCount", gr.NumberOfSources)
evtType := groupJoin
if (gr.Type == protocol.IGMPIsIn || gr.Type == protocol.IGMPToIn) && gr.NumberOfSources == 0 {
evtType = groupLeave
}
event := &mcastGroupEvent{
group: mgroup,
eType: groupJoin,
eType: evtType,
time: now,
iface: iface,
}
Expand All @@ -151,7 +159,7 @@ func (s *IGMPSnooper) processPacketIn(pktIn *ofctrl.PacketIn) error {

case protocol.IGMPv2LeaveGroup:
mgroup := igmp.(*protocol.IGMPv1or2).GroupAddress
klog.InfoS("Received IGMPv2 Leave message", "group", mgroup.String(), "interface", iface.PodName)
klog.InfoS("Received IGMPv2 Leave message", "group", mgroup.String(), "interface", iface.InterfaceName, "pod", podName)
event := &mcastGroupEvent{
group: mgroup,
eType: groupLeave,
Expand Down

0 comments on commit f64356e

Please sign in to comment.