Skip to content

Commit

Permalink
馃悰 Snowflake destination: do not create schema if it exists (#9311)
Browse files Browse the repository at this point in the history
* Snowflake destination: do not create schema if it exists

* Snowflake destination: move common constants to parent class

* Snowflake destination: bump version

* Snowflake destination: bump version
  • Loading branch information
sashaNeshcheret committed Jan 10, 2022
1 parent e68c564 commit ceaa1a4
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
"name": "Snowflake",
"dockerRepository": "airbyte/destination-snowflake",
"dockerImageTag": "0.3.23",
"dockerImageTag": "0.4.1",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake",
"icon": "snowflake.svg"
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.4.0
dockerImageTag: 0.4.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
icon: snowflake.svg
- name: MariaDB ColumnStore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@
public abstract class JdbcSqlOperations implements SqlOperations {

private static final Logger LOGGER = LoggerFactory.getLogger(JdbcSqlOperations.class);
protected static final String SHOW_SCHEMAS = "show schemas;";
protected static final String NAME = "name";

@Override
public void createSchemaIfNotExists(final JdbcDatabase database, final String schemaName) throws Exception {
database.execute(createSchemaQuery(schemaName));
if (!isSchemaExists(database, schemaName)) {
database.execute(createSchemaQuery(schemaName));;
}
}

private String createSchemaQuery(final String schemaName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ public interface SqlOperations {
*/
void createSchemaIfNotExists(JdbcDatabase database, String schemaName) throws Exception;

/**
* Denotes whether the schema exists in destination database
*
* @param database Database that the connector is syncing
* @param schemaName Name of schema.
*
* @return true if the schema exists in destination database, false if it doesn't
*/
default boolean isSchemaExists(final JdbcDatabase database, final String schemaName) throws Exception {
return false;
}

/**
* Create a table with provided name in provided schema if it does not already exist.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.4.0
LABEL io.airbyte.version=0.4.1
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public void createTableIfNotExists(final JdbcDatabase database, final String sch
database.execute(createTableQuery);
}

public boolean isSchemaExists(JdbcDatabase database, String outputSchema) throws Exception {
return database.query(SHOW_SCHEMAS).map(schemas -> schemas.get(NAME).asText()).anyMatch(outputSchema::equalsIgnoreCase);
}

@Override
public void insertRecordsInternal(final JdbcDatabase database,
final List<AirbyteRecordMessage> records,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,9 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN
schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
}

@Override
public boolean isSchemaExists(JdbcDatabase database, String outputSchema) throws Exception {
return database.query(SHOW_SCHEMAS).map(schemas -> schemas.get(NAME).asText()).anyMatch(outputSchema::equalsIgnoreCase);
}

}
27 changes: 14 additions & 13 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,21 +196,22 @@ The final query should show a `STORAGE_GCP_SERVICE_ACCOUNT` property with an ema
Finally, you need to add read/write permissions to your bucket with that email.


| Version | Date | Pull Request | Subject |
|:--------| :-------- | :----- | :------ |
| 0.4.0 | 2021-12-27 | [#9063](https://github.com/airbytehq/airbyte/pull/9063) | Updated normalization to produce permanent tables |
| Version | Date | Pull Request | Subject |
|:--------|:-----------| :----- | :------ |
| 0.4.1 | 2021-01-06 | [#9311](https://github.com/airbytehq/airbyte/pull/9311) | Update 褋reating schema during check |
| 0.4.0 | 2021-12-27 | [#9063](https://github.com/airbytehq/airbyte/pull/9063) | Updated normalization to produce permanent tables |
| 0.3.24 | 2021-12-23 | [#8869](https://github.com/airbytehq/airbyte/pull/8869) | Changed staging approach to Byte-Buffered |
| 0.3.23 | 2021-12-22 | [#9039](https://github.com/airbytehq/airbyte/pull/9039) | Added part_size configuration in UI for S3 loading method |
| 0.3.22 | 2021-12-21 | [#9006](https://github.com/airbytehq/airbyte/pull/9006) | Updated jdbc schema naming to follow Snowflake Naming Conventions |
| 0.3.21 | 2021-12-15 | [#8781](https://github.com/airbytehq/airbyte/pull/8781) | Updated check method to verify permissions to create/drop stage for internal staging; compatibility fix for Java 17 |
| 0.3.20 | 2021-12-10 | [#8562](https://github.com/airbytehq/airbyte/pull/8562) | Moving classes around for better dependency management; compatibility fix for Java 17 |
| 0.3.19 | 2021-12-06 | [#8528](https://github.com/airbytehq/airbyte/pull/8528) | Set Internal Staging as default choice |
| 0.3.18 | 2021-11-26 | [#8253](https://github.com/airbytehq/airbyte/pull/8253) | Snowflake Internal Staging Support |
| 0.3.17 | 2021-11-08 | [#7719](https://github.com/airbytehq/airbyte/pull/7719) | Improve handling of wide rows by buffering records based on their byte size rather than their count |
| 0.3.15 | 2021-10-11 | [#6949](https://github.com/airbytehq/airbyte/pull/6949) | Each stream was split into files of 10,000 records each for copying using S3 or GCS |
| 0.3.14 | 2021-09-08 | [#5924](https://github.com/airbytehq/airbyte/pull/5924) | Fixed AWS S3 Staging COPY is writing records from different table in the same raw table |
| 0.3.13 | 2021-09-01 | [#5784](https://github.com/airbytehq/airbyte/pull/5784) | Updated query timeout from 30 minutes to 3 hours |
| 0.3.12 | 2021-07-30 | [#5125](https://github.com/airbytehq/airbyte/pull/5125) | Enable `additionalPropertities` in spec.json |
| 0.3.11 | 2021-07-21 | [#3555](https://github.com/airbytehq/airbyte/pull/3555) | Partial Success in BufferedStreamConsumer |
| 0.3.10 | 2021-07-12 | [#4713](https://github.com/airbytehq/airbyte/pull/4713)| Tag traffic with `airbyte` label to enable optimization opportunities from Snowflake |
| 0.3.20 | 2021-12-10 | [#8562](https://github.com/airbytehq/airbyte/pull/8562) | Moving classes around for better dependency management; compatibility fix for Java 17 |
| 0.3.19 | 2021-12-06 | [#8528](https://github.com/airbytehq/airbyte/pull/8528) | Set Internal Staging as default choice |
| 0.3.18 | 2021-11-26 | [#8253](https://github.com/airbytehq/airbyte/pull/8253) | Snowflake Internal Staging Support |
| 0.3.17 | 2021-11-08 | [#7719](https://github.com/airbytehq/airbyte/pull/7719) | Improve handling of wide rows by buffering records based on their byte size rather than their count |
| 0.3.15 | 2021-10-11 | [#6949](https://github.com/airbytehq/airbyte/pull/6949) | Each stream was split into files of 10,000 records each for copying using S3 or GCS |
| 0.3.14 | 2021-09-08 | [#5924](https://github.com/airbytehq/airbyte/pull/5924) | Fixed AWS S3 Staging COPY is writing records from different table in the same raw table |
| 0.3.13 | 2021-09-01 | [#5784](https://github.com/airbytehq/airbyte/pull/5784) | Updated query timeout from 30 minutes to 3 hours |
| 0.3.12 | 2021-07-30 | [#5125](https://github.com/airbytehq/airbyte/pull/5125) | Enable `additionalPropertities` in spec.json |
| 0.3.11 | 2021-07-21 | [#3555](https://github.com/airbytehq/airbyte/pull/3555) | Partial Success in BufferedStreamConsumer |
| 0.3.10 | 2021-07-12 | [#4713](https://github.com/airbytehq/airbyte/pull/4713)| Tag traffic with `airbyte` label to enable optimization opportunities from Snowflake |

0 comments on commit ceaa1a4

Please sign in to comment.