Skip to content

Commit

Permalink
Merge standard sync schedule to standard sync (#3472)
Browse files Browse the repository at this point in the history
* Copy schedule schema to standard sync

* Add config model readme

* Remove references of standard sync schedule

* Add migration

* Fix unit test

* Remove standard sync schedule schema

* Fix unit test

* Remove unnecessary migration resource

* Add migration test

* Apply spotless format

* Add minor updates

* Update readme
  • Loading branch information
tuliren committed May 20, 2021
1 parent a31d6fb commit 21ba32c
Show file tree
Hide file tree
Showing 26 changed files with 585 additions and 219 deletions.
20 changes: 20 additions & 0 deletions airbyte-config/models/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Config Models

This module uses `jsonschema2pojo` to generate Java config objects from [json schema](https://json-schema.org/) definitions. See [build.gradle](./build.gradle) for details.

## How to use
- Update json schema under:
```
src/main/resources/types/
```
- Run the following command under the project root:
```sh
./gradlew airbyte-config:models:generateJsonSchema2Pojo
```
The generated file is under:
```
build/generated/src/gen/java/io/airbyte/config/
```

## Reference
- [`jsonschema2pojo` plugin](https://github.com/joelittlejohn/jsonschema2pojo/tree/master/jsonschema2pojo-gradle-plugin).
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public enum ConfigSchema {
// sync
STANDARD_SYNC("StandardSync.yaml"),
STANDARD_SYNC_OPERATION("StandardSyncOperation.yaml"),
STANDARD_SYNC_SCHEDULE("StandardSyncSchedule.yaml"),
STANDARD_SYNC_SUMMARY("StandardSyncSummary.yaml"),

// worker
Expand Down
24 changes: 24 additions & 0 deletions airbyte-config/models/src/main/resources/types/StandardSync.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ required:
- destinationId
- name
- catalog
- manual
additionalProperties: false
properties:
prefix:
Expand Down Expand Up @@ -38,3 +39,26 @@ properties:
- active
- inactive
- deprecated
# Ideally schedule and manual should be a union, but java
# codegen does not handle the union type properly.
# When schedule is defined, manual should be false.
schedule:
type: object
required:
- timeUnit
- units
additionalProperties: false
properties:
timeUnit:
type: string
enum:
- minutes
- hours
- days
- weeks
- months
units:
type: integer
# When manual is true, schedule should be null, and will be ignored.
manual:
type: boolean

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncSchedule;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
Expand Down Expand Up @@ -192,14 +191,6 @@ public List<StandardSync> listStandardSyncs() throws ConfigNotFoundException, IO
return persistence.listConfigs(ConfigSchema.STANDARD_SYNC, StandardSync.class);
}

public StandardSyncSchedule getStandardSyncSchedule(final UUID connectionId) throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(ConfigSchema.STANDARD_SYNC_SCHEDULE, connectionId.toString(), StandardSyncSchedule.class);
}

public void writeStandardSchedule(final StandardSyncSchedule schedule) throws JsonValidationException, IOException {
persistence.writeConfig(ConfigSchema.STANDARD_SYNC_SCHEDULE, schedule.getConnectionId().toString(), schedule);
}

public StandardSyncOperation getStandardSyncOperation(final UUID operationId) throws JsonValidationException, IOException, ConfigNotFoundException {
return persistence.getConfig(ConfigSchema.STANDARD_SYNC_OPERATION, operationId.toString(), StandardSyncOperation.class);
}
Expand Down
42 changes: 26 additions & 16 deletions airbyte-migration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,20 @@

This module migrates configs specified in `airbyte-config` to new versions.

## Run production migration in docker
## Change Airbyte Configs
- Update the config json schema in [`airbyte-config/models`](../airbyte-config/models).
- Add the changed json schema to the [main resources](./src/main/resources/migrations).
- If a migration is needed, create a migration file under [`io.airbyte.migrate.migrations`](./src/main/java/io/airbyte/migrate/migrations).
- Register the migration in [`Migrations.java`](./src/main/java/io/airbyte/migrate/Migrations.java).
- If needed, write a migration unit test under [`io.airbyte.migrate.migrations`](./src/test/java/io/airbyte/migrate/migrations).
- Test the migration locally in IDE or commandline (see below).

```sh
BUILD_VERSION=$(cat .env | grep VERSION | awk -F"=" '{print $2}')
INPUT_PATH=<path to directory containing downloaded airbyte_archive.tar.gz>
OUTPUT_PATH=<path to where migrated archive will be written (should end in .tar.gz)>
TARGET_VERSION=<version you are migrating to or empty for latest>

docker run --rm -v ${INPUT_PATH}:/config airbyte/migration:${BUILD_VERSION} -- \
--input /config/airbyte_archive.tar.gz \
--output ${OUTPUT_PATH} \
[ --target-version ${TARGET_VERSION} ]
```
## Test Migration Locally

See [Upgrading Airbyte](https://docs.airbyte.io/tutorials/upgrading-airbyte) for details.

## Run dev migration in IDE
### IDE
Run `MigrationRunner.java` with arguments (`--input`, `--output`, `--target-version`).

## Run dev migration in command line
### Command line

Run the following command in project root:

Expand All @@ -42,3 +36,19 @@ bin/airbyte-migration \
```

See [MigrationRunner](./src/main/java/io/airbyte/migrate/MigrationRunner.java) for details.

## Run migration in production

```sh
BUILD_VERSION=$(cat .env | grep VERSION | awk -F"=" '{print $2}')
INPUT_PATH=<path to directory containing downloaded airbyte_archive.tar.gz>
OUTPUT_PATH=<path to where migrated archive will be written (should end in .tar.gz)>
TARGET_VERSION=<version you are migrating to or empty for latest>

docker run --rm -v ${INPUT_PATH}:/config airbyte/migration:${BUILD_VERSION} -- \
--input /config/airbyte_archive.tar.gz \
--output ${OUTPUT_PATH} \
[ --target-version ${TARGET_VERSION} ]
```

See [Upgrading Airbyte](https://docs.airbyte.io/tutorials/upgrading-airbyte) for details.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.airbyte.migrate.migrations.MigrationV0_18_0;
import io.airbyte.migrate.migrations.MigrationV0_20_0;
import io.airbyte.migrate.migrations.MigrationV0_23_0;
import io.airbyte.migrate.migrations.MigrationV0_24_0;
import io.airbyte.migrate.migrations.NoOpMigration;
import java.util.List;

Expand All @@ -47,6 +48,7 @@ public class Migrations {
private static final Migration MIGRATION_V_0_21_0 = new NoOpMigration(MIGRATION_V_0_20_0, "0.21.0-alpha");
private static final Migration MIGRATION_V_0_22_0 = new NoOpMigration(MIGRATION_V_0_21_0, "0.22.0-alpha");
private static final Migration MIGRATION_V_0_23_0 = new MigrationV0_23_0(MIGRATION_V_0_22_0);
private static final Migration MIGRATION_V_0_24_0 = new MigrationV0_24_0(MIGRATION_V_0_23_0);

// all migrations must be added to the list in the order that they should be applied.
public static final List<Migration> MIGRATIONS = ImmutableList.of(
Expand All @@ -60,6 +62,7 @@ public class Migrations {
MIGRATION_V_0_20_0,
MIGRATION_V_0_21_0,
MIGRATION_V_0_22_0,
MIGRATION_V_0_23_0);
MIGRATION_V_0_23_0,
MIGRATION_V_0_24_0);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.migrate.migrations;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.migrate.Migration;
import io.airbyte.migrate.MigrationUtils;
import io.airbyte.migrate.ResourceId;
import io.airbyte.migrate.ResourceType;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

// This migration does the following:
// 1. Merge StandardSyncSchedule into StandardSync.
// 2. Remove StandardSyncSchedule.
public class MigrationV0_24_0 extends BaseMigration implements Migration {

private static final Logger LOGGER = LoggerFactory.getLogger(MigrationV0_24_0.class);

protected static final ResourceId STANDARD_SYNC_RESOURCE_ID = ResourceId
.fromConstantCase(ResourceType.CONFIG, "STANDARD_SYNC");
protected static final ResourceId STANDARD_SYNC_SCHEDULE_RESOURCE_ID = ResourceId
.fromConstantCase(ResourceType.CONFIG, "STANDARD_SYNC_SCHEDULE");

private static final String MIGRATION_VERSION = "0.24.0-alpha";
private static final Path CONFIG_PATH = Path.of("migrations/migrationV0_24_0");

private final Migration previousMigration;

public MigrationV0_24_0(Migration previousMigration) {
super(previousMigration);
this.previousMigration = previousMigration;
}

@Override
public String getVersion() {
return MIGRATION_VERSION;
}

@Override
public Map<ResourceId, JsonNode> getOutputSchema() {
final Map<ResourceId, JsonNode> outputSchema = new HashMap<>(
previousMigration.getOutputSchema());
outputSchema.remove(STANDARD_SYNC_SCHEDULE_RESOURCE_ID);
outputSchema.put(
STANDARD_SYNC_RESOURCE_ID,
MigrationUtils.getSchemaFromResourcePath(CONFIG_PATH, STANDARD_SYNC_RESOURCE_ID));
return outputSchema;
}

@Override
public void migrate(Map<ResourceId, Stream<JsonNode>> inputData,
Map<ResourceId, Consumer<JsonNode>> outputData) {
// Create a map from connection id to standard sync schedule nodes
// to "join" the schedule onto the standard sync node later.
final Map<String, JsonNode> connectionToScheduleNodes = inputData
.get(STANDARD_SYNC_SCHEDULE_RESOURCE_ID)
.collect(Collectors.toMap(r -> r.get("connectionId").asText(), r -> r));

for (final Map.Entry<ResourceId, Stream<JsonNode>> inputEntry : inputData.entrySet()) {
// Skip standard sync schedule.
if (inputEntry.getKey().equals(STANDARD_SYNC_SCHEDULE_RESOURCE_ID)) {
continue;
}

inputEntry.getValue().forEach(jsonNode -> {
if (inputEntry.getKey().equals(STANDARD_SYNC_RESOURCE_ID)) {
// "Join" the standard sync schedule node onto the standard sync.
final String connectionId = jsonNode.get("connectionId").asText();
final ObjectNode standardSync = (ObjectNode) jsonNode;
final ObjectNode syncSchedule = (ObjectNode) connectionToScheduleNodes.get(connectionId);
if (syncSchedule == null) {
LOGGER.warn(
"No standard sync schedule config exists for connection {}, will default to manual sync",
connectionId);
standardSync.set("manual", Jsons.jsonNode(true));
return;
}

final JsonNode manual = syncSchedule.get("manual");
standardSync.set("manual", manual);

final JsonNode schedule = syncSchedule.get("schedule");
if (schedule != null && !manual.asBoolean()) {
standardSync.set("schedule", schedule);
}

LOGGER.info(
"Schedule added to standard sync config for connection {} (manual: {}, schedule: {})",
connectionId, manual, schedule);
}

final Consumer<JsonNode> outputConsumer = outputData.get(inputEntry.getKey());
outputConsumer.accept(jsonNode);
});
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/StandardSync.yaml
title: StandardSync
description: configuration required for sync for ALL taps
type: object
required:
- sourceId
- destinationId
- name
- catalog
- manual
additionalProperties: false
properties:
prefix:
description: Prefix that will be prepended to the name of each stream when it is written to the destination.
type: string
sourceId:
type: string
format: uuid
destinationId:
type: string
format: uuid
operationIds:
type: array
items:
type: string
format: uuid
connectionId:
type: string
format: uuid
name:
type: string
catalog:
existingJavaType: io.airbyte.protocol.models.ConfiguredAirbyteCatalog
status:
type: string
enum:
- active
- inactive
- deprecated
# Ideally schedule and manual should be a union, but java
# codegen does not handle the union type properly.
# When schedule is defined, manual should be false.
schedule:
type: object
required:
- timeUnit
- units
additionalProperties: false
properties:
timeUnit:
type: string
enum:
- minutes
- hours
- days
- weeks
- months
units:
type: integer
# When manual is true, schedule should be null, and will be ignored.
manual:
type: boolean
Loading

0 comments on commit 21ba32c

Please sign in to comment.