Skip to content

Commit

Permalink
[Package] [WorkerK8S] Implement Gopher Worker (#103)
Browse files Browse the repository at this point in the history
* Feat [Package] [WorkerK8S] Constant

- [+] chore(constant.go): add new constants for HOME, kube, Config, errconfig, cannotcreatek8s
- [+] feat(constant.go): add new constants for FetchingPods, PodsFetched, ProcessingPods
- [+] feat(constant.go): add new constants for TaskLabelKey, TaskCheckHealth, TaskGetPod, TaskFetchPods, TaskProcessPod

* Feat [Package] [WorkerK8S] Module

- [+] chore(go.mod): add github.com/imdario/mergo v0.3.6 as an indirect dependency
- [+] chore(go.sum): add github.com/imdario/mergo v0.3.6 to go.sum as a direct dependency
- [+] chore(go.mod): add github.com/spf13/pflag v1.0.5 as an indirect dependency
- [+] chore(go.sum): add github.com/spf13/pflag v1.0.5 to go.sum as a direct dependency

* Feat [Package] [WorkerK8S] Internal Setup

- [+] fix(internal.go): fix error handling and add support for kubeconfig file outside the cluster
- [+] feat(internal.go): add support for using the current context in kubeconfig

* Feat [Package] [WorkerK8S] Zap Logger

- [+] feat(logger.go): add logger functionality to workerk8s package
- [+] fix(logger.go): fix logInfoWithEmoji and logErrorWithEmoji functions to include emoji and context in log message
- [+] feat(logger.go): add createLogFields function to create log fields with operation, namespace, and additional info

* [Package] [WorkerK8S] Update Docs & Implement Zap Logger

- [+] feat(workerk8s): add structured logging to Worker function
- [+] feat(workerk8s): add structured logging to RunWorkers function
- [+] fix(workerk8s): fix log messages in getPods function
- [+] fix(workerk8s): fix log messages in processPods function
- [+] chore(workerk8s): update TODO list in docs.go
- [+] chore(workerk8s): update enhancements section in docs.go
  • Loading branch information
H0llyW00dzZ committed Dec 21, 2023
1 parent 548b068 commit 7ba2e6e
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 15 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ require (
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.4.0 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/term v0.15.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.2 h1:Vie5ybvEvT75RniqhfF
github.com/googleapis/enterprise-certificate-proxy v0.3.2/go.mod h1:VLSiSSBs/ksPL8kq3OBOQ6WRI2QnaFynd1DCjZ62+V0=
github.com/googleapis/gax-go/v2 v2.12.0 h1:A+gCJKdRfqXkr+BIRGtZLibNXf0m1f9E4HG56etFpas=
github.com/googleapis/gax-go/v2 v2.12.0/go.mod h1:y+aIqrI5eb1YGMVJfuV3185Ts/D7qKpsEkdD5+I6QGU=
github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
Expand Down Expand Up @@ -143,6 +145,8 @@ github.com/pelletier/go-toml/v2 v2.1.1/go.mod h1:tJU2Z3ZkXwnxa4DPO899bsyIoywizdU
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
Expand Down
19 changes: 19 additions & 0 deletions workerk8s/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,23 @@ const (
NotHealthyStatus = "Not Healthy"
HealthyStatus = "Healthy"
WorkerCancelled = "Worker cancelled: %v"
HOME = "HOME"
kube = ".kube"
Config = "config"
errconfig = "cannot load kubeconfig: %w"
cannotcreatek8s = "cannot create kubernetes client: %w"
)

const (
FetchingPods = "Fetching pods"
PodsFetched = "Pods fetched"
ProcessingPods = "Processing pods"
)

const (
TaskLabelKey = "LabelKey"
TaskCheckHealth = "CheckHealth"
TaskGetPod = "GetPod"
TaskFetchPods = "FetchPods"
TaskProcessPod = "ProcessPod"
)
15 changes: 12 additions & 3 deletions workerk8s/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
// in-cluster configuration and returns a Kubernetes clientset.
// - Worker: A function that performs health checks on pods within a given namespace and
// sends the results to a channel for collection and further processing. It respects
// the context passed to it for cancellation or timeouts.
// the context passed to it for cancellation or timeouts, and it utilizes a structured
// logger for enhanced logging with contextual information such as the namespace and task.
// - RunWorkers: Starts a specified number of worker goroutines that call the Worker function
// with a given namespace. It returns a channel for results and a function to initiate
// a graceful shutdown of the workers.
// with a given namespace and a structured logger. It returns a channel for results and a
// function to initiate a graceful shutdown of the workers.
//
// # Usage
//
Expand All @@ -41,11 +42,19 @@
// fmt.Println(result)
// }
//
// # Enhancements
//
// - The Worker function now includes structured logging, which improves traceability and
// debugging by providing contextual information in the log entries.
// - The logging within the Worker function is now customizable, allowing different workers
// to log with their specific contextual information such as worker index and namespace.
//
// # TODO
//
// - Implement error handling and retry logic within the Worker function to handle transient errors.
// - Enhance the Worker function to perform a more specific task or to be more configurable.
// - Expand the package to support other Kubernetes resources and operations.
// - Introduce metrics collection for monitoring the health and performance of the workers.
//
// Copyright (c) 2023 by H0llyW00dzZ
package workerk8s
17 changes: 13 additions & 4 deletions workerk8s/internal.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
package workerk8s

import (
"fmt"
"os"
"path/filepath"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

// NewKubernetesClient creates a new Kubernetes client using the in-cluster configuration.
// This is typically used when the application itself is running within a Kubernetes cluster.
func NewKubernetesClient() (*kubernetes.Clientset, error) {
// Get the in-cluster config.
// Use the current context in kubeconfig
config, err := rest.InClusterConfig()
if err != nil {
return nil, err
// If running outside the cluster, use the kubeconfig file.
kubeconfig := filepath.Join(os.Getenv(HOME), kube, Config)
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
return nil, fmt.Errorf(errconfig, err)
}
}

// Create a clientset based on the in-cluster config.
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
return nil, fmt.Errorf(cannotcreatek8s, err)
}

return clientset, nil
Expand Down
7 changes: 5 additions & 2 deletions workerk8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"sync"

"go.uber.org/zap"
"k8s.io/client-go/kubernetes"
)

Expand All @@ -18,10 +19,12 @@ func RunWorkers(ctx context.Context, clientset *kubernetes.Clientset, namespace
// Start the specified number of worker goroutines.
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
go func(workerIndex int) {
defer wg.Done()
// We now use the package-level Logger, enhanced with additional fields.
SetLogger(Logger.With(zap.Int("workerIndex", workerIndex)))
Worker(shutdownCtx, clientset, namespace, results)
}()
}(i)
}

// Shutdown function to be called to initiate a graceful shutdown.
Expand Down
38 changes: 38 additions & 0 deletions workerk8s/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package workerk8s

import (
"fmt"

"go.uber.org/zap"
)

// Logger is a package-level variable to access the zap logger throughout the handlers package.
// It is intended to be used by other functions within the package for logging purposes.
var Logger *zap.Logger

// SetLogger sets the logger instance for the package.
func SetLogger(logger *zap.Logger) {
Logger = logger
}

// logInfoWithEmoji logs an informational message with given emoji, context, and fields.
func logInfoWithEmoji(emoji string, context string, fields ...zap.Field) {
Logger.Info(emoji+" "+context, fields...)
}

// logErrorWithEmoji logs an error message with given emoji, context, and fields.
func logErrorWithEmoji(emoji string, context string, fields ...zap.Field) {
Logger.Error(emoji+" "+context, fields...)
}

// createLogFields creates a slice of zap.Field with the operation and additional info.
func createLogFields(operation string, namespace string, infos ...string) []zap.Field {
fields := []zap.Field{
zap.String("operation", operation),
zap.String("namespace", namespace),
}
for i, info := range infos {
fields = append(fields, zap.String(fmt.Sprintf("info%d", i+1), info))
}
return fields
}
27 changes: 21 additions & 6 deletions workerk8s/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"fmt"

"github.com/H0llyW00dzZ/go-urlshortner/logmonitor/constant"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
Expand All @@ -12,26 +14,36 @@ import (
// Worker starts a worker process that retrieves all pods in a given namespace,
// performs health checks on them, and sends the results to a channel.
func Worker(ctx context.Context, clientset *kubernetes.Clientset, namespace string, results chan<- string) {
fields := createLogFields(TaskCheckHealth, namespace)
// Retrieve a list of pods from the namespace.
logInfoWithEmoji(constant.InfoEmoji, "Worker started", fields...)

pods, err := getPods(ctx, clientset, namespace)
if err != nil {
// If there's an error retrieving pods, send an error message on the results channel.
results <- fmt.Sprintf("Error retrieving pods: %v", err)
errMsg := fmt.Sprintf("Error retrieving pods: %v", err)
logErrorWithEmoji(constant.ErrorEmoji, errMsg)
results <- errMsg
return
}

// Process each pod to determine its health status and send the results on the channel.
processPods(ctx, pods, results)
logInfoWithEmoji(constant.ModernGopherEmoji, "Worker finished processing pods", fields...)
}

// getPods fetches the list of all pods within a specific namespace.
func getPods(ctx context.Context, clientset *kubernetes.Clientset, namespace string) ([]corev1.Pod, error) {
// List all pods in the namespace using the provided context.
fields := createLogFields(TaskFetchPods, namespace)
logInfoWithEmoji(constant.ModernGopherEmoji, FetchingPods, fields...)

podList, err := clientset.CoreV1().Pods(namespace).List(ctx, v1.ListOptions{})
if err != nil {
// Return an error if the pod list cannot be retrieved.
logErrorWithEmoji(constant.ModernGopherEmoji, "Failed to list pods", fields...)
return nil, err
}

logInfoWithEmoji(constant.ModernGopherEmoji, PodsFetched, append(fields, zap.Int("count", len(podList.Items)))...)
return podList.Items, nil
}

Expand All @@ -41,16 +53,19 @@ func processPods(ctx context.Context, pods []corev1.Pod, results chan<- string)
for _, pod := range pods {
select {
case <-ctx.Done():
// If the context is cancelled, send a cancellation message and exit the function.
results <- fmt.Sprintf(WorkerCancelled, ctx.Err())
cancelMsg := fmt.Sprintf("Worker cancelled: %v", ctx.Err())
logInfoWithEmoji(constant.ModernGopherEmoji, cancelMsg)
results <- cancelMsg
return
default:
// Determine the health status of the pod and send the result.
healthStatus := NotHealthyStatus
if isPodHealthy(&pod) {
healthStatus = HealthyStatus
}
results <- fmt.Sprintf(PodAndStatusAndHealth, pod.Name, pod.Status.Phase, healthStatus)
statusMsg := fmt.Sprintf(PodAndStatusAndHealth, pod.Name, pod.Status.Phase, healthStatus)
logInfoWithEmoji(constant.ModernGopherEmoji, PodsFetched, createLogFields(ProcessingPods, pod.Name, statusMsg)...)
results <- statusMsg
}
}
}
Expand Down

0 comments on commit 7ba2e6e

Please sign in to comment.