Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
programmer04 committed May 12, 2023
1 parent 86a3c0f commit 0064eab
Show file tree
Hide file tree
Showing 19 changed files with 276 additions and 245 deletions.
1 change: 1 addition & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ kind: ClusterRole
metadata:
name: kong-ingress
rules:
# ????????
- apiGroups:
- ""
resources:
Expand Down
1 change: 1 addition & 0 deletions deploy/single/all-in-one-dbless-enterprise.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,7 @@ rules:
- create
- patch
---
# // ????
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
Expand Down
1 change: 1 addition & 0 deletions deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,7 @@ metadata:
rules:
- apiGroups:
- ""
# ???
resources:
- endpoints
verbs:
Expand Down
1 change: 1 addition & 0 deletions deploy/single/all-in-one-dbless-konnect-enterprise.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,7 @@ metadata:
rules:
- apiGroups:
- ""
# ????
resources:
- endpoints
verbs:
Expand Down
1 change: 1 addition & 0 deletions deploy/single/all-in-one-dbless-konnect.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,7 @@ metadata:
rules:
- apiGroups:
- ""
# ????
resources:
- endpoints
verbs:
Expand Down
1 change: 1 addition & 0 deletions deploy/single/all-in-one-dbless-legacy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,7 @@ metadata:
rules:
- apiGroups:
- ""
# ????s
resources:
- endpoints
verbs:
Expand Down
1 change: 1 addition & 0 deletions deploy/single/all-in-one-dbless.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,7 @@ metadata:
rules:
- apiGroups:
- ""
# ????
resources:
- endpoints
verbs:
Expand Down
1 change: 1 addition & 0 deletions deploy/single/all-in-one-postgres-enterprise.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,7 @@ metadata:
rules:
- apiGroups:
- ""
# ???
resources:
- endpoints
verbs:
Expand Down
1 change: 1 addition & 0 deletions deploy/single/all-in-one-postgres.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1169,6 +1169,7 @@ metadata:
rules:
- apiGroups:
- ""
# ????
resources:
- endpoints
verbs:
Expand Down
20 changes: 11 additions & 9 deletions hack/generators/controllers/networking/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (
const (
outputFile = "../../internal/controllers/configuration/zz_generated_controllers.go"

corev1 = "k8s.io/api/core/v1"
netv1 = "k8s.io/api/networking/v1"
corev1 = "k8s.io/api/core/v1"
discoveryv1 = "k8s.io/api/discovery/v1"
netv1 = "k8s.io/api/networking/v1"

kongv1 = "github.com/kong/kubernetes-ingress-controller/v2/pkg/apis/configuration/v1"
kongv1beta1 = "github.com/kong/kubernetes-ingress-controller/v2/api/configuration/v1beta1"
Expand Down Expand Up @@ -47,12 +48,12 @@ var inputControllersNeeded = &typesNeeded{
typeNeeded{
Group: "\"\"",
Version: "v1",
Kind: "Endpoints",
PackageImportAlias: "corev1",
PackageAlias: "CoreV1",
Package: corev1,
Plural: "endpoints",
CacheType: "Endpoint",
Kind: "EndpointSlice",
PackageImportAlias: "discoveryv1",
PackageAlias: "DiscoveryV1",
Package: discoveryv1,
Plural: "endpointslices",
CacheType: "EndpointSlice",
NeedsStatusPermissions: true,
AcceptsIngressClassNameAnnotation: false,
AcceptsIngressClassNameSpec: false,
Expand Down Expand Up @@ -360,6 +361,7 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
netv1 "k8s.io/api/networking/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -584,7 +586,7 @@ func (r *{{.PackageAlias}}{{.Kind}}Reconciler) Reconcile(ctx context.Context, re
// if status updates are enabled report the status for the object
if r.DataplaneClient.AreKubernetesObjectReportsEnabled() {
log.V(util.DebugLevel).Info("determining whether data-plane configuration has succeeded", "namespace", req.Namespace, "name", req.Name)
if !r.DataplaneClient.KubernetesObjectIsConfigured(obj) {
log.V(util.DebugLevel).Info("resource not yet configured in the data-plane", "namespace", req.Namespace, "name", req.Name)
return ctrl.Result{Requeue: true}, nil // requeue until the object has been properly configured
Expand Down
25 changes: 13 additions & 12 deletions internal/controllers/configuration/zz_generated_controllers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

101 changes: 45 additions & 56 deletions internal/dataplane/parser/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ import (
"bytes"
"crypto/tls"
"fmt"
"reflect"
"sort"
"strings"

"github.com/kong/go-kong/kong"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
netv1 "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -651,7 +651,7 @@ func getServiceEndpoints(
"service_port": servicePort,
})

// in theory a Service could have multiple port protocols, we need to ensure we gather
// In theory a Service could have multiple port protocols, we need to ensure we gather
// endpoints based on all the protocols the service is configured for. We always check
// for TCP as this is the default protocol for service ports.
protocols := listProtocols(svc)
Expand All @@ -665,13 +665,11 @@ func getServiceEndpoints(
isSvcUpstream = ingressClassParameters.ServiceUpstream
}

// check all protocols for associated endpoints
// Check all protocols for associated endpoints.
endpoints := []util.Endpoint{}
for protocol := range protocols {
newEndpoints := getEndpoints(log, svc, servicePort, protocol, s.GetEndpointsForService, isSvcUpstream)
if len(newEndpoints) > 0 {
endpoints = append(endpoints, newEndpoints...)
}
newEndpoints := getEndpoints(log, svc, servicePort, protocol, s.GetEndpointSlicesForService, isSvcUpstream)
endpoints = append(endpoints, newEndpoints...)
}
if len(endpoints) == 0 {
log.Warningf("no active endpoints")
Expand Down Expand Up @@ -703,94 +701,85 @@ func getIngressClassParametersOrDefault(s store.Storer) (configurationv1alpha1.I
// of by IngressClassParameters configuration provided as a flag.
func getEndpoints(
log logrus.FieldLogger,
s *corev1.Service,
service *corev1.Service,
port *corev1.ServicePort,
proto corev1.Protocol,
getEndpoints func(string, string) (*corev1.Endpoints, error),
getEndpointSlices func(string, string) ([]*discoveryv1.EndpointSlice, error),
isSvcUpstream bool,
) []util.Endpoint {
if s == nil || port == nil {
if service == nil || port == nil {
return []util.Endpoint{}
}

// If service is an upstream service...
if isSvcUpstream || annotations.HasServiceUpstreamAnnotation(s.Annotations) {
if isSvcUpstream || annotations.HasServiceUpstreamAnnotation(service.Annotations) {
// ... return its address as the only endpoint.
return []util.Endpoint{
{
Address: s.Name + "." + s.Namespace + ".svc",
Address: service.Name + "." + service.Namespace + ".svc",
Port: fmt.Sprintf("%v", port.Port),
},
}
}

log = log.WithFields(logrus.Fields{
"service_name": s.Name,
"service_namespace": s.Namespace,
"service_port": port.String(),
"service_name": service.Name,
"service_namespace": service.Namespace,
"service_port": port,
})

// avoid duplicated upstream servers when the service
// contains multiple port definitions sharing the same
// targetport.
adus := make(map[string]bool)

// ExternalName services
if s.Spec.Type == corev1.ServiceTypeExternalName {
if service.Spec.Type == corev1.ServiceTypeExternalName {
log.Debug("found service of type=ExternalName")
return []util.Endpoint{
{
Address: s.Spec.ExternalName,
Address: service.Spec.ExternalName,
Port: port.TargetPort.String(),
},
}
}

log.Debugf("fetching endpoints")
ep, err := getEndpoints(s.Namespace, s.Name)
log.Debugf("fetching EndpointSlices")
endpointSlices, err := getEndpointSlices(service.Namespace, service.Name)
if err != nil {
log.WithError(err).Error("failed to fetch endpoints")
log.WithError(err).Error("error fetching EndpointSlices")
return []util.Endpoint{}
}

upsServers := []util.Endpoint{}
for _, ss := range ep.Subsets {
for _, epPort := range ss.Ports {
if !reflect.DeepEqual(epPort.Protocol, proto) {
continue
}

var targetPort int32

if port.Name == "" {
// port.Name is optional if there is only one port
targetPort = epPort.Port
} else if port.Name == epPort.Name {
targetPort = epPort.Port
}

// check for invalid port value
if targetPort <= 0 {
log.Debugf("fetched %d EndpointSlices", len(endpointSlices))

// Avoid duplicated upstream servers when the service contains
// multiple port definitions sharing the same target port.
uniqueUpstream := make(map[string]bool)
upstreamServers := make([]util.Endpoint, 0)
for _, endpointSlice := range endpointSlices {
for _, p := range endpointSlice.Ports {
if p.Port == nil || *p.Port < 0 || *p.Protocol != proto || *p.Name != port.Name {
continue
}

for _, epAddress := range ss.Addresses {
ep := fmt.Sprintf("%v:%v", epAddress.IP, targetPort)
if _, exists := adus[ep]; exists {
upstreamPort := fmt.Sprint(*p.Port)
for _, endpoint := range endpointSlice.Endpoints {
// Ready indicates that this endpoint is prepared to receive traffic, according to whatever
// system is managing the endpoint. A nil value indicates an unknown state.
// In most cases consumers should interpret this unknown state as ready.
// Field Ready has the same semantic as Endpoints from corev1 in Addresses.
// https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#conditions
if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
continue
}
ups := util.Endpoint{
Address: epAddress.IP,
Port: fmt.Sprintf("%v", targetPort),
// One address per endpoint is rather expected (allowing multiple is due to historical reasons)
// read more https://github.com/kubernetes/kubernetes/issues/106267#issuecomment-978770401.
// These are all assumed to be fungible and clients may choose to only use the first element.
upstreamServer := util.Endpoint{Address: endpoint.Addresses[0], Port: upstreamPort}
if _, ok := uniqueUpstream[upstreamServer.String()]; ok {
continue
}
upsServers = append(upsServers, ups)
adus[ep] = true
uniqueUpstream[upstreamServer.String()] = true
upstreamServers = append(upstreamServers, upstreamServer)
}
}
}

log.Debugf("found endpoints: %v", upsServers)
return upsServers
log.Debugf("found endpoints: %v", upstreamServers)
return upstreamServers
}

// listProtocols is a helper function to map out all the in-use corev1.Protocols
Expand Down
Loading

0 comments on commit 0064eab

Please sign in to comment.