Skip to content

Commit

Permalink
add ResetSourceConfiguration to JobResetConnectionConfig, and retriev… (
Browse files Browse the repository at this point in the history
#13696)

* add ResetSourceConfiguration to JobResetConnectionConfig, and retrieve in GenerateInputActivity

* add newline

* add comment explaining null check

* format

* more formatting
  • Loading branch information
lmossman committed Jun 10, 2022
1 parent 04d88f4 commit 7134625
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 2 deletions.
2 changes: 1 addition & 1 deletion airbyte-config/config-models/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ This module uses `jsonschema2pojo` to generate Java config objects from [json sc
```
- Run the following command under the project root:
```sh
SUB_BUILD=PLATFORM ./gradlew airbyte-config:models:generateJsonSchema2Pojo
SUB_BUILD=PLATFORM ./gradlew airbyte-config:config-models:generateJsonSchema2Pojo
```
The generated file is under:
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,5 @@ properties:
type: object
description: optional resource requirements to run sync workers
existingJavaType: io.airbyte.config.ResourceRequirements
resetSourceConfiguration:
"$ref": ResetSourceConfiguration.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/ResetSourceConfiguration.yaml
title: ResetSourceConfiguration
description: configuration of the reset source
type: object
additionalProperties: true
required:
- streamDescriptors
properties:
streamDescriptors:
type: array
items:
"$ref": StreamDescriptor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.JobResetConnectionConfig;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.ResetSourceConfiguration;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.scheduler.models.IntegrationLauncherConfig;
import io.airbyte.scheduler.models.Job;
Expand All @@ -31,13 +32,16 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) {
JobSyncConfig config = job.getConfig().getSync();
if (input.isReset()) {
final JobResetConnectionConfig resetConnection = job.getConfig().getResetConnection();
final ResetSourceConfiguration resetSourceConfiguration = resetConnection.getResetSourceConfiguration();
config = new JobSyncConfig()
.withNamespaceDefinition(resetConnection.getNamespaceDefinition())
.withNamespaceFormat(resetConnection.getNamespaceFormat())
.withPrefix(resetConnection.getPrefix())
.withSourceDockerImage(WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB)
.withDestinationDockerImage(resetConnection.getDestinationDockerImage())
.withSourceConfiguration(Jsons.emptyObject())
// null check for backwards compatibility with reset jobs that did not have a
// resetSourceConfiguration
.withSourceConfiguration(resetSourceConfiguration == null ? Jsons.emptyObject() : Jsons.jsonNode(resetSourceConfiguration))
.withDestinationConfiguration(resetConnection.getDestinationConfiguration())
.withConfiguredAirbyteCatalog(resetConnection.getConfiguredAirbyteCatalog())
.withOperationSequence(resetConnection.getOperationSequence())
Expand Down

0 comments on commit 7134625

Please sign in to comment.