From 3a42f7b88aa93991cb820b41c0a9e07f34d2a7c6 Mon Sep 17 00:00:00 2001 From: Pragyash Date: Thu, 9 Apr 2026 12:25:32 +0530 Subject: [PATCH] Fix BQ connector test --- .../bigquery/SafeDestinationCatalogFactory.kt | 14 ++++++- .../SafeDestinationCatalogFactoryTest.kt | 40 ++++++++++++++++++- 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/SafeDestinationCatalogFactory.kt b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/SafeDestinationCatalogFactory.kt index 5d75c3967b56..5335d3155496 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/SafeDestinationCatalogFactory.kt +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/kotlin/io/airbyte/integrations/destination/bigquery/SafeDestinationCatalogFactory.kt @@ -18,7 +18,9 @@ import io.airbyte.cdk.load.config.CHECK_STREAM_NAMESPACE import io.airbyte.cdk.load.data.FieldType import io.airbyte.cdk.load.data.IntegerType import io.airbyte.cdk.load.data.ObjectType +import io.airbyte.cdk.load.data.TimestampTypeWithTimezone import io.airbyte.cdk.load.data.json.JsonSchemaToAirbyteType +import io.airbyte.integrations.destination.bigquery.spec.BigqueryConfiguration import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.airbyte.protocol.models.v0.DestinationSyncMode import io.micronaut.context.annotation.Factory @@ -80,19 +82,27 @@ class SafeDestinationCatalogFactory { fun checkCatalog( namespaceMapper: NamespaceMapper, @Named("checkNamespace") checkNamespace: String?, + config: BigqueryConfiguration, ): DestinationCatalog { // Copied from DefaultDestinationCatalogFactory to maintain behavior val date = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyyMMdd")) val random = RandomStringUtils.randomAlphabetic(5).lowercase() val namespace = checkNamespace ?: "${CHECK_STREAM_NAMESPACE}_$date$random" + val schemaFields = linkedMapOf("test" to FieldType(IntegerType, nullable = true)) + if ( + !config.defaultPartitioningField.isNullOrBlank() && + config.defaultPartitioningField != "_airbyte_extracted_at" + ) { + schemaFields[config.defaultPartitioningField] = + FieldType(TimestampTypeWithTimezone, nullable = true) + } return DestinationCatalog( listOf( DestinationStream( unmappedNamespace = namespace, unmappedName = "test$date$random", importType = Append, - schema = - ObjectType(linkedMapOf("test" to FieldType(IntegerType, nullable = true))), + schema = ObjectType(schemaFields), generationId = 1, minimumGenerationId = 0, syncId = 1, diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/kotlin/io/airbyte/integrations/destination/bigquery/SafeDestinationCatalogFactoryTest.kt b/airbyte-integrations/connectors/destination-bigquery/src/test/kotlin/io/airbyte/integrations/destination/bigquery/SafeDestinationCatalogFactoryTest.kt index f85fb3dbf511..82c96675535a 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test/kotlin/io/airbyte/integrations/destination/bigquery/SafeDestinationCatalogFactoryTest.kt +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/kotlin/io/airbyte/integrations/destination/bigquery/SafeDestinationCatalogFactoryTest.kt @@ -6,7 +6,12 @@ package io.airbyte.integrations.destination.bigquery import io.airbyte.cdk.load.command.Dedupe import io.airbyte.cdk.load.command.NamespaceMapper +import io.airbyte.cdk.load.data.ObjectType import io.airbyte.cdk.load.data.json.JsonSchemaToAirbyteType +import io.airbyte.integrations.destination.bigquery.spec.BatchedStandardInsertConfiguration +import io.airbyte.integrations.destination.bigquery.spec.BigqueryConfiguration +import io.airbyte.integrations.destination.bigquery.spec.BigqueryRegion +import io.airbyte.integrations.destination.bigquery.spec.CdcDeletionMode import io.airbyte.protocol.models.v0.AirbyteStream import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream @@ -62,10 +67,43 @@ class SafeDestinationCatalogFactoryTest { val factory = SafeDestinationCatalogFactory() val namespaceMapper = mockk(relaxed = true) - val destCatalog = factory.checkCatalog(namespaceMapper, "custom_check_ns") + val destCatalog = factory.checkCatalog(namespaceMapper, "custom_check_ns", config()) assertEquals(1, destCatalog.streams.size) assertEquals("custom_check_ns", destCatalog.streams.first().unmappedNamespace) assert(destCatalog.streams.first().unmappedName.startsWith("test")) } + + @Test + fun `test checkCatalog adds default partitioning field to check stream schema`() { + val factory = SafeDestinationCatalogFactory() + val namespaceMapper = mockk(relaxed = true) + + val destCatalog = + factory.checkCatalog( + namespaceMapper, + "custom_check_ns", + config(defaultPartitioningField = "created_at"), + ) + + val schema = destCatalog.streams.first().schema as ObjectType + assert(schema.properties.containsKey("created_at")) + } + + private fun config(defaultPartitioningField: String? = null): BigqueryConfiguration = + BigqueryConfiguration( + projectId = "test-project", + datasetLocation = BigqueryRegion.US, + datasetId = "test_dataset", + loadingMethod = BatchedStandardInsertConfiguration, + credentialsJson = null, + cdcDeletionMode = CdcDeletionMode.HARD_DELETE, + internalTableDataset = "airbyte_internal", + legacyRawTablesOnly = false, + defaultPartitioningField = defaultPartitioningField, + defaultClusteringField = null, + defaultTableSuffix = null, + defaultPartitioningGranularity = null, + streamConfigMap = emptyMap(), + ) }