Skip to content

Commit

Permalink
update formatting
Browse files Browse the repository at this point in the history
Signed-off-by: vsoch <vsoch@users.noreply.github.com>
  • Loading branch information
vsoch committed Feb 8, 2023
1 parent 4121936 commit ab07a92
Show file tree
Hide file tree
Showing 11 changed files with 127 additions and 880 deletions.
2 changes: 1 addition & 1 deletion config/crd/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ resources:
- bases/flux-framework.org_miniclusters.yaml
#+kubebuilder:scaffold:crdkustomizeresource

patchesStrategicMerge:
patches:
# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix.
# patches here are for enabling the conversion webhook for each CRD
#- patches/webhook_in_fluxes.yaml
Expand Down
2 changes: 1 addition & 1 deletion config/default/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ bases:
# [PROMETHEUS] To enable prometheus monitor, uncomment all sections with 'PROMETHEUS'.
#- ../prometheus

patchesStrategicMerge:
patches:
# Protect the /metrics endpoint by putting it behind auth.
# If you want your controller-manager to expose the /metrics
# endpoint w/o any authn/z, please comment the following line.
Expand Down
2 changes: 2 additions & 0 deletions controllers/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ var (

// SetupControllers sets up all controllers.
func SetupControllers(mgr ctrl.Manager, restClient rest.Interface) (string, error) {

jobReconciler := controllers.NewMiniClusterReconciler(
mgr.GetClient(),
mgr.GetScheme(),
*(mgr.GetConfig()),
restClient,
// other watching reconcilers could be added here!
)

if err := jobReconciler.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "MiniCluster")
return "MiniCluster", err
Expand Down
46 changes: 39 additions & 7 deletions controllers/flux/certs.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,23 @@ func generateCertGeneratorEntrypoint(cluster *api.MiniCluster, container api.Min
}

// getCurveGenerateConfigMap generate the config map entrypoint for the pod to generate the curve cert
func (r *MiniClusterReconciler) getCurveGenerateConfigMap(ctx context.Context, cluster *api.MiniCluster, container api.MiniClusterContainer) (*corev1.ConfigMap, ctrl.Result, error) {
func (r *MiniClusterReconciler) getCurveGenerateConfigMap(
ctx context.Context,
cluster *api.MiniCluster,
container api.MiniClusterContainer,
) (*corev1.ConfigMap, ctrl.Result, error) {

existing := &corev1.ConfigMap{}
configFullName := cluster.Name + certGenSuffix
err := r.Client.Get(ctx, types.NamespacedName{Name: configFullName, Namespace: cluster.Namespace}, existing)
err := r.Client.Get(
ctx,
types.NamespacedName{
Name: configFullName,
Namespace: cluster.Namespace,
},
existing,
)

if err != nil {

// Case 1: not found yet, and hostfile is ready (recreate)
Expand All @@ -117,10 +129,19 @@ func (r *MiniClusterReconciler) getCurveGenerateConfigMap(ctx context.Context, c
}
data[certGeneratorName] = genScript
dep := r.createConfigMap(cluster, configFullName, data)
r.log.Info("✨ Creating Curve Certificate Pod Generator Entrypoint ✨", "Namespace", dep.Namespace, "Name", dep.Name)

r.log.Info(
"✨ Creating Curve Certificate Pod Generator Entrypoint ✨",
"Namespace", dep.Namespace,
"Name", dep.Name,
)
err = r.Client.Create(ctx, dep)
if err != nil {
r.log.Error(err, "❌ Failed to create Curve Certificate Pod Generator Entrypoint", "Namespace", dep.Namespace, "Name", (*dep).Name)
r.log.Error(
err, "❌ Failed to create Curve Certificate Pod Generator Entrypoint",
"Namespace", dep.Namespace,
"Name", (*dep).Name,
)
return existing, ctrl.Result{}, err
}
// Successful - return and requeue
Expand All @@ -131,13 +152,21 @@ func (r *MiniClusterReconciler) getCurveGenerateConfigMap(ctx context.Context, c
return existing, ctrl.Result{}, err
}
} else {
r.log.Info("🎉 Found existing Curve Certificate Pod Generator Entrypoint", "Namespace", existing.Namespace, "Name", existing.Name)
r.log.Info(
"🎉 Found existing Curve Certificate Pod Generator Entrypoint",
"Namespace", existing.Namespace,
"Name", existing.Name,
)
}
return existing, ctrl.Result{}, err
}

// createPersistentVolume creates a volume in /tmp, which doesn't seem to choke
func (r *MiniClusterReconciler) newPodCommandRunner(cluster *api.MiniCluster, container api.MiniClusterContainer, command []string) *corev1.Pod {
func (r *MiniClusterReconciler) newPodCommandRunner(
cluster *api.MiniCluster,
container api.MiniClusterContainer,
command []string,
) *corev1.Pod {

makeExecutable := int32(0777)
pullPolicy := corev1.PullIfNotPresent
Expand All @@ -154,7 +183,10 @@ func (r *MiniClusterReconciler) newPodCommandRunner(cluster *api.MiniCluster, co
}

pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: cluster.Name + certGenSuffix, Namespace: cluster.Namespace},
ObjectMeta: metav1.ObjectMeta{
Name: cluster.Name + certGenSuffix,
Namespace: cluster.Namespace,
},
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyOnFailure,
ImagePullSecrets: getImagePullSecrets(cluster),
Expand Down
30 changes: 25 additions & 5 deletions controllers/flux/extra.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@ var (
)

// podExec executes a command to a named pod
func (r *MiniClusterReconciler) podExec(ctx context.Context, pod corev1.Pod, cluster *api.MiniCluster, command []string) (string, error) {
func (r *MiniClusterReconciler) podExec(
ctx context.Context,
pod corev1.Pod,
cluster *api.MiniCluster,
command []string,
) (string, error) {

// creates the clientset
clientset, err := kubernetes.NewForConfig(r.RESTConfig)
Expand Down Expand Up @@ -96,7 +101,10 @@ func (r *MiniClusterReconciler) podExec(ctx context.Context, pod corev1.Pod, clu
}

// getMiniClusterPods returns a sorted (by name) podlist in the MiniCluster
func (r *MiniClusterReconciler) getMiniClusterPods(ctx context.Context, cluster *api.MiniCluster) *corev1.PodList {
func (r *MiniClusterReconciler) getMiniClusterPods(
ctx context.Context,
cluster *api.MiniCluster,
) *corev1.PodList {

podList := &corev1.PodList{}
opts := []client.ListOption{
Expand All @@ -116,7 +124,10 @@ func (r *MiniClusterReconciler) getMiniClusterPods(ctx context.Context, cluster
}

// brokerIsReady determines if broker 0 is waiting for worker nodes
func (r *MiniClusterReconciler) brokerIsReady(ctx context.Context, cluster *api.MiniCluster) (bool, error) {
func (r *MiniClusterReconciler) brokerIsReady(
ctx context.Context,
cluster *api.MiniCluster,
) (bool, error) {

// Cut out quickly if we've already done this
if brokerIsReady {
Expand All @@ -140,6 +151,7 @@ func (r *MiniClusterReconciler) brokerIsReady(ctx context.Context, cluster *api.
pods := r.getMiniClusterPods(ctx, cluster)
for _, pod := range pods.Items {
r.log.Info("🦀 Found Pod", "Pod Name", pod.Name)

if strings.HasPrefix(pod.Name, brokerPrefix) {
r.log.Info("🦀 Found Broker", "Pod Name", pod.Name)
out, err := r.podExec(ctx, pod, cluster, command)
Expand Down Expand Up @@ -171,10 +183,14 @@ func (r *MiniClusterReconciler) brokerIsReady(ctx context.Context, cluster *api.
}

// getMiniClusterIPS was used when we needed to write /etc/hosts and is no longer used
func (r *MiniClusterReconciler) getMiniClusterIPS(ctx context.Context, cluster *api.MiniCluster) map[string]string {
func (r *MiniClusterReconciler) getMiniClusterIPS(
ctx context.Context,
cluster *api.MiniCluster,
) map[string]string {

ips := map[string]string{}
for _, pod := range r.getMiniClusterPods(ctx, cluster).Items {

// Skip init pods
if strings.Contains(pod.Name, "init") {
continue
Expand All @@ -190,7 +206,11 @@ func (r *MiniClusterReconciler) getMiniClusterIPS(ctx context.Context, cluster *
}

// createMiniClusterIngress exposes the service for the minicluster
func (r *MiniClusterReconciler) createMiniClusterIngress(ctx context.Context, cluster *api.MiniCluster, service *corev1.Service) error {
func (r *MiniClusterReconciler) createMiniClusterIngress(
ctx context.Context,
cluster *api.MiniCluster,
service *corev1.Service,
) error {

pathType := networkv1.PathTypePrefix
ingressBackend := networkv1.IngressBackend{
Expand Down
17 changes: 13 additions & 4 deletions controllers/flux/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
)

// newMiniCluster is used to create the MiniCluster Job
func (r *MiniClusterReconciler) newMiniClusterJob(cluster *api.MiniCluster) (*batchv1.Job, error) {
func (r *MiniClusterReconciler) newMiniClusterJob(
cluster *api.MiniCluster,
) (*batchv1.Job, error) {

// Number of retries before marking as failed
backoffLimit := int32(100)
Expand Down Expand Up @@ -84,7 +86,9 @@ func (r *MiniClusterReconciler) newMiniClusterJob(cluster *api.MiniCluster) (*ba
return job, err
}

func (r *MiniClusterReconciler) getMiniClusterContainers(cluster *api.MiniCluster) ([]corev1.Container, error) {
func (r *MiniClusterReconciler) getMiniClusterContainers(
cluster *api.MiniCluster,
) ([]corev1.Container, error) {

// Create the containers for the pod
containers := []corev1.Container{}
Expand Down Expand Up @@ -114,7 +118,10 @@ func (r *MiniClusterReconciler) getMiniClusterContainers(cluster *api.MiniCluste
// Do we have a postStartExec Lifecycle command?
lifecycle := corev1.Lifecycle{}
if container.LifeCyclePostStartExec != "" {
r.log.Info("🌀 MiniCluster", "LifeCycle.PostStartExec", container.LifeCyclePostStartExec)
r.log.Info(
"🌀 MiniCluster",
"LifeCycle.PostStartExec", container.LifeCyclePostStartExec,
)
lifecycle.PostStart = &corev1.LifecycleHandler{
Exec: &corev1.ExecAction{
Command: []string{container.LifeCyclePostStartExec},
Expand Down Expand Up @@ -203,7 +210,9 @@ func getImagePullSecrets(cluster *api.MiniCluster) []corev1.LocalObjectReference
pullSecrets := []corev1.LocalObjectReference{}
for _, container := range cluster.Spec.Containers {
if container.ImagePullSecret != "" {
newSecret := corev1.LocalObjectReference{Name: container.ImagePullSecret}
newSecret := corev1.LocalObjectReference{
Name: container.ImagePullSecret,
}
pullSecrets = append(pullSecrets, newSecret)
}
}
Expand Down
15 changes: 13 additions & 2 deletions controllers/flux/minicluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ var (
// 2. Config maps for secrets and other things.
// 3. We "launch" a job by starting the Indexed job on the connected nodes
// newMiniCluster creates a new mini cluster, a stateful set for running flux!
func (r *MiniClusterReconciler) ensureMiniCluster(ctx context.Context, cluster *api.MiniCluster) (ctrl.Result, error) {
func (r *MiniClusterReconciler) ensureMiniCluster(
ctx context.Context,
cluster *api.MiniCluster,
) (ctrl.Result, error) {

// Ensure the configs are created (for volume sources)
_, result, err := r.getConfigMap(ctx, cluster, "flux-config", cluster.Name+fluxConfigSuffix)
Expand Down Expand Up @@ -340,7 +343,13 @@ func getFluxToken(requested string) string {
}

// createConfigMap generates a config map with some kind of data
func (r *MiniClusterReconciler) createConfigMap(cluster *api.MiniCluster, configName string, data map[string]string) *corev1.ConfigMap {
func (r *MiniClusterReconciler) createConfigMap(
cluster *api.MiniCluster,
configName string,
data map[string]string,
) *corev1.ConfigMap {

// Create the config map with respective data!
cm := &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -349,6 +358,8 @@ func (r *MiniClusterReconciler) createConfigMap(cluster *api.MiniCluster, config
},
Data: data,
}

// Show in the logs
fmt.Println(cm.Data)
ctrl.SetControllerReference(cluster, cm, r.Scheme)
return cm
Expand Down
17 changes: 15 additions & 2 deletions controllers/flux/minicluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,14 @@ type MiniClusterReconciler struct {
RESTConfig *rest.Config
}

func NewMiniClusterReconciler(client client.Client, scheme *runtime.Scheme, restConfig rest.Config, restClient rest.Interface, watchers ...MiniClusterUpdateWatcher) *MiniClusterReconciler {
func NewMiniClusterReconciler(
client client.Client,
scheme *runtime.Scheme,
restConfig rest.Config,
restClient rest.Interface,
watchers ...MiniClusterUpdateWatcher,
) *MiniClusterReconciler {

return &MiniClusterReconciler{
log: ctrl.Log.WithName("minicluster-reconciler"),
Client: client,
Expand Down Expand Up @@ -93,7 +100,10 @@ func NewMiniClusterReconciler(client client.Client, scheme *runtime.Scheme, rest
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.12.1/pkg/reconcile
func (r *MiniClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (r *MiniClusterReconciler) Reconcile(
ctx context.Context,
req ctrl.Request,
) (ctrl.Result, error) {

// Create a new MiniCluster
var cluster api.MiniCluster
Expand Down Expand Up @@ -138,6 +148,9 @@ func (r *MiniClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// What else do we want to do?
r.log.Info("🌀 Mini Cluster is Ready!")
r.log.Info("🌀 Wait for all pods to be running and previously running to be terminated.")

// Check until the job finishes to clean up volumes if needed

return result, nil
}

Expand Down
13 changes: 10 additions & 3 deletions controllers/flux/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
)

// getResourceGroup can return a ResourceList for either requests or limits
func (r *MiniClusterReconciler) getResourceGroup(items api.ContainerResource) (corev1.ResourceList, error) {
func (r *MiniClusterReconciler) getResourceGroup(
items api.ContainerResource,
) (corev1.ResourceList, error) {

r.log.Info("🍅️ Resource", "items", items)
list := corev1.ResourceList{}
Expand Down Expand Up @@ -58,7 +60,10 @@ func (r *MiniClusterReconciler) getResourceGroup(items api.ContainerResource) (c
}

// getContainerResources determines if any resources are requested via the spec
func (r *MiniClusterReconciler) getContainerResources(cluster *api.MiniCluster, container *api.MiniClusterContainer) (corev1.ResourceRequirements, error) {
func (r *MiniClusterReconciler) getContainerResources(
cluster *api.MiniCluster,
container *api.MiniClusterContainer,
) (corev1.ResourceRequirements, error) {

// memory int, setCPURequest, setCPULimit, setGPULimit int64
resources := corev1.ResourceRequirements{}
Expand All @@ -82,7 +87,9 @@ func (r *MiniClusterReconciler) getContainerResources(cluster *api.MiniCluster,
}

// getPodResources determines if any resources are requested via the spec
func (r *MiniClusterReconciler) getPodResources(cluster *api.MiniCluster) (corev1.ResourceList, error) {
func (r *MiniClusterReconciler) getPodResources(
cluster *api.MiniCluster,
) (corev1.ResourceList, error) {

// memory int, setCPURequest, setCPULimit, setGPULimit int64
resources, err := r.getResourceGroup(cluster.Spec.Pod.Resources)
Expand Down
10 changes: 8 additions & 2 deletions controllers/flux/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ var (
)

// exposeService will expose services - one for the port 5000 forward, and the other for job networking (headless)
func (r *MiniClusterReconciler) exposeServices(ctx context.Context, cluster *api.MiniCluster) (ctrl.Result, error) {
func (r *MiniClusterReconciler) exposeServices(
ctx context.Context,
cluster *api.MiniCluster,
) (ctrl.Result, error) {

// This service is for the restful API
existing := &corev1.Service{}
Expand All @@ -44,7 +47,10 @@ func (r *MiniClusterReconciler) exposeServices(ctx context.Context, cluster *api
}

// createMiniClusterService creates the service for the minicluster
func (r *MiniClusterReconciler) createMiniClusterService(ctx context.Context, cluster *api.MiniCluster) (*corev1.Service, error) {
func (r *MiniClusterReconciler) createMiniClusterService(
ctx context.Context,
cluster *api.MiniCluster,
) (*corev1.Service, error) {

r.log.Info("Creating service with: ", restfulServiceName, cluster.Namespace)
service := &corev1.Service{
Expand Down

0 comments on commit ab07a92

Please sign in to comment.