Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions apis/core/v1alpha1/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,12 @@ const (
// injected by POD IRSA, to decide in which region the resources should be
// created.
AnnotationDefaultRegion = AnnotationPrefix + "default-region"
// AnnotationEndpointURL is an annotation whose value is the identifier
// for the AWS endpoint in which the service controller will use to create
// its resources. If this annotation is set on a namespace, the Kubernetes user
// is indicating that the ACK service controller should create its resources using
// that specific endpoint. If this annotation is not set, ACK service controller
// will either use the default behavior of aws-sdk-go to create endpoints or
// aws-endpoint-url if it is set in controller binary flags and environment variables.
AnnotationEndpointURL = AnnotationPrefix + "endpoint-url"
)
21 changes: 20 additions & 1 deletion pkg/runtime/adoption_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,10 @@ func (r *adoptionReconciler) reconcile(req ctrlrt.Request) error {
acctID := r.getOwnerAccountID(res)
region := r.getRegion(res)
roleARN := r.getRoleARN(acctID)
endpointURL := r.getEndpointURL(res)

sess, err := r.sc.NewSession(
region, &r.cfg.EndpointURL, roleARN,
region, &endpointURL, roleARN,
targetDescriptor.EmptyRuntimeObject().GetObjectKind().GroupVersionKind(),
)
if err != nil {
Expand Down Expand Up @@ -419,6 +420,24 @@ func (r *adoptionReconciler) getOwnerAccountID(
return ackv1alpha1.AWSAccountID(r.cfg.AccountID)
}

// getEndpointURL returns the AWS account that owns the supplied resource.
// We look for the namespace associated endpoint url, if that is set we use it.
// Otherwise if none of these annotations are set we use the endpoint url specified
// in the configuration
func (r *adoptionReconciler) getEndpointURL(
res *ackv1alpha1.AdoptedResource,
) string {
// look for endpoint url in the namespace annotations
namespace := res.GetNamespace()
endpointURL, ok := r.cache.Namespaces.GetEndpointURL(namespace)
if ok {
return endpointURL
}

// use controller configuration
return r.cfg.EndpointURL
}

// getRoleARN return the Role ARN that should be assumed in order to manage
// the resources.
func (r *adoptionReconciler) getRoleARN(
Expand Down
34 changes: 29 additions & 5 deletions pkg/runtime/cache/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type namespaceInfo struct {
defaultRegion string
// services.k8s.aws/owner-account-id Annotation
ownerAccountID string
// services.k8s.aws/endpoint-url Annotation
endpointURL string
}

// getDefaultRegion returns the default region value
Expand All @@ -49,15 +51,23 @@ func (n *namespaceInfo) getOwnerAccountID() string {
return n.ownerAccountID
}

// getEndpointURL returns the namespace Endpoint URL
func (n *namespaceInfo) getEndpointURL() string {
if n == nil {
return ""
}
return n.endpointURL
}

// NamespaceCache is responsible of keeping track of namespaces
// annotations, and caching those related to the ACK controller.
type NamespaceCache struct {
sync.RWMutex

log logr.Logger
// Provide a namespace specifically to listen to.
// Provide a namespace specifically to listen to.
// Provide empty string to listen to all namespaces except kube-system and kube-public.
watchNamespace string
watchNamespace string

// Namespace informer
informer k8scache.SharedInformer
Expand All @@ -81,12 +91,12 @@ func NewNamespaceCache(clientset kubernetes.Interface, log logr.Logger, watchNam
}
}

// Check if the provided namespace should be listened to or not
// Check if the provided namespace should be listened to or not
func isWatchNamespace(raw interface{}, watchNamespace string) bool {
object, ok := raw.(*corev1.Namespace)
if !ok {
return false
}
}

if watchNamespace != "" {
return watchNamespace == object.ObjectMeta.Name
Expand Down Expand Up @@ -143,8 +153,18 @@ func (c *NamespaceCache) GetOwnerAccountID(namespace string) (string, bool) {
return "", false
}

// GetEndpointURL returns the endpoint URL if it exists
func (c *NamespaceCache) GetEndpointURL(namespace string) (string, bool) {
info, ok := c.getNamespaceInfo(namespace)
if ok {
e := info.getEndpointURL()
return e, e != ""
}
return "", false
}

// getNamespaceInfo reads a namespace cached annotations and
// return a given namespace default aws region and owner account id.
// return a given namespace default aws region, owner account id and endpoint url.
// This function is thread safe.
func (c *NamespaceCache) getNamespaceInfo(ns string) (*namespaceInfo, bool) {
c.RLock()
Expand All @@ -166,6 +186,10 @@ func (c *NamespaceCache) setNamespaceInfoFromK8sObject(ns *corev1.Namespace) {
if ok {
nsInfo.ownerAccountID = OwnerAccountID
}
EndpointURL, ok := nsa[ackv1alpha1.AnnotationEndpointURL]
if ok {
nsInfo.endpointURL = EndpointURL
}
c.Lock()
defer c.Unlock()
c.namespaceInfos[ns.ObjectMeta.Name] = nsInfo
Expand Down
10 changes: 10 additions & 0 deletions pkg/runtime/cache/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestNamespaceCache(t *testing.T) {
Annotations: map[string]string{
ackv1alpha1.AnnotationDefaultRegion: "us-west-2",
ackv1alpha1.AnnotationOwnerAccountID: "012345678912",
ackv1alpha1.AnnotationEndpointURL: "https://amazon-service.region.amazonaws.com",
},
},
},
Expand All @@ -79,6 +80,10 @@ func TestNamespaceCache(t *testing.T) {
require.True(t, ok)
require.Equal(t, "012345678912", ownerAccountID)

endpointURL, ok := namespaceCache.GetEndpointURL("production")
require.True(t, ok)
require.Equal(t, "https://amazon-service.region.amazonaws.com", endpointURL)

// Test update events
k8sClient.CoreV1().Namespaces().Update(
context.Background(),
Expand All @@ -88,6 +93,7 @@ func TestNamespaceCache(t *testing.T) {
Annotations: map[string]string{
ackv1alpha1.AnnotationDefaultRegion: "us-est-1",
ackv1alpha1.AnnotationOwnerAccountID: "21987654321",
ackv1alpha1.AnnotationEndpointURL: "https://amazon-other-service.region.amazonaws.com",
},
},
},
Expand All @@ -104,6 +110,10 @@ func TestNamespaceCache(t *testing.T) {
require.True(t, ok)
require.Equal(t, "21987654321", ownerAccountID)

endpointURL, ok = namespaceCache.GetEndpointURL("production")
require.True(t, ok)
require.Equal(t, "https://amazon-other-service.region.amazonaws.com", endpointURL)

// Test delete events
k8sClient.CoreV1().Namespaces().Delete(
context.Background(),
Expand Down
23 changes: 21 additions & 2 deletions pkg/runtime/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,9 @@ func (r *resourceReconciler) Reconcile(req ctrlrt.Request) (ctrlrt.Result, error
acctID := r.getOwnerAccountID(res)
region := r.getRegion(res)
roleARN := r.getRoleARN(acctID)
endpointURL := r.getEndpointURL(res)
sess, err := r.sc.NewSession(
region, &r.cfg.EndpointURL, roleARN,
region, &endpointURL, roleARN,
res.RuntimeObject().GetObjectKind().GroupVersionKind(),
)
if err != nil {
Expand Down Expand Up @@ -170,7 +171,6 @@ func (r *resourceReconciler) reconcile(
if res.IsBeingDeleted() {
return r.cleanup(ctx, rm, res)
}

return r.Sync(ctx, rm, res)
}

Expand Down Expand Up @@ -612,6 +612,25 @@ func (r *resourceReconciler) getRegion(
return ackv1alpha1.AWSRegion(r.cfg.Region)
}

// getEndpointURL returns the AWS account that owns the supplied resource.
// We look for the namespace associated endpoint url, if that is set we use it.
// Otherwise if none of these annotations are set we use the endpoint url specified
// in the configuration
func (r *resourceReconciler) getEndpointURL(
res acktypes.AWSResource,
) string {

// look for endpoint url in the namespace annotations
namespace := res.MetaObject().GetNamespace()
endpointURL, ok := r.cache.Namespaces.GetEndpointURL(namespace)
if ok {
return endpointURL
}

// use controller configuration EndpointURL
return r.cfg.EndpointURL
}

// NewReconciler returns a new reconciler object
func NewReconciler(
sc acktypes.ServiceController,
Expand Down