Skip to content

Commit

Permalink
Lightweight export/import of configurations scoped by workspace Id (#…
Browse files Browse the repository at this point in the history
…5598)

* Scope export/import by workspace id
  • Loading branch information
ChristopheDuong committed Aug 30, 2021
1 parent 61842ed commit 314a747
Show file tree
Hide file tree
Showing 9 changed files with 1,173 additions and 94 deletions.
88 changes: 87 additions & 1 deletion airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1464,6 +1464,68 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/ImportRead"
/v1/deployment/export_workspace:
post:
tags:
- deployment
summary: Export Airbyte Workspace Configuration
operationId: exportWorkspace
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/WorkspaceIdRequestBody"
required: true
responses:
"200":
description: Successful operation
content:
application/x-gzip:
schema:
$ref: "#/components/schemas/AirbyteArchive"
/v1/deployment/upload_archive_resource:
post:
tags:
- deployment
summary: Upload a GZIP archive tarball and stage it in the server's cache as a temporary resource
operationId: uploadArchiveResource
requestBody:
content:
application/x-gzip:
schema:
$ref: "#/components/schemas/AirbyteArchive"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/UploadRead"
/v1/deployment/import_into_workspace:
post:
tags:
- deployment
summary: >
Import Airbyte Configuration into Workspace (this operation might change ids of imported
configurations). Note, in order to use this api endpoint, you might need to upload a
temporary archive resource with 'deployment/upload_archive_resource' first
operationId: importIntoWorkspace
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/ImportRequestBody"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/ImportRead"
"404":
$ref: "#/components/responses/NotFoundResponse"
components:
securitySchemes:
bearerAuth:
Expand Down Expand Up @@ -1948,7 +2010,6 @@ components:
DestinationCreate:
type: object
required:
- workspaceId
- name
- workspaceId
- destinationDefinitionId
Expand Down Expand Up @@ -2912,6 +2973,31 @@ components:
- failed
reason:
type: string
ResourceId:
type: string
format: uuid
UploadRead:
type: object
required:
- status
properties:
status:
type: string
enum:
- succeeded
- failed
resourceId:
$ref: "#/components/schemas/ResourceId"
ImportRequestBody:
type: object
required:
- resourceId
- workspaceId
properties:
resourceId:
$ref: "#/components/schemas/ResourceId"
workspaceId:
$ref: "#/components/schemas/WorkspaceId"
InvalidInputProperty:
type: object
required:
Expand Down
123 changes: 118 additions & 5 deletions airbyte-server/src/main/java/io/airbyte/server/ConfigDumpExporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,37 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.io.Archives;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.CloseableConsumer;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.yaml.Yamls;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.validation.json.JsonValidationException;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
Expand All @@ -65,10 +80,12 @@ public class ConfigDumpExporter {
private static final String VERSION_FILE_NAME = "VERSION";
private final ConfigRepository configRepository;
private final JobPersistence jobPersistence;
private final WorkspaceHelper workspaceHelper;

public ConfigDumpExporter(ConfigRepository configRepository, JobPersistence jobPersistence) {
public ConfigDumpExporter(ConfigRepository configRepository, JobPersistence jobPersistence, WorkspaceHelper workspaceHelper) {
this.configRepository = configRepository;
this.jobPersistence = jobPersistence;
this.workspaceHelper = workspaceHelper;
}

public File dump() {
Expand Down Expand Up @@ -122,13 +139,19 @@ private void dumpConfigsDatabase(Path parentFolder) throws IOException {
}
}

private void writeConfigsToArchive(final Path storageRoot,
final String schemaType,
final Stream<JsonNode> configs)
private static void writeConfigsToArchive(final Path storageRoot,
final String schemaType,
final Stream<JsonNode> configs)
throws IOException {
writeConfigsToArchive(storageRoot, schemaType, configs.collect(Collectors.toList()));
}

private static void writeConfigsToArchive(final Path storageRoot,
final String schemaType,
final List<JsonNode> configList)
throws IOException {
final Path configPath = buildConfigPath(storageRoot, schemaType);
Files.createDirectories(configPath.getParent());
final List<JsonNode> configList = configs.collect(Collectors.toList());
if (!configList.isEmpty()) {
final List<JsonNode> sortedConfigs = configList.stream()
.sorted(Comparator.comparing(JsonNode::toString)).collect(
Expand All @@ -145,4 +168,94 @@ private static Path buildConfigPath(final Path storageRoot, final String schemaT
.resolve(String.format("%s.yaml", schemaType));
}

public File exportWorkspace(UUID workspaceId) throws JsonValidationException, IOException, ConfigNotFoundException {
final Path tempFolder = Files.createTempDirectory(Path.of("/tmp"), ARCHIVE_FILE_NAME);
final File dump = Files.createTempFile(ARCHIVE_FILE_NAME, ".tar.gz").toFile();
exportVersionFile(tempFolder);
exportConfigsDatabase(tempFolder, workspaceId);

Archives.createArchive(tempFolder, dump.toPath());
return dump;
}

private void exportConfigsDatabase(Path parentFolder, UUID workspaceId) throws IOException, JsonValidationException, ConfigNotFoundException {
final Collection<SourceConnection> sourceConnections = writeConfigsToArchive(
parentFolder,
ConfigSchema.SOURCE_CONNECTION.name(),
configRepository::listSourceConnection,
(sourceConnection) -> workspaceId.equals(sourceConnection.getWorkspaceId()));
writeConfigsToArchive(parentFolder, ConfigSchema.STANDARD_SOURCE_DEFINITION.name(),
() -> listSourceDefinition(sourceConnections),
(config) -> true);

final Collection<DestinationConnection> destinationConnections = writeConfigsToArchive(
parentFolder,
ConfigSchema.DESTINATION_CONNECTION.name(),
configRepository::listDestinationConnection,
(destinationConnection) -> workspaceId.equals(destinationConnection.getWorkspaceId()));
writeConfigsToArchive(parentFolder, ConfigSchema.STANDARD_DESTINATION_DEFINITION.name(),
() -> listDestinationDefinition(destinationConnections),
(config) -> true);

writeConfigsToArchive(
parentFolder,
ConfigSchema.STANDARD_SYNC_OPERATION.name(),
configRepository::listStandardSyncOperations,
(operation) -> workspaceId.equals(operation.getWorkspaceId()));

final List<StandardSync> standardSyncs = new ArrayList<>();
for (StandardSync standardSync : configRepository.listStandardSyncs()) {
if (workspaceHelper != null &&
workspaceId.equals(workspaceHelper.getWorkspaceForConnection(standardSync.getSourceId(), standardSync.getDestinationId()))) {
standardSyncs.add(standardSync);
}
}
writeConfigsToArchive(parentFolder, ConfigSchema.STANDARD_SYNC.name(), standardSyncs.stream().map(Jsons::jsonNode));
}

private <T> Collection<T> writeConfigsToArchive(Path parentFolder,
String configSchemaName,
ListConfigCall<T> listConfigCall,
Function<T, Boolean> filterConfigCall)
throws JsonValidationException, ConfigNotFoundException, IOException {
final Collection<T> configs = listConfigCall.apply().stream().filter(filterConfigCall::apply).collect(Collectors.toList());
writeConfigsToArchive(parentFolder, configSchemaName, configs.stream().map(Jsons::jsonNode));
return configs;
}

private Collection<StandardSourceDefinition> listSourceDefinition(Collection<SourceConnection> sourceConnections)
throws JsonValidationException, ConfigNotFoundException, IOException {
final Map<UUID, StandardSourceDefinition> sourceDefinitionMap = new HashMap<>();
for (SourceConnection sourceConnection : sourceConnections) {
if (!sourceDefinitionMap.containsKey(sourceConnection.getSourceDefinitionId())) {
sourceDefinitionMap
.put(sourceConnection.getSourceDefinitionId(),
configRepository.getStandardSourceDefinition(sourceConnection.getSourceDefinitionId()));
}
}
return sourceDefinitionMap.values();
}

private Collection<StandardDestinationDefinition> listDestinationDefinition(Collection<DestinationConnection> destinationConnections)
throws JsonValidationException, ConfigNotFoundException, IOException {
final Map<UUID, StandardDestinationDefinition> destinationDefinitionMap = new HashMap<>();
for (DestinationConnection destinationConnection : destinationConnections) {
if (!destinationDefinitionMap.containsKey(destinationConnection.getDestinationDefinitionId())) {
destinationDefinitionMap
.put(destinationConnection.getDestinationDefinitionId(),
configRepository.getStandardDestinationDefinition(destinationConnection.getDestinationDefinitionId()));
}
}
return destinationDefinitionMap.values();
}

/**
* List all configurations of type @param <T> that already exists
*/
public interface ListConfigCall<T> {

Collection<T> apply() throws IOException, JsonValidationException, ConfigNotFoundException;

}

}
Loading

0 comments on commit 314a747

Please sign in to comment.