Skip to content

Commit

Permalink
MySQL source: Add comprehensive data type test (#3810)
Browse files Browse the repository at this point in the history
  • Loading branch information
DoNotPanicUA committed Jun 7, 2021
1 parent 5d911ed commit 213fae1
Show file tree
Hide file tree
Showing 55 changed files with 1,419 additions and 204 deletions.
Expand Up @@ -27,7 +27,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import org.junit.jupiter.api.Test;

public class DataTypeEnumTest {
Expand Down
2 changes: 1 addition & 1 deletion airbyte-db/src/main/java/io/airbyte/db/jdbc/JdbcUtils.java
Expand Up @@ -28,7 +28,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.Date;
Expand Down
Expand Up @@ -34,7 +34,7 @@
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.stream.MoreStreams;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.math.BigDecimal;
import java.sql.Connection;
Expand Down
Expand Up @@ -43,7 +43,7 @@
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
Expand Down
Expand Up @@ -28,6 +28,7 @@
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.nio.file.Path;
Expand Down Expand Up @@ -119,7 +120,7 @@ protected List<String> getRegexTests() throws Exception {
}

@Override
protected void setup(TestDestinationEnv testEnv) throws Exception {
protected void setupEnvironment(TestDestinationEnv environment) throws Exception {
// no-op, for now
}

Expand Down
Expand Up @@ -12,6 +12,7 @@ plugins {
import org.jsoup.Jsoup;

dependencies {
implementation project(':airbyte-db')
implementation project(':airbyte-config:models')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-workers')
Expand Down
Expand Up @@ -90,7 +90,7 @@ protected List<String> getRegexTests() throws IOException {
}

@Override
protected void setup(TestDestinationEnv testEnv) throws Exception {
protected void setupEnvironment(TestDestinationEnv environment) throws Exception {
testRoot = Files.createTempDirectory(Files.createDirectories(Path.of("/tmp/standard_test")), "pytest");
runExecutableVoid(Command.SETUP);
}
Expand Down
@@ -0,0 +1,158 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.integrations.standardtest.source;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.JobGetSpecConfig;
import io.airbyte.config.StandardCheckConnectionInput;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.config.StandardTapConfig;
import io.airbyte.config.State;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.workers.DefaultCheckConnectionWorker;
import io.airbyte.workers.DefaultDiscoverCatalogWorker;
import io.airbyte.workers.DefaultGetSpecWorker;
import io.airbyte.workers.WorkerException;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.DockerProcessFactory;
import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.protocols.airbyte.AirbyteSource;
import io.airbyte.workers.protocols.airbyte.DefaultAirbyteSource;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

public abstract class SourceAbstractTest {

private TestDestinationEnv environment;
private Path jobRoot;
protected Path localRoot;
private ProcessFactory processFactory;

private static final long JOB_ID = 0L;
private static final int JOB_ATTEMPT = 0;

/**
* Name of the docker image that the tests will run against.
*
* @return docker image name
*/
protected abstract String getImageName();

/**
* Configuration specific to the integration. Will be passed to integration where appropriate in
* each test. Should be valid.
*
* @return integration-specific configuration
*/
protected abstract JsonNode getConfig() throws Exception;

/**
* Function that performs any setup of external resources required for the test. e.g. instantiate a
* postgres database. This function will be called before EACH test.
*
* @param environment - information about the test environment.
* @throws Exception - can throw any exception, test framework will handle.
*/
protected abstract void setupEnvironment(TestDestinationEnv environment) throws Exception;

/**
* Function that performs any clean up of external resources required for the test. e.g. delete a
* postgres database. This function will be called after EACH test. It MUST remove all data in the
* destination so that there is no contamination across tests.
*
* @param testEnv - information about the test environment.
* @throws Exception - can throw any exception, test framework will handle.
*/
protected abstract void tearDown(TestDestinationEnv testEnv) throws Exception;

@BeforeEach
public void setUpInternal() throws Exception {
final Path testDir = Path.of("/tmp/airbyte_tests/");
Files.createDirectories(testDir);
final Path workspaceRoot = Files.createTempDirectory(testDir, "test");
jobRoot = Files.createDirectories(Path.of(workspaceRoot.toString(), "job"));
localRoot = Files.createTempDirectory(testDir, "output");
environment = new TestDestinationEnv(localRoot);

setupEnvironment(environment);

processFactory = new DockerProcessFactory(
workspaceRoot,
workspaceRoot.toString(),
localRoot.toString(),
"host");
}

@AfterEach
public void tearDownInternal() throws Exception {
tearDown(environment);
}

protected ConnectorSpecification runSpec() throws WorkerException {
return new DefaultGetSpecWorker(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory))
.run(new JobGetSpecConfig().withDockerImage(getImageName()), jobRoot);
}

protected StandardCheckConnectionOutput runCheck() throws Exception {
return new DefaultCheckConnectionWorker(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory))
.run(new StandardCheckConnectionInput().withConnectionConfiguration(getConfig()), jobRoot);
}

protected AirbyteCatalog runDiscover() throws Exception {
return new DefaultDiscoverCatalogWorker(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory))
.run(new StandardDiscoverCatalogInput().withConnectionConfiguration(getConfig()), jobRoot);
}

protected List<AirbyteMessage> runRead(ConfiguredAirbyteCatalog configuredCatalog) throws Exception {
return runRead(configuredCatalog, null);
}

// todo (cgardens) - assume no state since we are all full refresh right now.
protected List<AirbyteMessage> runRead(ConfiguredAirbyteCatalog catalog, JsonNode state) throws Exception {
final StandardTapConfig sourceConfig = new StandardTapConfig()
.withSourceConnectionConfiguration(getConfig())
.withState(state == null ? null : new State().withState(state))
.withCatalog(catalog);

final AirbyteSource source = new DefaultAirbyteSource(new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, getImageName(), processFactory));
final List<AirbyteMessage> messages = new ArrayList<>();
source.start(sourceConfig, jobRoot);
while (!source.isFinished()) {
source.attemptRead().ifPresent(messages::add);
}
source.close();

return messages;
}

}

0 comments on commit 213fae1

Please sign in to comment.