Skip to content

Commit

Permalink
detect anomaly request and add allocation offset
Browse files Browse the repository at this point in the history
Signed-off-by: Sunyanan Choochotkaew <sunyanan.choochotkaew1@ibm.com>
  • Loading branch information
sunya-ch committed Sep 27, 2022
1 parent da39da6 commit 4bbac4b
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 126 deletions.
98 changes: 68 additions & 30 deletions daemon/src/allocator/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,24 @@
package allocator

import (
"math"
"sync"
"context"
"fmt"
"strings"
"time"
"github.com/foundation-model-stack/multi-nic-cni/daemon/backend"
"log"
"strconv"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"context"
"sort"
"k8s.io/client-go/kubernetes"
"log"
"math"
"sort"
"strconv"
"strings"
"sync"
"time"
)


const (
SHIFT_BYTE_VAL = 256
SHIFT_BYTE_VAL = 256
HISTORY_TIMEOUT = 60 // seconds
)

var allocatorLock sync.Mutex
Expand All @@ -36,28 +36,40 @@ type IPValue struct {
Value int64
}

type allocateRecord struct {
time.Time
LastOffset int
}

func (r *allocateRecord) Expired() bool {
curr := time.Now()
return curr.Sub(r.Time).Seconds() > HISTORY_TIMEOUT
}

var deallocateHistory map[string]*allocateRecord = make(map[string]*allocateRecord)

func FindAvailableIndex(indexes []int, leftIndex int) int {
if len(indexes) == 0 {
return -1
}
lastAllocationIndex := indexes[len(indexes)-1]
if lastAllocationIndex - leftIndex == len(indexes) {
// all address in the range is assigned
if lastAllocationIndex-leftIndex == len(indexes) {
// all address in the range is assigned
// log.Printf("lastAllocationIndex - leftIndex == len(indexes), lastIndex %d, leftIndex: %d, %v", lastAllocationIndex, leftIndex, indexes)
return -1
} else {
if indexes[0] != leftIndex + 1{
if indexes[0] != leftIndex+1 {
// log.Printf("indexes[0] != leftIndex + 1, lastIndex %d, leftIndex: %d, %v", lastAllocationIndex, leftIndex, indexes)
return leftIndex + 1
}
midIndex := len(indexes)/2
midIndex := len(indexes) / 2
leftPart := indexes[0:midIndex]
leftResult := FindAvailableIndex(leftPart, leftIndex)
if leftResult != -1 {
return leftResult
}
rightPart := indexes[midIndex:]
rightResult := FindAvailableIndex(rightPart, leftIndex + midIndex)
rightResult := FindAvailableIndex(rightPart, leftIndex+midIndex)
return rightResult
}
}
Expand Down Expand Up @@ -107,7 +119,6 @@ func (r ExcludeRange) Contains(index int) bool {
return index >= r.MinIndex && index <= r.MaxIndex
}


func getExcludeRanges(cidr string, excludes []string) []ExcludeRange {
exludeRanges := []ExcludeRange{}
startIPInIpValue := getIPValue(cidr)
Expand All @@ -122,15 +133,15 @@ func getExcludeRanges(cidr string, excludes []string) []ExcludeRange {
excludeBlock := int64(32)
if len(excludeIPSplits) >= 2 {
excludeBlock, _ = strconv.ParseInt(excludeIPSplits[1], 10, 64)
}
availableBlock := 32-excludeBlock
}
availableBlock := 32 - excludeBlock
maxIndex := int(math.Pow(2, float64(availableBlock)) - 1)
r := ExcludeRange {
r := ExcludeRange{
MinIndex: excludeStartIndex,
MaxIndex: excludeStartIndex + maxIndex,
}
exludeRanges = append(exludeRanges, r)

}
}
return exludeRanges
Expand Down Expand Up @@ -158,14 +169,23 @@ func AllocateIP(req IPRequest) []IPResponse {
hostName := req.HostName
interfaceNames := req.InterfaceNames

FlushExpiredHistory()
offset := 1
if record, ok := deallocateHistory[podName]; ok {
// anomaly
record.LastOffset += 1
offset = record.LastOffset
log.Printf("Found anomaly allocating %s: %d\n", podName, offset)
}

var responses []IPResponse
startAllocate := time.Now()
allocatorLock.Lock()
ippoolSpecMap, err := IppoolHandler.ListIPPool()
if err != nil {
return responses
}

for ippoolName, _ := range ippoolSpecMap {
if len(interfaceNames) == 0 {
// no more interfaces to allocate
Expand Down Expand Up @@ -196,16 +216,16 @@ func AllocateIP(req IPRequest) []IPResponse {
excludes := spec.Excludes

exludeRanges := getExcludeRanges(podCIDR, excludes)
availableBlock := 32-cirdBlock
availableBlock := 32 - cirdBlock
maxIndex := math.Pow(2, float64(availableBlock)) - 2 // except broadcast address
indexes := GenerateAllocateIndexes(allocations, int(maxIndex), exludeRanges)
log.Printf("exclude %v, indexes %v\n", exludeRanges, indexes)
var nextIndex int
if len(indexes) > 0 {
lastIndex := indexes[len(indexes)-1]
nextIndex = lastIndex + 1
nextIndex = lastIndex + offset
} else {
nextIndex = 1 // except network address
nextIndex = offset // except network address
}

nextAddress := ""
Expand All @@ -218,13 +238,13 @@ func AllocateIP(req IPRequest) []IPResponse {
}
}
if nextAddress != "" {
newAllocation := backend.Allocation {
Pod: podName,
newAllocation := backend.Allocation{
Pod: podName,
Namespace: podNamespace,
Index: nextIndex,
Address: nextAddress,
Index: nextIndex,
Address: nextAddress,
}
log.Println(newAllocation)
log.Println(newAllocation)
toInsertIndex := -1
for allocationIndex, allocation := range allocations {
if allocation.Index > newAllocation.Index {
Expand All @@ -237,7 +257,7 @@ func AllocateIP(req IPRequest) []IPResponse {
appendedAllocation := append(allocations[0:toInsertIndex], newAllocation)
allocations = append(appendedAllocation, allocations[toInsertIndex:]...)
}

_, err = IppoolHandler.PatchIPPool(ippoolName, allocations)
if err == nil {
response := IPResponse{
Expand Down Expand Up @@ -297,6 +317,15 @@ func DeallocateIP(req IPRequest) []IPResponse {
defName := req.NetAttachDefName
hostName := req.HostName

// set first record
if _, ok := deallocateHistory[podName]; !ok {
log.Printf("Add %s to deallocateHistory\n", podName)
deallocateHistory[podName] = &allocateRecord{
Time: time.Now(),
LastOffset: 1,
}
}

var responses []IPResponse
startDeallocate := time.Now()
allocatorLock.Lock()
Expand Down Expand Up @@ -327,3 +356,12 @@ func DeallocateIP(req IPRequest) []IPResponse {
log.Println(fmt.Sprintf("Deallocate elapsed: %d us", int64(elapsed/time.Microsecond)))
return responses
}

func FlushExpiredHistory() {
for podName, record := range deallocateHistory {
if record.Expired() {
log.Printf("Flush expired deallocateHistory: %s\n", podName)
delete(deallocateHistory, podName)
}
}
}
80 changes: 46 additions & 34 deletions daemon/src/allocator/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@
* SPDX-License-Identifier: Apache2.0
*/

package allocator
package allocator

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"testing"
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"testing"

"github.com/foundation-model-stack/multi-nic-cni/daemon/backend"
)
"time"
"github.com/foundation-model-stack/multi-nic-cni/daemon/backend"
)

func TestAllocator(t *testing.T) {
func TestAllocator(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Allocator Test Suite")
}
Expand All @@ -26,57 +27,68 @@ func genAllocation(indexes []int) []backend.Allocation {
return allocations
}

var _ = Describe("Test Allocator", func() {
initIndexes := []int{1,2,3,8,13,18}
var _ = Describe("Test Allocator", func() {
initIndexes := []int{1, 2, 3, 8, 13, 18}
allocations := genAllocation(initIndexes)

It("find simple next available index" , func() {
indexes := []int{1,2,3,8,13,18}
nextIndex := FindAvailableIndex(indexes,0)
It("find simple next available index", func() {
indexes := []int{1, 2, 3, 8, 13, 18}
nextIndex := FindAvailableIndex(indexes, 0)
Expect(nextIndex).To(Equal(4))
})

It("find next available index with exclude range over consecutive order" , func() {
excludes := []ExcludeRange {
ExcludeRange {
It("find next available index with exclude range over consecutive order", func() {
excludes := []ExcludeRange{
ExcludeRange{
MinIndex: 4,
MaxIndex: 6,
},
}
indexes := GenerateAllocateIndexes(allocations,20,excludes)
Expect(indexes).To(Equal([]int{1,2,3,4,5,6,8,13,18}))
nextIndex := FindAvailableIndex(indexes,0)
indexes := GenerateAllocateIndexes(allocations, 20, excludes)
Expect(indexes).To(Equal([]int{1, 2, 3, 4, 5, 6, 8, 13, 18}))
nextIndex := FindAvailableIndex(indexes, 0)
Expect(nextIndex).To(Equal(7))
})
It("find next available index with exclude range over non-consecutive order" , func() {
excludes := []ExcludeRange {
ExcludeRange {
It("find next available index with exclude range over non-consecutive order", func() {
excludes := []ExcludeRange{
ExcludeRange{
MinIndex: 4,
MaxIndex: 7,
},
}
indexes := GenerateAllocateIndexes(allocations,20,excludes)
Expect(indexes).To(Equal([]int{1,2,3,4,5,6,7,8,13,18}))
nextIndex := FindAvailableIndex(indexes,0)

indexes := GenerateAllocateIndexes(allocations, 20, excludes)
Expect(indexes).To(Equal([]int{1, 2, 3, 4, 5, 6, 7, 8, 13, 18}))
nextIndex := FindAvailableIndex(indexes, 0)
Expect(nextIndex).To(Equal(9))
})

It("find next available index with exclude range over non-consecutive and then consecutive order" , func() {
excludes := []ExcludeRange {
ExcludeRange {
It("find next available index with exclude range over non-consecutive and then consecutive order", func() {
excludes := []ExcludeRange{
ExcludeRange{
MinIndex: 4,
MaxIndex: 7,
},
ExcludeRange {
ExcludeRange{
MinIndex: 9,
MaxIndex: 12,
},
}
indexes := GenerateAllocateIndexes(allocations,20,excludes)
Expect(indexes).To(Equal([]int{1,2,3,4,5,6,7,8,9,10,11,12,13,18}))
nextIndex := FindAvailableIndex(indexes,0)

indexes := GenerateAllocateIndexes(allocations, 20, excludes)
Expect(indexes).To(Equal([]int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 18}))
nextIndex := FindAvailableIndex(indexes, 0)
Expect(nextIndex).To(Equal(14))
})

It("force expired", func() {
podName := "A"
deallocateHistory[podName] = &allocateRecord{
Time: time.Now(),
LastOffset: 1,
}
Expect(deallocateHistory[podName].Expired()).To(Equal(false))
deallocateHistory[podName].Time = deallocateHistory[podName].Time.Add(time.Duration(-HISTORY_TIMEOUT-1) * time.Second)
Expect(deallocateHistory[podName].Expired()).To(Equal(true))
})
})

0 comments on commit 4bbac4b

Please sign in to comment.