This fork extends the simulator to read in metric data from a telemetry agent, such as Dynatrace, and allows to simulate pod migration by assuming an empirical, linear migration downtime to memory size relationship with a throughput rate of about 1.5 GB/s. It is part of a research project to investigate the feasibility of speculative scheduling of long-running stateful jos with unknown resource requirements in Kubernetes.
The aim is to reproduce real cluster load scenarios and investigate different migration strategies of the developed migration controller, which can be found here.
In order to use the migration controller in the simulator, the corresponding code repo should be linked to the simulator. A Go workspace, here called controller-simulator
, make this easy (also see here):
go work init ./controller-simulator
go mod init github.com/elchead/k8s-cluster-simulator
go mod vendor
go work use ./k8s-cluster-simulator
go mod init github.com/elchead/k8s-migration-controller
go work use github.com/elchead/k8s-migration-controller
Dynatrace only allows to export all metric data through its developer API at the moment.
The /metrics/query
endpoint with the following query provides the memory from 2000 pod workers:
builtin:containers.memory.residentSetBytes:filter(and(eq(Container,worker))):splitBy("dt.entity.container_group_instance",Container):avg:auto:sort(value(avg,descending)):limit(2000)
In order to map these entities to the pod names, another query to the metrics/entity
endpoint is necessary with type("CONTAINER_GROUP_INSTANCE")
.
The script ./apython/json_creator.py
then merges the 2 JSON file results into a single JSON file that can be processed by the simulator.
In order to start the simulator with this data in ./example/main.go
, --file
needs to be provided and a simulation duration needs to be added to the global variable simDurationMap
.
Lastly, the config.yaml
should be adjusted to the start time of the metric data.
Then the simulator can be launched using:
go run github.com/elchead/k8s-cluster-simulator/example --config config --file $FILE
(example includes the migration controller).
Kubernetes cluster simulator for evaluating schedulers.
See example directory.
// 1. Create a KubeSim with a pod queue and a scheduler.
queue := queue.NewPriorityQueue()
sched := buildScheduler() // see below
kubesim := kubesim.NewKubeSimFromConfigPathOrDie(configPath, queue, sched)
// 2. Register one or more pod submitters to KubeSim.
numOfSubmittingPods := 8
kubesim.AddSubmitter(newMySubmitter(numOfSubmittingPods))
// 3. Run the main loop of KubeSim.
// In each execution of the loop, KubeSim
// 1) stores pods submitted from the registered submitters to its queue,
// 2) invokes scheduler with pending pods and cluster state,
// 3) emits cluster metrics to designated location(s) if enabled
// 4) progresses the simulated clock
if err := kubesim.Run(ctx); err != nil && errors.Cause(err) != context.Canceled {
log.L.Fatal(err)
}
func buildScheduler() scheduler.Scheduler {
// 1. Create a generic scheduler that mimics a kube-scheduler.
sched := scheduler.NewGenericScheduler( /* preemption enabled */ true)
// 2. Register extender(s)
sched.AddExtender(
scheduler.Extender{
Name: "MyExtender",
Filter: filterExtender,
Prioritize: prioritizeExtender,
Weight: 1,
NodeCacheCapable: true,
},
)
// 2. Register plugin(s)
// Predicate
sched.AddPredicate("GeneralPredicates", predicates.GeneralPredicates)
// Prioritizer
sched.AddPrioritizer(priorities.PriorityConfig{
Name: "BalancedResourceAllocation",
Map: priorities.BalancedResourceAllocationMap,
Reduce: nil,
Weight: 1,
})
sched.AddPrioritizer(priorities.PriorityConfig{
Name: "LeastRequested",
Map: priorities.LeastRequestedPriorityMap,
Reduce: nil,
Weight: 1,
})
return &sched
}
See pkg/submitter/submitter.go.
// Submitter defines the submitter interface.
type Submitter interface {
// Submit submits pods to a simulated cluster.
// The return value is a list of submitter events.
// Submitters are called serially in the same order that they are registered to the simulated
// cluster.
// This method must never block.
Submit(clock clock.Clock, nodeLister algorithm.NodeLister, metrics metrics.Metrics) ([]Event, error)
}
// Event defines the interface of a submitter event.
// Submit can returns any type in a list that implements this interface.
type Event interface {
IsSubmitterEvent() bool
}
// SubmitEvent represents an event of submitting a pod to a cluster.
type SubmitEvent struct {
Pod *v1.Pod
}
// DeleteEvent represents an event of deleting a pod from a cluster.
type DeleteEvent struct {
PodName string
PodNamespace string
}
// UpdateEvent represents an event of updating the manifest of a pending pod.
type UpdateEvent struct {
PodName string
PodNamespace string
NewPod *v1.Pod
}
// TerminateSubmitterEvent represents an event of terminating the submission process.
type TerminateSubmitterEvent struct {
}
See pkg/scheduler/generic_scheduler.go and pkg/scheduler/extender.go.
k8s-cluster-simulator provides GenericScheduler
, which follows the behavior of kube-scheduler's
genericScheduler
.
GenericScheduler
makes scheduling decision for each given pod in the one-by-one manner, with
predicates and prioritizers.
The interfaces of predicates and prioritizers are similar to those of kube-scheduler.
// NewGenericScheduler creates a new GenericScheduler.
func NewGenericScheduler(preeptionEnabled bool) GenericScheduler {
return GenericScheduler{
predicates: map[string]predicates.FitPredicate{},
preemptionEnabled: preeptionEnabled,
}
}
// AddExtender adds an extender to this GenericScheduler.
func (sched *GenericScheduler) AddExtender(extender Extender) {
sched.extenders = append(sched.extenders, extender)
}
// AddPredicate adds a predicate plugin to this GenericScheduler.
func (sched *GenericScheduler) AddPredicate(name string, predicate predicates.FitPredicate) {
sched.predicates[name] = predicate
}
// AddPrioritizer adds a prioritizer plugin to this GenericScheduler.
func (sched *GenericScheduler) AddPrioritizer(prioritizer priorities.PriorityConfig) {
sched.prioritizers = append(sched.prioritizers, prioritizer)
}
// Extender reperesents a scheduler extender.
type Extender struct {
// Name identifies this Extender.
Name string
// Filter filters out the nodes that cannot run the given pod in api.ExtenderArgs.
// This function can be nil.
Filter func(api.ExtenderArgs) api.ExtenderFilterResult
// Prioritize ranks each node that has passes the filtering stage.
// The weighted scores are summed up and the total score is used for the node selection.
Prioritize func(api.ExtenderArgs) api.HostPriorityList
Weight int
// NodeCacheCapable specifies that this Extender is capable of caching node information, so the
// scheduler should only send minimal information about the eligible nodes assuming that the
// extender already cached full details of all nodes in the cluster.
// Specifically, ExtenderArgs.NodeNames is populated iff NodeCacheCapable == true, and
// ExtenderArgs.Nodes.Items is populated iff NodeCacheCapable == false.
NodeCacheCapable bool
// Ignorable specifies whether the extender is ignorable (i.e. the scheduler process should not
// fail when this extender returns an error).
Ignorable bool
}
See pkg/scheduler/scheduler.go.
k8s-cluster-simulator also supports the lowest-level scheduler interface, which makes scheduling decisions for (subset of) pending pods and running pods, given the cluster state at a clock.
// Scheduler defines the lowest-level scheduler interface.
type Scheduler interface {
// Schedule makes scheduling decisions for (subset of) pending pods and running pods.
// The return value is a list of scheduling events.
// This method must never block.
Schedule(
clock clock.Clock,
podQueue queue.PodQueue,
nodeLister algorithm.NodeLister,
nodeInfoMap map[string]*nodeinfo.NodeInfo) ([]Event, error)
}
// Event defines the interface of a scheduling event.
// Submit can returns any type in a list that implements this interface.
type Event interface {
IsSchedulerEvent() bool
}
// BindEvent represents an event of deciding the binding of a pod to a node.
type BindEvent struct {
Pod *v1.Pod
ScheduleResult core.ScheduleResult
}
// DeleteEvent represents an event of the deleting a bound pod on a node.
type DeleteEvent struct {
PodNamespace string
PodName string
NodeName string
}
Embed a YAML in the annotations
field of the pod manifest. e.g.,
metadata:
name: nginx-sim
annotations:
simSpec: |
- seconds: 5 # an execution phase of this pod
resourceUsage: # resource usage (not request, nor limit)
cpu: 1
memory: 2Gi
nvidia.com/gpu: 0
- seconds: 10 # another phase that follows the previous one
resourceUsage:
cpu: 2
memory: 4Gi
nvidia.com/gpu: 1
These fields are populated or used by the simulator.
v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID, // populated when this pod is submitted to the simulator
CreationTimestamp, // populated when this pod is submitted to the simulator
DeletionTimestamp, // populated when a deletion event for this pod has been accepted by the simulator
},
Spec: v1.PodSpec {
NodeName, // populated when the cluster binds this pod to a node
TerminationGracePeriodSeconds, // read when this pod is deleted
Priority, // read by PriorityQueue to sort pods,
// and read when the scheduler trys to schedule this pod
},
Status: v1.PodStatus{
Phase, // populated by the simulator. Pending -> Running -> Succeeded xor Failed
Conditions, // populated by the simulator
Reason, // populated by the simulator
Message, // populated by the simulator
StartTime, // populated by the simulator when this pod has started its execution
ContainerStatuses, // populated by the simulator
},
}
These fields are populated and used by the simulator.
v1.Node{
TypeMeta: metav1.TypeMeta{
Kind: "Node",
APIVersion: "v1",
},
ObjectMeta: // determined by the config
Spec: // determined by the config
Status: v1.NodeStatus{
Capacity: // Determined by the config
Allocatable: // Same as Capacity
Conditions: []v1.NodeCondition{ // populated by the simulator
{
Type: v1.NodeReady,
Status: v1.ConditionTrue,
LastHeartbeatTime: // clock,
LastTransitionTime: // clock,
Reason: "KubeletReady",
Message: "kubelet is posting ready status",
},
{
Type: v1.NodeOutOfDisk,
Status: v1.ConditionFalse,
LastHeartbeatTime: // clock,
LastTransitionTime: // clock,
Reason: "KubeletHasSufficientDisk",
Message: "kubelet has sufficient disk space available",
},
{
Type: v1.NodeMemoryPressure,
Status: v1.ConditionFalse,
LastHeartbeatTime: // clock,
LastTransitionTime: // clock,
Reason: "KubeletHasSufficientMemory",
Message: "kubelet has sufficient memory available",
},
{
Type: v1.NodeDiskPressure,
Status: v1.ConditionFalse,
LastHeartbeatTime: // clock,
LastTransitionTime: // clock,
Reason: "KubeletHasNoDiskPressure",
Message: "kubelet has no disk pressure",
},
{
Type: v1.NodePIDPressure,
Status: v1.ConditionFalse,
LastHeartbeatTime: // clock,
LastTransitionTime: // clock,
Reason: "KubeletHasSufficientPID",
Message: "kubelet has sufficient PID available",
},
},
},
}
The design and implementation of this project are inherently inspired by kubernetes, which is licensed under Apache-2.0. Moreover, functions in the following files were obtained from Kubernetes project and modified so that they would be compatible with k8s-cluster-simulator. Please see each file for more detail.