Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lightweight export/import of configurations scoped by workspace Id #5598

Merged
merged 11 commits into from
Aug 30, 2021
88 changes: 87 additions & 1 deletion airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1416,6 +1416,68 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/ImportRead"
/v1/deployment/export_workspace:
post:
tags:
- deployment
summary: Export Airbyte Workspace Configuration
operationId: exportWorkspace
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/WorkspaceIdRequestBody"
required: true
responses:
"200":
description: Successful operation
content:
application/x-gzip:
schema:
$ref: "#/components/schemas/AirbyteArchive"
/v1/deployment/upload_archive_resource:
post:
tags:
- deployment
summary: Upload a GZIP archive tarball and stage it in the server's cache as a temporary resource
operationId: uploadArchiveResource
requestBody:
content:
application/x-gzip:
schema:
$ref: "#/components/schemas/AirbyteArchive"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/UploadRead"
/v1/deployment/import_into_workspace:
post:
tags:
- deployment
summary: >
Import Airbyte Configuration into Workspace (this operation might change ids of imported
configurations). Note, in order to use this api endpoint, you might need to upload a
temporary archive resource with 'deployment/upload_archive_resource' first
operationId: importIntoWorkspace
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/ImportRequestBody"
required: true
responses:
"200":
description: Successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/ImportRead"
"404":
$ref: "#/components/responses/NotFoundResponse"
components:
securitySchemes:
bearerAuth:
Expand Down Expand Up @@ -1900,7 +1962,6 @@ components:
DestinationCreate:
type: object
required:
- workspaceId
sherifnada marked this conversation as resolved.
Show resolved Hide resolved
- name
- workspaceId
- destinationDefinitionId
Expand Down Expand Up @@ -2795,6 +2856,31 @@ components:
- failed
reason:
type: string
ResourceId:
type: string
format: uuid
UploadRead:
type: object
required:
- status
properties:
status:
type: string
enum:
- succeeded
- failed
resourceId:
$ref: "#/components/schemas/ResourceId"
ImportRequestBody:
type: object
required:
- resourceId
- workspaceId
properties:
resourceId:
$ref: "#/components/schemas/ResourceId"
workspaceId:
$ref: "#/components/schemas/WorkspaceId"
InvalidInputProperty:
type: object
required:
Expand Down
123 changes: 118 additions & 5 deletions airbyte-server/src/main/java/io/airbyte/server/ConfigDumpExporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,37 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.io.Archives;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.CloseableConsumer;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.yaml.Yamls;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.validation.json.JsonValidationException;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
Expand All @@ -65,10 +80,12 @@ public class ConfigDumpExporter {
private static final String VERSION_FILE_NAME = "VERSION";
private final ConfigRepository configRepository;
private final JobPersistence jobPersistence;
private final WorkspaceHelper workspaceHelper;

public ConfigDumpExporter(ConfigRepository configRepository, JobPersistence jobPersistence) {
public ConfigDumpExporter(ConfigRepository configRepository, JobPersistence jobPersistence, WorkspaceHelper workspaceHelper) {
tuliren marked this conversation as resolved.
Show resolved Hide resolved
this.configRepository = configRepository;
this.jobPersistence = jobPersistence;
this.workspaceHelper = workspaceHelper;
}

public File dump() {
Expand Down Expand Up @@ -122,13 +139,19 @@ private void dumpConfigsDatabase(Path parentFolder) throws IOException {
}
}

private void writeConfigsToArchive(final Path storageRoot,
final String schemaType,
final Stream<JsonNode> configs)
private static void writeConfigsToArchive(final Path storageRoot,
final String schemaType,
final Stream<JsonNode> configs)
throws IOException {
writeConfigsToArchive(storageRoot, schemaType, configs.collect(Collectors.toList()));
}

private static void writeConfigsToArchive(final Path storageRoot,
final String schemaType,
final List<JsonNode> configList)
throws IOException {
final Path configPath = buildConfigPath(storageRoot, schemaType);
Files.createDirectories(configPath.getParent());
final List<JsonNode> configList = configs.collect(Collectors.toList());
if (!configList.isEmpty()) {
final List<JsonNode> sortedConfigs = configList.stream()
.sorted(Comparator.comparing(JsonNode::toString)).collect(
Expand All @@ -145,4 +168,94 @@ private static Path buildConfigPath(final Path storageRoot, final String schemaT
.resolve(String.format("%s.yaml", schemaType));
}

public File exportWorkspace(UUID workspaceId) throws JsonValidationException, IOException, ConfigNotFoundException {
final Path tempFolder = Files.createTempDirectory(Path.of("/tmp"), ARCHIVE_FILE_NAME);
final File dump = Files.createTempFile(ARCHIVE_FILE_NAME, ".tar.gz").toFile();
exportVersionFile(tempFolder);
exportConfigsDatabase(tempFolder, workspaceId);

Archives.createArchive(tempFolder, dump.toPath());
return dump;
}

private void exportConfigsDatabase(Path parentFolder, UUID workspaceId) throws IOException, JsonValidationException, ConfigNotFoundException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at a high level it feels like there should be an export(list<uuid> workspaceIds) instead of having two different implementations one scoped by workspace. Would that make sense here or am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This export function scoped by workspace Id is exporting configurations related to a workspace and then "cascade" down to other configurations that are being referred/dependent on them.

The second export without scope is dumping everything regardless of links/relations between configurations and thus, will cover more configurations than this export(uuid) or a export(list<uuid>) function

final Collection<SourceConnection> sourceConnections = writeConfigsToArchive(
parentFolder,
ConfigSchema.SOURCE_CONNECTION.name(),
configRepository::listSourceConnection,
(sourceConnection) -> workspaceId.equals(sourceConnection.getWorkspaceId()));
writeConfigsToArchive(parentFolder, ConfigSchema.STANDARD_SOURCE_DEFINITION.name(),
() -> listSourceDefinition(sourceConnections),
(config) -> true);

final Collection<DestinationConnection> destinationConnections = writeConfigsToArchive(
parentFolder,
ConfigSchema.DESTINATION_CONNECTION.name(),
configRepository::listDestinationConnection,
(destinationConnection) -> workspaceId.equals(destinationConnection.getWorkspaceId()));
writeConfigsToArchive(parentFolder, ConfigSchema.STANDARD_DESTINATION_DEFINITION.name(),
() -> listDestinationDefinition(destinationConnections),
(config) -> true);

writeConfigsToArchive(
parentFolder,
ConfigSchema.STANDARD_SYNC_OPERATION.name(),
configRepository::listStandardSyncOperations,
(operation) -> workspaceId.equals(operation.getWorkspaceId()));

final List<StandardSync> standardSyncs = new ArrayList<>();
for (StandardSync standardSync : configRepository.listStandardSyncs()) {
if (workspaceHelper != null &&
workspaceId.equals(workspaceHelper.getWorkspaceForConnection(standardSync.getSourceId(), standardSync.getDestinationId()))) {
standardSyncs.add(standardSync);
}
}
writeConfigsToArchive(parentFolder, ConfigSchema.STANDARD_SYNC.name(), standardSyncs.stream().map(Jsons::jsonNode));
}

private <T> Collection<T> writeConfigsToArchive(Path parentFolder,
String configSchemaName,
ListConfigCall<T> listConfigCall,
Function<T, Boolean> filterConfigCall)
throws JsonValidationException, ConfigNotFoundException, IOException {
final Collection<T> configs = listConfigCall.apply().stream().filter(filterConfigCall::apply).collect(Collectors.toList());
writeConfigsToArchive(parentFolder, configSchemaName, configs.stream().map(Jsons::jsonNode));
return configs;
}

private Collection<StandardSourceDefinition> listSourceDefinition(Collection<SourceConnection> sourceConnections)
throws JsonValidationException, ConfigNotFoundException, IOException {
final Map<UUID, StandardSourceDefinition> sourceDefinitionMap = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is formidable. can we break it up a little? are there any DRY opportunities here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I just got the basic functionality working, I'll refactor parts and add tests now

for (SourceConnection sourceConnection : sourceConnections) {
if (!sourceDefinitionMap.containsKey(sourceConnection.getSourceDefinitionId())) {
sourceDefinitionMap
.put(sourceConnection.getSourceDefinitionId(),
configRepository.getStandardSourceDefinition(sourceConnection.getSourceDefinitionId()));
}
}
return sourceDefinitionMap.values();
}

private Collection<StandardDestinationDefinition> listDestinationDefinition(Collection<DestinationConnection> destinationConnections)
throws JsonValidationException, ConfigNotFoundException, IOException {
final Map<UUID, StandardDestinationDefinition> destinationDefinitionMap = new HashMap<>();
for (DestinationConnection destinationConnection : destinationConnections) {
if (!destinationDefinitionMap.containsKey(destinationConnection.getDestinationDefinitionId())) {
destinationDefinitionMap
.put(destinationConnection.getDestinationDefinitionId(),
configRepository.getStandardDestinationDefinition(destinationConnection.getDestinationDefinitionId()));
}
}
return destinationDefinitionMap.values();
}

/**
* List all configurations of type @param <T> that already exists
*/
public interface ListConfigCall<T> {

Collection<T> apply() throws IOException, JsonValidationException, ConfigNotFoundException;

}

}
Loading