Skip to content

Commit

Permalink
🐛 Destination BigQuery (+denormalized): correctly parse buffer count …
Browse files Browse the repository at this point in the history
…from config (#26213)

* fix logic in parsing config

* simplify logic

* ugh

* holy moly that took way too many iterations

* version bumps / changelog

* Automated Change

* ✨ Destination Bigquery: stop running normalization container for DAT (#25925)

* readme update

* allow passing additional flags to test containers

* remove build dependency

* Automated Change

* versioning updates

* restore denormalized change from master

* formatting changes

* formatting

* Automated Change

* update metadata file

---------

Co-authored-by: jbfbell <jbfbell@users.noreply.github.com>

* fix version (#26218)

* Source Airtable: skip missing streams (#25946)

* Source Airtable: skip missing streams

* Move stream removal to a separate method, cover with tests

* Update changelog

* Fix flake warnings

* Update docs/integrations/sources/airtable.md

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* Update docs/integrations/sources/airtable.md

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* Automated Change

* Update link to docs in warning

* Automated Change

* Automated Change

* Automated Change

* “Empty-Commit”

---------

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
Co-authored-by: arsenlosenko <arsenlosenko@users.noreply.github.com>

* 🎉 New Source: Ringcentral [Low code CDK] (#25701)

* Initial commit - All test passed

* add stream fax cover

* refactor docs

* fix schema, Added pagination

* Add several streams, fix schema

* fix schema, add streams, refactor docs

* EOF

* Resolve conflicts

* Resolve conflicts

* add metadata file

---------

Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>

* rebump version

* Automated Change

---------

Co-authored-by: edgao <edgao@users.noreply.github.com>
Co-authored-by: Joe Bell <joseph.bell@airbyte.io>
Co-authored-by: jbfbell <jbfbell@users.noreply.github.com>
Co-authored-by: Joe Reuter <joe@airbyte.io>
Co-authored-by: Arsen Losenko <20901439+arsenlosenko@users.noreply.github.com>
Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
Co-authored-by: arsenlosenko <arsenlosenko@users.noreply.github.com>
Co-authored-by: btkcodedev <btk.codedev@gmail.com>
Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
  • Loading branch information
10 people committed May 18, 2023
1 parent 00c8e03 commit c25afc4
Show file tree
Hide file tree
Showing 10 changed files with 29 additions and 18 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION destination-bigquery-denormalized

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.4.0
LABEL io.airbyte.version=1.4.1
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 079d5540-f236-4294-ba7c-ade8fd918496
dockerImageTag: 1.4.0
dockerImageTag: 1.4.1
dockerRepository: airbyte/destination-bigquery-denormalized
githubIssueLabel: destination-bigquery-denormalized
icon: bigquery.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ public void testGetFileBufferDefault() {
@Test
public void testGetFileBufferMaxLimited() {
final JsonNode defaultConfig = Jsons.clone(config);
((ObjectNode) defaultConfig).put(FileBuffer.FILE_BUFFER_COUNT_KEY, 100);
((ObjectNode) defaultConfig.get(BigQueryConsts.LOADING_METHOD)).put(FileBuffer.FILE_BUFFER_COUNT_KEY, 100);
final BigQueryDenormalizedDestination destination = new BigQueryDenormalizedDestination();
assertEquals(destination.getNumberOfFileBuffers(defaultConfig), FileBuffer.MAX_CONCURRENT_STREAM_IN_BUFFER);
assertEquals(FileBuffer.MAX_CONCURRENT_STREAM_IN_BUFFER, destination.getNumberOfFileBuffers(defaultConfig));
}

@Test
public void testGetMinimumFileBufferCount() {
final JsonNode defaultConfig = Jsons.clone(config);
((ObjectNode) defaultConfig).put(FileBuffer.FILE_BUFFER_COUNT_KEY, 1);
((ObjectNode) defaultConfig.get(BigQueryConsts.LOADING_METHOD)).put(FileBuffer.FILE_BUFFER_COUNT_KEY, 1);
final BigQueryDenormalizedDestination destination = new BigQueryDenormalizedDestination();
// User cannot set number of file counts below the default file buffer count, which is existing
// behavior
assertEquals(destination.getNumberOfFileBuffers(defaultConfig), FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER);
assertEquals(FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER, destination.getNumberOfFileBuffers(defaultConfig));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ ENV AIRBYTE_NORMALIZATION_INTEGRATION bigquery

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.4.2
LABEL io.airbyte.version=1.4.3
LABEL io.airbyte.name=airbyte/destination-bigquery

ENV AIRBYTE_ENTRYPOINT "/airbyte/run_with_normalization.sh"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 1.4.2
dockerImageTag: 1.4.3
dockerRepository: airbyte/destination-bigquery
githubIssueLabel: destination-bigquery
icon: bigquery.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,21 @@ protected Function<String, String> getTargetTableNameTransformer(final BigQueryS
*/
@VisibleForTesting
public int getNumberOfFileBuffers(final JsonNode config) {
int numOfFileBuffers = FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER;
if (config.has(FileBuffer.FILE_BUFFER_COUNT_KEY)) {
numOfFileBuffers = Math.min(config.get(FileBuffer.FILE_BUFFER_COUNT_KEY).asInt(), FileBuffer.MAX_CONCURRENT_STREAM_IN_BUFFER);
// This null check is probably redundant, but I don't want to gamble on that right now
if (config.hasNonNull(BigQueryConsts.LOADING_METHOD)) {
final JsonNode loadingMethodConfig = config.get(BigQueryConsts.LOADING_METHOD);
final int numOfFileBuffers;
if (loadingMethodConfig.has(FileBuffer.FILE_BUFFER_COUNT_KEY)) {
numOfFileBuffers = Math.min(loadingMethodConfig.get(FileBuffer.FILE_BUFFER_COUNT_KEY).asInt(), FileBuffer.MAX_CONCURRENT_STREAM_IN_BUFFER);
} else {
numOfFileBuffers = FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER;
}
// Only allows for values 10 <= numOfFileBuffers <= 50
return Math.max(numOfFileBuffers, FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER);
}
// Only allows for values 10 <= numOfFileBuffers <= 50
return Math.max(numOfFileBuffers, FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER);

// Return the default if we fail to parse the config
return FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER;
}

public static void main(final String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,19 @@ public void testGetFileBufferDefault() {
@Test
public void testGetFileBufferMaxLimited() {
final JsonNode defaultConfig = Jsons.clone(config);
((ObjectNode) defaultConfig).put(FileBuffer.FILE_BUFFER_COUNT_KEY, 100);
((ObjectNode) defaultConfig.get(BigQueryConsts.LOADING_METHOD)).put(FileBuffer.FILE_BUFFER_COUNT_KEY, 100);
final BigQueryDestination destination = new BigQueryDestination();
assertEquals(destination.getNumberOfFileBuffers(defaultConfig), FileBuffer.MAX_CONCURRENT_STREAM_IN_BUFFER);
assertEquals(FileBuffer.MAX_CONCURRENT_STREAM_IN_BUFFER, destination.getNumberOfFileBuffers(defaultConfig));
}

@Test
public void testGetMinimumFileBufferCount() {
final JsonNode defaultConfig = Jsons.clone(config);
((ObjectNode) defaultConfig).put(FileBuffer.FILE_BUFFER_COUNT_KEY, 1);
((ObjectNode) defaultConfig.get(BigQueryConsts.LOADING_METHOD)).put(FileBuffer.FILE_BUFFER_COUNT_KEY, 1);
final BigQueryDestination destination = new BigQueryDestination();
// User cannot set number of file counts below the default file buffer count, which is existing
// behavior
assertEquals(destination.getNumberOfFileBuffers(defaultConfig), FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER);
assertEquals(FileBuffer.DEFAULT_MAX_CONCURRENT_STREAM_IN_BUFFER, destination.getNumberOfFileBuffers(defaultConfig));
}

}
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery-denormalized.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ See [destinations/bigquery](https://docs.airbyte.com/integrations/destinations/b

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :----------------------------------------------------------------------------------------------------------------------- |
| 1.4.1 | 2023-05-17 | [\#26213](https://github.com/airbytehq/airbyte/pull/26213) | Fix bug in parsing file buffer config count |
| 1.4.0 | 2023-04-28 | [\#25570](https://github.com/airbytehq/airbyte/pull/25570) | Fix: all integer schemas should be converted to Avro longs |
| 1.3.3 | 2023-04-27 | [\#25346](https://github.com/airbytehq/airbyte/pull/25346) | Internal code cleanup |
| 1.3.0 | 2023-04-19 | [\#25287](https://github.com/airbytehq/airbyte/pull/25287) | Add parameter to configure the number of file buffers when GCS is used as the loading method |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ Now that you have set up the BigQuery destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------|
| 1.4.3 | 2023-05-17 | [\#26213](https://github.com/airbytehq/airbyte/pull/26213) | Fix bug in parsing file buffer config count |
| 1.4.2 | 2023-05-10 | [\#25925](https://github.com/airbytehq/airbyte/pull/25925) | Testing update. Normalization tests are now done in the destination container. |
| 1.4.1 | 2023-05-11 | [\#25993](https://github.com/airbytehq/airbyte/pull/25993) | Internal library update |
| 1.4.0 | 2023-04-29 | [\#25570](https://github.com/airbytehq/airbyte/pull/25570) | Internal library update. Bumping version to stay in sync with BigQuery-denormalized. |
Expand Down

0 comments on commit c25afc4

Please sign in to comment.