From 01d5ebce62493c16a943e591e20b2fb8ed9c969c Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Fri, 9 Dec 2022 12:27:32 -0800 Subject: [PATCH] Bmoric/convert connection micronaut (#20211) * tmp * Fix build * tmp * Tmp * tmp * tmp * Tmp * tmp * tmp * Clean up * tmp * Convert Connection Api Controller * PR Comments * convert openapiapicontroller to micronaut (#20258) * convert openapiapicontroller to micronaut * merge health/openapi locations into one entry * Fix build * Format * Remove media type * Format Co-authored-by: Cole Snodgrass --- .../airbyte/api/client/AirbyteApiClient.java | 2 +- airbyte-proxy/nginx-auth.conf.template | 8 +- airbyte-proxy/nginx-no-auth.conf.template | 8 +- airbyte-server/build.gradle | 1 + .../java/io/airbyte/server/ServerApp.java | 7 +- .../java/io/airbyte/server/ServerFactory.java | 22 +--- .../server/apis/ConnectionApiController.java | 35 +++-- .../server/apis/OpenapiApiController.java | 12 +- .../apis/binders/ConnectionApiBinder.java | 21 --- .../server/apis/binders/OpenapiApiBinder.java | 21 --- .../apis/factories/ConnectionApiFactory.java | 44 ------- .../apis/factories/OpenapiApiFactory.java | 29 ----- .../server/config/ApplicationBeanFactory.java | 92 +++++++++++++ .../config/CloudStorageBeanFactory.java | 92 +++++++++++++ .../server/config/DatabaseBeanFactory.java | 7 + .../server/config/HelperBeanFactory.java | 21 +++ .../config/JobErrorReportingBeanFactory.java | 68 ++++++++++ .../config/SecretPersistenceBeanFactory.java | 123 ++++++++++++++++++ .../server/config/TemporalBeanFactory.java | 72 ++++++++++ .../server/converters/JobConverter.java | 4 +- .../server/handlers/ConnectionsHandler.java | 6 + .../server/handlers/OpenApiConfigHandler.java | 2 + .../server/handlers/SchedulerHandler.java | 11 +- .../src/main/resources/application.yml | 23 ++++ 24 files changed, 561 insertions(+), 170 deletions(-) delete mode 100644 airbyte-server/src/main/java/io/airbyte/server/apis/binders/ConnectionApiBinder.java delete mode 100644 airbyte-server/src/main/java/io/airbyte/server/apis/binders/OpenapiApiBinder.java delete mode 100644 airbyte-server/src/main/java/io/airbyte/server/apis/factories/ConnectionApiFactory.java delete mode 100644 airbyte-server/src/main/java/io/airbyte/server/apis/factories/OpenapiApiFactory.java create mode 100644 airbyte-server/src/main/java/io/airbyte/server/config/ApplicationBeanFactory.java create mode 100644 airbyte-server/src/main/java/io/airbyte/server/config/CloudStorageBeanFactory.java create mode 100644 airbyte-server/src/main/java/io/airbyte/server/config/HelperBeanFactory.java create mode 100644 airbyte-server/src/main/java/io/airbyte/server/config/JobErrorReportingBeanFactory.java create mode 100644 airbyte-server/src/main/java/io/airbyte/server/config/SecretPersistenceBeanFactory.java create mode 100644 airbyte-server/src/main/java/io/airbyte/server/config/TemporalBeanFactory.java diff --git a/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java b/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java index cf53428e296d4f..bbf1eb9748c224 100644 --- a/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java +++ b/airbyte-api/src/main/java/io/airbyte/api/client/AirbyteApiClient.java @@ -61,7 +61,7 @@ public class AirbyteApiClient { private final StateApi stateApi; public AirbyteApiClient(final ApiClient apiClient, final ApiClient micronautApiClient) { - connectionApi = new ConnectionApi(apiClient); + connectionApi = new ConnectionApi(micronautApiClient); destinationDefinitionApi = new DestinationDefinitionApi(apiClient); destinationApi = new DestinationApi(apiClient); destinationSpecificationApi = new DestinationDefinitionSpecificationApi(apiClient); diff --git a/airbyte-proxy/nginx-auth.conf.template b/airbyte-proxy/nginx-auth.conf.template index 1a0f6ee8c24740..625a6a6a8e211a 100644 --- a/airbyte-proxy/nginx-auth.conf.template +++ b/airbyte-proxy/nginx-auth.conf.template @@ -21,7 +21,7 @@ http { } } - location /api/v1/health { + location ~ ^/api/v1/(health|openapi) { proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; @@ -38,7 +38,7 @@ http { } } - location /api/v1/operations { + location ~ ^/api/v1/(connections|operations)/.* { proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; @@ -76,7 +76,7 @@ http { } } - location /api/v1/health { + location ~ ^/api/v1/(health|openapi) { proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; @@ -93,7 +93,7 @@ http { } } - location /api/v1/operations { + location ~ ^/api/v1/(connections|operations)/.* { proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; diff --git a/airbyte-proxy/nginx-no-auth.conf.template b/airbyte-proxy/nginx-no-auth.conf.template index 3de1d4fa54c8b1..b3794e8bbcff6e 100644 --- a/airbyte-proxy/nginx-no-auth.conf.template +++ b/airbyte-proxy/nginx-no-auth.conf.template @@ -12,7 +12,7 @@ http { proxy_pass "${PROXY_PASS_WEB}"; } - location /api/v1/health { + location ~ ^/api/v1/(health|openapi) { proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; @@ -20,7 +20,7 @@ http { proxy_pass "${PROXY_PASS_MICRONAUT_API}"; } - location /api/v1/operations { + location ~ ^/api/v1/(connections|operations)/.* { proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; @@ -40,7 +40,7 @@ http { proxy_pass "${PROXY_PASS_API}"; } - location /api/v1/health { + location ~ ^/api/v1/(health|openapi) { proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; @@ -48,7 +48,7 @@ http { proxy_pass "${PROXY_PASS_MICRONAUT_API}"; } - location /api/v1/operations { + location ~ ^/api/v1/(connections|operations)/.* { proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; diff --git a/airbyte-server/build.gradle b/airbyte-server/build.gradle index ee95967e8b2c88..e4edd67cc54b8a 100644 --- a/airbyte-server/build.gradle +++ b/airbyte-server/build.gradle @@ -36,6 +36,7 @@ dependencies { implementation 'org.glassfish.jersey.media:jersey-media-json-jackson' implementation 'org.glassfish.jersey.ext:jersey-bean-validation' implementation 'org.quartz-scheduler:quartz:2.3.2' + implementation 'io.sentry:sentry:6.3.1' implementation 'io.swagger:swagger-annotations:1.6.2' annotationProcessor platform(libs.micronaut.bom) diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index df80788247e422..6437d876a580bb 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -8,6 +8,7 @@ import io.airbyte.analytics.TrackingClient; import io.airbyte.analytics.TrackingClientSingleton; import io.airbyte.commons.features.EnvVariableFeatureFlags; +import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.temporal.ConnectionManagerUtils; import io.airbyte.commons.temporal.StreamResetRecordsHelper; @@ -50,7 +51,6 @@ import io.airbyte.server.handlers.JobHistoryHandler; import io.airbyte.server.handlers.LogsHandler; import io.airbyte.server.handlers.OAuthHandler; -import io.airbyte.server.handlers.OpenApiConfigHandler; import io.airbyte.server.handlers.OperationsHandler; import io.airbyte.server.handlers.SchedulerHandler; import io.airbyte.server.handlers.SourceDefinitionsHandler; @@ -205,7 +205,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final TrackingClient trackingClient = TrackingClientSingleton.get(); final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence, trackingClient); - final EnvVariableFeatureFlags envVariableFeatureFlags = new EnvVariableFeatureFlags(); + final FeatureFlags envVariableFeatureFlags = new EnvVariableFeatureFlags(); final WebUrlHelper webUrlHelper = new WebUrlHelper(configs.getWebappUrl()); final JobErrorReportingClient jobErrorReportingClient = JobErrorReportingClientFactory.getClient(configs.getJobErrorReportingStrategy(), configs); @@ -320,8 +320,6 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, destinationHandler, sourceHandler); - final OpenApiConfigHandler openApiConfigHandler = new OpenApiConfigHandler(); - final StatePersistence statePersistence = new StatePersistence(configsDatabase); final StateHandler stateHandler = new StateHandler(statePersistence); @@ -369,7 +367,6 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, jobHistoryHandler, logsHandler, oAuthHandler, - openApiConfigHandler, operationsHandler, schedulerHandler, sourceHandler, diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java b/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java index 4a8ba1617dfa8c..0a47296f31bf93 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java @@ -13,7 +13,6 @@ import io.airbyte.config.persistence.SecretsRepositoryWriter; import io.airbyte.db.Database; import io.airbyte.persistence.job.JobPersistence; -import io.airbyte.server.apis.ConnectionApiController; import io.airbyte.server.apis.DestinationApiController; import io.airbyte.server.apis.DestinationDefinitionApiController; import io.airbyte.server.apis.DestinationDefinitionSpecificationApiController; @@ -31,7 +30,6 @@ import io.airbyte.server.apis.StateApiController; import io.airbyte.server.apis.WebBackendApiController; import io.airbyte.server.apis.WorkspaceApiController; -import io.airbyte.server.apis.binders.ConnectionApiBinder; import io.airbyte.server.apis.binders.DestinationApiBinder; import io.airbyte.server.apis.binders.DestinationDefinitionApiBinder; import io.airbyte.server.apis.binders.DestinationDefinitionSpecificationApiBinder; @@ -39,7 +37,6 @@ import io.airbyte.server.apis.binders.JobsApiBinder; import io.airbyte.server.apis.binders.LogsApiBinder; import io.airbyte.server.apis.binders.NotificationApiBinder; -import io.airbyte.server.apis.binders.OpenapiApiBinder; import io.airbyte.server.apis.binders.SchedulerApiBinder; import io.airbyte.server.apis.binders.SourceApiBinder; import io.airbyte.server.apis.binders.SourceDefinitionApiBinder; @@ -48,7 +45,6 @@ import io.airbyte.server.apis.binders.StateApiBinder; import io.airbyte.server.apis.binders.WebBackendApiBinder; import io.airbyte.server.apis.binders.WorkspaceApiBinder; -import io.airbyte.server.apis.factories.ConnectionApiFactory; import io.airbyte.server.apis.factories.DestinationApiFactory; import io.airbyte.server.apis.factories.DestinationDefinitionApiFactory; import io.airbyte.server.apis.factories.DestinationDefinitionSpecificationApiFactory; @@ -56,7 +52,6 @@ import io.airbyte.server.apis.factories.JobsApiFactory; import io.airbyte.server.apis.factories.LogsApiFactory; import io.airbyte.server.apis.factories.NotificationsApiFactory; -import io.airbyte.server.apis.factories.OpenapiApiFactory; import io.airbyte.server.apis.factories.SchedulerApiFactory; import io.airbyte.server.apis.factories.SourceApiFactory; import io.airbyte.server.apis.factories.SourceDefinitionApiFactory; @@ -73,7 +68,6 @@ import io.airbyte.server.handlers.JobHistoryHandler; import io.airbyte.server.handlers.LogsHandler; import io.airbyte.server.handlers.OAuthHandler; -import io.airbyte.server.handlers.OpenApiConfigHandler; import io.airbyte.server.handlers.OperationsHandler; import io.airbyte.server.handlers.SchedulerHandler; import io.airbyte.server.handlers.SourceDefinitionsHandler; @@ -118,7 +112,6 @@ ServerRunnable create(final SynchronousSchedulerClient synchronousSchedulerClien final JobHistoryHandler jobHistoryHandler, final LogsHandler logsHandler, final OAuthHandler oAuthHandler, - final OpenApiConfigHandler openApiConfigHandler, final OperationsHandler operationsHandler, final SchedulerHandler schedulerHandler, final SourceHandler sourceHandler, @@ -156,7 +149,6 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul final JobHistoryHandler jobHistoryHandler, final LogsHandler logsHandler, final OAuthHandler oAuthHandler, - final OpenApiConfigHandler openApiConfigHandler, final OperationsHandler operationsHandler, final SchedulerHandler schedulerHandler, final SourceHandler sourceHandler, @@ -168,12 +160,6 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul final WebBackendCheckUpdatesHandler webBackendCheckUpdatesHandler) { final Map mdc = MDC.getCopyOfContextMap(); - ConnectionApiFactory.setValues( - connectionsHandler, - operationsHandler, - schedulerHandler, - mdc); - DestinationApiFactory.setValues(destinationApiHandler, schedulerHandler, mdc); DestinationDefinitionApiFactory.setValues(destinationDefinitionsHandler); @@ -190,8 +176,6 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul NotificationsApiFactory.setValues(workspacesHandler); - OpenapiApiFactory.setValues(openApiConfigHandler); - SchedulerApiFactory.setValues(schedulerHandler); SourceApiFactory.setValues(schedulerHandler, sourceHandler); @@ -206,9 +190,8 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul WorkspaceApiFactory.setValues(workspacesHandler); - // server configurations + // server configuration final Set> componentClasses = Set.of( - ConnectionApiController.class, DestinationApiController.class, DestinationDefinitionApiController.class, DestinationDefinitionSpecificationApiController.class, @@ -228,8 +211,6 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul WorkspaceApiController.class); final Set components = Set.of( - new CorsFilter(), - new ConnectionApiBinder(), new DestinationApiBinder(), new DestinationDefinitionApiBinder(), new DestinationDefinitionSpecificationApiBinder(), @@ -237,7 +218,6 @@ public ServerRunnable create(final SynchronousSchedulerClient synchronousSchedul new JobsApiBinder(), new LogsApiBinder(), new NotificationApiBinder(), - new OpenapiApiBinder(), new SchedulerApiBinder(), new SourceApiBinder(), new SourceDefinitionApiBinder(), diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/ConnectionApiController.java b/airbyte-server/src/main/java/io/airbyte/server/apis/ConnectionApiController.java index fe29b42e1f3333..0992b878f47044 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/ConnectionApiController.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/ConnectionApiController.java @@ -16,9 +16,13 @@ import io.airbyte.server.handlers.ConnectionsHandler; import io.airbyte.server.handlers.OperationsHandler; import io.airbyte.server.handlers.SchedulerHandler; -import javax.ws.rs.Path; +import io.micronaut.context.annotation.Context; +import io.micronaut.http.annotation.Body; +import io.micronaut.http.annotation.Controller; +import io.micronaut.http.annotation.Post; -@Path("/v1/connections") +@Controller("/api/v1/connections") +@Context() public class ConnectionApiController implements ConnectionApi { private final ConnectionsHandler connectionsHandler; @@ -34,37 +38,44 @@ public ConnectionApiController(final ConnectionsHandler connectionsHandler, } @Override - public ConnectionRead createConnection(final ConnectionCreate connectionCreate) { + @Post(uri = "/create") + public ConnectionRead createConnection(@Body final ConnectionCreate connectionCreate) { return ApiHelper.execute(() -> connectionsHandler.createConnection(connectionCreate)); } @Override - public ConnectionRead updateConnection(final ConnectionUpdate connectionUpdate) { + @Post(uri = "/update") + public ConnectionRead updateConnection(@Body final ConnectionUpdate connectionUpdate) { return ApiHelper.execute(() -> connectionsHandler.updateConnection(connectionUpdate)); } @Override - public ConnectionReadList listConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) { + @Post(uri = "/list") + public ConnectionReadList listConnectionsForWorkspace(@Body final WorkspaceIdRequestBody workspaceIdRequestBody) { return ApiHelper.execute(() -> connectionsHandler.listConnectionsForWorkspace(workspaceIdRequestBody)); } @Override - public ConnectionReadList listAllConnectionsForWorkspace(final WorkspaceIdRequestBody workspaceIdRequestBody) { + @Post(uri = "/list_all") + public ConnectionReadList listAllConnectionsForWorkspace(@Body final WorkspaceIdRequestBody workspaceIdRequestBody) { return ApiHelper.execute(() -> connectionsHandler.listAllConnectionsForWorkspace(workspaceIdRequestBody)); } @Override - public ConnectionReadList searchConnections(final ConnectionSearch connectionSearch) { + @Post(uri = "/search") + public ConnectionReadList searchConnections(@Body final ConnectionSearch connectionSearch) { return ApiHelper.execute(() -> connectionsHandler.searchConnections(connectionSearch)); } @Override - public ConnectionRead getConnection(final ConnectionIdRequestBody connectionIdRequestBody) { + @Post(uri = "/get") + public ConnectionRead getConnection(@Body final ConnectionIdRequestBody connectionIdRequestBody) { return ApiHelper.execute(() -> connectionsHandler.getConnection(connectionIdRequestBody.getConnectionId())); } @Override - public void deleteConnection(final ConnectionIdRequestBody connectionIdRequestBody) { + @Post(uri = "/delete") + public void deleteConnection(@Body final ConnectionIdRequestBody connectionIdRequestBody) { ApiHelper.execute(() -> { operationsHandler.deleteOperationsForConnection(connectionIdRequestBody); connectionsHandler.deleteConnection(connectionIdRequestBody.getConnectionId()); @@ -73,12 +84,14 @@ public void deleteConnection(final ConnectionIdRequestBody connectionIdRequestBo } @Override - public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody) { + @Post(uri = "/sync") + public JobInfoRead syncConnection(@Body final ConnectionIdRequestBody connectionIdRequestBody) { return ApiHelper.execute(() -> schedulerHandler.syncConnection(connectionIdRequestBody)); } @Override - public JobInfoRead resetConnection(final ConnectionIdRequestBody connectionIdRequestBody) { + @Post(uri = "/reset") + public JobInfoRead resetConnection(@Body final ConnectionIdRequestBody connectionIdRequestBody) { return ApiHelper.execute(() -> schedulerHandler.resetConnection(connectionIdRequestBody)); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/OpenapiApiController.java b/airbyte-server/src/main/java/io/airbyte/server/apis/OpenapiApiController.java index ae749403b4201c..8d3ebadb4f6293 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/OpenapiApiController.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/OpenapiApiController.java @@ -6,16 +6,20 @@ import io.airbyte.api.generated.OpenapiApi; import io.airbyte.server.handlers.OpenApiConfigHandler; +import io.micronaut.http.annotation.Controller; +import io.micronaut.http.annotation.Get; import java.io.File; -import javax.ws.rs.Path; -import lombok.AllArgsConstructor; -@Path("/v1/openapi") -@AllArgsConstructor +@Controller("/api/v1/openapi") public class OpenapiApiController implements OpenapiApi { private final OpenApiConfigHandler openApiConfigHandler; + public OpenapiApiController(final OpenApiConfigHandler openApiConfigHandler) { + this.openApiConfigHandler = openApiConfigHandler; + } + + @Get(produces = "text/plain") @Override public File getOpenApiSpec() { return ApiHelper.execute(openApiConfigHandler::getFile); diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/binders/ConnectionApiBinder.java b/airbyte-server/src/main/java/io/airbyte/server/apis/binders/ConnectionApiBinder.java deleted file mode 100644 index d95fe33855141a..00000000000000 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/binders/ConnectionApiBinder.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.server.apis.binders; - -import io.airbyte.server.apis.ConnectionApiController; -import io.airbyte.server.apis.factories.ConnectionApiFactory; -import org.glassfish.hk2.utilities.binding.AbstractBinder; -import org.glassfish.jersey.process.internal.RequestScoped; - -public class ConnectionApiBinder extends AbstractBinder { - - @Override - protected void configure() { - bindFactory(ConnectionApiFactory.class) - .to(ConnectionApiController.class) - .in(RequestScoped.class); - } - -} diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/binders/OpenapiApiBinder.java b/airbyte-server/src/main/java/io/airbyte/server/apis/binders/OpenapiApiBinder.java deleted file mode 100644 index 820da88a9206a9..00000000000000 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/binders/OpenapiApiBinder.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.server.apis.binders; - -import io.airbyte.server.apis.OpenapiApiController; -import io.airbyte.server.apis.factories.OpenapiApiFactory; -import org.glassfish.hk2.utilities.binding.AbstractBinder; -import org.glassfish.jersey.process.internal.RequestScoped; - -public class OpenapiApiBinder extends AbstractBinder { - - @Override - protected void configure() { - bindFactory(OpenapiApiFactory.class) - .to(OpenapiApiController.class) - .in(RequestScoped.class); - } - -} diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/factories/ConnectionApiFactory.java b/airbyte-server/src/main/java/io/airbyte/server/apis/factories/ConnectionApiFactory.java deleted file mode 100644 index 7378d342b65f40..00000000000000 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/factories/ConnectionApiFactory.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.server.apis.factories; - -import io.airbyte.server.apis.ConnectionApiController; -import io.airbyte.server.handlers.ConnectionsHandler; -import io.airbyte.server.handlers.OperationsHandler; -import io.airbyte.server.handlers.SchedulerHandler; -import java.util.Map; -import org.glassfish.hk2.api.Factory; -import org.slf4j.MDC; - -public class ConnectionApiFactory implements Factory { - - private static ConnectionsHandler connectionsHandler; - private static OperationsHandler operationsHandler; - private static SchedulerHandler schedulerHandler; - private static Map mdc; - - public static void setValues(final ConnectionsHandler connectionsHandler, - final OperationsHandler operationsHandler, - final SchedulerHandler schedulerHandler, - final Map mdc) { - ConnectionApiFactory.connectionsHandler = connectionsHandler; - ConnectionApiFactory.operationsHandler = operationsHandler; - ConnectionApiFactory.schedulerHandler = schedulerHandler; - ConnectionApiFactory.mdc = mdc; - } - - @Override - public ConnectionApiController provide() { - MDC.setContextMap(ConnectionApiFactory.mdc); - - return new ConnectionApiController(connectionsHandler, operationsHandler, schedulerHandler); - } - - @Override - public void dispose(final ConnectionApiController instance) { - /* no op */ - } - -} diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/factories/OpenapiApiFactory.java b/airbyte-server/src/main/java/io/airbyte/server/apis/factories/OpenapiApiFactory.java deleted file mode 100644 index 239fc610188b39..00000000000000 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/factories/OpenapiApiFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (c) 2022 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.server.apis.factories; - -import io.airbyte.server.apis.OpenapiApiController; -import io.airbyte.server.handlers.OpenApiConfigHandler; -import org.glassfish.hk2.api.Factory; - -public class OpenapiApiFactory implements Factory { - - private static OpenApiConfigHandler openApiConfigHandler; - - public static void setValues(final OpenApiConfigHandler openApiConfigHandler) { - OpenapiApiFactory.openApiConfigHandler = openApiConfigHandler; - } - - @Override - public OpenapiApiController provide() { - return new OpenapiApiController(openApiConfigHandler); - } - - @Override - public void dispose(final OpenapiApiController instance) { - /* no op */ - } - -} diff --git a/airbyte-server/src/main/java/io/airbyte/server/config/ApplicationBeanFactory.java b/airbyte-server/src/main/java/io/airbyte/server/config/ApplicationBeanFactory.java new file mode 100644 index 00000000000000..0dcf0d74338bac --- /dev/null +++ b/airbyte-server/src/main/java/io/airbyte/server/config/ApplicationBeanFactory.java @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.server.config; + +import io.airbyte.analytics.TrackingClient; +import io.airbyte.commons.features.EnvVariableFeatureFlags; +import io.airbyte.commons.features.FeatureFlags; +import io.airbyte.commons.temporal.TemporalClient; +import io.airbyte.commons.temporal.config.WorkerMode; +import io.airbyte.commons.version.AirbyteVersion; +import io.airbyte.config.Configs.DeploymentMode; +import io.airbyte.config.Configs.TrackingStrategy; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.persistence.job.JobPersistence; +import io.airbyte.persistence.job.WebUrlHelper; +import io.airbyte.persistence.job.tracker.JobTracker; +import io.airbyte.server.scheduler.EventRunner; +import io.airbyte.server.scheduler.TemporalEventRunner; +import io.micronaut.context.annotation.Factory; +import io.micronaut.context.annotation.Requires; +import io.micronaut.context.annotation.Value; +import io.micronaut.core.util.StringUtils; +import jakarta.inject.Named; +import jakarta.inject.Singleton; +import java.nio.file.Path; +import java.util.Locale; +import java.util.UUID; +import java.util.function.Function; +import java.util.function.Supplier; + +@Factory +public class ApplicationBeanFactory { + + @Singleton + public Supplier randomUUIDSupplier() { + return () -> UUID.randomUUID(); + } + + @Singleton + @Requires(env = WorkerMode.CONTROL_PLANE) + public EventRunner eventRunner(final TemporalClient temporalClient) { + return new TemporalEventRunner(temporalClient); + } + + @Singleton + public TrackingStrategy trackingStrategy(@Value("${airbyte.tracking-strategy}") final String trackingStrategy) { + return convertToEnum(trackingStrategy, TrackingStrategy::valueOf, TrackingStrategy.LOGGING); + } + + @Singleton + public AirbyteVersion airbyteVersion(@Value("${airbyte.version}") final String airbyteVersion) { + return new AirbyteVersion(airbyteVersion); + } + + @Singleton + public DeploymentMode deploymentMode(@Value("${airbyte.deployment-mode}") final String deploymentMode) { + return convertToEnum(deploymentMode, DeploymentMode::valueOf, DeploymentMode.OSS); + } + + @Singleton + @Requires(env = WorkerMode.CONTROL_PLANE) + public JobTracker jobTracker( + final ConfigRepository configRepository, + final JobPersistence jobPersistence, + final TrackingClient trackingClient) { + return new JobTracker(configRepository, jobPersistence, trackingClient); + } + + @Singleton + @Requires(env = WorkerMode.CONTROL_PLANE) + public WebUrlHelper webUrlHelper(@Value("${airbyte.web-app.url}") final String webAppUrl) { + return new WebUrlHelper(webAppUrl); + } + + @Singleton + public FeatureFlags featureFlags() { + return new EnvVariableFeatureFlags(); + } + + @Singleton + @Named("workspaceRoot") + public Path workspaceRoot(@Value("${airbyte.workspace.root}") final String workspaceRoot) { + return Path.of(workspaceRoot); + } + + private T convertToEnum(final String value, final Function creatorFunction, final T defaultValue) { + return StringUtils.isNotEmpty(value) ? creatorFunction.apply(value.toUpperCase(Locale.ROOT)) : defaultValue; + } + +} diff --git a/airbyte-server/src/main/java/io/airbyte/server/config/CloudStorageBeanFactory.java b/airbyte-server/src/main/java/io/airbyte/server/config/CloudStorageBeanFactory.java new file mode 100644 index 00000000000000..06ba03c9512ca7 --- /dev/null +++ b/airbyte-server/src/main/java/io/airbyte/server/config/CloudStorageBeanFactory.java @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.server.config; + +import io.airbyte.config.storage.CloudStorageConfigs; +import io.airbyte.config.storage.CloudStorageConfigs.GcsConfig; +import io.airbyte.config.storage.CloudStorageConfigs.MinioConfig; +import io.airbyte.config.storage.CloudStorageConfigs.S3Config; +import io.micronaut.context.annotation.Factory; +import io.micronaut.context.annotation.Requires; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Named; +import jakarta.inject.Singleton; + +/** + * Micronaut bean factory for cloud storage-related singletons. + */ +@Factory +@SuppressWarnings("PMD.AvoidDuplicateLiterals") +public class CloudStorageBeanFactory { + + @Singleton + @Requires(property = "airbyte.cloud.storage.logs.type", + pattern = "(?i)^gcs$") + @Named("logStorageConfigs") + public CloudStorageConfigs gcsLogStorageConfigs( + @Value("${airbyte.cloud.storage.logs.gcs.bucket}") final String gcsLogBucket, + @Value("${airbyte.cloud.storage.logs.gcs.application-credentials}") final String googleApplicationCredentials) { + return CloudStorageConfigs.gcs(new GcsConfig(gcsLogBucket, googleApplicationCredentials)); + } + + @Singleton + @Requires(property = "airbyte.cloud.storage.logs.type", + pattern = "(?i)^minio$") + @Named("logStorageConfigs") + public CloudStorageConfigs minioLogStorageConfigs( + @Value("${airbyte.cloud.storage.logs.minio.access-key}") final String awsAccessKeyId, + @Value("${airbyte.cloud.storage.logs.minio.secret-access-key}") final String awsSecretAccessKey, + @Value("${airbyte.cloud.storage.logs.minio.bucket}") final String s3LogBucket, + @Value("${airbyte.cloud.storage.logs.minio.endpoint}") final String s3MinioEndpoint) { + return CloudStorageConfigs.minio(new MinioConfig(s3LogBucket, awsAccessKeyId, awsSecretAccessKey, s3MinioEndpoint)); + } + + @Singleton + @Requires(property = "airbyte.cloud.storage.logs.type", + pattern = "(?i)^s3$") + @Named("logStorageConfigs") + public CloudStorageConfigs s3LogStorageConfigs( + @Value("${airbyte.cloud.storage.logs.s3.access-key}") final String awsAccessKeyId, + @Value("${airbyte.cloud.storage.logs.s3.secret-access-key}") final String awsSecretAccessKey, + @Value("${airbyte.cloud.storage.logs.s3.bucket}") final String s3LogBucket, + @Value("${airbyte.cloud.storage.logs.s3.region}") final String s3LogBucketRegion) { + return CloudStorageConfigs.s3(new S3Config(s3LogBucket, awsAccessKeyId, awsSecretAccessKey, s3LogBucketRegion)); + } + + @Singleton + @Requires(property = "airbyte.cloud.storage.state.type", + pattern = "(?i)^gcs$") + @Named("stateStorageConfigs") + public CloudStorageConfigs gcsStateStorageConfiguration( + @Value("${airbyte.cloud.storage.state.gcs.bucket}") final String gcsBucketName, + @Value("${airbyte.cloud.storage.state.gcs.application-credentials}") final String gcsApplicationCredentials) { + return CloudStorageConfigs.gcs(new GcsConfig(gcsBucketName, gcsApplicationCredentials)); + } + + @Singleton + @Requires(property = "airbyte.cloud.storage.state.type", + pattern = "(?i)^minio$") + @Named("stateStorageConfigs") + public CloudStorageConfigs minioStateStorageConfiguration( + @Value("${airbyte.cloud.storage.state.minio.bucket}") final String bucketName, + @Value("${airbyte.cloud.storage.state.minio.access-key}") final String awsAccessKey, + @Value("${airbyte.cloud.storage.state.minio.secret-access-key}") final String secretAccessKey, + @Value("${airbyte.cloud.storage.state.minio.endpoint}") final String endpoint) { + return CloudStorageConfigs.minio(new MinioConfig(bucketName, awsAccessKey, secretAccessKey, endpoint)); + } + + @Singleton + @Requires(property = "airbyte.cloud.storage.state.type", + pattern = "(?i)^s3$") + @Named("stateStorageConfigs") + public CloudStorageConfigs s3StateStorageConfiguration( + @Value("${airbyte.cloud.storage.state.s3.bucket}") final String bucketName, + @Value("${airbyte.cloud.storage.state.s3.access-key}") final String awsAccessKey, + @Value("${airbyte.cloud.storage.state.s3.secret-access-key}") final String secretAcessKey, + @Value("${airbyte.cloud.storage.state.s3.region}") final String s3Region) { + return CloudStorageConfigs.s3(new S3Config(bucketName, awsAccessKey, secretAcessKey, s3Region)); + } + +} diff --git a/airbyte-server/src/main/java/io/airbyte/server/config/DatabaseBeanFactory.java b/airbyte-server/src/main/java/io/airbyte/server/config/DatabaseBeanFactory.java index 3684d242edfdc0..f03bda728e2340 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/config/DatabaseBeanFactory.java +++ b/airbyte-server/src/main/java/io/airbyte/server/config/DatabaseBeanFactory.java @@ -6,6 +6,7 @@ import io.airbyte.commons.temporal.config.WorkerMode; import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.config.persistence.StreamResetPersistence; import io.airbyte.db.Database; import io.airbyte.db.check.DatabaseMigrationCheck; import io.airbyte.db.check.impl.JobsDatabaseAvailabilityCheck; @@ -120,4 +121,10 @@ public JobsDatabaseAvailabilityCheck jobsDatabaseAvailabilityCheck(@Named("confi return new JobsDatabaseAvailabilityCheck(dslContext, DatabaseConstants.DEFAULT_ASSERT_DATABASE_TIMEOUT_MS); } + @Singleton + @Requires(env = WorkerMode.CONTROL_PLANE) + public StreamResetPersistence streamResetPersistence(@Named("configDatabase") final Database configDatabase) { + return new StreamResetPersistence(configDatabase); + } + } diff --git a/airbyte-server/src/main/java/io/airbyte/server/config/HelperBeanFactory.java b/airbyte-server/src/main/java/io/airbyte/server/config/HelperBeanFactory.java new file mode 100644 index 00000000000000..1f7b478f5acf8c --- /dev/null +++ b/airbyte-server/src/main/java/io/airbyte/server/config/HelperBeanFactory.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.server.config; + +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.persistence.job.JobPersistence; +import io.airbyte.persistence.job.WorkspaceHelper; +import io.micronaut.context.annotation.Factory; +import jakarta.inject.Singleton; + +@Factory +public class HelperBeanFactory { + + @Singleton + public WorkspaceHelper workspaceHelper(final ConfigRepository configRepository, final JobPersistence jobPersistence) { + return new WorkspaceHelper(configRepository, jobPersistence); + } + +} diff --git a/airbyte-server/src/main/java/io/airbyte/server/config/JobErrorReportingBeanFactory.java b/airbyte-server/src/main/java/io/airbyte/server/config/JobErrorReportingBeanFactory.java new file mode 100644 index 00000000000000..979361e3d4e160 --- /dev/null +++ b/airbyte-server/src/main/java/io/airbyte/server/config/JobErrorReportingBeanFactory.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.server.config; + +import io.airbyte.commons.temporal.config.WorkerMode; +import io.airbyte.config.Configs.DeploymentMode; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.persistence.job.WebUrlHelper; +import io.airbyte.persistence.job.errorreporter.JobErrorReporter; +import io.airbyte.persistence.job.errorreporter.JobErrorReportingClient; +import io.airbyte.persistence.job.errorreporter.LoggingJobErrorReportingClient; +import io.airbyte.persistence.job.errorreporter.SentryExceptionHelper; +import io.airbyte.persistence.job.errorreporter.SentryJobErrorReportingClient; +import io.airbyte.workers.normalization.NormalizationRunnerFactory; +import io.micronaut.context.annotation.Factory; +import io.micronaut.context.annotation.Requires; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Named; +import jakarta.inject.Singleton; +import java.util.Optional; + +/** + * Micronaut bean factory for job error reporting-related singletons. + */ +@Factory +@SuppressWarnings("PMD.AvoidDuplicateLiterals") +public class JobErrorReportingBeanFactory { + + @Singleton + @Requires(property = "airbyte.worker.job.error-reporting.strategy", + pattern = "(?i)^sentry$") + @Requires(env = WorkerMode.CONTROL_PLANE) + @Named("jobErrorReportingClient") + public JobErrorReportingClient sentryJobErrorReportingClient( + @Value("${airbyte.worker.job.error-reporting.sentry.dsn}") final String sentryDsn) { + return new SentryJobErrorReportingClient(sentryDsn, new SentryExceptionHelper()); + } + + @Singleton + @Requires(property = "airbyte.worker.job.error-reporting.strategy", + pattern = "(?i)^logging$") + @Requires(env = WorkerMode.CONTROL_PLANE) + @Named("jobErrorReportingClient") + public JobErrorReportingClient loggingJobErrorReportingClient() { + return new LoggingJobErrorReportingClient(); + } + + @Singleton + @Requires(env = WorkerMode.CONTROL_PLANE) + public JobErrorReporter jobErrorReporter( + @Value("${airbyte.version}") final String airbyteVersion, + final ConfigRepository configRepository, + final DeploymentMode deploymentMode, + @Named("jobErrorReportingClient") final Optional jobErrorReportingClient, + final WebUrlHelper webUrlHelper) { + return new JobErrorReporter( + configRepository, + deploymentMode, + airbyteVersion, + NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME, + NormalizationRunnerFactory.NORMALIZATION_VERSION, + webUrlHelper, + jobErrorReportingClient.orElseGet(() -> new LoggingJobErrorReportingClient())); + } + +} diff --git a/airbyte-server/src/main/java/io/airbyte/server/config/SecretPersistenceBeanFactory.java b/airbyte-server/src/main/java/io/airbyte/server/config/SecretPersistenceBeanFactory.java new file mode 100644 index 00000000000000..93cb8670bbb8dc --- /dev/null +++ b/airbyte-server/src/main/java/io/airbyte/server/config/SecretPersistenceBeanFactory.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.server.config; + +import io.airbyte.commons.temporal.config.WorkerMode; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.config.persistence.SecretsRepositoryReader; +import io.airbyte.config.persistence.SecretsRepositoryWriter; +import io.airbyte.config.persistence.split_secrets.GoogleSecretManagerPersistence; +import io.airbyte.config.persistence.split_secrets.LocalTestingSecretPersistence; +import io.airbyte.config.persistence.split_secrets.RealSecretsHydrator; +import io.airbyte.config.persistence.split_secrets.SecretPersistence; +import io.airbyte.config.persistence.split_secrets.SecretsHydrator; +import io.airbyte.config.persistence.split_secrets.VaultSecretPersistence; +import io.airbyte.db.Database; +import io.micronaut.context.annotation.Factory; +import io.micronaut.context.annotation.Requires; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Named; +import jakarta.inject.Singleton; +import java.util.Optional; + +/** + * Micronaut bean factory for secret persistence-related singletons. + */ +@Factory +@SuppressWarnings("PMD.AvoidDuplicateLiterals") +public class SecretPersistenceBeanFactory { + + @Singleton + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^(?!testing_config_db_table).*") + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^(?!google_secret_manager).*") + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^(?!vault).*") + @Requires(env = WorkerMode.CONTROL_PLANE) + @Named("secretPersistence") + public SecretPersistence defaultSecretPersistence(@Named("configDatabase") final Database configDatabase) { + return localTestingSecretPersistence(configDatabase); + } + + @Singleton + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^testing_config_db_table$") + @Requires(env = WorkerMode.CONTROL_PLANE) + @Named("secretPersistence") + public SecretPersistence localTestingSecretPersistence(@Named("configDatabase") final Database configDatabase) { + return new LocalTestingSecretPersistence(configDatabase); + } + + @Singleton + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^testing_config_db_table$") + @Requires(env = WorkerMode.CONTROL_PLANE) + @Named("ephemeralSecretPersistence") + public SecretPersistence ephemeralLocalTestingSecretPersistence(@Named("configDatabase") final Database configDatabase) { + return new LocalTestingSecretPersistence(configDatabase); + } + + @Singleton + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^google_secret_manager$") + @Named("secretPersistence") + public SecretPersistence googleSecretPersistence(@Value("${airbyte.secret.store.gcp.credentials}") final String credentials, + @Value("${airbyte.secret.store.gcp.project-id}") final String projectId) { + return GoogleSecretManagerPersistence.getLongLived(projectId, credentials); + } + + @Singleton + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^google_secret_manager$") + @Named("ephemeralSecretPersistence") + public SecretPersistence ephemeralGoogleSecretPersistence(@Value("${airbyte.secret.store.gcp.credentials}") final String credentials, + @Value("${airbyte.secret.store.gcp.project-id}") final String projectId) { + return GoogleSecretManagerPersistence.getEphemeral(projectId, credentials); + } + + @Singleton + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^vault$") + @Requires(env = WorkerMode.CONTROL_PLANE) + @Named("secretPersistence") + public SecretPersistence vaultSecretPersistence(@Value("${airbyte.secret.store.vault.address}") final String address, + @Value("${airbyte.secret.store.vault.prefix}") final String prefix, + @Value("${airbyte.secret.store.vault.token}") final String token) { + return new VaultSecretPersistence(address, prefix, token); + } + + @Singleton + @Requires(property = "airbyte.secret.persistence", + pattern = "(?i)^vault$") + @Requires(env = WorkerMode.CONTROL_PLANE) + @Named("ephemeralSecretPersistence") + public SecretPersistence ephemeralVaultSecretPersistence(@Value("${airbyte.secret.store.vault.address}") final String address, + @Value("${airbyte.secret.store.vault.prefix}") final String prefix, + @Value("${airbyte.secret.store.vault.token}") final String token) { + return new VaultSecretPersistence(address, prefix, token); + } + + @Singleton + @Requires(env = WorkerMode.CONTROL_PLANE) + public SecretsHydrator secretsHydrator(@Named("secretPersistence") final SecretPersistence secretPersistence) { + return new RealSecretsHydrator(secretPersistence); + } + + @Singleton + @Requires(env = WorkerMode.CONTROL_PLANE) + public SecretsRepositoryReader secretsRepositoryReader(final ConfigRepository configRepository, final SecretsHydrator secretsHydrator) { + return new SecretsRepositoryReader(configRepository, secretsHydrator); + } + + @Singleton + @Requires(env = WorkerMode.CONTROL_PLANE) + public SecretsRepositoryWriter secretsRepositoryWriter(final ConfigRepository configRepository, + @Named("secretPersistence") final Optional secretPersistence, + @Named("ephemeralSecretPersistence") final Optional ephemeralSecretPersistence) { + return new SecretsRepositoryWriter(configRepository, secretPersistence, ephemeralSecretPersistence); + } + +} diff --git a/airbyte-server/src/main/java/io/airbyte/server/config/TemporalBeanFactory.java b/airbyte-server/src/main/java/io/airbyte/server/config/TemporalBeanFactory.java new file mode 100644 index 00000000000000..6d49756f6b0fd9 --- /dev/null +++ b/airbyte-server/src/main/java/io/airbyte/server/config/TemporalBeanFactory.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.server.config; + +import io.airbyte.analytics.Deployment; +import io.airbyte.analytics.TrackingClient; +import io.airbyte.analytics.TrackingClientSingleton; +import io.airbyte.commons.temporal.TemporalClient; +import io.airbyte.commons.temporal.config.WorkerMode; +import io.airbyte.commons.version.AirbyteVersion; +import io.airbyte.config.Configs.DeploymentMode; +import io.airbyte.config.Configs.TrackingStrategy; +import io.airbyte.config.Configs.WorkerEnvironment; +import io.airbyte.config.persistence.ConfigRepository; +import io.airbyte.persistence.job.JobPersistence; +import io.airbyte.persistence.job.errorreporter.JobErrorReporter; +import io.airbyte.persistence.job.factory.OAuthConfigSupplier; +import io.airbyte.persistence.job.tracker.JobTracker; +import io.airbyte.server.scheduler.DefaultSynchronousSchedulerClient; +import io.airbyte.server.scheduler.SynchronousSchedulerClient; +import io.micronaut.context.annotation.Factory; +import io.micronaut.context.annotation.Requires; +import io.micronaut.context.annotation.Value; +import jakarta.inject.Singleton; +import java.io.IOException; + +/** + * Micronaut bean factory for Temporal-related singletons. + */ +@Factory +public class TemporalBeanFactory { + + @Singleton + @Requires(env = WorkerMode.CONTROL_PLANE) + public TrackingClient trackingClient(final TrackingStrategy trackingStrategy, + final DeploymentMode deploymentMode, + final JobPersistence jobPersistence, + final WorkerEnvironment workerEnvironment, + @Value("${airbyte.role}") final String airbyteRole, + final AirbyteVersion airbyteVersion, + final ConfigRepository configRepository) + throws IOException { + + TrackingClientSingleton.initialize( + trackingStrategy, + new Deployment(deploymentMode, jobPersistence.getDeployment().orElseThrow(), + workerEnvironment), + airbyteRole, + airbyteVersion, + configRepository); + + return TrackingClientSingleton.get(); + } + + @Singleton + @Requires(env = WorkerMode.CONTROL_PLANE) + public OAuthConfigSupplier oAuthConfigSupplier(final ConfigRepository configRepository, final TrackingClient trackingClient) { + return new OAuthConfigSupplier(configRepository, trackingClient); + } + + @Singleton + @Requires(env = WorkerMode.CONTROL_PLANE) + public SynchronousSchedulerClient synchronousSchedulerClient(final TemporalClient temporalClient, + final JobTracker jobTracker, + final JobErrorReporter jobErrorReporter, + final OAuthConfigSupplier oAuthConfigSupplier) { + return new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, jobErrorReporter, oAuthConfigSupplier); + } + +} diff --git a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java index 52c28f3640f110..ef3630191ab7ec 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java +++ b/airbyte-server/src/main/java/io/airbyte/server/converters/JobConverter.java @@ -44,6 +44,7 @@ import io.airbyte.server.scheduler.SynchronousJobMetadata; import io.airbyte.server.scheduler.SynchronousResponse; import io.airbyte.workers.helper.ProtocolConverters; +import jakarta.inject.Singleton; import java.io.IOException; import java.nio.file.Path; import java.util.List; @@ -52,6 +53,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j +@Singleton public class JobConverter { private final WorkerEnvironment workerEnvironment; @@ -243,7 +245,7 @@ public SynchronousJobRead getSynchronousJobRead(final SynchronousJobMetadata met } public static AttemptNormalizationStatusRead convertAttemptNormalizationStatus( - AttemptNormalizationStatus databaseStatus) { + final AttemptNormalizationStatus databaseStatus) { return new AttemptNormalizationStatusRead() .attemptNumber(databaseStatus.attemptNumber()) .hasRecordsCommitted(!databaseStatus.recordsCommitted().isEmpty()) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java index cf15edd7bf79f0..704ff1ca9f9268 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java @@ -26,6 +26,7 @@ import io.airbyte.api.model.generated.WorkspaceIdRequestBody; import io.airbyte.commons.enums.Enums; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.temporal.config.WorkerMode; import io.airbyte.config.ActorCatalog; import io.airbyte.config.BasicSchedule; import io.airbyte.config.DestinationConnection; @@ -56,6 +57,8 @@ import io.airbyte.server.scheduler.EventRunner; import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.helper.ConnectionHelper; +import io.micronaut.context.annotation.Requires; +import jakarta.inject.Singleton; import java.io.IOException; import java.util.Collections; import java.util.HashSet; @@ -70,6 +73,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@Singleton +@Requires(env = WorkerMode.CONTROL_PLANE) public class ConnectionsHandler { private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionsHandler.class); @@ -96,6 +101,7 @@ public class ConnectionsHandler { this.connectionHelper = connectionHelper; } + @Deprecated(forRemoval = true) public ConnectionsHandler(final ConfigRepository configRepository, final WorkspaceHelper workspaceHelper, final TrackingClient trackingClient, diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/OpenApiConfigHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/OpenApiConfigHandler.java index a1f572f0c6e79f..e9d4b3614ec890 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/OpenApiConfigHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/OpenApiConfigHandler.java @@ -5,10 +5,12 @@ package io.airbyte.server.handlers; import io.airbyte.commons.resources.MoreResources; +import jakarta.inject.Singleton; import java.io.File; import java.io.IOException; import java.nio.file.Files; +@Singleton public class OpenApiConfigHandler { private static final File TMP_FILE; diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 9681f916f559db..6b93073d169df2 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -42,7 +42,7 @@ import io.airbyte.api.model.generated.SynchronousJobRead; import io.airbyte.commons.docker.DockerUtils; import io.airbyte.commons.enums.Enums; -import io.airbyte.commons.features.EnvVariableFeatureFlags; +import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.temporal.ErrorCode; import io.airbyte.commons.temporal.TemporalClient.ManualOperationResult; @@ -75,6 +75,7 @@ import io.airbyte.server.scheduler.SynchronousSchedulerClient; import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; +import jakarta.inject.Singleton; import java.io.IOException; import java.util.ArrayList; import java.util.Optional; @@ -83,6 +84,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j +@Singleton public class SchedulerHandler { private static final HashFunction HASH_FUNCTION = Hashing.md5(); @@ -99,8 +101,9 @@ public class SchedulerHandler { private final JobPersistence jobPersistence; private final JobConverter jobConverter; private final EventRunner eventRunner; - private final EnvVariableFeatureFlags envVariableFeatureFlags; + private final FeatureFlags envVariableFeatureFlags; + // TODO: Convert to be fully using micronaut public SchedulerHandler(final ConfigRepository configRepository, final SecretsRepositoryReader secretsRepositoryReader, final SecretsRepositoryWriter secretsRepositoryWriter, @@ -110,7 +113,7 @@ public SchedulerHandler(final ConfigRepository configRepository, final LogConfigs logConfigs, final EventRunner eventRunner, final ConnectionsHandler connectionsHandler, - final EnvVariableFeatureFlags envVariableFeatureFlags) { + final FeatureFlags envVariableFeatureFlags) { this( configRepository, secretsRepositoryWriter, @@ -134,7 +137,7 @@ public SchedulerHandler(final ConfigRepository configRepository, final EventRunner eventRunner, final JobConverter jobConverter, final ConnectionsHandler connectionsHandler, - final EnvVariableFeatureFlags envVariableFeatureFlags) { + final FeatureFlags envVariableFeatureFlags) { this.configRepository = configRepository; this.secretsRepositoryWriter = secretsRepositoryWriter; this.synchronousSchedulerClient = synchronousSchedulerClient; diff --git a/airbyte-server/src/main/resources/application.yml b/airbyte-server/src/main/resources/application.yml index a4e5956891df72..4febe74ba091b7 100644 --- a/airbyte-server/src/main/resources/application.yml +++ b/airbyte-server/src/main/resources/application.yml @@ -33,3 +33,26 @@ airbyte: address: ${VAULT_ADDRESS:} prefix: ${VAULT_PREFIX:} token: ${VAULT_AUTH_TOKEN:} + role: ${AIRBYTE_ROLE:} + tracking-strategy: ${TRACKING_STRATEGY:LOGGING} + worker: + job: + error-reporting: + sentry: + dsn: ${JOB_ERROR_REPORTING_SENTRY_DSN} + strategy: ${JOB_ERROR_REPORTING_STRATEGY:LOGGING} + web-app: + url: ${WEBAPP_URL:} + workspace: + root: ${WORKSPACE_ROOT} + +temporal: + cloud: + client: + cert: ${TEMPORAL_CLOUD_CLIENT_CERT:} + key: ${TEMPORAL_CLOUD_CLIENT_KEY:} + enabled: ${TEMPORAL_CLOUD_ENABLED:false} + host: ${TEMPORAL_CLOUD_HOST:} + namespace: ${TEMPORAL_CLOUD_NAMESPACE:} + host: ${TEMPORAL_HOST:`airbyte-temporal:7233`} + retention: ${TEMPORAL_HISTORY_RETENTION_IN_DAYS:30}