Skip to content

Commit

Permalink
🎉 Add dataset location to BigQuery connector (issue #3277) (#3889)
Browse files Browse the repository at this point in the history
Authored-by: sabifranjo <sabifranjo@gmail.com>
Authored-by: Sabolc Franjo <sabolc.franjo@ev-box.com>
  • Loading branch information
ChristopheDuong committed Jun 7, 2021
1 parent 4e084a1 commit 913c55d
Show file tree
Hide file tree
Showing 11 changed files with 76 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
"name": "BigQuery",
"dockerRepository": "airbyte/destination-bigquery",
"dockerImageTag": "0.3.3",
"dockerImageTag": "0.3.4",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery"
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
- destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
name: BigQuery
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 0.3.3
dockerImageTag: 0.3.4
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
name: Snowflake
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Changelog

## 0.3.4
Added option to choose dataset location
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.3.3
LABEL io.airbyte.version=0.3.4
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class BigQueryDestination extends BaseConnector implements Destination {
private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDestination.class);
static final String CONFIG_DATASET_ID = "dataset_id";
static final String CONFIG_PROJECT_ID = "project_id";
static final String CONFIG_DATASET_LOCATION = "dataset_location";
static final String CONFIG_CREDS = "credentials_json";

static final com.google.cloud.bigquery.Schema SCHEMA = com.google.cloud.bigquery.Schema.of(
Expand All @@ -91,8 +92,9 @@ public BigQueryDestination() {
public AirbyteConnectionStatus check(JsonNode config) {
try {
final String datasetId = config.get(CONFIG_DATASET_ID).asText();
final String datasetLocation = getDatasetLocation(config);
final BigQuery bigquery = getBigQuery(config);
createSchemaTable(bigquery, datasetId);
createSchemaTable(bigquery, datasetId, datasetLocation);
QueryJobConfiguration queryConfig = QueryJobConfiguration
.newBuilder(String.format("SELECT * FROM %s.INFORMATION_SCHEMA.TABLES LIMIT 1;", datasetId))
.setUseLegacySql(false)
Expand All @@ -110,10 +112,18 @@ public AirbyteConnectionStatus check(JsonNode config) {
}
}

private void createSchemaTable(BigQuery bigquery, String datasetId) {
private static String getDatasetLocation(JsonNode config) {
if (config.has(CONFIG_DATASET_LOCATION)) {
return config.get(CONFIG_DATASET_LOCATION).asText();
} else {
return "US";
}
}

private void createSchemaTable(BigQuery bigquery, String datasetId, String datasetLocation) {
final Dataset dataset = bigquery.getDataset(datasetId);
if (dataset == null || !dataset.exists()) {
final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).build();
final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build();
bigquery.create(datasetInfo);
}
}
Expand Down Expand Up @@ -175,7 +185,8 @@ public AirbyteMessageConsumer getConsumer(JsonNode config,
final String schemaName = getSchema(config, configStream);
final String tableName = namingResolver.getRawTableName(streamName);
final String tmpTableName = namingResolver.getTmpTableName(streamName);
createSchemaAndTableIfNeeded(bigquery, existingSchemas, schemaName, tmpTableName);
final String datasetLocation = getDatasetLocation(config);
createSchemaAndTableIfNeeded(bigquery, existingSchemas, schemaName, tmpTableName, datasetLocation);

// https://cloud.google.com/bigquery/docs/loading-data-local#loading_data_from_a_local_data_source
final WriteChannelConfiguration writeChannelConfiguration = WriteChannelConfiguration
Expand Down Expand Up @@ -205,9 +216,13 @@ private static String getSchema(JsonNode config, ConfiguredAirbyteStream stream)
return srcNamespace;
}

private void createSchemaAndTableIfNeeded(BigQuery bigquery, Set<String> existingSchemas, String schemaName, String tmpTableName) {
private void createSchemaAndTableIfNeeded(BigQuery bigquery,
Set<String> existingSchemas,
String schemaName,
String tmpTableName,
String datasetLocation) {
if (!existingSchemas.contains(schemaName)) {
createSchemaTable(bigquery, schemaName);
createSchemaTable(bigquery, schemaName, datasetLocation);
existingSchemas.add(schemaName);
}
BigQueryUtils.createTable(bigquery, schemaName, tmpTableName, SCHEMA);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,41 @@
"description": "Default BigQuery Dataset ID tables are replicated to if the source does not specify a namespace.",
"title": "Default Dataset ID"
},
"dataset_location": {
"type": "string",
"description": "The location of the dataset. Warning: Changes made after creation will not be applied.",
"title": "Dataset Location",
"default": "US",
"enum": [
"US",
"EU",
"us-central1",
"us-west-1",
"us-west-2",
"us-west-3",
"us-west-4",
"us-east1",
"us-east4",
"northamerica-northeast1",
"southamerica-east1",
"europe-north1",
"europe-west1",
"europe-west2",
"europe-west3",
"europe-west4",
"europe-west6",
"europe-central2",
"asia-east1",
"asia-east2",
"asia-southeast2",
"asia-south1",
"asia-northeast1",
"asia-northeast2",
"asia-southeast1",
"asia-northeast3",
"australia-southeast1"
]
},
"credentials_json": {
"type": "string",
"description": "The contents of the JSON service account key. Check out the <a href=\"https://docs.airbyte.io/integrations/destinations/bigquery\">docs</a> if you need help generating this key.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class BigQueryDestinationAcceptanceTest extends DestinationAcceptanceTest

private static final String CONFIG_DATASET_ID = "dataset_id";
private static final String CONFIG_PROJECT_ID = "project_id";
private static final String CONFIG_DATASET_LOCATION = "dataset_location";
private static final String CONFIG_CREDS = "credentials_json";

private BigQuery bigquery;
Expand Down Expand Up @@ -187,12 +188,14 @@ protected void setup(TestDestinationEnv testEnv) throws Exception {

final JsonNode credentialsJson = Jsons.deserialize(credentialsJsonString);
final String projectId = credentialsJson.get(CONFIG_PROJECT_ID).asText();
final String datasetLocation = "US";

final String datasetId = "airbyte_tests_" + RandomStringUtils.randomAlphanumeric(8);
config = Jsons.jsonNode(ImmutableMap.builder()
.put(CONFIG_PROJECT_ID, projectId)
.put(CONFIG_CREDS, credentialsJsonString)
.put(CONFIG_DATASET_ID, datasetId)
.put(CONFIG_DATASET_LOCATION, datasetLocation)
.build());

final ServiceAccountCredentials credentials =
Expand All @@ -203,7 +206,8 @@ protected void setup(TestDestinationEnv testEnv) throws Exception {
.build()
.getService();

final DatasetInfo datasetInfo = DatasetInfo.newBuilder(config.get(CONFIG_DATASET_ID).asText()).build();
final DatasetInfo datasetInfo =
DatasetInfo.newBuilder(config.get(CONFIG_DATASET_ID).asText()).setLocation(config.get(CONFIG_DATASET_LOCATION).asText()).build();
dataset = bigquery.create(datasetInfo);

tornDown = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ void setup(TestInfo info) throws IOException {
.getService();

final String datasetId = "airbyte_tests_" + RandomStringUtils.randomAlphanumeric(8);
final String datasetLocation = "EU";
MESSAGE_USERS1.getRecord().setNamespace(datasetId);
MESSAGE_USERS2.getRecord().setNamespace(datasetId);
MESSAGE_TASKS1.getRecord().setNamespace(datasetId);
Expand All @@ -156,6 +157,7 @@ void setup(TestInfo info) throws IOException {
.put(BigQueryDestination.CONFIG_PROJECT_ID, projectId)
.put(BigQueryDestination.CONFIG_CREDS, credentialsJsonString)
.put(BigQueryDestination.CONFIG_DATASET_ID, datasetId)
.put(BigQueryDestination.CONFIG_DATASET_LOCATION, datasetLocation)
.build());

tornDown = false;
Expand Down Expand Up @@ -194,7 +196,7 @@ private void tearDownBigQuery() {
}

@Test
void testSpec() throws IOException {
void testSpec() throws Exception {
final ConnectorSpecification actual = new BigQueryDestination().spec();
final String resourceString = MoreResources.readResource("spec.json");
final ConnectorSpecification expected = Jsons.deserialize(resourceString, ConnectorSpecification.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ private CsvDestination getDestination() {
}

@Test
void testSpec() throws IOException {
void testSpec() throws Exception {
final ConnectorSpecification actual = getDestination().spec();
final String resourceString = MoreResources.readResource("spec.json");
final ConnectorSpecification expected = Jsons.deserialize(resourceString, ConnectorSpecification.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private LocalJsonDestination getDestination() {
}

@Test
void testSpec() throws IOException {
void testSpec() throws Exception {
final ConnectorSpecification actual = getDestination().spec();
final String resourceString = MoreResources.readResource("spec.json");
final ConnectorSpecification expected = Jsons.deserialize(resourceString, ConnectorSpecification.class);
Expand Down
7 changes: 3 additions & 4 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ BigQuery is typically enabled automatically in new projects. If this is not the

Airbyte needs a location in BigQuery to write the data being synced from your data sources. If you already have a Dataset into which Airbyte should sync data, skip this section. Otherwise, follow the Google Cloud guide for [Creating a Dataset via the Console UI](https://cloud.google.com/bigquery/docs/quickstarts/quickstart-web-ui#create_a_dataset) to achieve this.

Note that queries written in BigQueries can only reference Datasets in the same physical location. So if you plan on combining the data Airbyte synced with data from other datasets in your queries, make sure you create the datasets in the same location on Google Cloud. See the [Introduction to Datasets](https://cloud.google.com/bigquery/docs/datasets-intro) section for more info on considerations around creating Datasets.
Note that queries written in BigQuery can only reference Datasets in the same physical location. So if you plan on combining the data Airbyte synced with data from other datasets in your queries, make sure you create the datasets in the same location on Google Cloud. See the [Introduction to Datasets](https://cloud.google.com/bigquery/docs/datasets-intro) section for more info on considerations around creating Datasets.

#### Service account

Expand All @@ -80,9 +80,9 @@ Follow the [Creating and Managing Service Account Keys](https://cloud.google.com
You should now have all the requirements needed to configure BigQuery as a destination in the UI. You'll need the following information to configure the BigQuery destination:

* **Project ID**
* **Dataset ID**
* **Dataset Location**
* **Dataset ID**: the name of the schema where the tables will be created.
* **Service Account Key**: the contents of your Service Account Key JSON file
* **Default Target Schema:** the name of the schema where the tables will be created. In most cases, this should match the Dataset ID.

Once you've configured BigQuery as a destination, delete the Service Account Key from your computer.

Expand All @@ -101,4 +101,3 @@ When you create a dataset in BigQuery, the dataset name must be unique for each
* Dataset names cannot contain spaces or special characters such as -, &, @, or %.

Therefore, Airbyte BigQuery destination will convert any invalid characters into '\_' characters when writing data.

0 comments on commit 913c55d

Please sign in to comment.