Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/BigQueryMultiTable-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,9 @@ GET https://www.googleapis.com/bigquery/v2/projects/xxxx/datasets/mysql_bq_perm?
have the permission to read the dataset you specified in this plugin. You must grant "BigQuery Data Editor" role on the
project identified by the `Dataset Project ID` you specified in this plugin to the service account. If you think you
already granted the role, check if you granted the role on the wrong project (for example the one identified by the `Project ID`).

Column Names
------------
A column name can contain the letters (a-z, A-Z), numbers (0-9), or underscores (_), and it must start with a letter or
underscore. For more flexible column name support, see
[flexible column names](https://cloud.google.com/bigquery/docs/schemas#flexible-column-names).
6 changes: 6 additions & 0 deletions docs/BigQueryTable-batchsink.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,9 @@ GET https://www.googleapis.com/bigquery/v2/projects/xxxx/datasets/mysql_bq_perm?
have the permission to read the dataset you specified in this plugin. You must grant "BigQuery Data Editor" role on the
project identified by the `Dataset Project ID` you specified in this plugin to the service account. If you think you
already granted the role, check if you granted the role on the wrong project (for example the one identified by the `Project ID`).

Column Names
------------
A column name can contain the letters (a-z, A-Z), numbers (0-9), or underscores (_), and it must start with a letter or
underscore. For more flexible column name support, see
[flexible column names](https://cloud.google.com/bigquery/docs/schemas#flexible-column-names).
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.Table;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
Expand All @@ -36,12 +35,12 @@
import io.cdap.cdap.etl.api.batch.BatchSink;
import io.cdap.cdap.etl.api.batch.BatchSinkContext;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
import io.cdap.plugin.gcp.common.GCPUtils;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.slf4j.Logger;
Expand All @@ -55,7 +54,6 @@
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import javax.annotation.Nullable;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import com.google.cloud.hadoop.io.bigquery.BigQueryFactory;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
import com.google.cloud.hadoop.io.bigquery.BigQueryHelper;
import com.google.cloud.hadoop.io.bigquery.BigQueryStrings;
import com.google.cloud.hadoop.io.bigquery.BigQueryUtils;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.ForwardingBigQueryFileOutputCommitter;
Expand All @@ -62,6 +61,7 @@
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryStrings;
import io.cdap.plugin.gcp.bigquery.source.BigQueryFactoryWithScopes;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.cloud.bigquery.JobStatistics;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import io.cdap.cdap.api.annotation.Description;
Expand All @@ -44,6 +43,7 @@
import io.cdap.cdap.etl.api.engine.sql.SQLEngineOutput;
import io.cdap.cdap.etl.common.Constants;
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnector;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngine;
import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryWrite;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig {
private static final String WHERE = "WHERE";
public static final Set<Schema.Type> SUPPORTED_CLUSTERING_TYPES =
ImmutableSet.of(Schema.Type.INT, Schema.Type.LONG, Schema.Type.STRING, Schema.Type.BOOLEAN, Schema.Type.BYTES);
private static final Pattern FIELD_PATTERN = Pattern.compile("[a-zA-Z0-9_]+");
// Read More : https://cloud.google.com/bigquery/docs/schemas#flexible-column-names
private static final Pattern FIELD_PATTERN = Pattern.compile("[\\p{L}\\p{M}\\p{N}\\p{Pc}\\p{Pd}&%+=:'<>#| ]+");

public static final String NAME_TABLE = "table";
public static final String NAME_SCHEMA = "schema";
Expand All @@ -75,6 +76,8 @@ public final class BigQuerySinkConfig extends AbstractBigQuerySinkConfig {
public static final String NAME_RANGE_INTERVAL = "rangeInterval";

public static final int MAX_NUMBER_OF_COLUMNS = 4;
// As defined in https://cloud.google.com/bigquery/docs/schemas#column_names
private static final int MAX_LENGTH_OF_COLUMN_NAME = 300;

@Name(NAME_TABLE)
@Macro
Expand Down Expand Up @@ -345,9 +348,18 @@ public void validate(@Nullable Schema inputSchema, @Nullable Schema outputSchema
String name = field.getName();
// BigQuery column names only allow alphanumeric characters and _
// https://cloud.google.com/bigquery/docs/schemas#column_names
// Allow support for Flexible column names
// https://cloud.google.com/bigquery/docs/schemas#flexible-column-names
if (!FIELD_PATTERN.matcher(name).matches()) {
collector.addFailure(String.format("Output field '%s' must only contain alphanumeric characters and '_'.",
name), null).withOutputSchemaField(name);
collector.addFailure(String.format("Output field '%s' contains invalid characters. " +
"Check column names docs for more details.",
name), null).withOutputSchemaField(name);
}

// Check if the field name exceeds the maximum length of 300 characters.
if (name.length() > MAX_LENGTH_OF_COLUMN_NAME) {
collector.addFailure(String.format("Output field '%s' exceeds the maximum length of 300 characters.",
name), null).withOutputSchemaField(name);
}

// check if the required fields are present in the input schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableSchema;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
Expand All @@ -43,6 +40,9 @@
import io.cdap.cdap.etl.api.validation.ValidationFailure;
import io.cdap.plugin.common.Asset;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryOutputConfiguration;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableSchema;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize.Numeric;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
Expand All @@ -62,6 +62,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

Expand Down Expand Up @@ -611,6 +612,13 @@ private static BigQueryFileFormat getFileFormat(List<BigQueryTableFieldSchema> f
if (DATETIME.equals(field.getType())) {
return BigQueryFileFormat.NEWLINE_DELIMITED_JSON;
}
// If the field name is not in english characters, then we will use json format
// We do this as the avro load job in BQ does not support non-english characters in field names for now
String fieldName = field.getName();
final String englishCharactersRegex = "[\\w]+";
if (!Pattern.matches(englishCharactersRegex, fieldName)) {
return BigQueryFileFormat.NEWLINE_DELIMITED_JSON;
}
// If the field is a record we have to check its subfields.
if (RECORD.equals(field.getType())) {
if (getFileFormat(field.getFields()) == BigQueryFileFormat.NEWLINE_DELIMITED_JSON) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package io.cdap.plugin.gcp.bigquery.sink;

import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package io.cdap.plugin.gcp.bigquery.sink;

import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema;
import io.cdap.cdap.api.data.format.StructuredRecord;
import io.cdap.cdap.api.data.schema.Schema;
import io.cdap.plugin.gcp.bigquery.sink.lib.BigQueryTableFieldSchema;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter;
Expand Down
Loading