Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cns/ipampool/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ func init() {
)
}

func observeIPPoolState(state ipPoolState, meta metaState, labels []string) {
func observeIPPoolState(state ipPoolState, meta metaState) {
labels := []string{meta.subnet, meta.subnetCIDR, meta.subnetARMID}
ipamAllocatedIPCount.WithLabelValues(labels...).Set(float64(state.allocatedToPods))
ipamAvailableIPCount.WithLabelValues(labels...).Set(float64(state.available))
ipamBatchSize.WithLabelValues(labels...).Set(float64(meta.batch))
Expand Down
35 changes: 9 additions & 26 deletions cns/ipampool/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ type metaState struct {
minFreeCount int64
notInUseCount int64
primaryIPAddresses map[string]struct{}
subnet string
subnetARMID string
subnetCIDR string
}

type Options struct {
Expand All @@ -56,10 +59,6 @@ type Monitor struct {
once sync.Once
}

// Global Variables:
// For Subnet, Subnet Address Space and Subnet ARM ID
var subnet, subnetCIDR, subnetARMID string

func NewMonitor(httpService cns.HTTPService, nnccli nodeNetworkConfigSpecUpdater, cssSource <-chan v1alpha1.ClusterSubnetState, opts *Options) *Monitor {
if opts.RefreshDelay < 1 {
opts.RefreshDelay = DefaultRefreshDelay
Expand All @@ -82,15 +81,8 @@ func NewMonitor(httpService cns.HTTPService, nnccli nodeNetworkConfigSpecUpdater
// Subsequently, it will run run once per RefreshDelay and attempt to re-reconcile the pool.
func (pm *Monitor) Start(ctx context.Context) error {
logger.Printf("[ipam-pool-monitor] Starting CNS IPAM Pool Monitor")

// the exhausted subnet set is populated by parsing incoming cluster subnet states
// if a subnet is present in this map, the batch from the NNC is ignored and instead
// the pool monitor batches 1 IP at a time to relieve capacity pressure.
exhaustedSubnetSet := map[string]struct{}{}

ticker := time.NewTicker(pm.opts.RefreshDelay)
defer ticker.Stop()

for {
// proceed when things happen:
select {
Expand All @@ -105,14 +97,8 @@ func (pm *Monitor) Start(ctx context.Context) error {
// if we have initialized and enter this case, we proceed out of the select and continue to reconcile.
}
case css := <-pm.cssSource: // received an updated ClusterSubnetState
// this map does not need additional synchronization thanks to channels <3
if css.Status.Exhausted {
logger.Printf("subnet %s is exhausted", css.Name)
exhaustedSubnetSet[css.Name] = struct{}{}
} else {
logger.Printf("subnet %s is no longer exhausted", css.Name)
delete(exhaustedSubnetSet, css.Name)
}
pm.metastate.exhausted = css.Status.Exhausted
logger.Printf("subnet exhausted status = %t", pm.metastate.exhausted)
select {
default:
// if we have NOT initialized and enter this case, we continue out of this iteration and let the for loop begin again.
Expand All @@ -123,13 +109,10 @@ func (pm *Monitor) Start(ctx context.Context) error {
case nnc := <-pm.nncSource: // received a new NodeNetworkConfig, extract the data from it and re-reconcile.
if len(nnc.Status.NetworkContainers) > 0 {
// Set SubnetName, SubnetAddressSpace and Pod Network ARM ID values to the global subnet, subnetCIDR and subnetARM variables.
subnet = nnc.Status.NetworkContainers[0].SubnetName
subnetCIDR = nnc.Status.NetworkContainers[0].SubnetAddressSpace
subnetARMID = GenerateARMID(&nnc.Status.NetworkContainers[0])
// check for subnet exhaustion
_, pm.metastate.exhausted = exhaustedSubnetSet[nnc.Status.NetworkContainers[0].SubnetName]
pm.metastate.subnet = nnc.Status.NetworkContainers[0].SubnetName
pm.metastate.subnetCIDR = nnc.Status.NetworkContainers[0].SubnetAddressSpace
pm.metastate.subnetARMID = GenerateARMID(&nnc.Status.NetworkContainers[0])
}

pm.metastate.primaryIPAddresses = make(map[string]struct{})
// Add Primary IP to Map, if not present.
// This is only for Swift i.e. if NC Type is vnet.
Expand Down Expand Up @@ -206,7 +189,7 @@ func (pm *Monitor) reconcile(ctx context.Context) error {
allocatedIPs := pm.httpService.GetPodIPConfigState()
meta := pm.metastate
state := buildIPPoolState(allocatedIPs, pm.spec)
observeIPPoolState(state, meta, []string{subnet, subnetCIDR, subnetARMID})
observeIPPoolState(state, meta)

// log every 30th reconcile to reduce the AI load. we will always log when the monitor
// changes the pool, below.
Expand Down
10 changes: 9 additions & 1 deletion cns/service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
kuberuntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -1102,8 +1103,15 @@ func InitializeCRDState(ctx context.Context, httpRestService cns.HTTPService, cn
},
})

crdSchemes := kuberuntime.NewScheme()
if err = v1alpha.AddToScheme(crdSchemes); err != nil {
return errors.Wrap(err, "failed to add nodenetworkconfig/v1alpha to scheme")
}
if err = v1alpha1.AddToScheme(crdSchemes); err != nil {
return errors.Wrap(err, "failed to add clustersubnetstate/v1alpha1 to scheme")
}
manager, err := ctrl.NewManager(kubeConfig, ctrl.Options{
Scheme: nodenetworkconfig.Scheme,
Scheme: crdSchemes,
MetricsBindAddress: "0",
Namespace: "kube-system", // TODO(rbtr): namespace should be in the cns config
NewCache: nodeScopedCache,
Expand Down