Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ protected BigQuerySinkConfig getConfig() {
protected void prepareRunValidation(BatchSinkContext context) {
FailureCollector collector = context.getFailureCollector();
config.validate(context.getInputSchema(), config.getSchema(collector), collector);
collector.getOrThrowException();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ private void validatePartitionProperties(@Nullable Schema schema, FailureCollect
validateColumnForPartition(partitionByField, schema, collector);
return;
}
if (!shouldCreatePartitionedTable()) {
if (shouldCreatePartitionedTable()) {
validateColumnForPartition(partitionByField, schema, collector);
}
}
Expand Down Expand Up @@ -433,7 +433,16 @@ private void validateRangePartitionTableWithInputConfiguration(Table table, Rang

private void validateColumnForPartition(@Nullable String columnName, @Nullable Schema schema,
FailureCollector collector) {
if (columnName == null || schema == null) {
if (containsMacro(NAME_PARTITION_BY_FIELD) || containsMacro(NAME_PARTITIONING_TYPE) || schema == null) {
return;
}
PartitionType partitioningType = getPartitioningType();
if (Strings.isNullOrEmpty(columnName)) {
if (partitioningType == PartitionType.INTEGER) {
collector.addFailure("Partition column not provided.",
"Set the column for integer partitioning.")
.withConfigProperty(NAME_PARTITION_BY_FIELD);
}
return;
}
Schema.Field field = schema.getField(columnName);
Expand All @@ -445,7 +454,6 @@ private void validateColumnForPartition(@Nullable String columnName, @Nullable S
}
Schema fieldSchema = field.getSchema();
fieldSchema = fieldSchema.isNullable() ? fieldSchema.getNonNullable() : fieldSchema;
PartitionType partitioningType = getPartitioningType();
if (partitioningType == PartitionType.TIME) {
validateTimePartitioningColumn(columnName, collector, fieldSchema);
} else if (partitioningType == PartitionType.INTEGER) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,26 @@ public void testBigQuerySinkInvalidConfig() {
Assert.assertEquals(3, failures.size());
}

@Test
public void testBigQueryTimePartitionConfig() {
Schema schema = Schema.recordOf("record",
Schema.Field.of("id", Schema.of(Schema.Type.LONG)),
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
Schema.Field.of("price", Schema.of(Schema.Type.DOUBLE)),
Schema.Field.of("dt", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE))),
Schema.Field.of("bytedata", Schema.of(Schema.Type.BYTES)),
Schema.Field.of("timestamp",
Schema.nullableOf(Schema.of(Schema.LogicalType.TIMESTAMP_MICROS))));

BigQuerySinkConfig config = new BigQuerySinkConfig("44", "ds", "tb", "bucket", schema.toString(),
"TIME", 0L, 100L, 10L, null);
config.partitionByField = "dt";

MockFailureCollector collector = new MockFailureCollector("bqsink");
config.validate(collector);
Assert.assertEquals(0, collector.getValidationFailures().size());
}

@Test
public void testBigQuerySinkMetricInsert() throws Exception {
Job mockJob = getMockLoadJob(10L);
Expand Down