Skip to content

Commit

Permalink
馃悰 BigQuery source: Fix nested arrays (#4981)
Browse files Browse the repository at this point in the history
* unfinished jdbcsource separation

* creation AbstactRelation

* Migrate StateManager to new abstract level (JdbcSource -> RelationalSource)

* fix imports

* move configs to Database level + fix MySql source

* make in line jdbc source with a new impl

* Fix ScaffoldJavaJdbcSource template

* rename `AbstractField` to `CommonField`. Now it
s not an abstract class.
+ add default implementation for `AbstractRelationalDbSource.getFullyQualifiedTableName`

* format

* rename generated files in line with their location

* bonus renaming

* move utility methods specific for jdbc source to a proper module

* internal review update

* BigQueryDatabase impl without row transformation

* add Static method for BigQueryDatabase instancing

* remove data type parameter limitation + rename class parameters

* Move DataTypeUtils from jdbs to common + impl basic types BigQueryUtils

* make DB2 in line with new relational abstract classes

* add missing import

* cover all biqquery classes + add type transformation method from StandardSQLTypeName to JsonSchemaPrimitive

* close unused connections

* add table list extract method

* bigquery source connector

* return all tables for a whole project instead of a dataset

* impl incremental fetch

* bigquery source connector

* bigquery source connector

* remove unnecessary databaseid

* add primitive type filtering

* add temporary workaround for test database.

* add dataset location

* fix table info retrieving

* handle dataset config

* Add working comprehensive test without data cases

* minor changes in the source processing

* acceptance tests; discover method fix

* discover method fix

* first comprehensinve test

* Comprehensive tests for the BigQuery source + database timeout config

* bigquery acceptance tests fix; formatting

* fix incremental sync using date, datetime, time and timestamp types

* Implement source checks: basic and dataset

* format

* revert: airbyte_protocol.by

* internal review update

* Add possibility to get list of comprehensive tests in a Markdown table format.

* Update airbyte-integrations/connectors/source-bigquery/src/main/resources/spec.json

Co-authored-by: Sherif A. Nada <snadalive@gmail.com>

* review update

* Implement processing for arrays and structures

* format

* added bigquery secrets

* added bigquery secrets

* spec fix

* test configs fix

* extend mapping for Arrays and Structs

* Process nested arrays

* handle arrays of records properly.

* format

* BigQuery source docs

* docs readme update

* hide evidences

* fix changlog order

* Add bigquery to source_defintions yaml

Co-authored-by: heade <danildubinin2@gmail.com>
Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
  • Loading branch information
3 people committed Jul 27, 2021
1 parent e1b4957 commit 9151d83
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 5 deletions.
@@ -0,0 +1,7 @@
{
"sourceDefinitionId": "bfd1ddf8-ae8a-4620-b1d7-55597d2ba08c",
"name": "BigQuery",
"dockerRepository": "airbyte/source-bigquery",
"dockerImageTag": "0.1.1",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/bigquery"
}
Expand Up @@ -404,3 +404,8 @@
dockerRepository: airbyte/source-prestashop
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.io/integrations/sources/prestashop
- sourceDefinitionId: bfd1ddf8-ae8a-4620-b1d7-55597d2ba08c
name: BigQuery
dockerRepository: airbyte/source-bigquery
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.io/integrations/sources/bigquery
22 changes: 20 additions & 2 deletions airbyte-db/src/main/java/io/airbyte/db/bigquery/BigQueryUtils.java
Expand Up @@ -90,10 +90,26 @@ private static void setJsonField(Field field, FieldValue fieldValue, ObjectNode
} else if (fieldValue.getAttribute().equals(Attribute.REPEATED)) {
ArrayNode arrayNode = node.putArray(fieldName);
StandardSQLTypeName fieldType = field.getType().getStandardType();
fieldValue.getRepeatedValue().forEach(arrayFieldValue -> fillObjectNode(fieldName, fieldType, arrayFieldValue, arrayNode.addObject()));
FieldList subFields = field.getSubFields();
// Array of primitive
if (subFields == null || subFields.isEmpty()) {
fieldValue.getRepeatedValue().forEach(arrayFieldValue -> fillObjectNode(fieldName, fieldType, arrayFieldValue, arrayNode.addObject()));
// Array of records
} else {
for (FieldValue arrayFieldValue : fieldValue.getRepeatedValue()) {
int count = 0; // named get doesn't work here for some reasons.
ObjectNode newNode = arrayNode.addObject();
for (Field repeatedField : subFields) {
setJsonField(repeatedField, arrayFieldValue.getRecordValue().get(count++),
newNode);
}
}
}
} else if (fieldValue.getAttribute().equals(Attribute.RECORD)) {
ObjectNode newNode = node.putObject(fieldName);
field.getSubFields().forEach(recordField -> setJsonField(recordField, fieldValue.getRecordValue().get(recordField.getName()), newNode));
field.getSubFields().forEach(recordField -> {
setJsonField(recordField, fieldValue.getRecordValue().get(recordField.getName()), newNode);
});
}
}

Expand All @@ -113,6 +129,8 @@ public static JsonSchemaPrimitive getType(StandardSQLTypeName bigQueryType) {
case BOOL -> JsonSchemaPrimitive.BOOLEAN;
case INT64, FLOAT64, NUMERIC, BIGNUMERIC -> JsonSchemaPrimitive.NUMBER;
case STRING, BYTES, TIMESTAMP, DATE, TIME, DATETIME -> JsonSchemaPrimitive.STRING;
case ARRAY -> JsonSchemaPrimitive.ARRAY;
case STRUCT -> JsonSchemaPrimitive.OBJECT;
default -> JsonSchemaPrimitive.STRING;
};
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-bigquery/Dockerfile
Expand Up @@ -9,5 +9,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1

# Airbyte's build system uses these labels to know what to name and tag the docker images produced by this Dockerfile.
LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.name=airbyte/source-bigquery
21 changes: 21 additions & 0 deletions airbyte-integrations/connectors/source-bigquery/README.md
@@ -0,0 +1,21 @@
# BigQuery Test Configuration

In order to test the BigQuery source, you need a service account key file.

## Community Contributor

As a community contributor, you will need access to a GCP project and BigQuery to run tests.

1. Go to the `Service Accounts` page on the GCP console
1. Click on `+ Create Service Account" button
1. Fill out a descriptive name/id/description
1. Click the edit icon next to the service account you created on the `IAM` page
1. Add the `BigQuery Data Editor` and `BigQuery User` role
1. Go back to the `Service Accounts` page and use the actions modal to `Create Key`
1. Download this key as a JSON file
1. Move and rename this file to `secrets/credentials.json`

## Airbyte Employee

1. Access the `BigQuery Integration Test User` secret on Rippling under the `Engineering` folder
1. Create a file with the contents at `secrets/credentials.json`
2 changes: 0 additions & 2 deletions airbyte-integrations/connectors/source-bigquery/build.gradle
Expand Up @@ -17,8 +17,6 @@ dependencies {
implementation project(':airbyte-integrations:connectors:source-jdbc')
implementation project(':airbyte-integrations:connectors:source-relational-db')

//TODO Add jdbc driver import here. Ex: implementation 'com.microsoft.sqlserver:mssql-jdbc:8.4.1.jre14'

testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))

testImplementation 'org.apache.commons:commons-lang3:3.11'
Expand Down
Expand Up @@ -316,6 +316,26 @@ protected void initTests() {
.addInsertValues("STRUCT('s' as frst, 1 as sec, STRUCT(555 as id_col, STRUCT(TIME(15, 30, 00) as time) as mega_obbj) as obbj)")
.addExpectedValues("{\"frst\":\"s\",\"sec\":1,\"obbj\":{\"id_col\":555,\"mega_obbj\":{\"last_col\":\"15:30:00\"}}}")
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("array")
.fullSourceDataType("array<STRUCT<fff String, ggg int64>>")
.airbyteType(JsonSchemaPrimitive.STRING)
.createTablePatternSql(CREATE_SQL_PATTERN)
.addInsertValues("[STRUCT('qqq' as fff, 1 as ggg), STRUCT('kkk' as fff, 2 as ggg)]")
.addExpectedValues("[{\"fff\":\"qqq\",\"ggg\":1},{\"fff\":\"kkk\",\"ggg\":2}]")
.build());

addDataTypeTestData(
TestDataHolder.builder()
.sourceType("array")
.fullSourceDataType("array<STRUCT<fff String, ggg array<STRUCT<ooo String, kkk int64>>>>")
.airbyteType(JsonSchemaPrimitive.STRING)
.createTablePatternSql(CREATE_SQL_PATTERN)
.addInsertValues("[STRUCT('qqq' as fff, [STRUCT('fff' as ooo, 1 as kkk), STRUCT('hhh' as ooo, 2 as kkk)] as ggg)]")
.addExpectedValues("[{\"fff\":\"qqq\",\"ggg\":[{\"ooo\":\"fff\",\"kkk\":1},{\"ooo\":\"hhh\",\"kkk\":2}]}]")
.build());
}

@Override
Expand Down
1 change: 1 addition & 0 deletions docs/SUMMARY.md
Expand Up @@ -33,6 +33,7 @@
* [Asana](integrations/sources/asana.md)
* [AWS CloudTrail](integrations/sources/aws-cloudtrail.md)
* [Braintree](integrations/sources/braintree.md)
* [BigQuery](integrations/sources/bigquery.md)
* [Cart](integrations/sources/cart.md)
* [ClickHouse](integrations/sources/clickhouse.md)
* [CockroachDB](integrations/sources/cockroachdb.md)
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/README.md
Expand Up @@ -18,6 +18,7 @@ Airbyte uses a grading system for connectors to help users understand what to ex
|[Asana](./sources/asana.md) | Beta |
|[AWS CloudTrail](./sources/aws-cloudtrail.md)| Beta |
|[Braintree](./sources/braintree.md)| Alpha |
|[BigQuery](./sources/bigquery.md)| Beta |
|[Cart](./sources/cart.md)| Beta |
|[ClickHouse](./sources/clickhouse.md)| Beta |
|[CockroachDB](./sources/cockroachdb.md)| Beta |
Expand Down
92 changes: 92 additions & 0 deletions docs/integrations/sources/bigquery.md
@@ -0,0 +1,92 @@
---
description: >-
BigQuery is a serverless, highly scalable, and cost-effective data warehouse
offered by Google Cloud Provider.
---

# BigQuery

## Overview

The BigQuery source supports both Full Refresh and Incremental syncs. You can choose if this connector will copy only the new or updated data, or all rows in the tables and columns you set up for replication, every time a sync is running.

### Resulting schema

The BigQuery source does not alter the schema present in your database. Depending on the destination connected to this source, however, the schema may be altered. See the destination's documentation for more details.

### Data type mapping

The BigQuery data types mapping:

| CockroachDb Type | Resulting Type | Notes |
| :--- | :--- | :--- |
| `BOOL` | Boolean | |
| `INT64` | Number | |
| `FLOAT64` | Number | |
| `NUMERIC` | Number | |
| `BIGNUMERIC` | Number | |
| `STRING` | String | |
| `BYTES` | String | |
| `DATE` | String | In ISO8601 format |
| `DATETIME` | String | In ISO8601 format |
| `TIMESTAMP` | String | In ISO8601 format |
| `TIME` | String | |
| `ARRAY` | Array | |
| `STRUCT` | Object | |
| `GEOGRAPHY` | String | |

### Features

| Feature | Supported | Notes |
| :--- | :--- | :--- |
| Full Refresh Sync | Yes | |
| Incremental Sync| Yes | |
| Change Data Capture | No | |
| SSL Support | Yes | |

## Getting started

### Requirements

To use the BigQuery source, you'll need:

* A Google Cloud Project with BigQuery enabled
* A Google Cloud Service Account with the "BigQuery User" and "BigQuery Data Editor" roles in your GCP project
* A Service Account Key to authenticate into your Service Account

See the setup guide for more information about how to create the required resources.

#### Service account

In order for Airbyte to sync data from BigQuery, it needs credentials for a [Service Account](https://cloud.google.com/iam/docs/service-accounts) with the "BigQuery User" and "BigQuery Data Editor" roles, which grants permissions to run BigQuery jobs, write to BigQuery Datasets, and read table metadata. We highly recommend that this Service Account is exclusive to Airbyte for ease of permissioning and auditing. However, you can use a pre-existing Service Account if you already have one with the correct permissions.

The easiest way to create a Service Account is to follow GCP's guide for [Creating a Service Account](https://cloud.google.com/iam/docs/creating-managing-service-accounts). Once you've created the Service Account, make sure to keep its ID handy as you will need to reference it when granting roles. Service Account IDs typically take the form `<account-name>@<project-name>.iam.gserviceaccount.com`

Then, add the service account as a Member in your Google Cloud Project with the "BigQuery User" role. To do this, follow the instructions for [Granting Access](https://cloud.google.com/iam/docs/granting-changing-revoking-access#granting-console) in the Google documentation. The email address of the member you are adding is the same as the Service Account ID you just created.

At this point you should have a service account with the "BigQuery User" project-level permission.

#### Service account key

Service Account Keys are used to authenticate as Google Service Accounts. For Airbyte to leverage the permissions you granted to the Service Account in the previous step, you'll need to provide its Service Account Keys. See the [Google documentation](https://cloud.google.com/iam/docs/service-accounts#service_account_keys) for more information about Keys.

Follow the [Creating and Managing Service Account Keys](https://cloud.google.com/iam/docs/creating-managing-service-account-keys) guide to create a key. Airbyte currently supports JSON Keys only, so make sure you create your key in that format. As soon as you created the key, make sure to download it, as that is the only time Google will allow you to see its contents. Once you've successfully configured BigQuery as a source in Airbyte, delete this key from your computer.

### Setup the BigQuery source in Airbyte

You should now have all the requirements needed to configure BigQuery as a source in the UI. You'll need the following information to configure the BigQuery source:

* **Project ID**
* **Default Dataset ID [Optional]**: the schema name if only one schema is interested. Dramatically boost source discover operation.
* **Credentials JSON**: the contents of your Service Account Key JSON file

Once you've configured BigQuery as a source, delete the Service Account Key from your computer.

## CHANGELOG

### source-bigquery

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.1 | 2021-07-28 | [#4981](https://github.com/airbytehq/airbyte/pull/4981) | 馃悰 BigQuery source: Fix nested arrays |
| 0.1.0 | 2021-07-22 | [#4457](https://github.com/airbytehq/airbyte/pull/4457) | 馃帀 New Source: Big Query. |

0 comments on commit 9151d83

Please sign in to comment.