Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Kafka topic creation bug due to recent upgrade of AdmissionReview version #796

Merged
merged 5 commits into from
Apr 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions pkg/webhook/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"net/http"
"reflect"

admissionv1beta1 "k8s.io/api/admission/v1beta1"
admissionv1 "k8s.io/api/admission/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/banzaicloud/koperator/pkg/util"
Expand All @@ -34,7 +34,7 @@ var (
kafkaTopic = reflect.TypeOf(v1alpha1.KafkaTopic{}).Name()
)

func (s *webhookServer) validate(ar *admissionv1beta1.AdmissionReview) *admissionv1beta1.AdmissionResponse {
func (s *webhookServer) validate(ar *admissionv1.AdmissionReview) *admissionv1.AdmissionResponse {
req := ar.Request

l := log.WithValues("kind", req.Kind, "namespace", req.Namespace, "name", req.Name, "uid", req.UID,
Expand All @@ -50,7 +50,7 @@ func (s *webhookServer) validate(ar *admissionv1beta1.AdmissionReview) *admissio
}
if ok := util.ObjectManagedByClusterRegistry(topic.GetObjectMeta()); ok {
l.Info("Skip validation as the resource is managed by Cluster Registry")
return &admissionv1beta1.AdmissionResponse{
return &admissionv1.AdmissionResponse{
Allowed: true,
}
}
Expand Down Expand Up @@ -85,16 +85,22 @@ func (s *webhookServer) serve(w http.ResponseWriter, r *http.Request) {
return
}

var admissionResponse *admissionv1beta1.AdmissionResponse
ar := admissionv1beta1.AdmissionReview{}
var admissionResponse *admissionv1.AdmissionResponse
ar := admissionv1.AdmissionReview{}
if _, _, err := s.deserializer.Decode(body, nil, &ar); err != nil {
log.Error(err, "Can't decode body")
admissionResponse = notAllowed(err.Error(), metav1.StatusReasonBadRequest)
} else {
admissionResponse = s.validate(&ar)
}

admissionReview := admissionv1beta1.AdmissionReview{}
admissionReview := admissionv1.AdmissionReview{
// APIVersion and Kind must be set for admission/v1, or the request would fail
TypeMeta: metav1.TypeMeta{
APIVersion: admissionv1.SchemeGroupVersion.String(),
Kind: "AdmissionReview",
},
}
if admissionResponse != nil {
admissionReview.Response = admissionResponse
if ar.Request != nil {
Expand All @@ -113,8 +119,8 @@ func (s *webhookServer) serve(w http.ResponseWriter, r *http.Request) {
}
}

func notAllowed(msg string, reason metav1.StatusReason) *admissionv1beta1.AdmissionResponse {
return &admissionv1beta1.AdmissionResponse{
func notAllowed(msg string, reason metav1.StatusReason) *admissionv1.AdmissionResponse {
return &admissionv1.AdmissionResponse{
Result: &metav1.Status{
Message: msg,
Reason: reason,
Expand Down
12 changes: 6 additions & 6 deletions pkg/webhook/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"strings"
"testing"

admissionv1beta1 "k8s.io/api/admission/v1beta1"
admissionv1 "k8s.io/api/admission/v1"
authv1 "k8s.io/api/authentication/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

Expand All @@ -41,9 +41,9 @@ func newRawTopic() []byte {
return out
}

func newAdmissionReview() *admissionv1beta1.AdmissionReview {
return &admissionv1beta1.AdmissionReview{
Request: &admissionv1beta1.AdmissionRequest{
func newAdmissionReview() *admissionv1.AdmissionReview {
return &admissionv1.AdmissionReview{
Request: &admissionv1.AdmissionRequest{
Kind: metav1.GroupVersionKind{
Kind: "non-topic-kind",
},
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestServe(t *testing.T) {
if err != nil {
t.Error("Expected admission review response, got error")
}
admissionReview := admissionv1beta1.AdmissionReview{}
admissionReview := admissionv1.AdmissionReview{}
if err := json.Unmarshal(body, &admissionReview); err != nil {
t.Error("Expected no error got:", err)
}
Expand All @@ -196,7 +196,7 @@ func TestServe(t *testing.T) {
if err != nil {
t.Error("Expected admission review response, got error")
}
admissionReview := admissionv1beta1.AdmissionReview{}
admissionReview := admissionv1.AdmissionReview{}
if err := json.Unmarshal(body, &admissionReview); err != nil {
t.Error("Expected no error got:", err)
}
Expand Down
25 changes: 9 additions & 16 deletions pkg/webhook/topic_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

"github.com/banzaicloud/koperator/pkg/util"

admissionv1beta1 "k8s.io/api/admission/v1beta1"
admissionv1 "k8s.io/api/admission/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -36,7 +36,7 @@ const (
invalidReplicationFactorErrMsg = "Replication factor is larger than the number of nodes in the kafka cluster"
)

func (s *webhookServer) validateKafkaTopic(topic *banzaicloudv1alpha1.KafkaTopic) *admissionv1beta1.AdmissionResponse {
func (s *webhookServer) validateKafkaTopic(topic *banzaicloudv1alpha1.KafkaTopic) *admissionv1.AdmissionResponse {
ctx := context.Background()
log.Info(fmt.Sprintf("Doing pre-admission validation of kafka topic %s", topic.Spec.Name))

Expand All @@ -54,25 +54,21 @@ func (s *webhookServer) validateKafkaTopic(topic *banzaicloudv1alpha1.KafkaTopic
if apierrors.IsNotFound(err) {
if k8sutil.IsMarkedForDeletion(topic.ObjectMeta) {
log.Info("Deleted as a result of a cluster deletion")
return &admissionv1beta1.AdmissionResponse{
return &admissionv1.AdmissionResponse{
Allowed: true,
}
}
log.Error(err, "Referenced kafka cluster does not exist")
return notAllowed(
fmt.Sprintf("KafkaCluster '%s' in the namespace '%s' does not exist", topic.Spec.ClusterRef.Name, topic.Spec.ClusterRef.Namespace),
metav1.StatusReasonNotFound,
)
return notAllowed(fmt.Sprintf("KafkaCluster '%s' in the namespace '%s' does not exist", topic.Spec.ClusterRef.Name, topic.Spec.ClusterRef.Namespace), metav1.StatusReasonNotFound)
}
log.Error(err, "API failure while running topic validation")
return notAllowed("API failure while validating topic, please try again", metav1.StatusReasonServiceUnavailable)
}

if k8sutil.IsMarkedForDeletion(cluster.ObjectMeta) {
// Let this through, it's a delete topic request from a parent cluster being
// deleted
log.Info("Cluster is going down for deletion, assuming a delete topic request")
return &admissionv1beta1.AdmissionResponse{
return &admissionv1.AdmissionResponse{
Allowed: true,
}
}
Expand All @@ -96,15 +92,15 @@ func (s *webhookServer) validateKafkaTopic(topic *banzaicloudv1alpha1.KafkaTopic
}

// everything looks a-okay
return &admissionv1beta1.AdmissionResponse{
return &admissionv1.AdmissionResponse{
Allowed: true,
}
}

// checkKafka creates a Kafka admin client and connects to the Kafka brokers to check
// whether the referred topic exists, and what are its properties
func (s *webhookServer) checkKafka(ctx context.Context, topic *banzaicloudv1alpha1.KafkaTopic,
cluster *banzaicloudv1beta1.KafkaCluster) *admissionv1beta1.AdmissionResponse {
cluster *banzaicloudv1beta1.KafkaCluster) *admissionv1.AdmissionResponse {
// retrieve an admin client for the cluster
broker, closeClient, err := s.newKafkaFromCluster(s.client, cluster)
if err != nil {
Expand All @@ -128,10 +124,7 @@ func (s *webhookServer) checkKafka(ctx context.Context, topic *banzaicloudv1alph
if apierrors.IsNotFound(err) {
// User is trying to overwrite an existing topic - bad user
log.Info("User attempted to create topic with name that already exists in the kafka cluster")
return notAllowed(
fmt.Sprintf("Topic '%s' already exists on kafka cluster '%s'", topic.Spec.Name, topic.Spec.ClusterRef.Name),
metav1.StatusReasonAlreadyExists,
)
return notAllowed(fmt.Sprintf("Topic '%s' already exists on kafka cluster '%s'", topic.Spec.Name, topic.Spec.ClusterRef.Name), metav1.StatusReasonAlreadyExists)
}
log.Error(err, "API failure while running topic validation")
return notAllowed("API failure while validating topic, please try again", metav1.StatusReasonServiceUnavailable)
Expand Down Expand Up @@ -160,7 +153,7 @@ func (s *webhookServer) checkKafka(ctx context.Context, topic *banzaicloudv1alph
// checkExistingKafkaTopicCRs checks whether there's any other duplicate KafkaTopic CR exists
// that refers to the same KafkaCluster's same topic
func (s *webhookServer) checkExistingKafkaTopicCRs(ctx context.Context,
clusterNamespace string, topic *banzaicloudv1alpha1.KafkaTopic) *admissionv1beta1.AdmissionResponse {
clusterNamespace string, topic *banzaicloudv1alpha1.KafkaTopic) *admissionv1.AdmissionResponse {
// check KafkaTopic in the referred KafkaCluster's namespace
kafkaTopicList := banzaicloudv1alpha1.KafkaTopicList{}
err := s.client.List(ctx, &kafkaTopicList, client.MatchingFields{"spec.name": topic.Spec.Name})
Expand Down