Skip to content

Commit

Permalink
Convert the server to micronaut (#19194)
Browse files Browse the repository at this point in the history
* Extract Operation API

* Extract scheduler API

* Format

* extract source api

* Extract source definition api

* Add path

* Extract State API

* extract webbackend api

* extract webbackend api

* extract workspace api

* Extract source definition specification api

* Remove configuration API

* tmp

* Checkstyle

* tmp

* tmp

* Inject but don't resolve Bean

* tmp

* Tmp

* fix build

* TMP

* Tmp

* Clean up

* better thread pool

* Change port to 8080

* Fix port

* Rm unused

* Cors filter

* Format

* rename

* Tmp

* Config based

* Rm health controller ref

* tmp

* Pool size

* Mock healthcheck

* Revert "Mock healthcheck"

This reverts commit 4666776.

* Revert "Revert "Mock healthcheck""

This reverts commit 267094a.

* Restore health check

* Tmp

* format

* Rm deprecated

* Fix PMD

* Tmp

* Fix proxy test

* Remove useless annotation

* set auto commit as false

* Clean up and PR comments

* Bmoric/convert attempt micronaut (#19847)

* tmp

* Fix build

* tmp

* Tmp

* tmp

* tmp

* Tmp

* tmp

* tmp

* Clean up

* Comments and banner

* Non related files

* rm tmp

* Fix build

* Format

* Hit the micronaut server directly

* micronaut OperationApiController (#20270)

* micronaut OperationApiController

* pass micronaut client to OperationApi

* Bmoric/convert connection micronaut (#20211)

* tmp

* Fix build

* tmp

* Tmp

* tmp

* tmp

* Tmp

* tmp

* tmp

* Clean up

* tmp

* Convert Connection Api Controller

* PR Comments

* convert openapiapicontroller to micronaut (#20258)

* convert openapiapicontroller to micronaut

* merge health/openapi locations into one entry

* Fix build

* Format

* Remove media type

* Format

Co-authored-by: Cole Snodgrass <cole@airbyte.io>

* Bmoric/convert destination controller micronaut (#20269)

* tmp

* Fix build

* tmp

* Tmp

* tmp

* tmp

* Tmp

* tmp

* tmp

* Clean up

* tmp

* Convert Connection Api Controller

* Tmp

* PR Comments

* convert openapiapicontroller to micronaut (#20258)

* convert openapiapicontroller to micronaut

* merge health/openapi locations into one entry

* Fix bean

* Add JsonSchemaValidator as a Bean

* Fix build

* Format

* Format

* Test fix

* Pr comments

* Remove media type

* Format

* Remove media type

* Format

* format

* Add missing airbyte api client

Co-authored-by: Cole Snodgrass <cole@airbyte.io>

* Bmoric/convert destination definition controller micronaut (#20277)

* tmp

* Fix build

* tmp

* Tmp

* tmp

* tmp

* Tmp

* tmp

* tmp

* Clean up

* tmp

* Convert Connection Api Controller

* Tmp

* PR Comments

* convert openapiapicontroller to micronaut (#20258)

* convert openapiapicontroller to micronaut

* merge health/openapi locations into one entry

* Fix bean

* Add JsonSchemaValidator as a Bean

* Fix build

* Format

* Format

* Test fix

* Pr comments

* Remove media type

* Format

* Remove media type

* Format

* Remove media type

* Format

* api client

* missing annotation

* format

Co-authored-by: Cole Snodgrass <cole@airbyte.io>

* convert StateApiController to Micronaut (#20329)

* convert to micronaut

* nginx updates

* format

* Move dest oauth to micronaut (#20318)

* tmp

* Fix build

* tmp

* Tmp

* tmp

* tmp

* Tmp

* tmp

* tmp

* Clean up

* tmp

* Convert Connection Api Controller

* PR Comments

* convert openapiapicontroller to micronaut (#20258)

* convert openapiapicontroller to micronaut

* merge health/openapi locations into one entry

* Fix build

* Format

* Remove media type

* Format

* Move dest oauth to micronaut

* Pr comments

* format

Co-authored-by: Cole Snodgrass <cole@airbyte.io>

* Bmoric/convert source micronaut (#20334)

* tmp

* Fix build

* tmp

* Tmp

* tmp

* tmp

* Tmp

* tmp

* tmp

* Clean up

* tmp

* Convert Connection Api Controller

* PR Comments

* convert openapiapicontroller to micronaut (#20258)

* convert openapiapicontroller to micronaut

* merge health/openapi locations into one entry

* Fix build

* Format

* Remove media type

* Format

* Tmp

* tmp

* Build

* missing bean

* format

Co-authored-by: Cole Snodgrass <cole@airbyte.io>

* Migrate to micronaut (#20339)

* Migrate source to micronaut

* convert SchedulerApiController to Micronaut (#20337)

* wip; SchedulerApiController

* remove @nAmed

* remove @singleton

* add back todo message

* Bmoric/convert source definition micronaut (#20338)

* tmp

* Fix build

* tmp

* Tmp

* tmp

* tmp

* Tmp

* tmp

* tmp

* Clean up

* tmp

* Convert Connection Api Controller

* PR Comments

* convert openapiapicontroller to micronaut (#20258)

* convert openapiapicontroller to micronaut

* merge health/openapi locations into one entry

* Fix build

* Format

* Remove media type

* Format

* Tmp

* tmp

* Build

* missing bean

* Tmp

* Add Beans

* fix Bean

* Add passthrough

* Clean up

* Missing path

* FIx typo

* Fix conflicts

* for mat

Co-authored-by: Cole Snodgrass <cole@airbyte.io>

* update SourceOauthApiController to Micronaut (#20386)

* convert SourceOauthApiController to Micronaut

* remove SourceOauthApi reference

* convert WorkspaceApiController to micronaut (#20214)

* wip; broken

* convert WorkspaceApiController to micronaut

* remove test controller

* format

* format

* add @Body to SourceOauthApiController

* consolidate nginx settings

* remove unnecessary factories

* Bmoric/convert jobs micronaut (#20382)

* Convert jobs to micronaut

* Nit

* Format

* Bmoric/convert source definition specification micronaut (#20379)

* Migrate source definition specifications to micronaut

* Format

* Format

* convert database assert call to Micronaut (#20406)

* remove dupe config section; add DatabaseEventListener

* move eventlistner to correct package; update implementation

* convert NotificationsApiController to Micronaut (#20396)

* convert NotificationsApiController to Micronaut

* format

* Migrate logs to micronaut (#20400)

* Bmoric/convert webbackend micronaut (#20403)

* Convert jobs to micronaut

* Nit

* Format

* Migrate the webbackend to micronaut

* Add missing Bean

* Cleanup (#20459)

* Cleanup

* More cleanup

* Disable in order to test cloud

* Restore missing files

* Fix test

* Format and fix pmd

* Add transactional

* Fix version

* Tentative

* Cleanup the cleanup

* Rm reference to the micronaut server

* format

* pmd

* more pmd

* fix build

* Delete logs API

* Revert "Delete logs API"

This reverts commit fcb271d.

* Rm flaky test

* Format

* Try to fix test

* Format

* Remove optional

* Rm import

* Test sleep

* Simplify injection

* update import

* Remove sleep

* More injection

* Remove more requirement

* imports

* Remove more requirement

* Fix yaml

* Remove unused conf

* Add role

* Test acceptance test

* Update env

* Revert "Update to Micronaut 3.8.0 (#20716)"

This reverts commit a28f937.

* Update helm chart

* Fix helm chart

* Convert Application Listener

* Format

* Add explicit deployment mode

* Change check port

* Update version and bump version to the right value

* Cleanup

* Update FE end to end test

* Allow head request

* Fix controller

* Format

* Fix http client Bean

* Format

Co-authored-by: Cole Snodgrass <cole@airbyte.io>
  • Loading branch information
2 people authored and jbfbell committed Jan 13, 2023
1 parent 6403184 commit c071fc5
Show file tree
Hide file tree
Showing 127 changed files with 1,585 additions and 1,578 deletions.
5 changes: 2 additions & 3 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ DATABASE_PORT=5432
DATABASE_DB=airbyte
# translate manually DATABASE_URL=jdbc:postgresql://${DATABASE_HOST}:${DATABASE_PORT}/${DATABASE_DB} (do not include the username or password here)
DATABASE_URL=jdbc:postgresql://db:5432/airbyte
JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.29.15.001
JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.40.26.001

# Airbyte Internal Config Database, defaults to Job Database if empty. Explicitly left empty to mute docker compose warnings.
CONFIG_DATABASE_USER=
CONFIG_DATABASE_PASSWORD=
CONFIG_DATABASE_URL=
CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.35.15.001
CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION=0.40.23.002

### AIRBYTE SERVICES ###
TEMPORAL_HOST=airbyte-temporal:7233
Expand Down Expand Up @@ -115,4 +115,3 @@ OTEL_COLLECTOR_ENDPOINT="http://host.docker.internal:4317"

USE_STREAM_CAPABLE_STATE=true
AUTO_DETECT_SCHEMA=false

Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
* This class is meant to consolidate all our API endpoints into a fluent-ish client. Currently, all
* open API generators create a separate class per API "root-route". For example, if our API has two
* routes "/v1/First/get" and "/v1/Second/get", OpenAPI generates (essentially) the following files:
*
* <p>
* ApiClient.java, FirstApi.java, SecondApi.java
*
* <p>
* To call the API type-safely, we'd do new FirstApi(new ApiClient()).get() or new SecondApi(new
* ApiClient()).get(), which can get cumbersome if we're interacting with many pieces of the API.
*
* <p>
* This is currently manually maintained. We could look into autogenerating it if needed.
*/
public class AirbyteApiClient {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.commons.temporal.exception.DeletedWorkflowException;
import io.airbyte.commons.temporal.exception.UnreachableWorkflowException;
import io.airbyte.commons.temporal.scheduling.CheckConnectionWorkflow;
Expand All @@ -30,7 +29,6 @@
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.protocol.models.StreamDescriptor;
import io.micronaut.context.annotation.Requires;
import io.temporal.api.common.v1.WorkflowType;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsRequest;
Expand Down Expand Up @@ -62,7 +60,6 @@

@Slf4j
@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
public class TemporalClient {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ public class ApiClientBeanFactory {
private static final int JWT_TTL_MINUTES = 5;

@Singleton
public ApiClient apiClient(@Value("${airbyte.internal.api.auth-header.name}") final String airbyteApiAuthHeaderName,
@Named("apiClient")
public ApiClient apiClient(
@Value("${airbyte.internal.api.auth-header.name}") final String airbyteApiAuthHeaderName,
@Value("${airbyte.internal.api.host}") final String airbyteApiHost,
@Named("internalApiAuthToken") final BeanProvider<String> internalApiAuthToken,
@Named("internalApiScheme") final String internalApiScheme) {
return new io.airbyte.api.client.invoker.generated.ApiClient()
return new ApiClient()
.setScheme(internalApiScheme)
.setHost(parseHostName(airbyteApiHost))
.setPort(parsePort(airbyteApiHost))
Expand All @@ -66,7 +68,7 @@ public AirbyteApiClient airbyteApiClient(final ApiClient apiClient) {
}

@Singleton
public SourceApi sourceApi(final ApiClient apiClient) {
public SourceApi sourceApi(@Named("apiClient") final ApiClient apiClient) {
return new SourceApi(apiClient);
}

Expand All @@ -87,7 +89,7 @@ public WorkspaceApi workspaceApi(final ApiClient apiClient) {

@Singleton
public HttpClient httpClient() {
return HttpClient.newHttpClient();
return HttpClient.newBuilder().version(HttpClient.Version.HTTP_1_1).build();
}

@Singleton
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.google.common.base.Preconditions;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.config.BasicSchedule;
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.Schedule;
Expand All @@ -18,7 +17,6 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.WorkspaceHelper;
import io.airbyte.validation.json.JsonValidationException;
import io.micronaut.context.annotation.Requires;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.List;
Expand All @@ -28,7 +26,6 @@
// todo (cgardens) - we are not getting any value out of instantiating this class. we should just
// use it as statics. not doing it now, because already in the middle of another refactor.
@Singleton
@Requires(env = WorkerMode.CONTROL_PLANE)
public class ConnectionHelper {

private final ConfigRepository configRepository;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
package io.airbyte.commons.features;

import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Slf4j
public class EnvVariableFeatureFlags implements FeatureFlags {

private static final Logger log = LoggerFactory.getLogger(EnvVariableFeatureFlags.class);

public static final String USE_STREAM_CAPABLE_STATE = "USE_STREAM_CAPABLE_STATE";
public static final String AUTO_DETECT_SCHEMA = "AUTO_DETECT_SCHEMA";
// Set this value to true to see all messages from the source to destination, set to one second
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public File getServerLogFile(final Path workspaceRoot, final WorkerEnvironment w
}
final var cloudLogPath = sanitisePath(APP_LOGGING_CLOUD_PREFIX, getServerLogsRoot(workspaceRoot));
try {
createCloudClientIfNull(logConfigs);
return logClient.downloadCloudLog(logConfigs, cloudLogPath);
} catch (final IOException e) {
throw new RuntimeException("Error retrieving log file: " + cloudLogPath + " from S3", e);
Expand All @@ -95,6 +96,7 @@ public File getSchedulerLogFile(final Path workspaceRoot, final WorkerEnvironmen

final var cloudLogPath = APP_LOGGING_CLOUD_PREFIX + getSchedulerLogsRoot(workspaceRoot);
try {
createCloudClientIfNull(logConfigs);
return logClient.downloadCloudLog(logConfigs, cloudLogPath);
} catch (final IOException e) {
throw new RuntimeException("Error retrieving log file: " + cloudLogPath + " from S3", e);
Expand All @@ -111,6 +113,7 @@ public List<String> getJobLogFile(final WorkerEnvironment workerEnvironment, fin
}

final var cloudLogPath = sanitisePath(JOB_LOGGING_CLOUD_PREFIX, logPath);
createCloudClientIfNull(logConfigs);
return logClient.tailCloudLog(logConfigs, cloudLogPath, LOG_TAIL_SIZE);
}

Expand All @@ -127,6 +130,7 @@ public void deleteLogs(final WorkerEnvironment workerEnvironment, final LogConfi
throw new NotImplementedException("Local log deletes not supported.");
}
final var cloudLogPath = sanitisePath(JOB_LOGGING_CLOUD_PREFIX, Path.of(logPath));
createCloudClientIfNull(logConfigs);
logClient.deleteLogs(logConfigs, cloudLogPath);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public ConfigRepository(final Database database) {
*/
public boolean healthCheck() {
try {
database.query(ctx -> ctx.select(WORKSPACE.ID).from(WORKSPACE).limit(1).fetch());
database.query(ctx -> ctx.select(WORKSPACE.ID).from(WORKSPACE).limit(1).fetch()).stream().count();
} catch (final Exception e) {
LOGGER.error("Health check error: ", e);
return false;
Expand Down Expand Up @@ -294,7 +294,8 @@ private Stream<StandardSourceDefinition> sourceDefQuery(final Optional<UUID> sou
.where(ACTOR_DEFINITION.ACTOR_TYPE.eq(ActorType.source))
.and(sourceDefId.map(ACTOR_DEFINITION.ID::eq).orElse(noCondition()))
.and(includeTombstone ? noCondition() : ACTOR_DEFINITION.TOMBSTONE.notEqual(true))
.fetchStream())
.fetch())
.stream()
.map(DbConverter::buildStandardSourceDefinition)
// Ensure version is set. Needed for connectors not upgraded since we added versioning.
.map(def -> def.withProtocolVersion(AirbyteProtocolVersion.getWithDefault(def.getProtocolVersion()).serialize()));
Expand Down Expand Up @@ -356,7 +357,8 @@ private Stream<StandardDestinationDefinition> destDefQuery(final Optional<UUID>
.where(ACTOR_DEFINITION.ACTOR_TYPE.eq(ActorType.destination))
.and(destDefId.map(ACTOR_DEFINITION.ID::eq).orElse(noCondition()))
.and(includeTombstone ? noCondition() : ACTOR_DEFINITION.TOMBSTONE.notEqual(true))
.fetchStream())
.fetch())
.stream()
.map(DbConverter::buildStandardDestinationDefinition)
// Ensure version is set. Needed for connectors not upgraded since we added versioning.
.map(def -> def.withProtocolVersion(AirbyteProtocolVersion.getWithDefault(def.getProtocolVersion()).serialize()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ private static Stream<Record4<UUID, String, ActorType, String>> getActorDefiniti
return ctx.select(ACTOR_DEFINITION.ID, ACTOR_DEFINITION.DOCKER_REPOSITORY, ACTOR_DEFINITION.ACTOR_TYPE, ACTOR_DEFINITION.PROTOCOL_VERSION)
.from(ACTOR_DEFINITION)
.join(ACTOR).on(ACTOR.ACTOR_DEFINITION_ID.equal(ACTOR_DEFINITION.ID))
.fetchStream();
.fetch()
.stream();
}

static void writeStandardSourceDefinition(final List<StandardSourceDefinition> configs, final DSLContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ private Stream<StandardSyncIdsWithProtocolVersions> findDisabledSyncs(final DSLC
.where(
CONNECTION.UNSUPPORTED_PROTOCOL_VERSION.eq(true).and(
(actorType == ActorType.DESTINATION ? destDef : sourceDef).ID.eq(actorDefId)))
.fetchStream()
.fetch()
.stream()
.map(r -> new StandardSyncIdsWithProtocolVersions(
r.get(CONNECTION.ID),
r.get(sourceDef.ID),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ private Set<StandardSyncProtocolVersionFlag> getProtocolVersionFlagForSyncs(fina
.select(CONNECTION.ID, CONNECTION.UNSUPPORTED_PROTOCOL_VERSION)
.from(CONNECTION)
.where(CONNECTION.ID.in(standardSync.stream().map(StandardSync::getConnectionId).toList()))
.fetchStream())
.fetch())
.stream()
.map(r -> new StandardSyncProtocolVersionFlag(r.get(CONNECTION.ID), r.get(CONNECTION.UNSUPPORTED_PROTOCOL_VERSION)))
.collect(Collectors.toSet());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.persistence.job.models.Job;
import io.airbyte.validation.json.JsonValidationException;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.util.Objects;
import java.util.UUID;
Expand All @@ -30,6 +31,7 @@
// scheduler:persistence in order to get workspace ids for configs (e.g. source). Our options are to
// split this helper by database or put it in a new module.
@SuppressWarnings("PMD.AvoidCatchingThrowable")
@Singleton
public class WorkspaceHelper {

private static final Logger LOGGER = LoggerFactory.getLogger(WorkspaceHelper.class);
Expand Down Expand Up @@ -106,7 +108,7 @@ public UUID load(@NonNull final Long jobId) throws ConfigNotFoundException, IOEx
* There are generally two kinds of helper methods present here. The first kind propagate exceptions
* for the method backing the cache. The second ignores them. The former is meant to be used with
* proper api calls, while the latter is meant to be use with asserts and precondtions checks.
*
* <p>
* In API calls, distinguishing between various exceptions helps return the correct status code.
*/

Expand Down
3 changes: 2 additions & 1 deletion airbyte-proxy/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ function start_container () {
}

function start_container_with_proxy () {
CMD="docker run -d -p $PORT:8000 --env PROXY_PASS_WEB=$1 --env PROXY_PASS_API=$1 --name $NAME airbyte/proxy:$VERSION"
CMD="docker run -d -p $PORT:8000 --env PROXY_PASS_WEB=$1 --env PROXY_PASS_API=$1 --name $NAME
airbyte/proxy:$VERSION"
echo $CMD
eval $CMD
wait_for_docker;
Expand Down
23 changes: 20 additions & 3 deletions airbyte-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ plugins {

configurations.all {
exclude group: 'io.micronaut.jaxrs'
exclude group: 'io.micronaut.sql'
}

dependencies {
Expand All @@ -25,7 +24,6 @@ dependencies {
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-persistence:job-persistence')

implementation libs.flyway.core
implementation 'com.github.slugify:slugify:2.4'
implementation 'commons-cli:commons-cli:1.4'
implementation libs.temporal.sdk
Expand All @@ -38,12 +36,28 @@ dependencies {
implementation 'org.glassfish.jersey.media:jersey-media-json-jackson'
implementation 'org.glassfish.jersey.ext:jersey-bean-validation'
implementation 'org.quartz-scheduler:quartz:2.3.2'
implementation 'io.sentry:sentry:6.3.1'
implementation 'io.swagger:swagger-annotations:1.6.2'

annotationProcessor platform(libs.micronaut.bom)
annotationProcessor libs.bundles.micronaut.annotation.processor
annotationProcessor libs.micronaut.jaxrs.processor

implementation platform(libs.micronaut.bom)
implementation libs.bundles.micronaut
implementation libs.micronaut.jaxrs.server

// Ensure that the versions defined in deps.toml are used
// instead of versions from transitive dependencies
implementation(libs.flyway.core) {
force = true
}

testImplementation project(':airbyte-test-utils')
testImplementation libs.postgresql
testImplementation libs.platform.testcontainers.postgresql
testImplementation 'com.squareup.okhttp3:mockwebserver:4.9.1'
testImplementation 'org.mockito:mockito-inline:4.7.0'
}

// we want to be able to access the generated db files from config/init when we build the server docker image.
Expand All @@ -57,9 +71,10 @@ task copySeed(type: Copy, dependsOn: [project(':airbyte-config:init').processRes
test.dependsOn(project.tasks.copySeed)
assemble.dependsOn(project.tasks.copySeed)

mainClassName = 'io.airbyte.server.ServerApp'
mainClassName = 'io.airbyte.server.Application'

application {
applicationName = project.name
mainClass = mainClassName
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
}
Expand Down Expand Up @@ -87,6 +102,8 @@ run {
environment "AIRBYTE_VERSION", env.VERSION
environment "AIRBYTE_ROLE", System.getenv('AIRBYTE_ROLE')
environment "TEMPORAL_HOST", "localhost:7233"

environment 'MICRONAUT_ENVIRONMENTS', 'control-plane'
}

// produce reproducible archives
Expand Down
15 changes: 15 additions & 0 deletions airbyte-server/src/main/java/io/airbyte/server/Application.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server;

import io.micronaut.runtime.Micronaut;

public class Application {

public static void main(final String[] args) {
Micronaut.run(Application.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server;

import io.airbyte.db.check.DatabaseCheckException;
import io.airbyte.db.check.DatabaseMigrationCheck;
import io.micronaut.context.event.ApplicationEventListener;
import io.micronaut.discovery.event.ServiceReadyEvent;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.lang.invoke.MethodHandles;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Singleton
public class DatabaseEventListener implements ApplicationEventListener<ServiceReadyEvent> {

private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final DatabaseMigrationCheck configsMigrationCheck;

private final DatabaseMigrationCheck jobsMigrationCheck;

public DatabaseEventListener(
@Named("configsDatabaseMigrationCheck") final DatabaseMigrationCheck configsMigrationCheck,
@Named("jobsDatabaseMigrationCheck") final DatabaseMigrationCheck jobsMigrationCheck) {
this.configsMigrationCheck = configsMigrationCheck;
this.jobsMigrationCheck = jobsMigrationCheck;
}

@Override
public void onApplicationEvent(final ServiceReadyEvent event) {
log.info("Checking configs database flyway migration version...");
try {
configsMigrationCheck.check();
} catch (final DatabaseCheckException e) {
throw new RuntimeException(e);
}

log.info("Checking jobs database flyway migration version...");
try {
jobsMigrationCheck.check();
} catch (final DatabaseCheckException e) {
throw new RuntimeException(e);
}
}

}
Loading

0 comments on commit c071fc5

Please sign in to comment.