diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java index d14e2eab6..84ea74781 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySink.java @@ -104,6 +104,7 @@ protected BigQuerySinkConfig getConfig() { protected void prepareRunValidation(BatchSinkContext context) { FailureCollector collector = context.getFailureCollector(); config.validate(context.getInputSchema(), config.getSchema(collector), collector); + collector.getOrThrowException(); } @Override diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java index 84b5c4872..28b0df7f4 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkConfig.java @@ -389,7 +389,7 @@ private void validatePartitionProperties(@Nullable Schema schema, FailureCollect validateColumnForPartition(partitionByField, schema, collector); return; } - if (!shouldCreatePartitionedTable()) { + if (shouldCreatePartitionedTable()) { validateColumnForPartition(partitionByField, schema, collector); } } @@ -433,7 +433,16 @@ private void validateRangePartitionTableWithInputConfiguration(Table table, Rang private void validateColumnForPartition(@Nullable String columnName, @Nullable Schema schema, FailureCollector collector) { - if (columnName == null || schema == null) { + if (containsMacro(NAME_PARTITION_BY_FIELD) || containsMacro(NAME_PARTITIONING_TYPE) || schema == null) { + return; + } + PartitionType partitioningType = getPartitioningType(); + if (Strings.isNullOrEmpty(columnName)) { + if (partitioningType == PartitionType.INTEGER) { + collector.addFailure("Partition column not provided.", + "Set the column for integer partitioning.") + .withConfigProperty(NAME_PARTITION_BY_FIELD); + } return; } Schema.Field field = schema.getField(columnName); @@ -445,7 +454,6 @@ private void validateColumnForPartition(@Nullable String columnName, @Nullable S } Schema fieldSchema = field.getSchema(); fieldSchema = fieldSchema.isNullable() ? fieldSchema.getNonNullable() : fieldSchema; - PartitionType partitioningType = getPartitioningType(); if (partitioningType == PartitionType.TIME) { validateTimePartitioningColumn(columnName, collector, fieldSchema); } else if (partitioningType == PartitionType.INTEGER) { diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkTest.java index 6d969dc3e..11934e05b 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkTest.java @@ -85,6 +85,26 @@ public void testBigQuerySinkInvalidConfig() { Assert.assertEquals(3, failures.size()); } + @Test + public void testBigQueryTimePartitionConfig() { + Schema schema = Schema.recordOf("record", + Schema.Field.of("id", Schema.of(Schema.Type.LONG)), + Schema.Field.of("name", Schema.of(Schema.Type.STRING)), + Schema.Field.of("price", Schema.of(Schema.Type.DOUBLE)), + Schema.Field.of("dt", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE))), + Schema.Field.of("bytedata", Schema.of(Schema.Type.BYTES)), + Schema.Field.of("timestamp", + Schema.nullableOf(Schema.of(Schema.LogicalType.TIMESTAMP_MICROS)))); + + BigQuerySinkConfig config = new BigQuerySinkConfig("44", "ds", "tb", "bucket", schema.toString(), + "TIME", 0L, 100L, 10L, null); + config.partitionByField = "dt"; + + MockFailureCollector collector = new MockFailureCollector("bqsink"); + config.validate(collector); + Assert.assertEquals(0, collector.getValidationFailures().size()); + } + @Test public void testBigQuerySinkMetricInsert() throws Exception { Job mockJob = getMockLoadJob(10L);