Skip to content

Commit

Permalink
implement PerfBackgroundJsonValidation feature-flag (#21569)
Browse files Browse the repository at this point in the history
builds on add perf feature-flag; ensure compatability with LaunchDarkly (LDv5) users #21534
add support for checking the PerfBackgroundJsonValidation feature-flag within the RecordSchemaValidator class

I spent a lot of time with trying to get the Kotlin beans injectable into the java code. What should just work doesn't and it well documented on the Micronaut side as to what exactly is required (or at least I never found the documentation). After many attempts I and looking at the auto-generated kotlin Micronaut projects I landed on kapt being necessary in kotlin projects. Even this isn't entirely true.

If the Kotlin project is defined with the following gradle setup, the beans will not be visible to java projects (and possibly other projects as well):

plugins {
    kotlin("jvm") version "1.8.0"
}

dependencies {
    annotationProcessor(platform(libs.micronaut.bom))
    annotationProcessor(libs.bundles.micronaut.annotation.processor)
}
But they will be visible with the following setup:

plugins {
    kotlin("jvm") version "1.8.0"
    id("io.micronaut.minimal.application") version "3.7.0"
}

dependencies {
    annotationProcessor(platform(libs.micronaut.bom))
    annotationProcessor(libs.bundles.micronaut.annotation.processor)
}

micronaut {
  version("3.8.2")
}
and also visible with the following setup:

plugins {
    kotlin("jvm") version "1.8.0"
    kotlin("kapt") version "1.8.0"
}

dependencies {
    kapt(platform(libs.micronaut.bom))
    kapt(libs.bundles.micronaut.annotation.processor)
}
I went with the kapt solution.

It's worth noting that kapt is deprecated and is being replaced with kps in Micronaut 4 (this is a good thing).

---------

Co-authored-by: Davin Chia <davinchia@gmail.com>
  • Loading branch information
colesnodgrass and davinchia committed Jan 31, 2023
1 parent e9d8b7a commit e32215f
Show file tree
Hide file tree
Showing 23 changed files with 228 additions and 125 deletions.
3 changes: 2 additions & 1 deletion airbyte-commons-worker/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ dependencies {
implementation 'io.fabric8:kubernetes-client:5.12.2'
implementation 'com.auth0:java-jwt:3.19.2'
implementation libs.guava
implementation (libs.temporal.sdk) {
implementation(libs.temporal.sdk) {
exclude module: 'guava'
}
implementation 'org.apache.ant:ant:1.10.10'
Expand All @@ -24,6 +24,7 @@ dependencies {
implementation project(':airbyte-commons-temporal')
implementation project(':airbyte-config:config-models')
implementation project(':airbyte-config:config-persistence')
implementation project(':airbyte-featureflag')
implementation project(':airbyte-json-validation')
implementation project(':airbyte-metrics:metrics-lib')
implementation project(':airbyte-persistence:job-persistence')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,41 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.PerfBackgroundJsonValidation;
import io.airbyte.featureflag.Workspace;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.exception.RecordSchemaValidationException;
import java.lang.invoke.MethodHandles;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Validates that AirbyteRecordMessage data conforms to the JSON schema defined by the source's
* configured catalog
*/

public class RecordSchemaValidator {

private static final JsonSchemaValidator validator = new JsonSchemaValidator();
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

private final FeatureFlagClient featureFlagClient;
private final UUID workspaceId;
private static final JsonSchemaValidator validator = new JsonSchemaValidator();
private final Map<AirbyteStreamNameNamespacePair, JsonNode> streams;

public RecordSchemaValidator(final Map<AirbyteStreamNameNamespacePair, JsonNode> streamNamesToSchemas) {
public RecordSchemaValidator(final FeatureFlagClient featureFlagClient,
final UUID workspaceId,
final Map<AirbyteStreamNameNamespacePair, JsonNode> streamNamesToSchemas) {
this.featureFlagClient = featureFlagClient;
this.workspaceId = workspaceId;
// streams is Map of a stream source namespace + name mapped to the stream schema
// for easy access when we check each record's schema
this.streams = streamNamesToSchemas;
Expand Down Expand Up @@ -55,6 +68,16 @@ public void validateSchema(final AirbyteRecordMessage message, final AirbyteStre
final JsonNode messageData = message.getData();
final JsonNode matchingSchema = streams.get(messageStream);

if (workspaceId != null) {
if (featureFlagClient.enabled(PerfBackgroundJsonValidation.INSTANCE, new Workspace(workspaceId))) {
log.info("feature flag enabled for workspace {}", workspaceId);
} else {
log.info("feature flag disabled for workspace {}", workspaceId);
}
} else {
log.info("workspace id is null");
}

try {
validator.ensureInitializedSchema(messageStream.toString(), messageData);
} catch (final JsonValidationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ public static ImmutablePair<StandardSync, StandardSyncInput> createSyncConfig(fi
.withCatalog(standardSync.getCatalog())
.withSourceConfiguration(sourceConnectionConfig.getConfiguration())
.withState(state)
.withOperationSequence(List.of(normalizationOperation, customDbtOperation));
.withOperationSequence(List.of(normalizationOperation, customDbtOperation))
.withWorkspaceId(workspaceId);

return new ImmutablePair<>(standardSync, syncInput);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.featureflag.TestClient;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.workers.exception.RecordSchemaValidationException;
Expand All @@ -34,13 +35,17 @@ void setup() throws Exception {

@Test
void testValidateValidSchema() throws Exception {
final RecordSchemaValidator recordSchemaValidator = new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput));
final var featureFlagClient = new TestClient();
final var recordSchemaValidator = new RecordSchemaValidator(featureFlagClient, syncInput.getWorkspaceId(),
WorkerUtils.mapStreamNamesToSchemas(syncInput));
recordSchemaValidator.validateSchema(VALID_RECORD.getRecord(), AirbyteStreamNameNamespacePair.fromRecordMessage(VALID_RECORD.getRecord()));
}

@Test
void testValidateInvalidSchema() throws Exception {
final RecordSchemaValidator recordSchemaValidator = new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput));
final var featureFlagClient = new TestClient();
final RecordSchemaValidator recordSchemaValidator = new RecordSchemaValidator(featureFlagClient, syncInput.getWorkspaceId(),
WorkerUtils.mapStreamNamesToSchemas(syncInput));
assertThrows(RecordSchemaValidationException.class, () -> recordSchemaValidator.validateSchema(INVALID_RECORD.getRecord(),
AirbyteStreamNameNamespacePair.fromRecordMessage(INVALID_RECORD.getRecord())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.featureflag.TestClient;
import io.airbyte.metrics.lib.MetricClient;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.protocol.models.AirbyteLogMessage.Level;
Expand All @@ -48,7 +49,9 @@
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.protocol.models.Config;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.workers.*;
import io.airbyte.workers.RecordSchemaValidator;
import io.airbyte.workers.WorkerMetricReporter;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.ConnectorConfigUpdater;
import io.airbyte.workers.helper.FailureHelper;
Expand Down Expand Up @@ -456,8 +459,9 @@ void testOnlySelectedFieldsDeliveredToDestinationWithFieldSelectionEnabled() thr
// Use a real schema validator to make sure validation doesn't affect this.
final String streamName = sourceConfig.getCatalog().getStreams().get(0).getStream().getName();
final String streamNamespace = sourceConfig.getCatalog().getStreams().get(0).getStream().getNamespace();
recordSchemaValidator = new RecordSchemaValidator(Map.of(new AirbyteStreamNameNamespacePair(streamName, streamNamespace),
sourceConfig.getCatalog().getStreams().get(0).getStream().getJsonSchema()));
recordSchemaValidator = new RecordSchemaValidator(new TestClient(), syncInput.getWorkspaceId(),
Map.of(new AirbyteStreamNameNamespacePair(streamName, streamNamespace),
sourceConfig.getCatalog().getStreams().get(0).getStream().getJsonSchema()));
final ReplicationWorker worker = new DefaultReplicationWorker(
JOB_ID,
JOB_ATTEMPT,
Expand Down Expand Up @@ -487,8 +491,9 @@ void testAllFieldsDeliveredWithFieldSelectionDisabled() throws Exception {
// Use a real schema validator to make sure validation doesn't affect this.
final String streamName = sourceConfig.getCatalog().getStreams().get(0).getStream().getName();
final String streamNamespace = sourceConfig.getCatalog().getStreams().get(0).getStream().getNamespace();
recordSchemaValidator = new RecordSchemaValidator(Map.of(new AirbyteStreamNameNamespacePair(streamName, streamNamespace),
sourceConfig.getCatalog().getStreams().get(0).getStream().getJsonSchema()));
recordSchemaValidator = new RecordSchemaValidator(new TestClient(), syncInput.getWorkspaceId(),
Map.of(new AirbyteStreamNameNamespacePair(streamName, streamNamespace),
sourceConfig.getCatalog().getStreams().get(0).getStream().getJsonSchema()));
final ReplicationWorker worker = new DefaultReplicationWorker(
JOB_ID,
JOB_ATTEMPT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType;
import io.airbyte.config.ReplicationOutput;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.featureflag.TestClient;
import io.airbyte.metrics.lib.NotImplementedMetricClient;
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.CatalogHelpers;
Expand All @@ -36,6 +37,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import lombok.extern.slf4j.Slf4j;
import org.mockito.Mockito;
Expand Down Expand Up @@ -80,7 +82,8 @@ public void executeOneSync() throws InterruptedException {
final var connectorConfigUpdater = Mockito.mock(ConnectorConfigUpdater.class);
final var metricReporter = new WorkerMetricReporter(new NotImplementedMetricClient(), "test-image:0.01");
final var dstNamespaceMapper = new NamespacingMapper(NamespaceDefinitionType.DESTINATION, "", "");
final var validator = new RecordSchemaValidator(Map.of(
final var workspaceID = UUID.randomUUID();
final var validator = new RecordSchemaValidator(new TestClient(), workspaceID, Map.of(
new AirbyteStreamNameNamespacePair("s1", null),
CatalogHelpers.fieldsToJsonSchema(io.airbyte.protocol.models.Field.of("data", JsonSchemaType.STRING))));

Expand Down
1 change: 1 addition & 0 deletions airbyte-container-orchestrator/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {
implementation project(':airbyte-commons-temporal')
implementation project(':airbyte-commons-worker')
implementation project(':airbyte-config:init')
implementation project(':airbyte-featureflag')
implementation project(':airbyte-json-validation')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-metrics:metrics-lib')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.airbyte.container_orchestrator.orchestrator.NoOpOrchestrator;
import io.airbyte.container_orchestrator.orchestrator.NormalizationJobOrchestrator;
import io.airbyte.container_orchestrator.orchestrator.ReplicationJobOrchestrator;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.process.AsyncOrchestratorPodProcess;
Expand Down Expand Up @@ -99,14 +100,16 @@ JobOrchestrator<?> jobOrchestrator(
final EnvConfigs envConfigs,
final ProcessFactory processFactory,
final FeatureFlags featureFlags,
final FeatureFlagClient featureFlagClient,
final WorkerConfigs workerConfigs,
final AirbyteMessageSerDeProvider serdeProvider,
final AirbyteProtocolVersionedMigratorFactory migratorFactory,
final JobRunConfig jobRunConfig,
final SourceApi sourceApi,
final DestinationApi destinationApi) {
return switch (application) {
case ReplicationLauncherWorker.REPLICATION -> new ReplicationJobOrchestrator(envConfigs, processFactory, featureFlags, serdeProvider,
case ReplicationLauncherWorker.REPLICATION -> new ReplicationJobOrchestrator(envConfigs, processFactory, featureFlags, featureFlagClient,
serdeProvider,
migratorFactory, jobRunConfig, sourceApi, destinationApi);
case NormalizationLauncherWorker.NORMALIZATION -> new NormalizationJobOrchestrator(envConfigs, processFactory, jobRunConfig);
case DbtLauncherWorker.DBT -> new DbtJobOrchestrator(envConfigs, workerConfigs, processFactory, jobRunConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airbyte.config.Configs;
import io.airbyte.config.ReplicationOutput;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricEmittingApps;
Expand Down Expand Up @@ -61,6 +62,7 @@ public class ReplicationJobOrchestrator implements JobOrchestrator<StandardSyncI
private final ProcessFactory processFactory;
private final Configs configs;
private final FeatureFlags featureFlags;
private final FeatureFlagClient featureFlagClient;
private final AirbyteMessageSerDeProvider serDeProvider;
private final AirbyteProtocolVersionedMigratorFactory migratorFactory;
private final JobRunConfig jobRunConfig;
Expand All @@ -70,6 +72,7 @@ public class ReplicationJobOrchestrator implements JobOrchestrator<StandardSyncI
public ReplicationJobOrchestrator(final Configs configs,
final ProcessFactory processFactory,
final FeatureFlags featureFlags,
final FeatureFlagClient featureFlagClient,
final AirbyteMessageSerDeProvider serDeProvider,
final AirbyteProtocolVersionedMigratorFactory migratorFactory,
final JobRunConfig jobRunConfig,
Expand All @@ -78,6 +81,7 @@ public ReplicationJobOrchestrator(final Configs configs,
this.configs = configs;
this.processFactory = processFactory;
this.featureFlags = featureFlags;
this.featureFlagClient = featureFlagClient;
this.serDeProvider = serDeProvider;
this.migratorFactory = migratorFactory;
this.jobRunConfig = jobRunConfig;
Expand Down Expand Up @@ -166,7 +170,7 @@ public Optional<String> runJob() throws Exception {
Optional.of(syncInput.getCatalog())),
migratorFactory.getProtocolSerializer(destinationLauncherConfig.getProtocolVersion())),
new AirbyteMessageTracker(featureFlags),
new RecordSchemaValidator(WorkerUtils.mapStreamNamesToSchemas(syncInput)),
new RecordSchemaValidator(featureFlagClient, syncInput.getWorkspaceId(), WorkerUtils.mapStreamNamesToSchemas(syncInput)),
metricReporter,
new ConnectorConfigUpdater(sourceApi, destinationApi),
FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, syncInput.getWorkspaceId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ airbyte:
acceptance:
test:
enabled: ${ACCEPTANCE_TEST_ENABLED:false}
config-dir: /config
control:
plane:
auth-endpoint: ${CONTROL_PLANE_AUTH_ENDPOINT:}
Expand All @@ -16,6 +15,10 @@ airbyte:
service-account:
credentials-path: ${DATA_PLANE_SERVICE_ACCOUNT_CREDENTIALS_PATH:}
email: ${DATA_PLANE_SERVICE_ACCOUNT_EMAIL:}
feature-flag:
client: ${FEATURE_FLAG_CLIENT:config}
path: ${FEATURE_FLAG_PATH:/flags}
api-key: ${LAUNCHDARKLY_KEY:}
internal:
api:
auth-header:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import io.airbyte.commons.protocol.AirbyteMessageSerDeProvider;
import io.airbyte.commons.protocol.AirbyteProtocolVersionedMigratorFactory;
import io.airbyte.config.EnvConfigs;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.TestClient;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.process.AsyncOrchestratorPodProcess;
Expand All @@ -23,8 +25,11 @@
import io.airbyte.workers.sync.DbtLauncherWorker;
import io.airbyte.workers.sync.NormalizationLauncherWorker;
import io.airbyte.workers.sync.ReplicationLauncherWorker;
import io.micronaut.context.annotation.Bean;
import io.micronaut.context.annotation.Replaces;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import jakarta.inject.Inject;
import java.util.Map;
import org.junit.jupiter.api.Test;

@MicronautTest
Expand All @@ -33,6 +38,10 @@ class ContainerOrchestratorFactoryTest {
@Inject
FeatureFlags featureFlags;

@Bean
@Replaces(FeatureFlagClient.class)
FeatureFlagClient featureFlagClient = new TestClient(Map.of());

@Inject
EnvConfigs envConfigs;

Expand Down Expand Up @@ -94,29 +103,29 @@ void jobOrchestrator() {
final var factory = new ContainerOrchestratorFactory();

final var repl = factory.jobOrchestrator(
ReplicationLauncherWorker.REPLICATION, envConfigs, processFactory, featureFlags, workerConfigs,
ReplicationLauncherWorker.REPLICATION, envConfigs, processFactory, featureFlags, featureFlagClient, workerConfigs,
airbyteMessageSerDeProvider, airbyteProtocolVersionedMigratorFactory, jobRunConfig, sourceApi, destinationApi);
assertEquals("Replication", repl.getOrchestratorName());

final var norm = factory.jobOrchestrator(
NormalizationLauncherWorker.NORMALIZATION, envConfigs, processFactory, featureFlags, workerConfigs,
NormalizationLauncherWorker.NORMALIZATION, envConfigs, processFactory, featureFlags, featureFlagClient, workerConfigs,
airbyteMessageSerDeProvider, airbyteProtocolVersionedMigratorFactory, jobRunConfig, sourceApi, destinationApi);
assertEquals("Normalization", norm.getOrchestratorName());

final var dbt = factory.jobOrchestrator(
DbtLauncherWorker.DBT, envConfigs, processFactory, featureFlags, workerConfigs,
DbtLauncherWorker.DBT, envConfigs, processFactory, featureFlags, featureFlagClient, workerConfigs,
airbyteMessageSerDeProvider, airbyteProtocolVersionedMigratorFactory, jobRunConfig, sourceApi, destinationApi);
assertEquals("DBT Transformation", dbt.getOrchestratorName());

final var noop = factory.jobOrchestrator(
AsyncOrchestratorPodProcess.NO_OP, envConfigs, processFactory, featureFlags, workerConfigs,
AsyncOrchestratorPodProcess.NO_OP, envConfigs, processFactory, featureFlags, featureFlagClient, workerConfigs,
airbyteMessageSerDeProvider, airbyteProtocolVersionedMigratorFactory, jobRunConfig, sourceApi, destinationApi);
assertEquals("NO_OP", noop.getOrchestratorName());

var caught = false;
try {
factory.jobOrchestrator(
"does not exist", envConfigs, processFactory, featureFlags, workerConfigs,
"does not exist", envConfigs, processFactory, featureFlags, featureFlagClient, workerConfigs,
airbyteMessageSerDeProvider, airbyteProtocolVersionedMigratorFactory, jobRunConfig, sourceApi, destinationApi);
} catch (final Exception e) {
caught = true;
Expand Down
10 changes: 5 additions & 5 deletions airbyte-featureflag/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
`java-library`
kotlin("jvm") version "1.8.0"
kotlin("kapt") version "1.8.0"
}

dependencies {
annotationProcessor(platform(libs.micronaut.bom))
annotationProcessor(libs.bundles.micronaut.annotation.processor)
kapt(platform(libs.micronaut.bom))
kapt(libs.bundles.micronaut.annotation.processor)

implementation(platform(libs.micronaut.bom))
implementation(libs.micronaut.inject)
Expand All @@ -17,9 +18,8 @@ dependencies {
implementation(libs.jackson.dataformat)
implementation(libs.jackson.kotlin)

testAnnotationProcessor(platform(libs.micronaut.bom))
testAnnotationProcessor(libs.micronaut.inject)
testAnnotationProcessor(libs.bundles.micronaut.test.annotation.processor)
kaptTest(platform(libs.micronaut.bom))
kaptTest(libs.bundles.micronaut.test.annotation.processor)

testImplementation(kotlin("test"))
testImplementation(kotlin("test-junit5"))
Expand Down

0 comments on commit e32215f

Please sign in to comment.