Skip to content

Commit

Permalink
Using podCIDR and podCIDRs to calculate assigned IP
Browse files Browse the repository at this point in the history
  • Loading branch information
yiningou committed Apr 26, 2023
1 parent 167b4f4 commit a0b45d8
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 20 deletions.
139 changes: 123 additions & 16 deletions pkg/metrics/collector/pod_ip_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@ package collector

import (
"bufio"
"context"
"fmt"
"math"
"net"
"os"
"strings"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"github.com/fsnotify/fsnotify"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -91,18 +96,32 @@ var (
"Indicates the IP reuse duration in millisecond for all IPs.",
nil, nil,
)
podIPMetricsWatcherSetup = false
assignedIPv4AddrCountDesc = prometheus.NewDesc(
"ipv4_assigned_count",
"Indicates the total IPv4 IPs assigned to the subnetwork.",
nil, nil,
)
assignedIPv6AddrCountDesc = prometheus.NewDesc(
"ipv6_assigned_count",
"Indicates the total IPv6 IPs assigned to the subnetwork.",
nil, nil,
)
)

type podIPMetricsCollector struct {
usedIPv4AddrCount uint64
usedIPv6AddrCount uint64
dualStackCount uint64
dualStackErrorCount uint64
duplicateIPCount uint64
reuseIPs reuseIPs
reuseMap map[string]*ipReuse
clock clock
usedIPv4AddrCount uint64
usedIPv6AddrCount uint64
dualStackCount uint64
dualStackErrorCount uint64
duplicateIPCount uint64
reuseIPs reuseIPs
reuseMap map[string]*ipReuse
clientset kubernetes.Interface
nodeName string
clock clock
assignedIPv4AddrCount uint64
assignedIPv6AddrCount uint64
podIPMetricsWatcherIsInitialized bool
}

type reuseIPs struct {
Expand Down Expand Up @@ -139,7 +158,26 @@ func init() {
// NewPodIpMetricsCollector returns a new Collector exposing pod IP allocation
// stats.
func NewPodIPMetricsCollector() (Collector, error) {
return &podIPMetricsCollector{clock: &realClock{}}, nil
config, err := rest.InClusterConfig()
if err != nil {
return nil, fmt.Errorf("error creating in-cluster config: %v", err)
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("error creating clientset: %v", err)
}

nodeName := os.Getenv("HOSTNAME")
if nodeName == "" {
return nil, fmt.Errorf("error getting hostname: HOSTNAME environment variable not set")
}

return &podIPMetricsCollector{
clientset: clientset,
nodeName: nodeName,
clock: &realClock{},
}, nil
}

func readLine(path string) (string, error) {
Expand Down Expand Up @@ -266,6 +304,68 @@ func (c *podIPMetricsCollector) fillBuckets(diff uint64) {
}
}

// countIPsFronRange returns the number of available hosts in a subnet.
// The max number is limited by the size of an uint64.
// Number of hosts is calculated with the formula:
// IPv4: 2^x – 2, not consider network and broadcast address
// IPv6: 2^x - 1, not consider network address
// where x is the number of host bits in the subnet.
func (c *podIPMetricsCollector) countIPsFromRange(subnet *net.IPNet) (uint64, error) {
ones, bits := subnet.Mask.Size()
if bits <= ones {
return 0, fmt.Errorf("invalid subnet mask: %v", subnet.Mask)
}
// this checks that we are not overflowing an int64
if bits-ones >= 64 {
return math.MaxUint64, nil
}
max := uint64(1) << uint(bits-ones)
max--
if subnet.IP.To4() != nil {
// Don't use the IPv4 network's broadcast address
if max == 0 {
return 0, fmt.Errorf("subnet includes only the network and broadcast addresses")
}
max--
}
return max, nil
}

func (c *podIPMetricsCollector) updateAssignedIPs(subnet *net.IPNet, totalIP uint64) {
if subnet.IP.To16() != nil && subnet.IP.To4() == nil {
c.assignedIPv6AddrCount += totalIP
} else if subnet.IP.To4() != nil {
c.assignedIPv4AddrCount += totalIP
}
}

func (c *podIPMetricsCollector) calculateAssignedIP() error {
node, err := c.clientset.CoreV1().Nodes().Get(context.Background(), c.nodeName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error getting node %s: %v", c.nodeName, err)
}

var allCIDRs []string
allCIDRs = append(allCIDRs, node.Spec.PodCIDRs...)
if len(allCIDRs) == 0 {
if node.Spec.PodCIDR != "" {
allCIDRs = append(allCIDRs, node.Spec.PodCIDR)
}
}
for _, podCIDR := range allCIDRs {
_, subnet, err := net.ParseCIDR(podCIDR)
if err != nil {
return fmt.Errorf("error parsing podCIDR %s: %v", podCIDR, err)
}
totalIP, err := c.countIPsFromRange(subnet)
if err != nil {
return fmt.Errorf("error calculating total IPs for subnet %s: %v", subnet.IP.String(), err)
}
c.updateAssignedIPs(subnet, totalIP)
}
return nil
}

func (c *podIPMetricsCollector) setupDirectoryWatcher(dir string) error {
if err := c.listIPAddresses(dir); err != nil {
return err
Expand All @@ -286,7 +386,7 @@ func (c *podIPMetricsCollector) setupDirectoryWatcher(dir string) error {
go func() {
defer func() {
watcher.Close()
podIPMetricsWatcherSetup = false
c.podIPMetricsWatcherIsInitialized = false
}()

for {
Expand Down Expand Up @@ -318,26 +418,33 @@ func (c *podIPMetricsCollector) setupDirectoryWatcher(dir string) error {
if err != nil {
glog.Errorf("Failed to add watcher for directory %s: %v", dir, err)
}
podIPMetricsWatcherSetup = true
c.podIPMetricsWatcherIsInitialized = true
return nil
}

func (c *podIPMetricsCollector) Update(ch chan<- prometheus.Metric) error {
if !podIPMetricsWatcherSetup {
if !c.podIPMetricsWatcherIsInitialized {
if err := c.setupDirectoryWatcher(gkePodNetworkDir); err != nil {
glog.Errorf("setupDirectoryWatcher returned error: %v", err)
return nil
}
if err := c.calculateAssignedIP(); err != nil {
glog.Errorf("calculateAssignedIP returned error: %v", err)
}
}
c.populateMetrics(ch)
return nil
}

func (c *podIPMetricsCollector) populateMetrics(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(usedIPv4AddrCountDesc, prometheus.GaugeValue, float64(c.usedIPv4AddrCount))
ch <- prometheus.MustNewConstMetric(usedIPv6AddrCountDesc, prometheus.GaugeValue, float64(c.usedIPv6AddrCount))
ch <- prometheus.MustNewConstMetric(dualStackCountDesc, prometheus.GaugeValue, float64(c.dualStackCount))
ch <- prometheus.MustNewConstMetric(dualStackErrorCountDesc, prometheus.GaugeValue, float64(c.dualStackErrorCount))
ch <- prometheus.MustNewConstMetric(duplicateIPCountDesc, prometheus.GaugeValue, float64(c.duplicateIPCount))

ch <- prometheus.MustNewConstMetric(ipReuseMinDesc, prometheus.GaugeValue, float64(c.reuseIPs.min))
ch <- prometheus.MustNewConstMetric(ipReuseAvgDesc, prometheus.GaugeValue, c.reuseIPs.sum/float64(c.reuseIPs.count))
ch <- prometheus.MustNewConstHistogram(ipReuseHistogramDesc, c.reuseIPs.count, c.reuseIPs.sum, c.reuseIPs.buckets)

return nil
ch <- prometheus.MustNewConstMetric(assignedIPv4AddrCountDesc, prometheus.GaugeValue, float64(c.assignedIPv4AddrCount))
ch <- prometheus.MustNewConstMetric(assignedIPv6AddrCountDesc, prometheus.GaugeValue, float64(c.assignedIPv6AddrCount))
}
71 changes: 67 additions & 4 deletions pkg/metrics/collector/pod_ip_metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ import (
"testing"
"time"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"

"github.com/prometheus/client_golang/prometheus"
)

Expand All @@ -20,6 +24,18 @@ func (c *fakeClock) Sleep(d time.Duration) {
c.now = c.now.Add(d)
}

func newNodeWithPodCIDRs(podCIDR string, podCIDRs []string) *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node",
},
Spec: v1.NodeSpec{
PodCIDR: podCIDR,
PodCIDRs: podCIDRs,
},
}
}

func mustCreateFile(t *testing.T, dir, name, content string) {
path := dir + "/" + name
f, err := os.Create(path)
Expand Down Expand Up @@ -134,10 +150,10 @@ func TestListIpAddresses(t *testing.T) {
return
}
if mc.usedIPv4AddrCount != tc.wantUsedIPv4Count {
t.Errorf("usedIpv4AddrCount. want: %d, got %d", tc.wantUsedIPv4Count, mc.usedIPv4AddrCount)
t.Errorf("usedIPv4AddrCount. want: %d, got %d", tc.wantUsedIPv4Count, mc.usedIPv4AddrCount)
}
if mc.usedIPv6AddrCount != tc.wantUsedIPv6Count {
t.Errorf("usedIpv6AddrCount. want: %d, got %d", tc.wantUsedIPv6Count, mc.usedIPv6AddrCount)
t.Errorf("usedIPv6AddrCount. want: %d, got %d", tc.wantUsedIPv6Count, mc.usedIPv6AddrCount)
}
if mc.dualStackCount != tc.wantDualStackCount {
t.Errorf("dualStackCount. want: %d, got %d", tc.wantDualStackCount, mc.dualStackCount)
Expand Down Expand Up @@ -191,8 +207,8 @@ func TestSetupDirectoryWatcher(t *testing.T) {
t.Errorf("no file is deleted. want value to be 0, got %v", v)
}
}
if !podIPMetricsWatcherSetup {
t.Fatal("podIpMetricsWatcherSetup: want: true, got: false")
if !mc.podIPMetricsWatcherIsInitialized {
t.Fatal("podIPMetricsWatcherIsIntialized: want: true, got: false")
}

//Add a new file to the directory. Verify metrics
Expand Down Expand Up @@ -269,3 +285,50 @@ func TestSetupDirectoryWatcher(t *testing.T) {
}
}
}

func TestCalculateAssignedIP(t *testing.T) {
testCases := []struct {
desc string
podCIDR string
podCIDRs []string
wantAssignedIPv4Count uint64
wantAssignedIPv6Count uint64
}{
{
desc: "Two same IPv4 addresses and one IPv6 address",
podCIDR: "10.0.0.0/24",
podCIDRs: []string{"10.0.0.0/24", "2600:1900::/125"},
wantAssignedIPv4Count: 254,
wantAssignedIPv6Count: 7,
},
{
desc: "Two same IPv4 addresses and no IPv6 address",
podCIDR: "10.0.0.0/24",
podCIDRs: []string{"10.0.0.0/24"},
wantAssignedIPv4Count: 254,
wantAssignedIPv6Count: 0,
},
}

for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
node := newNodeWithPodCIDRs(tc.podCIDR, tc.podCIDRs)
fakeClient := fake.NewSimpleClientset(node)
mc := &podIPMetricsCollector{
clientset: fakeClient,
nodeName: "test-node",
}

err := mc.calculateAssignedIP()
if err != nil {
t.Fatalf("Error calculating assigned IPs: %v", err)
}
if mc.assignedIPv4AddrCount != tc.wantAssignedIPv4Count {
t.Errorf("assignedIPv4AddrCount. want: %d, got %d", tc.wantAssignedIPv4Count, mc.assignedIPv4AddrCount)
}
if mc.assignedIPv6AddrCount != tc.wantAssignedIPv6Count {
t.Errorf("assignedIPv6AddrCount. want: %d, got %d", tc.wantAssignedIPv6Count, mc.assignedIPv6AddrCount)
}
})
}
}

0 comments on commit a0b45d8

Please sign in to comment.