Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions internal/knowledge/datasources/plugins/openstack/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cobaltcore-dev/cortex/pkg/sso"
"github.com/sapcc/go-bits/jobloop"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -59,9 +60,20 @@ type OpenStackDatasourceReconciler struct {
// move the current state of the cluster closer to the desired state.
func (r *OpenStackDatasourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := logf.FromContext(ctx)
log.Info("Reconciling resource")

datasource := &v1alpha1.Datasource{}
if err := r.Get(ctx, req.NamespacedName, datasource); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
if apierrors.IsNotFound(err) {
// If the custom resource is not found then it usually means
// that it was deleted or not created.
log.Info("Resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}

// Error reading the object - requeue the request.
log.Error(err, "Failed to get resource")
return ctrl.Result{}, err
}

// Sanity checks.
Expand All @@ -75,6 +87,7 @@ func (r *OpenStackDatasourceReconciler) Reconcile(ctx context.Context, req ctrl.
}

// Authenticate with the database based on the secret provided in the datasource.
log.Info("Connecting to database")
authenticatedDB, err := db.Connector{Client: r.Client}.
FromSecretRef(ctx, datasource.Spec.DatabaseSecretRef)
if err != nil {
Expand All @@ -97,6 +110,7 @@ func (r *OpenStackDatasourceReconciler) Reconcile(ctx context.Context, req ctrl.
// Authenticate with the datasource host if SSO is configured.
var authenticatedHTTP = http.DefaultClient
if datasource.Spec.SSOSecretRef != nil {
log.Info("Collecting SSO credentials and authenticating with datasource host if applicable")
authenticatedHTTP, err = sso.Connector{Client: r.Client}.
FromSecretRef(ctx, *datasource.Spec.SSOSecretRef)
if err != nil {
Expand All @@ -118,6 +132,7 @@ func (r *OpenStackDatasourceReconciler) Reconcile(ctx context.Context, req ctrl.
}

// Authenticate with keystone.
log.Info("Authenticating with keystone")
authenticatedKeystone, err := keystone.Connector{Client: r.Client, HTTPClient: authenticatedHTTP}.
FromSecretRef(ctx, datasource.Spec.OpenStack.SecretRef)
if err != nil {
Expand All @@ -137,14 +152,15 @@ func (r *OpenStackDatasourceReconciler) Reconcile(ctx context.Context, req ctrl.
return ctrl.Result{}, err
}

log.Info("Looking for supported syncer of datasource")
syncer, err := getSupportedSyncer(
*datasource,
authenticatedDB,
authenticatedKeystone,
r.Monitor,
)
if err != nil {
log.Info("skipping datasource, unsupported openstack datasource type", "type", datasource.Spec.OpenStack.Type)
log.Error(err, "failed to get supported syncer for datasource", "type", datasource.Spec.OpenStack.Type)
old := datasource.DeepCopy()
meta.SetStatusCondition(&datasource.Status.Conditions, metav1.Condition{
Type: v1alpha1.DatasourceConditionReady,
Expand All @@ -161,6 +177,7 @@ func (r *OpenStackDatasourceReconciler) Reconcile(ctx context.Context, req ctrl.
}

// Initialize the syncer before syncing.
log.Info("Initializing syncer for datasource")
if err := syncer.Init(ctx); err != nil {
log.Error(err, "failed to init openstack datasource", "name", datasource.Name)
old := datasource.DeepCopy()
Expand All @@ -178,7 +195,9 @@ func (r *OpenStackDatasourceReconciler) Reconcile(ctx context.Context, req ctrl.
return ctrl.Result{}, err
}

log.Info("Syncing datasource")
nResults, err := syncer.Sync(ctx)
log.Info("Finished syncing datasource", "name", datasource.Name, "numberOfResults", nResults)
if errors.Is(err, v1alpha1.ErrWaitingForDependencyDatasource) {
log.Info("datasource sync waiting for dependency datasource", "name", datasource.Name)
old := datasource.DeepCopy()
Expand Down Expand Up @@ -215,6 +234,7 @@ func (r *OpenStackDatasourceReconciler) Reconcile(ctx context.Context, req ctrl.
}

// Update the datasource status to reflect successful sync.
log.Info("Synced successfully. Patching datasource.", "name", datasource.Name)
old := datasource.DeepCopy()
meta.SetStatusCondition(&datasource.Status.Conditions, metav1.Condition{
Type: v1alpha1.DatasourceConditionReady,
Expand All @@ -233,7 +253,8 @@ func (r *OpenStackDatasourceReconciler) Reconcile(ctx context.Context, req ctrl.
}

// Calculate the next sync time based on the configured sync interval.
return ctrl.Result{RequeueAfter: time.Until(nextTime)}, nil
log.Info("Finished reconcile", "next", nextTime)
return ctrl.Result{RequeueAfter: datasource.Spec.OpenStack.SyncInterval.Duration}, nil
}

func (r *OpenStackDatasourceReconciler) SetupWithManager(mgr manager.Manager, mcl *multicluster.Client) error {
Expand Down
Loading