Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a simpler Getting Started with demo data #1408

Merged
merged 23 commits into from Dec 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
1c5434c
add demo source & demo dest
michel-tricot Dec 19, 2020
a3931c8
progress
michel-tricot Dec 19, 2020
9190694
lint
michel-tricot Dec 19, 2020
38620ed
GitBook: [mt-demo-data] 4 pages modified
michel-tricot Dec 19, 2020
37446a1
GitBook: [mt-demo-data] 3 pages modified
michel-tricot Dec 19, 2020
5ed1850
Merge branch 'master' into mt-demo-data
michel-tricot Dec 19, 2020
aa648cf
Merge branch 'mt-demo-data' of github.com:airbytehq/airbyte into mt-d…
michel-tricot Dec 19, 2020
5f0fd23
GitBook: [mt-demo-data] 2 pages modified
michel-tricot Dec 19, 2020
51652b3
GitBook: [mt-demo-data] one page modified
michel-tricot Dec 19, 2020
27bd2b1
GitBook: [mt-demo-data] 2 pages modified
michel-tricot Dec 19, 2020
360b93e
remove demo
michel-tricot Dec 19, 2020
748a478
Merge branch 'mt-demo-data' of github.com:airbytehq/airbyte into mt-d…
michel-tricot Dec 19, 2020
b32f1e9
GitBook: [mt-demo-data] 5 pages and 4 assets modified
michel-tricot Dec 19, 2020
ae89e2c
Merge branch 'mt-demo-data' of github.com:airbytehq/airbyte into mt-d…
michel-tricot Dec 19, 2020
fbca103
update readme
michel-tricot Dec 19, 2020
de379bf
update link
michel-tricot Dec 19, 2020
86d7380
Merge branch 'master' into mt-demo-data
michel-tricot Dec 19, 2020
a957cc6
GitBook: [mt-demo-data] one page modified
Dec 20, 2020
68b8fe9
lint
michel-tricot Dec 21, 2020
de63587
Update local-csv.md
ChristopheDuong Dec 21, 2020
40a5e41
Update local-json.md
ChristopheDuong Dec 21, 2020
e1d1cdf
GitBook: [mt-demo-data] 83 pages modified
michel-tricot Dec 21, 2020
e69352a
fix gitbook rewrite
michel-tricot Dec 21, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -30,7 +30,7 @@ docker-compose up

Now visit [http://localhost:8000](http://localhost:8000)

Here is a [step-by-step guide](docs/tutorials/getting-started.md) showing you how to load data from a sample Postgres database into another database using Airbyte, all on your computer.
Here is a [step-by-step guide](docs/getting-started.md) showing you how to load data from an API into a file, all on your computer.

## Features

Expand Down
Expand Up @@ -3,5 +3,5 @@
"name": "BigQuery",
"dockerRepository": "airbyte/destination-bigquery",
"dockerImageTag": "0.1.11",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-bigquery-destination"
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery"
}
Expand Up @@ -3,5 +3,5 @@
"name": "Postgres",
"dockerRepository": "airbyte/destination-postgres",
"dockerImageTag": "0.1.9",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-postgres-destination"
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/postgres"
}
Expand Up @@ -3,5 +3,5 @@
"name": "Local CSV",
"dockerRepository": "airbyte/destination-csv",
"dockerImageTag": "0.1.5",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-csv-destination"
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-csv"
}
@@ -0,0 +1,7 @@
{
"destinationDefinitionId": "a625d593-bba5-4a1c-a53d-2d246268a816",
"name": "Local JSON",
"dockerRepository": "airbyte/destination-local-json",
"dockerImageTag": "0.1.0",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-json"
}
@@ -1,6 +1,6 @@
{
"sourceDefinitionId": "9fed261d-d107-47fd-8c8b-323023db6e20",
"name": "exchangeratesapi.io",
"name": "Exchange Rates Api",
"dockerRepository": "airbyte/source-exchangeratesapi-singer",
"dockerImageTag": "0.1.8",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-exchangeratesapi_io-source"
Expand Down
@@ -1,18 +1,23 @@
- destinationDefinitionId: a625d593-bba5-4a1c-a53d-2d246268a816
name: Local JSON
dockerRepository: airbyte/destination-local-json
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-json
- destinationDefinitionId: 8be1cf83-fde1-477f-a4ad-318d23c9f3c6
name: Local CSV
dockerRepository: airbyte/destination-csv
dockerImageTag: 0.1.5
documentationUrl: https://hub.docker.com/r/airbyte/integration-singer-csv-destination
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-csv
- destinationDefinitionId: 25c5221d-dce2-4163-ade9-739ef790f503
name: Postgres
dockerRepository: airbyte/destination-postgres
dockerImageTag: 0.1.9
documentationUrl: https://hub.docker.com/r/airbyte/integration-singer-postgres-destination
documentationUrl: https://docs.airbyte.io/integrations/destinations/postgres
- destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
name: BigQuery
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 0.1.11
documentationUrl: https://hub.docker.com/r/airbyte/integration-singer-bigquery-destination
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
name: Snowflake
dockerRepository: airbyte/destination-snowflake
Expand Down
@@ -1,5 +1,5 @@
- sourceDefinitionId: 9fed261d-d107-47fd-8c8b-323023db6e20
name: exchangeratesapi.io
name: Exchange Rates Api
dockerRepository: airbyte/source-exchangeratesapi-singer
dockerImageTag: 0.1.8
documentationUrl: https://hub.docker.com/r/airbyte/integration-singer-exchangeratesapi_io-source
Expand Down
10 changes: 6 additions & 4 deletions airbyte-integrations/bases/standard-source-test/build.gradle
Expand Up @@ -55,14 +55,16 @@ task generateSourceTestDocs(type: Javadoc) {
def methodName = methodInfo.selectFirst("div>span.memberName").text()
def methodDocstring = methodInfo.selectFirst("div.block")

md += "### ${methodName}\n"
md += "${methodDocstring != null ? methodDocstring.text() : 'No method description was provided'}\n\n"
md += "## ${methodName}\n\n"
md += "${methodDocstring != null ? methodDocstring.text().replaceAll(/([()])/, '\\\\$1') : 'No method description was provided'}\n\n"
}
def outputDoc = new File("${rootDir}/docs/contributing-to-airbyte/building-new-connector/standard-source-tests.md")
outputDoc.write "# Standard Source Test Suite\n"
outputDoc.append "Test methods start with `test`. Other methods are internal helpers in the java class implementing the test suite.\n"
outputDoc.write "# Standard Source Test Suite\n\n"
outputDoc.append "Test methods start with `test`. Other methods are internal helpers in the java class implementing the test suite.\n\n"
outputDoc.append md
}

outputs.upToDateWhen {false}
}

project.build.dependsOn(generateSourceTestDocs)
Expand Down
Expand Up @@ -95,7 +95,7 @@ public SQLNamingResolvable getNamingResolver() {
* @param config - csv destination config.
* @param catalog - schema of the incoming messages.
* @return - a consumer to handle writing records to the filesystem.
* @throws IOException - exception throw in manipulating the filesytem.
* @throws IOException - exception throw in manipulating the filesystem.
*/
@Override
public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirbyteCatalog catalog) throws IOException {
Expand Down
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
11 changes: 11 additions & 0 deletions airbyte-integrations/connectors/destination-local-json/Dockerfile
@@ -0,0 +1,11 @@
FROM airbyte/integration-base-java:dev

WORKDIR /airbyte
ENV APPLICATION destination-local-json

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-local-json
@@ -0,0 +1,18 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}

application {
mainClass = 'io.airbyte.integrations.destination.local_json.LocalJsonDestination'
}

dependencies {
implementation project(':airbyte-config:models')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:bases:base-java')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
}
@@ -0,0 +1,234 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.destination.local_json;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.DestinationConsumer;
import io.airbyte.integrations.base.FailureTrackingConsumer;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.SQLNamingResolvable;
import io.airbyte.integrations.base.StandardSQLNaming;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.SyncMode;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalJsonDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(LocalJsonDestination.class);

static final String FIELD_DATA = "data";
static final String FIELD_AB_ID = "ab_id";
static final String FIELD_EMITTED_AT = "emitted_at";

static final String DESTINATION_PATH_FIELD = "destination_path";

private final SQLNamingResolvable namingResolver;

public LocalJsonDestination() {
namingResolver = new StandardSQLNaming();
}

@Override
public ConnectorSpecification spec() throws IOException {
final String resourceString = MoreResources.readResource("spec.json");
return Jsons.deserialize(resourceString, ConnectorSpecification.class);
}

@Override
public AirbyteConnectionStatus check(JsonNode config) {
try {
FileUtils.forceMkdir(getDestinationPath(config).toFile());
} catch (Exception e) {
return new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage(e.getMessage());
}
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
}

@Override
public SQLNamingResolvable getNamingResolver() {
return namingResolver;
}

/**
* @param config - destination config.
* @param catalog - schema of the incoming messages.
* @return - a consumer to handle writing records to the filesystem.
* @throws IOException - exception throw in manipulating the filesystem.
*/
@Override
public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirbyteCatalog catalog) throws IOException {
final Path destinationDir = getDestinationPath(config);

FileUtils.forceMkdir(destinationDir.toFile());

final Map<String, WriteConfig> writeConfigs = new HashMap<>();
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
final String streamName = stream.getStream().getName();
final Path finalPath = destinationDir.resolve(getNamingResolver().getRawTableName(streamName) + ".jsonl");
final Path tmpPath = destinationDir.resolve(getNamingResolver().getTmpTableName(streamName) + ".jsonl");

final boolean isIncremental = stream.getSyncMode() == SyncMode.INCREMENTAL;
if (isIncremental && finalPath.toFile().exists()) {
Files.copy(finalPath, tmpPath, StandardCopyOption.REPLACE_EXISTING);
}

final Writer writer = new FileWriter(tmpPath.toFile(), isIncremental);
writeConfigs.put(stream.getStream().getName(), new WriteConfig(writer, tmpPath, finalPath));
}

return new JsonConsumer(writeConfigs, catalog);
}

/**
* Extract provided path.
*
* @param config - config object
* @return absolute path where to write files.
*/
private Path getDestinationPath(JsonNode config) {
Path destinationPath = Paths.get(config.get(DESTINATION_PATH_FIELD).asText());

if (!destinationPath.startsWith("/local"))
destinationPath = Path.of("/local").resolve(destinationPath);

return destinationPath;
}

/**
* This consumer writes individual records to temporary files. If all of the messages are written
* successfully, it moves the tmp files to files named by their respective stream. If there are any
* failures, nothing is written.
*/
private static class JsonConsumer extends FailureTrackingConsumer<AirbyteMessage> {

private final Map<String, WriteConfig> writeConfigs;
private final ConfiguredAirbyteCatalog catalog;

public JsonConsumer(Map<String, WriteConfig> writeConfigs, ConfiguredAirbyteCatalog catalog) {
LOGGER.info("initializing consumer.");
this.catalog = catalog;
this.writeConfigs = writeConfigs;
}

@Override
protected void acceptTracked(AirbyteMessage message) throws Exception {

// ignore other message types.
if (message.getType() == AirbyteMessage.Type.RECORD) {
if (!writeConfigs.containsKey(message.getRecord().getStream())) {
throw new IllegalArgumentException(
String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s",
Jsons.serialize(catalog), Jsons.serialize(message)));
}

final Writer writer = writeConfigs.get(message.getRecord().getStream()).getWriter();
writer.write(Jsons.serialize(ImmutableMap.of(
FIELD_AB_ID, UUID.randomUUID(),
FIELD_EMITTED_AT, message.getRecord().getEmittedAt(),
FIELD_DATA, message.getRecord().getData())));
writer.write(System.lineSeparator());
}
}

@Override
protected void close(boolean hasFailed) throws IOException {
LOGGER.info("finalizing consumer.");

for (final Map.Entry<String, WriteConfig> entries : writeConfigs.entrySet()) {
try {
entries.getValue().getWriter().flush();
entries.getValue().getWriter().close();
} catch (Exception e) {
hasFailed = true;
LOGGER.error("failed to close writer for: {}.", entries.getKey());
}
}
// do not persist the data, if there are any failures.
if (!hasFailed) {
for (final WriteConfig writeConfig : writeConfigs.values()) {
Files.move(writeConfig.getTmpPath(), writeConfig.getFinalPath(), StandardCopyOption.REPLACE_EXISTING);
}
}
// clean up tmp files.
for (final WriteConfig writeConfig : writeConfigs.values()) {
Files.deleteIfExists(writeConfig.getTmpPath());
}

}

}

private static class WriteConfig {

private final Writer writer;
private final Path tmpPath;
private final Path finalPath;

public WriteConfig(Writer writer, Path tmpPath, Path finalPath) {
this.writer = writer;
this.tmpPath = tmpPath;
this.finalPath = finalPath;
}

public Writer getWriter() {
return writer;
}

public Path getTmpPath() {
return tmpPath;
}

public Path getFinalPath() {
return finalPath;
}

}

public static void main(String[] args) throws Exception {
new IntegrationRunner(new LocalJsonDestination()).run(args);
}

}
@@ -0,0 +1,18 @@
{
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-json",
"supportsIncremental": true,
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Local Json Destination Spec",
"type": "object",
"required": ["destination_path"],
"additionalProperties": false,
"properties": {
"destination_path": {
"description": "Path to the directory where json files will be written. The files will be placed inside that local mount. For more information check out our <a href=\"https://docs.airbyte.io/integrations/destinations/local-json\">docs</a>",
"type": "string",
"examples": ["/json_data"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This path should match what we're doing in local csv.

I'm fine with just putting it in a relative directory to the mount (and requiring no preceding slash) or doing it how we do it in local-csv where we require it to start with /local/). Either way is fine if it matches.

This is how it's defined in local-csv:

      "destination_path": {
        "description": "Path to the directory where csv files will be written. Must start with the local mount \"/local\". Any other directory appended on the end will be placed inside that local mount. For more information check out our <a href=\"https://docs.airbyte.io/integrations/destinations/local-csv\">docs</a>",
        "type": "string",
        "examples": ["/local"],
        "pattern": "(^\\/local\\/.*)|(^\\/local$)"
      }

}
}
}
}