From b488213db424574270bc11365c10a65273bdf99e Mon Sep 17 00:00:00 2001 From: Stephane Geneix <147216312+stephane-airbyte@users.noreply.github.com> Date: Thu, 23 May 2024 11:36:46 -0700 Subject: [PATCH] remove unnecessary calls to toList() (#37540) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ## How ## Review guide ## User Impact ## Can this PR be safely reverted and rolled back? - [ ] YES 💚 - [ ] NO ❌ --- .../integrations/base/IntegrationRunner.kt | 2 +- .../destination/StandardNameTransformer.kt | 4 +- .../destination/async/DetectStreamToFlush.kt | 26 +++--- .../async/state/GlobalAsyncStateManager.kt | 4 +- .../normalization/SentryExceptionHelper.kt | 6 +- .../InMemoryRecordBufferingStrategy.kt | 2 +- .../util/ConnectorExceptionUtil.kt | 4 +- .../concurrent/ConcurrentStreamConsumer.kt | 1 - .../io/airbyte/cdk/db/jdbc/TestJdbcUtils.kt | 8 +- .../cdk/db/jdbc/TestStreamingJdbcDatabase.kt | 2 +- .../async/AsyncStreamConsumerTest.kt | 7 +- .../state/GlobalAsyncStateManagerTest.kt | 72 ++++++++-------- .../BufferedStreamConsumerTest.kt | 10 ++- .../ConcurrentStreamConsumerTest.kt | 2 +- .../airbyte/cdk/testutils/ContainerFactory.kt | 11 +-- .../io/airbyte/cdk/testutils/TestDatabase.kt | 7 +- .../cdk/db/bigquery/BigQueryDatabase.kt | 14 ++-- .../io/airbyte/cdk/db/mongodb/MongoUtils.kt | 25 +++--- .../typing_deduping/JdbcDestinationHandler.kt | 9 +- .../jdbc/typing_deduping/JdbcSqlGenerator.kt | 18 ++-- .../destination/DestinationAcceptanceTest.kt | 67 ++++++--------- .../JdbcSqlGeneratorIntegrationTest.kt | 34 ++++---- .../AirbyteFileOffsetBackingStore.kt | 1 + .../source/jdbc/AbstractJdbcSource.kt | 6 +- .../source/relationaldb/AbstractDbSource.kt | 9 +- .../relationaldb/DbSourceDiscoverUtil.kt | 71 ++++++++-------- .../relationaldb/RelationalDbReadUtil.kt | 2 - .../relationaldb/state/GlobalStateManager.kt | 1 - .../relationaldb/state/StateGeneratorUtils.kt | 52 ++++++------ .../relationaldb/state/StreamStateManager.kt | 2 +- .../integrations/debezium/CdcSourceTest.kt | 1 - .../jdbc/test/JdbcSourceAcceptanceTest.kt | 82 ++++++++----------- .../source/AbstractSourceDatabaseTypeTest.kt | 54 ++++++------ .../source/PythonSourceAcceptanceTest.kt | 5 +- .../source/SourceAcceptanceTest.kt | 11 ++- .../kotlin/io/airbyte/commons/enums/Enums.kt | 2 +- .../io/airbyte/commons/json/JsonPaths.kt | 6 +- .../io/airbyte/commons/json/JsonSchemas.kt | 9 +- .../validation/json/JsonSchemaValidator.kt | 10 +-- .../concurrency/CompletableFuturesTest.kt | 4 +- .../io/airbyte/commons/json/JsonPathsTest.kt | 12 +-- .../DefaultAirbyteStreamFactoryTest.kt | 30 +++---- .../workers/helper/CatalogClientConverters.kt | 15 ++-- .../airbyte/workers/helper/FailureHelper.kt | 2 +- ...GcsAvroParquetDestinationAcceptanceTest.kt | 6 +- .../destination/s3/S3StorageOperations.kt | 6 +- .../s3/avro/AvroNameTransformer.kt | 4 +- .../destination/s3/avro/JsonSchemaType.kt | 16 ++-- .../s3/avro/JsonToAvroSchemaConverter.kt | 23 +++--- .../csv/RootLevelFlatteningSheetGenerator.kt | 1 - .../s3/parquet/ParquetSerializedBufferTest.kt | 3 +- .../S3AvroParquetDestinationAcceptanceTest.kt | 14 ++-- .../s3/S3DestinationAcceptanceTest.kt | 8 +- .../typing_deduping/AirbyteType.kt | 9 +- .../typing_deduping/CatalogParser.kt | 4 +- .../typing_deduping/DefaultTyperDeduper.kt | 2 +- .../typing_deduping/FutureUtils.kt | 2 +- .../base/destination/typing_deduping/Sql.kt | 45 +++++----- .../typing_deduping/BaseTypingDedupingTest.kt | 3 +- .../typing_deduping/RecordDiffer.kt | 10 +-- 60 files changed, 383 insertions(+), 495 deletions(-) diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt index d5938ba475c70..b38f6d41561b6 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/IntegrationRunner.kt @@ -481,7 +481,7 @@ internal constructor( ) { val currentThread = Thread.currentThread() - val runningThreads = ThreadUtils.getAllThreads().filter(::filterOrphanedThread).toList() + val runningThreads = ThreadUtils.getAllThreads().filter(::filterOrphanedThread) if (runningThreads.isNotEmpty()) { LOGGER.warn( """ diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StandardNameTransformer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StandardNameTransformer.kt index 106160b776fb7..131a0bf26bde4 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StandardNameTransformer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/StandardNameTransformer.kt @@ -73,9 +73,7 @@ open class StandardNameTransformer : NamingConventionTransformer { return Jsons.jsonNode>(properties) } else if (root.isArray) { return Jsons.jsonNode( - MoreIterators.toList(root.elements()) - .map { r: JsonNode -> formatJsonPath(r) } - .toList() + MoreIterators.toList(root.elements()).map { r: JsonNode -> formatJsonPath(r) } ) } else { return root diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/DetectStreamToFlush.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/DetectStreamToFlush.kt index 92e43b748b6d9..db61cdaebf45d 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/DetectStreamToFlush.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/DetectStreamToFlush.kt @@ -241,20 +241,18 @@ internal constructor( streamDescriptor, ) } - return streams - .sortedWith( - Comparator.comparing( - { s: StreamDescriptor -> sdToQueueSize[s]!!.orElseThrow() }, - Comparator.reverseOrder(), - ) // if no time is present, it suggests the queue has no records. set MAX time - // as a sentinel value to - // represent no records. - .thenComparing { s: StreamDescriptor -> - sdToTimeOfLastRecord[s]!!.orElse(Instant.MAX) - } - .thenComparing { s: StreamDescriptor -> s.namespace + s.name }, - ) - .toList() + return streams.sortedWith( + Comparator.comparing( + { s: StreamDescriptor -> sdToQueueSize[s]!!.orElseThrow() }, + Comparator.reverseOrder(), + ) // if no time is present, it suggests the queue has no records. set MAX time + // as a sentinel value to + // represent no records. + .thenComparing { s: StreamDescriptor -> + sdToTimeOfLastRecord[s]!!.orElse(Instant.MAX) + } + .thenComparing { s: StreamDescriptor -> s.namespace + s.name }, + ) } companion object { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt index 2db1c727fd07e..d2e56c7469775 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManager.kt @@ -281,9 +281,7 @@ class GlobalAsyncStateManager(private val memoryManager: GlobalMemoryManager) { // into the non-STREAM world for correctness. synchronized(lock) { aliasIds.addAll( - descToStateIdQ.values - .flatMap { obj: LinkedBlockingDeque -> obj } - .toList(), + descToStateIdQ.values.flatMap { obj: LinkedBlockingDeque -> obj }, ) descToStateIdQ.clear() retroactiveGlobalStateId = StateIdProvider.nextId diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/normalization/SentryExceptionHelper.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/normalization/SentryExceptionHelper.kt index 74548816238b3..c43b0589060fe 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/normalization/SentryExceptionHelper.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/normalization/SentryExceptionHelper.kt @@ -122,11 +122,7 @@ object SentryExceptionHelper { errorMessageAndType[ErrorMapKeys.ERROR_MAP_MESSAGE_KEY] = String.format( "%s", - stacktraceLines[ - Arrays.stream(stacktraceLines) - .toList() - .indexOf(followingLine) + 1 - ] + stacktraceLines[stacktraceLines.indexOf(followingLine) + 1] .trim { it <= ' ' } ) errorMessageAndType[ErrorMapKeys.ERROR_MAP_TYPE_KEY] = diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.kt index 70dfaffbf1e91..92ed3b5fb71ea 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/destination/record_buffer/InMemoryRecordBufferingStrategy.kt @@ -71,7 +71,7 @@ class InMemoryRecordBufferingStrategy( stream.name, streamBuffer[stream]!!.size ) - recordWriter.accept(stream, streamBuffer[stream]!!.toList()) + recordWriter.accept(stream, streamBuffer[stream]!!) LOGGER.info("Flushing completed for {}", stream.name) } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConnectorExceptionUtil.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConnectorExceptionUtil.kt index ba9f9dce7be6c..38f185f459dd0 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConnectorExceptionUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/ConnectorExceptionUtil.kt @@ -123,12 +123,12 @@ object ConnectorExceptionUtil { initialMessage: String, eithers: List> ): List { - val throwables: List = eithers.filter { it.isLeft() }.map { it.left!! }.toList() + val throwables: List = eithers.filter { it.isLeft() }.map { it.left!! } if (throwables.isNotEmpty()) { logAllAndThrowFirst(initialMessage, throwables) } // No need to filter on isRight since isLeft will throw before reaching this line. - return eithers.map { obj: Either -> obj.right!! }.toList() + return eithers.map { obj: Either -> obj.right!! } } private fun isTransientErrorException(e: Throwable?): Boolean { diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumer.kt index 22b826e747dbf..6c392c3cd132b 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumer.kt @@ -83,7 +83,6 @@ class ConcurrentStreamConsumer( .map { runnable: ConcurrentStreamRunnable -> CompletableFuture.runAsync(runnable, executorService) } - .toList() /* * Wait for the submitted streams to complete before returning. This uses the join() method to allow diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestJdbcUtils.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestJdbcUtils.kt index 7f57d453d328a..30a8f76fcc3bb 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestJdbcUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestJdbcUtils.kt @@ -119,10 +119,10 @@ internal class TestJdbcUtils { val rs = connection.createStatement().executeQuery("SELECT * FROM id_and_name;") val actual = JdbcDatabase.toUnsafeStream(rs) { queryContext: ResultSet -> - sourceOperations.rowToJson(queryContext) - } - .toList() - Assertions.assertEquals(RECORDS_AS_JSON, actual) + sourceOperations.rowToJson(queryContext) + } + + Assertions.assertEquals(RECORDS_AS_JSON, actual.toList()) } } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestStreamingJdbcDatabase.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestStreamingJdbcDatabase.kt index 3693a8aef75e7..f9313761e3231 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestStreamingJdbcDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/db/jdbc/TestStreamingJdbcDatabase.kt @@ -144,7 +144,7 @@ internal class TestStreamingJdbcDatabase { // This check assumes that FetchSizeConstants.TARGET_BUFFER_BYTE_SIZE = 200 MB. // Update this check if the buffer size constant is changed. Assertions.assertEquals(2, fetchSizes.size) - val sortedSizes = fetchSizes.sorted().toList() + val sortedSizes = fetchSizes.sorted() Assertions.assertTrue(sortedSizes[0] < FetchSizeConstants.INITIAL_SAMPLE_SIZE) Assertions.assertEquals(FetchSizeConstants.INITIAL_SAMPLE_SIZE, sortedSizes[1]) } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt index 7b49b5d410ace..4d9e03cb450b6 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/AsyncStreamConsumerTest.kt @@ -548,12 +548,7 @@ class AsyncStreamConsumerTest { ) // captures the output of all the workers, since our records could come out in any of them. - val actualRecords = - argumentCaptor.allValues - .stream() // flatten those results into a single list for the simplicity of - // comparison - .flatMap { s: Stream<*> -> s } - .toList() + val actualRecords = argumentCaptor.allValues.flatMap { it.toList() } val expRecords = allRecords.map { m: AirbyteMessage -> diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManagerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManagerTest.kt index 8a15090c84850..896e6c04b408d 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManagerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/async/state/GlobalAsyncStateManagerTest.kt @@ -225,7 +225,7 @@ class GlobalAsyncStateManagerTest { val stateWithStats2 = emittedStatesFromDestination.associateWith { it.state?.destinationStats } assertEquals( - listOf( + setOf( attachDestinationStateStats( Jsons.deserialize( STREAM1_STATE_MESSAGE1.serialized, @@ -234,7 +234,7 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats2.keys.toList(), + stateWithStats2.keys, ) assertEquals(listOf(expectedDestinationStats), stateWithStats2.values.toList()) } @@ -267,7 +267,7 @@ class GlobalAsyncStateManagerTest { emittedStatesFromDestination.associateWith { it.state?.destinationStats } // assertEquals( - listOf( + setOf( attachDestinationStateStats( Jsons.deserialize( GLOBAL_STATE_MESSAGE1.serialized, @@ -276,7 +276,7 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats.keys.toList(), + stateWithStats.keys, ) assertEquals(listOf(expectedDestinationStats), stateWithStats.values.toList()) @@ -329,7 +329,7 @@ class GlobalAsyncStateManagerTest { val stateWithStats = emittedStatesFromDestination.associateWith { it.state?.destinationStats } assertEquals( - listOf( + setOf( attachDestinationStateStats( Jsons.deserialize( GLOBAL_STATE_MESSAGE1.serialized, @@ -338,7 +338,7 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats.keys.toList(), + stateWithStats.keys, ) assertEquals(listOf(expectedDestinationStats), stateWithStats.values.toList()) } @@ -361,7 +361,7 @@ class GlobalAsyncStateManagerTest { val stateWithStats = emittedStatesFromDestination.associateWith { it.state?.destinationStats } assertEquals( - listOf( + setOf( attachDestinationStateStats( Jsons.deserialize( GLOBAL_STATE_MESSAGE1.serialized, @@ -370,7 +370,7 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats.keys.toList(), + stateWithStats.keys, ) assertEquals(listOf(expectedDestinationStats), stateWithStats.values.toList()) @@ -387,7 +387,7 @@ class GlobalAsyncStateManagerTest { val stateWithStats2 = emittedStatesFromDestination.associateWith { it.state?.destinationStats } assertEquals( - listOf( + setOf( attachDestinationStateStats( Jsons.deserialize( GLOBAL_STATE_MESSAGE2.serialized, @@ -396,7 +396,7 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats2.keys.toList(), + stateWithStats2.keys, ) assertEquals(listOf(expectedDestinationStats), stateWithStats2.values.toList()) } @@ -419,7 +419,7 @@ class GlobalAsyncStateManagerTest { val stateWithStats = emittedStatesFromDestination.associateWith { it.state?.destinationStats } assertEquals( - listOf( + setOf( attachDestinationStateStats( Jsons.deserialize( GLOBAL_STATE_MESSAGE1.serialized, @@ -428,7 +428,7 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats.keys.toList(), + stateWithStats.keys, ) assertEquals(listOf(expectedDestinationStats), stateWithStats.values.toList()) emittedStatesFromDestination.clear() @@ -443,7 +443,7 @@ class GlobalAsyncStateManagerTest { val stateWithStats2 = emittedStatesFromDestination.associateWith { it.state?.destinationStats } assertEquals( - listOf( + setOf( attachDestinationStateStats( Jsons.deserialize( GLOBAL_STATE_MESSAGE2.serialized, @@ -452,7 +452,7 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats2, ), ), - stateWithStats2.keys.toList(), + stateWithStats2.keys, ) assertEquals( listOf(expectedDestinationStats2), @@ -471,7 +471,7 @@ class GlobalAsyncStateManagerTest { val stateWithStats3 = emittedStatesFromDestination.associateWith { it.state?.destinationStats } assertEquals( - listOf( + setOf( attachDestinationStateStats( Jsons.deserialize( GLOBAL_STATE_MESSAGE3.serialized, @@ -480,7 +480,7 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats3.keys.toList(), + stateWithStats3.keys, ) assertEquals(listOf(expectedDestinationStats), stateWithStats3.values.toList()) } @@ -506,7 +506,7 @@ class GlobalAsyncStateManagerTest { val stateWithStats = emittedStatesFromDestination.associateWith { it.state?.destinationStats } assertEquals( - listOf( + setOf( attachDestinationStateStats( Jsons.deserialize( GLOBAL_STATE_MESSAGE1.serialized, @@ -515,7 +515,7 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats.keys.toList(), + stateWithStats.keys, ) assertEquals(listOf(expectedDestinationStats), stateWithStats.values.toList()) emittedStatesFromDestination.clear() @@ -533,7 +533,7 @@ class GlobalAsyncStateManagerTest { val stateWithStats2 = emittedStatesFromDestination.associateWith { it.state?.destinationStats } assertEquals( - listOf( + setOf( attachDestinationStateStats( Jsons.deserialize( GLOBAL_STATE_MESSAGE2.serialized, @@ -542,7 +542,7 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats2.keys.toList(), + stateWithStats2.keys, ) assertEquals(listOf(expectedDestinationStats), stateWithStats2.values.toList()) } @@ -567,7 +567,7 @@ class GlobalAsyncStateManagerTest { val stateWithStats = emittedStatesFromDestination.associateWith { it.state?.destinationStats } assertEquals( - listOf( + setOf( attachDestinationStateStats( Jsons.deserialize( STREAM1_STATE_MESSAGE1.serialized, @@ -576,7 +576,7 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats.keys.toList(), + stateWithStats.keys, ) assertEquals(listOf(expectedDestinationStats), stateWithStats.values.toList()) @@ -609,7 +609,7 @@ class GlobalAsyncStateManagerTest { val stateWithStats = emittedStatesFromDestination.associateWith { it.state?.destinationStats } assertEquals( - listOf( + setOf( attachDestinationStateStats( Jsons.deserialize( STREAM1_STATE_MESSAGE1.serialized, @@ -618,7 +618,7 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats.keys.toList(), + stateWithStats.keys, ) assertEquals(listOf(expectedDestinationStats), stateWithStats.values.toList()) @@ -636,7 +636,7 @@ class GlobalAsyncStateManagerTest { val stateWithStats2 = emittedStatesFromDestination.associateWith { it.state?.destinationStats } assertEquals( - listOf( + setOf( attachDestinationStateStats( Jsons.deserialize( STREAM1_STATE_MESSAGE2.serialized, @@ -645,7 +645,7 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats2, ), ), - stateWithStats2.keys.toList(), + stateWithStats2.keys, ) assertEquals( listOf(expectedDestinationStats2), @@ -671,7 +671,7 @@ class GlobalAsyncStateManagerTest { val stateWithStats = emittedStatesFromDestination.associateWith { it.state?.destinationStats } assertEquals( - listOf( + setOf( attachDestinationStateStats( Jsons.deserialize( STREAM1_STATE_MESSAGE1.serialized, @@ -680,7 +680,7 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats.keys.toList(), + stateWithStats.keys, ) assertEquals(listOf(expectedDestinationStats), stateWithStats.values.toList()) emittedStatesFromDestination.clear() @@ -695,7 +695,7 @@ class GlobalAsyncStateManagerTest { emittedStatesFromDestination.associateWith { it.state?.destinationStats } val expectedDestinationStats2 = AirbyteStateStats().withRecordCount(0.0) assertEquals( - listOf( + setOf( attachDestinationStateStats( Jsons.deserialize( STREAM1_STATE_MESSAGE2.serialized, @@ -704,7 +704,7 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats2, ), ), - stateWithStats2.keys.toList(), + stateWithStats2.keys, ) assertEquals( listOf(expectedDestinationStats2), @@ -724,7 +724,7 @@ class GlobalAsyncStateManagerTest { emittedStatesFromDestination.associateWith { it.state?.destinationStats } val expectedDestinationStats3 = AirbyteStateStats().withRecordCount(10.0) assertEquals( - listOf( + setOf( attachDestinationStateStats( Jsons.deserialize( STREAM1_STATE_MESSAGE3.serialized, @@ -733,7 +733,7 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats3, ), ), - stateWithStats3.keys.toList(), + stateWithStats3.keys, ) assertEquals( listOf(expectedDestinationStats3), @@ -761,7 +761,7 @@ class GlobalAsyncStateManagerTest { val stateWithStats = emittedStatesFromDestination.associateWith { it.state?.destinationStats } assertEquals( - listOf( + setOf( attachDestinationStateStats( Jsons.deserialize( STREAM1_STATE_MESSAGE1.serialized, @@ -770,7 +770,7 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats, ), ), - stateWithStats.keys.toList(), + stateWithStats.keys, ) assertEquals(listOf(expectedDestinationStats), stateWithStats.values.toList()) emittedStatesFromDestination.clear() @@ -794,7 +794,7 @@ class GlobalAsyncStateManagerTest { val stateWithStats2 = emittedStatesFromDestination.associateWith { it.state?.destinationStats } assertEquals( - listOf( + setOf( attachDestinationStateStats( Jsons.deserialize( STREAM2_STATE_MESSAGE.serialized, @@ -803,7 +803,7 @@ class GlobalAsyncStateManagerTest { expectedDestinationStats2, ), ), - stateWithStats2.keys.toList(), + stateWithStats2.keys, ) assertEquals( listOf(expectedDestinationStats2), diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.kt index 2889871675bc5..d8b5be49db9ff 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/destination/buffered_stream_consumer/BufferedStreamConsumerTest.kt @@ -145,9 +145,11 @@ class BufferedStreamConsumerTest { verifyStartAndClose() val expectedRecords = - Lists.newArrayList(expectedRecordsBatch1, expectedRecordsBatch2) - .flatMap { obj: List -> obj } - .toList() + Lists.newArrayList(expectedRecordsBatch1, expectedRecordsBatch2).flatMap { + obj: List -> + obj + } + verifyRecords(STREAM_NAME, SCHEMA_NAME, expectedRecords) Mockito.verify(outputRecordCollector).accept(STATE_MESSAGE1) @@ -580,7 +582,7 @@ class BufferedStreamConsumerTest { Mockito.verify(recordWriter) .accept( AirbyteStreamNameNamespacePair(streamName, namespace), - expectedRecords.map { obj: AirbyteMessage -> obj.record }.toList() + expectedRecords.map { obj: AirbyteMessage -> obj.record } ) } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumerTest.kt b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumerTest.kt index d23ef04559007..9dd4a383c5277 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumerTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/test/kotlin/io/airbyte/cdk/integrations/util/concurrent/ConcurrentStreamConsumerTest.kt @@ -107,7 +107,7 @@ internal class ConcurrentStreamConsumerTest { val concurrentStreamConsumer = ConcurrentStreamConsumer(streamConsumer, streams.size) val partitionSize = concurrentStreamConsumer.parallelism - val partitions = Lists.partition(streams.toList(), partitionSize) + val partitions = Lists.partition(streams, partitionSize) for (partition in partitions) { Assertions.assertDoesNotThrow { concurrentStreamConsumer.accept(partition) } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/ContainerFactory.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/ContainerFactory.kt index 9f4abc795c560..886785d5ff3bc 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/ContainerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/ContainerFactory.kt @@ -12,7 +12,6 @@ import java.util.concurrent.ConcurrentMap import java.util.concurrent.atomic.AtomicInteger import java.util.function.Consumer import java.util.function.Supplier -import java.util.stream.Stream import kotlin.concurrent.Volatile import org.slf4j.Logger import org.slf4j.LoggerFactory @@ -87,9 +86,7 @@ abstract class ContainerFactory> { fun shared(imageName: String, vararg methods: String): C { return shared( imageName, - Stream.of(*methods) - .map { n: String -> NamedContainerModifierImpl(n, resolveModifierByName(n)) } - .toList() + methods.map { n: String -> NamedContainerModifierImpl(n, resolveModifierByName(n)) } ) } @@ -106,7 +103,7 @@ abstract class ContainerFactory> { ContainerKey( javaClass, DockerImageName.parse(imageName), - namedContainerModifiers.map { it.name() }.toList() + namedContainerModifiers.map { it.name() } ) // We deliberately avoid creating the container itself eagerly during the evaluation of the // map @@ -131,9 +128,7 @@ abstract class ContainerFactory> { fun exclusive(imageName: String, vararg methods: String): C { return exclusive( imageName, - Stream.of(*methods) - .map { n: String -> NamedContainerModifierImpl(n, resolveModifierByName(n)) } - .toList() + methods.map { n: String -> NamedContainerModifierImpl(n, resolveModifierByName(n)) } ) } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt index ea9b0f3c7cf46..f71d93947ef08 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt @@ -184,16 +184,13 @@ protected constructor(val container: C) : AutoCloseable { protected fun execInContainer(cmds: Stream) { val cmd = cmds.toList() - if (cmd!!.isEmpty()) { + if (cmd.isEmpty()) { return } try { LOGGER!!.info( formatLogLine( - String.format( - "executing command %s", - Strings.join(cmd.toList().asIterable(), " ") - ) + String.format("executing command %s", Strings.join(cmd.asIterable(), " ")) ) ) val exec = container.execInContainer(*cmd.toTypedArray()) diff --git a/airbyte-cdk/java/airbyte-cdk/datastore-bigquery/src/main/kotlin/io/airbyte/cdk/db/bigquery/BigQueryDatabase.kt b/airbyte-cdk/java/airbyte-cdk/datastore-bigquery/src/main/kotlin/io/airbyte/cdk/db/bigquery/BigQueryDatabase.kt index d123206000e54..aeb4cb550a445 100644 --- a/airbyte-cdk/java/airbyte-cdk/datastore-bigquery/src/main/kotlin/io/airbyte/cdk/db/bigquery/BigQueryDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/datastore-bigquery/src/main/kotlin/io/airbyte/cdk/db/bigquery/BigQueryDatabase.kt @@ -98,14 +98,12 @@ constructor( @Throws(Exception::class) override fun unsafeQuery(sql: String?, vararg params: String): Stream { val parameterValueList = - Arrays.stream(params) - .map { param: String -> - QueryParameterValue.newBuilder() - .setValue(param) - .setType(StandardSQLTypeName.STRING) - .build() - } - .toList() + params.map { param: String -> + QueryParameterValue.newBuilder() + .setValue(param) + .setType(StandardSQLTypeName.STRING) + .build() + } return query(sql, parameterValueList) } diff --git a/airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoUtils.kt b/airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoUtils.kt index e2929a4b96739..3d8d35c9b1eea 100644 --- a/airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/datastore-mongo/src/main/kotlin/io/airbyte/cdk/db/mongodb/MongoUtils.kt @@ -127,9 +127,10 @@ object MongoUtils { val field = node.data if (node.hasChildren()) { val subFields = - node.children!! - .map { obj: TreeNode> -> nodeToCommonField(obj) } - .toList() + node.children!!.map { obj: TreeNode> -> + nodeToCommonField(obj) + } + return CommonField(field.name, field.type, subFields) } else { return CommonField(field.name, field.type) @@ -294,17 +295,15 @@ object MongoUtils { ): List>> { val allkeys = HashSet(getFieldsName(collection)) - return allkeys - .map { key: String -> - val types = getTypes(collection, key) - val type = getUniqueType(types) - val fieldNode = TreeNode(CommonField(transformName(types, key), type)) - if (type == BsonType.DOCUMENT) { - setSubFields(collection, fieldNode, key) - } - fieldNode + return allkeys.map { key: String -> + val types = getTypes(collection, key) + val type = getUniqueType(types) + val fieldNode = TreeNode(CommonField(transformName(types, key), type)) + if (type == BsonType.DOCUMENT) { + setSubFields(collection, fieldNode, key) } - .toList() + fieldNode + } } /** diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt index 80cfc583e27dc..83974a6cc9b50 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcDestinationHandler.kt @@ -207,11 +207,10 @@ abstract class JdbcDestinationHandler( } val initialStates = - streamConfigs - .map { streamConfig: StreamConfig -> - retrieveState(destinationStatesFuture, streamConfig) - } - .toList() + streamConfigs.map { streamConfig: StreamConfig -> + retrieveState(destinationStatesFuture, streamConfig) + } + val states = CompletableFutures.allOf(initialStates).toCompletableFuture().join() return getResultsOrLogAndThrowFirst("Failed to retrieve initial state", states) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt index a74a857fcb655..fc4dbd5629794 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/jdbc/typing_deduping/JdbcSqlGenerator.kt @@ -172,11 +172,10 @@ constructor( metaColumns: Map> ): List> { val fields = - metaColumns.entries - .map { metaColumn: Map.Entry?> -> - DSL.field(DSL.quotedName(metaColumn.key), metaColumn.value) - } - .toList() + metaColumns.entries.map { metaColumn: Map.Entry?> -> + DSL.field(DSL.quotedName(metaColumn.key), metaColumn.value) + } + val dataFields = columns.entries .map { column: Map.Entry -> @@ -220,11 +219,10 @@ constructor( useExpensiveSaferCasting: Boolean ): List> { val fields = - metaColumns.entries - .map { metaColumn: Map.Entry?> -> - DSL.field(DSL.quotedName(metaColumn.key), metaColumn.value) - } - .toList() + metaColumns.entries.map { metaColumn: Map.Entry?> -> + DSL.field(DSL.quotedName(metaColumn.key), metaColumn.value) + } + // Use originalName with non-sanitized characters when extracting data from _airbyte_data val dataFields = extractRawDataFields(columns, useExpensiveSaferCasting) dataFields.addAll(fields) diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt index 8ba9994e885ef..ac4b6b0e97c60 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/DestinationAcceptanceTest.kt @@ -428,13 +428,9 @@ abstract class DestinationAcceptanceTest { ) val configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog) val messages: List = - MoreResources.readResource(messagesFilename) - .trim() - .lines() - .map { - Jsons.deserialize(it, io.airbyte.protocol.models.v0.AirbyteMessage::class.java) - } - .toList() + MoreResources.readResource(messagesFilename).trim().lines().map { + Jsons.deserialize(it, io.airbyte.protocol.models.v0.AirbyteMessage::class.java) + } val config = getConfig() val defaultSchema = getDefaultSchema(config) @@ -458,13 +454,9 @@ abstract class DestinationAcceptanceTest { ) val configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog) val messages: List = - MoreResources.readResource(messagesFilename) - .trim() - .lines() - .map { - Jsons.deserialize(it, io.airbyte.protocol.models.v0.AirbyteMessage::class.java) - } - .toList() + MoreResources.readResource(messagesFilename).trim().lines().map { + Jsons.deserialize(it, io.airbyte.protocol.models.v0.AirbyteMessage::class.java) + } val largeNumberRecords = Collections.nCopies(400, messages) @@ -482,7 +474,6 @@ abstract class DestinationAcceptanceTest { else message.toString() } ) - .toList() val config = getConfig() runSyncAndVerifyStateOutput(config, largeNumberRecords, configuredCatalog, false) @@ -522,7 +513,7 @@ abstract class DestinationAcceptanceTest { io.airbyte.protocol.models.v0.AirbyteMessage::class.java ) } - .toList() + val config = getConfig() runSyncAndVerifyStateOutput(config, firstSyncMessages, configuredCatalog, false) @@ -714,7 +705,7 @@ abstract class DestinationAcceptanceTest { .trim() .lines() .map { Jsons.deserialize(it, AirbyteMessage::class.java) } - .toList() + val config = getConfig() runSyncAndVerifyStateOutput(config, firstSyncMessages, configuredCatalog, false) val secondSyncMessages: List = @@ -848,14 +839,13 @@ abstract class DestinationAcceptanceTest { // We expect all the of messages to be missing the removed column after normalization. val expectedMessages = - messages - .map { message: io.airbyte.protocol.models.v0.AirbyteMessage -> - if (message.record != null) { - (message.record.data as ObjectNode).remove("HKD") - } - message + messages.map { message: io.airbyte.protocol.models.v0.AirbyteMessage -> + if (message.record != null) { + (message.record.data as ObjectNode).remove("HKD") } - .toList() + message + } + assertSameMessages(expectedMessages, actualMessages, true) } @@ -1024,7 +1014,6 @@ abstract class DestinationAcceptanceTest { it.record.data["NZD"].asText() (it.record.emittedAt == latestMessagesOnly[key]!!.record.emittedAt) } - .toList() val defaultSchema = getDefaultSchema(config) retrieveRawRecordsAndAssertSameMessages( @@ -1695,14 +1684,14 @@ abstract class DestinationAcceptanceTest { val streamName = stream.name val schema = if (stream.namespace != null) stream.namespace else defaultSchema!! val msgList = - retrieveRecords(testEnv, streamName, schema, stream.jsonSchema) - .map { data: JsonNode -> - AirbyteRecordMessage() - .withStream(streamName) - .withNamespace(schema) - .withData(data) - } - .toList() + retrieveRecords(testEnv, streamName, schema, stream.jsonSchema).map { data: JsonNode + -> + AirbyteRecordMessage() + .withStream(streamName) + .withNamespace(schema) + .withData(data) + } + actualMessages.addAll(msgList) } @@ -1726,7 +1715,6 @@ abstract class DestinationAcceptanceTest { if (pruneAirbyteInternalFields) safePrune(recordMessage) else recordMessage } .map { obj: AirbyteRecordMessage -> obj.data } - .toList() val actualProcessed = actual @@ -1734,7 +1722,6 @@ abstract class DestinationAcceptanceTest { if (pruneAirbyteInternalFields) safePrune(recordMessage) else recordMessage } .map { obj: AirbyteRecordMessage -> obj.data } - .toList() _testDataComparator.assertSameData(expectedProcessed, actualProcessed) } @@ -1750,11 +1737,11 @@ abstract class DestinationAcceptanceTest { val streamName = stream.name val msgList = - retrieveNormalizedRecords(testEnv, streamName, defaultSchema) - .map { data: JsonNode -> - AirbyteRecordMessage().withStream(streamName).withData(data) - } - .toList() + retrieveNormalizedRecords(testEnv, streamName, defaultSchema).map { data: JsonNode + -> + AirbyteRecordMessage().withStream(streamName).withData(data) + } + actualMessages.addAll(msgList) } return actualMessages diff --git a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt index 6830476dba83e..05b21c9334128 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/destination/typing_deduping/JdbcSqlGeneratorIntegrationTest.kt @@ -56,32 +56,28 @@ abstract class JdbcSqlGeneratorIntegrationTest DSL.field(DSL.quotedName(columnName)) } - .toList() + columnNames.map { columnName: String -> DSL.field(DSL.quotedName(columnName)) } ) for (record in records) { insert = insert.values( - columnNames - .map { fieldName: String -> - // Convert this field to a string. Pretty naive implementation. - val column = record[fieldName] - val columnAsString = - if (column == null) { - null - } else if (column.isTextual) { - column.asText() - } else { - column.toString() - } - if (Arrays.asList(*columnsToParseJson).contains(fieldName)) { - return@map toJsonValue(columnAsString) + columnNames.map { fieldName: String -> + // Convert this field to a string. Pretty naive implementation. + val column = record[fieldName] + val columnAsString = + if (column == null) { + null + } else if (column.isTextual) { + column.asText() } else { - return@map DSL.`val`(columnAsString) + column.toString() } + if (Arrays.asList(*columnsToParseJson).contains(fieldName)) { + return@map toJsonValue(columnAsString) + } else { + return@map DSL.`val`(columnAsString) } - .toList() + } ) } database.execute(insert.getSQL(ParamType.INLINED)) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt index de8399a0213d6..2d8eed56d3746 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt @@ -61,6 +61,7 @@ class AirbyteFileOffsetBackingStore( private fun updateStateForDebezium2_1(mapAsString: Map): Map { val updatedMap: MutableMap = LinkedHashMap() if (mapAsString.size > 0) { + // We're getting the 1st of a map. Something fishy going on here val key = mapAsString.keys.toList()[0] val i = key.indexOf('[') val i1 = key.lastIndexOf(']') diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt index cb978c73cc714..e83cdd7d0d568 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt @@ -314,12 +314,10 @@ abstract class AbstractJdbcSource( f.get(INTERNAL_COLUMN_NAME).asText(), datatype ) {} - } - .toList(), + }, cursorFields = extractCursorFields(fields) ) } - .toList() } private fun extractCursorFields(fields: List): List { @@ -328,7 +326,6 @@ abstract class AbstractJdbcSource( isCursorType(sourceOperations.getDatabaseFieldType(field)) } .map { it.get(INTERNAL_COLUMN_NAME).asText() } - .toList() } protected fun excludeNotAccessibleTables( @@ -756,7 +753,6 @@ abstract class AbstractJdbcSource( ) } .map { `object`: ConfiguredAirbyteStream -> Jsons.clone(`object`) } - .toList() } companion object { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt index 4832cef3507d3..19e8ba0e212e8 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt @@ -302,11 +302,9 @@ protected constructor(driverClassName: String) : val discoveredTables = discoverInternal(database) return (if (systemNameSpaces.isEmpty()) discoveredTables else - discoveredTables - .filter { table: TableInfo> -> - !systemNameSpaces.contains(table.nameSpace) && !systemViews.contains(table.name) - } - .toList()) + discoveredTables.filter { table: TableInfo> -> + !systemNameSpaces.contains(table.nameSpace) && !systemViews.contains(table.name) + }) } protected fun getFullRefreshIterators( @@ -421,7 +419,6 @@ protected constructor(driverClassName: String) : table.fields .map { obj: CommonField -> obj.name } .filter { o: String -> selectedFieldsInCatalog.contains(o) } - .toList() val iterator: AutoCloseableIterator // checks for which sync mode we're using based on the configured airbytestream diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt index 26d82375e1973..59023764e39d1 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt @@ -46,7 +46,7 @@ object DbSourceDiscoverUtil { toField(commonField, airbyteTypeConverter) } .distinct() - .toList() + val currentJsonSchema = CatalogHelpers.fieldsToJsonSchema(fields) val catalogSchema = stream.jsonSchema val currentSchemaProperties = currentJsonSchema["properties"] @@ -85,37 +85,35 @@ object DbSourceDiscoverUtil { airbyteTypeConverter: Function ): AirbyteCatalog { val tableInfoFieldList = - tableInfos - .map { t: TableInfo> -> - // some databases return multiple copies of the same record for a column (e.g. - // redshift) because - // they have at least once delivery guarantees. we want to dedupe these, but - // first we check that the - // records are actually the same and provide a good error message if they are - // not. - assertColumnsWithSameNameAreSame(t.nameSpace, t.name, t.fields) - val fields = - t.fields - .map { commonField: CommonField -> - toField(commonField, airbyteTypeConverter) - } - .distinct() - .toList() - val fullyQualifiedTableName = getFullyQualifiedTableName(t.nameSpace, t.name) - val primaryKeys = - fullyQualifiedTableNameToPrimaryKeys.getOrDefault( - fullyQualifiedTableName, - emptyList() - ) - TableInfo( - nameSpace = t.nameSpace, - name = t.name, - fields = fields, - primaryKeys = primaryKeys, - cursorFields = t.cursorFields + tableInfos.map { t: TableInfo> -> + // some databases return multiple copies of the same record for a column (e.g. + // redshift) because + // they have at least once delivery guarantees. we want to dedupe these, but + // first we check that the + // records are actually the same and provide a good error message if they are + // not. + assertColumnsWithSameNameAreSame(t.nameSpace, t.name, t.fields) + val fields = + t.fields + .map { commonField: CommonField -> + toField(commonField, airbyteTypeConverter) + } + .distinct() + + val fullyQualifiedTableName = getFullyQualifiedTableName(t.nameSpace, t.name) + val primaryKeys = + fullyQualifiedTableNameToPrimaryKeys.getOrDefault( + fullyQualifiedTableName, + emptyList() ) - } - .toList() + TableInfo( + nameSpace = t.nameSpace, + name = t.name, + fields = fields, + primaryKeys = primaryKeys, + cursorFields = t.cursorFields + ) + } val streams = tableInfoFieldList @@ -124,7 +122,7 @@ object DbSourceDiscoverUtil { tableInfo.primaryKeys .filter { obj: String -> Objects.nonNull(obj) } .map { listOf(it) } - .toList() + CatalogHelpers.createAirbyteStream( tableInfo.name, tableInfo.nameSpace, @@ -157,11 +155,10 @@ object DbSourceDiscoverUtil { !commonField.properties.isEmpty() ) { val properties = - commonField.properties - .map { commField: CommonField -> - toField(commField, airbyteTypeConverter) - } - .toList() + commonField.properties.map { commField: CommonField -> + toField(commField, airbyteTypeConverter) + } + return Field.of( commonField.name, airbyteTypeConverter.apply(commonField.type), diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt index 4e1a2a7fdf8d0..3a489c4f2c0ea 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbReadUtil.kt @@ -26,7 +26,6 @@ object RelationalDbReadUtil { ) } .map { `object`: ConfiguredAirbyteStream -> Jsons.clone(`object`) } - .toList() } @JvmStatic @@ -48,7 +47,6 @@ object RelationalDbReadUtil { ) } .map { `object`: ConfiguredAirbyteStream -> Jsons.clone(`object`) } - .toList() } @JvmStatic diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt index db5d05b22abe3..33c08d2d433e6 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/GlobalStateManager.kt @@ -167,7 +167,6 @@ class GlobalStateManager( .withName(s.streamName) ) } - .toList() } else { return@Supplier listOf() } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateGeneratorUtils.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateGeneratorUtils.kt index 5283189c58cb0..817a9245ae5f7 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateGeneratorUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateGeneratorUtils.kt @@ -100,7 +100,6 @@ object StateGeneratorUtils { generateStreamState(e.key, e.value) } .filter { s: AirbyteStreamState -> isValidStreamDescriptor(s.streamDescriptor) } - .toList() } /** @@ -123,7 +122,6 @@ object StateGeneratorUtils { .map { e: Map.Entry -> generateDbStreamState(e.key, e.value) } - .toList() ) } @@ -203,17 +201,15 @@ object StateGeneratorUtils { AirbyteGlobalState() .withSharedState(Jsons.jsonNode(dbState.cdcState)) .withStreamStates( - dbState.streams - .map { s: DbStreamState -> - AirbyteStreamState() - .withStreamDescriptor( - StreamDescriptor() - .withName(s.streamName) - .withNamespace(s.streamNamespace) - ) - .withStreamState(Jsons.jsonNode(s)) - } - .toList() + dbState.streams.map { s: DbStreamState -> + AirbyteStreamState() + .withStreamDescriptor( + StreamDescriptor() + .withName(s.streamName) + .withNamespace(s.streamNamespace) + ) + .withStreamState(Jsons.jsonNode(s)) + } ) return AirbyteStateMessage() .withType(AirbyteStateMessage.AirbyteStateType.GLOBAL) @@ -230,22 +226,20 @@ object StateGeneratorUtils { fun convertLegacyStateToStreamState( airbyteStateMessage: AirbyteStateMessage ): List { - return Jsons.`object`(airbyteStateMessage.data, DbState::class.java)!! - .streams - .map { s: DbStreamState -> - AirbyteStateMessage() - .withType(AirbyteStateMessage.AirbyteStateType.STREAM) - .withStream( - AirbyteStreamState() - .withStreamDescriptor( - StreamDescriptor() - .withNamespace(s.streamNamespace) - .withName(s.streamName) - ) - .withStreamState(Jsons.jsonNode(s)) - ) - } - .toList() + return Jsons.`object`(airbyteStateMessage.data, DbState::class.java)!!.streams.map { + s: DbStreamState -> + AirbyteStateMessage() + .withType(AirbyteStateMessage.AirbyteStateType.STREAM) + .withStream( + AirbyteStreamState() + .withStreamDescriptor( + StreamDescriptor() + .withNamespace(s.streamNamespace) + .withName(s.streamName) + ) + .withStreamState(Jsons.jsonNode(s)) + ) + } } fun convertStateMessage( diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt index fdd11857631bc..1e34647d779d9 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt @@ -34,7 +34,7 @@ open class StreamStateManager ) : AbstractStateManager( catalog, - Supplier { rawAirbyteStateMessages.map { it.stream }.toList() }, + Supplier { rawAirbyteStateMessages.map { it.stream } }, StateGeneratorUtils.CURSOR_FUNCTION, StateGeneratorUtils.CURSOR_FIELD_FUNCTION, StateGeneratorUtils.CURSOR_RECORD_COUNT_FUNCTION, diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt index 7512780ab0555..3f2a35fd6fa65 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt @@ -311,7 +311,6 @@ abstract class CdcSourceTest> { return messages .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.STATE } .map { obj: AirbyteMessage -> obj.state } - .toList() } protected fun assertExpectedRecords( diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt index 44c80fdbcfa27..f7446c5dd74fe 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt @@ -1016,7 +1016,7 @@ abstract class JdbcSourceAcceptanceTest> { firstSyncActualMessages .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.RECORD } .map { r: AirbyteMessage -> r.record.data[COL_NAME].asText() } - .toList() + // some databases don't make insertion order guarantee when equal ordering value if ( testdb.databaseDriver == DatabaseDriver.TERADATA || @@ -1066,7 +1066,7 @@ abstract class JdbcSourceAcceptanceTest> { secondSyncActualMessages .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.RECORD } .map { r: AirbyteMessage -> r.record.data[COL_NAME].asText() } - .toList() + Assertions.assertEquals(listOf("c"), secondSyncNames) // 3rd sync has records with duplicated cursors @@ -1120,7 +1120,6 @@ abstract class JdbcSourceAcceptanceTest> { thirdSyncActualMessages .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.RECORD } .map { r: AirbyteMessage -> r.record.data[COL_NAME].asText() } - .toList() // teradata doesn't make insertion order guarantee when equal ordering value if (testdb.databaseDriver == DatabaseDriver.TERADATA) { @@ -1342,47 +1341,41 @@ abstract class JdbcSourceAcceptanceTest> { states: List, numRecords: Long ): List { - return states - .map { s: DbStreamState -> - AirbyteMessage() - .withType(AirbyteMessage.Type.STATE) - .withState( - AirbyteStateMessage() - .withType(AirbyteStateMessage.AirbyteStateType.STREAM) - .withStream( - AirbyteStreamState() - .withStreamDescriptor( - StreamDescriptor() - .withNamespace(s.streamNamespace) - .withName(s.streamName) - ) - .withStreamState(Jsons.jsonNode(s)) - ) - .withData(Jsons.jsonNode(DbState().withCdc(false).withStreams(states))) - .withSourceStats( - AirbyteStateStats().withRecordCount(numRecords.toDouble()) - ) - ) - } - .toList() + return states.map { s: DbStreamState -> + AirbyteMessage() + .withType(AirbyteMessage.Type.STATE) + .withState( + AirbyteStateMessage() + .withType(AirbyteStateMessage.AirbyteStateType.STREAM) + .withStream( + AirbyteStreamState() + .withStreamDescriptor( + StreamDescriptor() + .withNamespace(s.streamNamespace) + .withName(s.streamName) + ) + .withStreamState(Jsons.jsonNode(s)) + ) + .withData(Jsons.jsonNode(DbState().withCdc(false).withStreams(states))) + .withSourceStats(AirbyteStateStats().withRecordCount(numRecords.toDouble())) + ) + } } protected open fun createState(states: List): List { - return states - .map { s: DbStreamState -> - AirbyteStateMessage() - .withType(AirbyteStateMessage.AirbyteStateType.STREAM) - .withStream( - AirbyteStreamState() - .withStreamDescriptor( - StreamDescriptor() - .withNamespace(s.streamNamespace) - .withName(s.streamName) - ) - .withStreamState(Jsons.jsonNode(s)) - ) - } - .toList() + return states.map { s: DbStreamState -> + AirbyteStateMessage() + .withType(AirbyteStateMessage.AirbyteStateType.STREAM) + .withStream( + AirbyteStreamState() + .withStreamDescriptor( + StreamDescriptor() + .withNamespace(s.streamNamespace) + .withName(s.streamName) + ) + .withStreamState(Jsons.jsonNode(s)) + ) + } } @Throws(SQLException::class) @@ -1604,20 +1597,16 @@ abstract class JdbcSourceAcceptanceTest> { if (s.stream.streamState[field] != null) s.stream.streamState[field].asText() else "" } - .toList() } protected fun filterRecords(messages: List): List { - return messages - .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.RECORD } - .toList() + return messages.filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.RECORD } } protected fun extractStateMessage(messages: List): List { return messages .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.STATE } .map { obj: AirbyteMessage -> obj.state } - .toList() } protected fun extractStateMessage( @@ -1630,7 +1619,6 @@ abstract class JdbcSourceAcceptanceTest> { r.state.stream.streamDescriptor.name == streamName } .map { obj: AirbyteMessage -> obj.state } - .toList() } protected fun createRecord( diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt index 5846fdcfeaffc..533b0007749e5 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt @@ -129,9 +129,8 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() { val allMessages = runRead(catalog) val recordMessages = - allMessages - .filter { m: AirbyteMessage -> m.type == AirbyteMessage.Type.RECORD } - .toList() + allMessages.filter { m: AirbyteMessage -> m.type == AirbyteMessage.Type.RECORD } + val expectedValues: MutableMap?> = HashMap() val missedValuesByStream: MutableMap> = HashMap() val unexpectedValuesByStream: MutableMap> = HashMap() @@ -272,32 +271,30 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() { get() = ConfiguredAirbyteCatalog() .withStreams( - testDataHolders - .map { test: TestDataHolder -> - ConfiguredAirbyteStream() - .withSyncMode(SyncMode.INCREMENTAL) - .withCursorField(Lists.newArrayList(idColumnName)) - .withDestinationSyncMode(DestinationSyncMode.APPEND) - .withStream( - CatalogHelpers.createAirbyteStream( - String.format("%s", test.nameWithTestPrefix), - String.format("%s", nameSpace), - Field.of(idColumnName, JsonSchemaType.INTEGER), - Field.of(testColumnName, test.airbyteType) - ) - .withSourceDefinedCursor(true) - .withSourceDefinedPrimaryKey( - java.util.List.of(java.util.List.of(idColumnName)) - ) - .withSupportedSyncModes( - Lists.newArrayList( - SyncMode.FULL_REFRESH, - SyncMode.INCREMENTAL - ) + testDataHolders.map { test: TestDataHolder -> + ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withCursorField(Lists.newArrayList(idColumnName)) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream( + CatalogHelpers.createAirbyteStream( + String.format("%s", test.nameWithTestPrefix), + String.format("%s", nameSpace), + Field.of(idColumnName, JsonSchemaType.INTEGER), + Field.of(testColumnName, test.airbyteType) + ) + .withSourceDefinedCursor(true) + .withSourceDefinedPrimaryKey( + java.util.List.of(java.util.List.of(idColumnName)) + ) + .withSupportedSyncModes( + Lists.newArrayList( + SyncMode.FULL_REFRESH, + SyncMode.INCREMENTAL ) - ) - } - .toList() + ) + ) + } ) /** @@ -393,7 +390,6 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() { return messages .filter { r: AirbyteMessage -> r.type == AirbyteMessage.Type.STATE } .map { obj: AirbyteMessage -> obj.state } - .toList() } companion object { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/PythonSourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/PythonSourceAcceptanceTest.kt index 849e0376ba4f8..bb96403fdf15e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/PythonSourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/PythonSourceAcceptanceTest.kt @@ -52,10 +52,11 @@ class PythonSourceAcceptanceTest : SourceAcceptanceTest() { Streams.stream( runExecutable(Command.GET_REGEX_TESTS).withArray("tests").elements() ) - .map { obj: JsonNode -> obj.textValue() } .toList() + .map { obj: JsonNode -> obj.textValue() } + val stringMessages = - allMessages.map { `object`: AirbyteMessage -> Jsons.serialize(`object`) }.toList() + allMessages.map { `object`: AirbyteMessage -> Jsons.serialize(`object`) } LOGGER.info("Running " + regexTests.size + " regex tests...") regexTests.forEach( Consumer { regex: String -> diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt index 08e4653793a2e..df12f2f877076 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt @@ -225,9 +225,9 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() { val configuredCatalog = withSourceDefinedCursors(configuredCatalog) // only sync incremental streams configuredCatalog.streams = - configuredCatalog.streams - .filter { s: ConfiguredAirbyteStream -> s.syncMode == SyncMode.INCREMENTAL } - .toList() + configuredCatalog.streams.filter { s: ConfiguredAirbyteStream -> + s.syncMode == SyncMode.INCREMENTAL + } val airbyteMessages = runRead(configuredCatalog, state) val recordMessages = filterRecords(airbyteMessages) @@ -235,7 +235,7 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() { airbyteMessages .filter { m: AirbyteMessage -> m.type == AirbyteMessage.Type.STATE } .map { obj: AirbyteMessage -> obj.state } - .toList() + Assertions.assertFalse( recordMessages.isEmpty(), "Expected the first incremental sync to produce records" @@ -396,7 +396,7 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() { actual .map { m: AirbyteRecordMessage -> this.pruneEmittedAt(m) } .map { m: AirbyteRecordMessage -> this.pruneCdcMetadata(m) } - .toList() + Assertions.assertEquals(prunedExpected.size, prunedActual.size, message) Assertions.assertTrue(prunedExpected.containsAll(prunedActual), message) Assertions.assertTrue(prunedActual.containsAll(prunedExpected), message) @@ -436,7 +436,6 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() { return messages .filter { m: AirbyteMessage -> m.type == AirbyteMessage.Type.RECORD } .map { obj: AirbyteMessage -> obj.record } - .toList() } @JvmStatic diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/enums/Enums.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/enums/Enums.kt index 9bee126061c8d..d8f1f2c30ebef 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/enums/Enums.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/enums/Enums.kt @@ -62,7 +62,7 @@ class Enums { ies: List, oe: Class ): List { - return ies.map { convertTo(it, oe) }.toList() + return ies.map { convertTo(it, oe) } } } } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonPaths.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonPaths.kt index 8553edb6a9ce2..10495a84ed008 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonPaths.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonPaths.kt @@ -167,9 +167,9 @@ object JsonPaths { * specifically that said, we do expect that there will be no duplicates in the returned list. */ fun getPaths(json: JsonNode?, jsonPath: String): List { - return getInternal(GET_PATHS_CONFIGURATION, json, jsonPath) - .map { obj: JsonNode -> obj.asText() } - .toList() + return getInternal(GET_PATHS_CONFIGURATION, json, jsonPath).map { obj: JsonNode -> + obj.asText() + } } /** diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonSchemas.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonSchemas.kt index 79bd68f348d59..8e978239fb6a4 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonSchemas.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/JsonSchemas.kt @@ -150,7 +150,7 @@ object JsonSchemas { traverseJsonSchema(jsonSchema) { node: JsonNode?, path: List -> mapper.apply(node, path).ifPresent { e: T -> collector.add(e) } } - return collector.toList() // make list unmodifiable + return collector // make list unmodifiable } /** @@ -300,9 +300,10 @@ object JsonSchemas { fun getType(jsonSchema: JsonNode): List { if (jsonSchema.has(JSON_SCHEMA_TYPE_KEY)) { return if (jsonSchema[JSON_SCHEMA_TYPE_KEY].isArray) { - MoreIterators.toList(jsonSchema[JSON_SCHEMA_TYPE_KEY].iterator()) - .map { obj: JsonNode -> obj.asText() } - .toList() + MoreIterators.toList(jsonSchema[JSON_SCHEMA_TYPE_KEY].iterator()).map { + obj: JsonNode -> + obj.asText() + } } else { java.util.List.of(jsonSchema[JSON_SCHEMA_TYPE_KEY].asText()) } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt index 695ceae76b23f..9ab5247b4e0b3 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/validation/json/JsonSchemaValidator.kt @@ -93,15 +93,13 @@ class JsonSchemaValidator @VisibleForTesting constructor(private val baseUri: UR } fun getValidationMessageArgs(schemaJson: JsonNode, objectJson: JsonNode): List> { - return validateInternal(schemaJson, objectJson) - .map { obj: ValidationMessage -> obj.arguments } - .toList() + return validateInternal(schemaJson, objectJson).map { obj: ValidationMessage -> + obj.arguments + } } fun getValidationMessagePaths(schemaJson: JsonNode, objectJson: JsonNode): List { - return validateInternal(schemaJson, objectJson) - .map { obj: ValidationMessage -> obj.path } - .toList() + return validateInternal(schemaJson, objectJson).map { obj: ValidationMessage -> obj.path } } @Throws(JsonValidationException::class) diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/concurrency/CompletableFuturesTest.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/concurrency/CompletableFuturesTest.kt index e2b95daef178c..2914cae0aa166 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/concurrency/CompletableFuturesTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/concurrency/CompletableFuturesTest.kt @@ -26,7 +26,7 @@ internal class CompletableFuturesTest { val allOfResult = CompletableFutures.allOf(futures).toCompletableFuture() val result = allOfResult.join() - val success = result.filter { obj: Either -> obj.isRight() }.toList() + val success = result.filter { obj: Either -> obj.isRight() } Assertions.assertEquals( success, Arrays.asList( @@ -41,7 +41,7 @@ internal class CompletableFuturesTest { result .filter { obj: Either -> obj.isLeft() } .map { either: Either -> either.left!!.cause!!.message } - .toList() + Assertions.assertEquals(failureMessages, mutableListOf("Fail 5", "Fail 6")) } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/json/JsonPathsTest.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/json/JsonPathsTest.kt index ca7f6c5926135..6955f4dd0dfda 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/json/JsonPathsTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/commons/json/JsonPathsTest.kt @@ -15,21 +15,15 @@ internal class JsonPathsTest { fun testGetValues() { Assertions.assertEquals( listOf(0, 1, 2), - JsonPaths.getValues(JSON_NODE, LIST_ALL_QUERY) - .map { obj: JsonNode -> obj.asInt() } - .toList() + JsonPaths.getValues(JSON_NODE, LIST_ALL_QUERY).map { obj: JsonNode -> obj.asInt() } ) Assertions.assertEquals( listOf(1), - JsonPaths.getValues(JSON_NODE, LIST_ONE_QUERY) - .map { obj: JsonNode -> obj.asInt() } - .toList() + JsonPaths.getValues(JSON_NODE, LIST_ONE_QUERY).map { obj: JsonNode -> obj.asInt() } ) Assertions.assertEquals( listOf(10), - JsonPaths.getValues(JSON_NODE, NESTED_FIELD_QUERY) - .map { obj: JsonNode -> obj.asInt() } - .toList() + JsonPaths.getValues(JSON_NODE, NESTED_FIELD_QUERY).map { obj: JsonNode -> obj.asInt() } ) Assertions.assertEquals( JSON_NODE["two"], diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/workers/internal/DefaultAirbyteStreamFactoryTest.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/workers/internal/DefaultAirbyteStreamFactoryTest.kt index 257dabcf235f9..039560b8d627a 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/workers/internal/DefaultAirbyteStreamFactoryTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/test/kotlin/io/airbyte/workers/internal/DefaultAirbyteStreamFactoryTest.kt @@ -14,7 +14,6 @@ import java.io.InputStream import java.io.InputStreamReader import java.nio.charset.StandardCharsets import java.util.* -import java.util.stream.Stream import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Disabled @@ -38,10 +37,10 @@ internal class DefaultAirbyteStreamFactoryTest { fun testValid() { val record1 = AirbyteMessageUtils.createRecordMessage(STREAM_NAME, FIELD_NAME, "green") - val messageStream = stringToMessageStream(Jsons.serialize(record1)) - val expectedStream = Stream.of(record1) + val messageStream = stringToMessageList(Jsons.serialize(record1)) + val expectedStream = listOf(record1) - Assertions.assertEquals(expectedStream.toList(), messageStream.toList()) + Assertions.assertEquals(expectedStream, messageStream) Mockito.verifyNoInteractions(logger) } @@ -49,9 +48,9 @@ internal class DefaultAirbyteStreamFactoryTest { fun testLoggingLine() { val invalidRecord = "invalid line" - val messageStream = stringToMessageStream(invalidRecord) + val messageStream = stringToMessageList(invalidRecord) - Assertions.assertEquals(emptyList(), messageStream.toList()) + Assertions.assertEquals(emptyList(), messageStream) Mockito.verify(logger).info(ArgumentMatchers.anyString()) Mockito.verifyNoMoreInteractions(logger) } @@ -61,9 +60,9 @@ internal class DefaultAirbyteStreamFactoryTest { val logMessage = AirbyteMessageUtils.createLogMessage(AirbyteLogMessage.Level.WARN, "warning") - val messageStream = stringToMessageStream(Jsons.serialize(logMessage)) + val messageStream = stringToMessageList(Jsons.serialize(logMessage)) - Assertions.assertEquals(emptyList(), messageStream.toList()) + Assertions.assertEquals(emptyList(), messageStream) Mockito.verify(logger).warn("warning") Mockito.verifyNoMoreInteractions(logger) } @@ -74,9 +73,9 @@ internal class DefaultAirbyteStreamFactoryTest { Mockito.`when`(protocolPredicate!!.test(Jsons.deserialize(invalidRecord))).thenReturn(false) - val messageStream = stringToMessageStream(invalidRecord) + val messageStream = stringToMessageList(invalidRecord) - Assertions.assertEquals(emptyList(), messageStream.toList()) + Assertions.assertEquals(emptyList(), messageStream) Mockito.verify(logger).error(ArgumentMatchers.anyString(), ArgumentMatchers.anyString()) Mockito.verifyNoMoreInteractions(logger) } @@ -87,9 +86,9 @@ internal class DefaultAirbyteStreamFactoryTest { Mockito.`when`(protocolPredicate!!.test(Jsons.deserialize(invalidRecord))).thenReturn(true) - val messageStream = stringToMessageStream(invalidRecord) + val messageStream = stringToMessageList(invalidRecord) - Assertions.assertEquals(emptyList(), messageStream.toList()) + Assertions.assertEquals(emptyList(), messageStream) Mockito.verify(logger).error(ArgumentMatchers.anyString(), ArgumentMatchers.anyString()) Mockito.verifyNoMoreInteractions(logger) } @@ -123,14 +122,14 @@ internal class DefaultAirbyteStreamFactoryTest { val inputString = Jsons.serialize(record1) + Jsons.serialize(record2) - val messageStream = stringToMessageStream(inputString) + val messageStream = stringToMessageList(inputString) - Assertions.assertEquals(emptyList(), messageStream.toList()) + Assertions.assertEquals(emptyList(), messageStream) Mockito.verify(logger).error(ArgumentMatchers.anyString(), ArgumentMatchers.anyString()) Mockito.verifyNoMoreInteractions(logger) } - private fun stringToMessageStream(inputString: String): Stream { + private fun stringToMessageList(inputString: String): List { val inputStream: InputStream = ByteArrayInputStream(inputString.toByteArray(StandardCharsets.UTF_8)) val bufferedReader = BufferedReader(InputStreamReader(inputStream, StandardCharsets.UTF_8)) @@ -141,6 +140,7 @@ internal class DefaultAirbyteStreamFactoryTest { Optional.empty() ) .create(bufferedReader) + .toList() } companion object { diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/CatalogClientConverters.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/CatalogClientConverters.kt index fbefb4d4b0fa0..d26d784eb5f87 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/CatalogClientConverters.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/CatalogClientConverters.kt @@ -26,15 +26,13 @@ object CatalogClientConverters { fun toAirbyteProtocol(catalog: AirbyteCatalog): io.airbyte.protocol.models.AirbyteCatalog { val protoCatalog = io.airbyte.protocol.models.AirbyteCatalog() val airbyteStream = - catalog.streams - .map { stream: AirbyteStreamAndConfiguration -> - try { - return@map toConfiguredProtocol(stream.stream, stream.config) - } catch (e: JsonValidationException) { - return@map null - } + catalog.streams.map { stream: AirbyteStreamAndConfiguration -> + try { + return@map toConfiguredProtocol(stream.stream, stream.config) + } catch (e: JsonValidationException) { + return@map null } - .toList() + } protoCatalog.withStreams(airbyteStream) return protoCatalog @@ -140,7 +138,6 @@ object CatalogClientConverters { .stream(s) .config(generateDefaultConfiguration(s)) } - .toList() ) } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/FailureHelper.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/FailureHelper.kt index 1e9d4f401d976..2c00077e0d2fa 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/FailureHelper.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/testFixtures/kotlin/io/airbyte/workers/helper/FailureHelper.kt @@ -158,7 +158,7 @@ object FailureHelper { } val compareByTimestamp = Comparator.comparing { obj: FailureReason -> obj.timestamp } val compareByTraceAndTimestamp = compareByIsTrace.thenComparing(compareByTimestamp) - return failures.sortedWith(compareByTraceAndTimestamp).toList() + return failures.sortedWith(compareByTraceAndTimestamp) } enum class ConnectorCommand(private val value: String) { diff --git a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt index f9f066a5d2343..95d8d98fc5036 100644 --- a/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/gcs-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/gcs/GcsAvroParquetDestinationAcceptanceTest.kt @@ -124,9 +124,9 @@ abstract class GcsAvroParquetDestinationAcceptanceTest(fileUploadFormat: FileUpl protected fun getTypes(record: GenericData.Record): Map> { val fieldList = - record.schema.fields - .filter { field: Schema.Field -> !field.name().startsWith("_airbyte") } - .toList() + record.schema.fields.filter { field: Schema.Field -> + !field.name().startsWith("_airbyte") + } return if (fieldList.size == 1) { fieldList.associate { diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt index 2677109491412..07b3d83db9b7c 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/S3StorageOperations.kt @@ -320,7 +320,7 @@ open class S3StorageOperations( obj.key, ) } - .toList() + cleanUpObjects(bucket, keysToDelete) logger.info { "Storage bucket $objectPath has been cleaned-up (${keysToDelete.size} objects matching $regexFormat were deleted)..." @@ -371,7 +371,7 @@ open class S3StorageOperations( obj.key, ) } - .toList() + cleanUpObjects(bucket, keysToDelete) logger.info { "Storage bucket $objectPath has been cleaned-up (${keysToDelete.size} objects were deleted)..." @@ -391,7 +391,7 @@ open class S3StorageOperations( if (keysToDelete.isNotEmpty()) { logger.info { "Deleting objects ${keysToDelete.map { obj: DeleteObjectsRequest.KeyVersion -> obj.key } - .toList().joinToString(separator = ", ")}" + .joinToString(separator = ", ")}" } s3Client.deleteObjects(DeleteObjectsRequest(bucket).withKeys(keysToDelete)) } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroNameTransformer.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroNameTransformer.kt index f526f95fb8d42..8b5750019d7e5 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroNameTransformer.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/AvroNameTransformer.kt @@ -5,7 +5,6 @@ package io.airbyte.cdk.integrations.destination.s3.avro import io.airbyte.cdk.integrations.destination.StandardNameTransformer -import java.util.Arrays import java.util.Locale /** @@ -30,13 +29,12 @@ class AvroNameTransformer : StandardNameTransformer() { override fun getNamespace(namespace: String): String { val tokens = namespace.split("\\.".toRegex()).dropLastWhile { it.isEmpty() }.toTypedArray() - return Arrays.stream(tokens) + return tokens .map { name: String -> this.getIdentifier( name, ) } - .toList() .joinToString(separator = ".") } } diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonSchemaType.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonSchemaType.kt index 70ebb36ff07fc..f355ea6c4bb6e 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonSchemaType.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonSchemaType.kt @@ -71,26 +71,24 @@ enum class JsonSchemaType { // Match by Type + airbyteType if (jsonSchemaAirbyteType != null) { matchSchemaType = - Arrays.stream(entries.toTypedArray()) + entries + .toTypedArray() .filter { type: JsonSchemaType -> jsonSchemaType == type.jsonSchemaType } .filter { type: JsonSchemaType -> jsonSchemaAirbyteType == type.jsonSchemaAirbyteType } - .toList() } // Match by Type are no results already if (matchSchemaType == null || matchSchemaType.isEmpty()) { matchSchemaType = - Arrays.stream(entries.toTypedArray()) - .filter { format: JsonSchemaType -> - jsonSchemaType == format.jsonSchemaType && - format.jsonSchemaAirbyteType == null - } - .toList() + entries.toTypedArray().filter { format: JsonSchemaType -> + jsonSchemaType == format.jsonSchemaType && + format.jsonSchemaAirbyteType == null + } } - require(!matchSchemaType!!.isEmpty()) { + require(!matchSchemaType.isEmpty()) { String.format( "Unexpected jsonSchemaType - %s and jsonSchemaAirbyteType - %s", jsonSchemaType, diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonToAvroSchemaConverter.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonToAvroSchemaConverter.kt index 254146e413ce5..ea7642afe533b 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonToAvroSchemaConverter.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/avro/JsonToAvroSchemaConverter.kt @@ -358,7 +358,6 @@ class JsonToAvroSchemaConverter { ) } .distinct() - .toList() return mergeRecordSchemas(fieldName, fieldNamespace, schemas, appendExtraProps) } @@ -490,7 +489,7 @@ class JsonToAvroSchemaConverter { .filter { s: Schema -> s != NULL_SCHEMA } } .distinct() - .toList() + val subfieldNamespace: String = if (fieldNamespace == null) fieldName else ("$fieldNamespace.$fieldName") // recursively merge schemas of a subfield because they may include multiple record @@ -611,7 +610,7 @@ class JsonToAvroSchemaConverter { type, )) } - return Schema.createUnion(unionTypes.filter { removeTimestampType.test(it) }.toList()) + return Schema.createUnion(unionTypes.filter { removeTimestampType.test(it) }) } companion object { @@ -628,9 +627,9 @@ class JsonToAvroSchemaConverter { @Suppress("DEPRECATION") fun getNonNullTypes(fieldName: String?, fieldDefinition: JsonNode): List { - return getTypes(fieldName, fieldDefinition) - .filter { type: JsonSchemaType -> type != JsonSchemaType.NULL } - .toList() + return getTypes(fieldName, fieldDefinition).filter { type: JsonSchemaType -> + type != JsonSchemaType.NULL + } } /** When no type or $ref are specified, it will default to string. */ @@ -645,13 +644,11 @@ class JsonToAvroSchemaConverter { val airbyteType: String? = fieldDefinition.get(AIRBYTE_TYPE)?.asText() if (typeProperty != null && typeProperty.isArray) { - return MoreIterators.toList(typeProperty.elements()) - .map { s: JsonNode -> - JsonSchemaType.fromJsonSchemaType( - s.asText(), - ) - } - .toList() + return MoreIterators.toList(typeProperty.elements()).map { s: JsonNode -> + JsonSchemaType.fromJsonSchemaType( + s.asText(), + ) + } } if (hasTextValue(typeProperty)) { diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/RootLevelFlatteningSheetGenerator.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/RootLevelFlatteningSheetGenerator.kt index 5a9957463abee..cbcd4c96e468a 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/RootLevelFlatteningSheetGenerator.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/main/kotlin/io/airbyte/cdk/integrations/destination/s3/csv/RootLevelFlatteningSheetGenerator.kt @@ -19,7 +19,6 @@ class RootLevelFlatteningSheetGenerator(jsonSchema: JsonNode) : jsonSchema["properties"].fieldNames(), ) .sorted() - .toList() override fun getHeaderRow(): List { val headers: MutableList = diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBufferTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBufferTest.kt index c13f44a96a879..1b9c9446a7a31 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBufferTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/test/kotlin/io/airbyte/cdk/integrations/destination/s3/parquet/ParquetSerializedBufferTest.kt @@ -21,7 +21,6 @@ import java.io.FileOutputStream import java.io.InputStream import java.nio.file.Files import java.util.UUID -import java.util.stream.Stream import org.apache.avro.generic.GenericData import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -42,7 +41,7 @@ class ParquetSerializedBufferTest { "column2" to "string value", "another field" to true, "nested_column" to mapOf("array_column" to listOf(1, 2, 3)), - "string_array_column" to Stream.of("test_string", null).toList(), + "string_array_column" to listOf("test_string", null), "datetime_with_timezone" to "2022-05-12T15:35:44.192950Z", ), ) diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt index c6fb4112902e3..94a8a5960b454 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3AvroParquetDestinationAcceptanceTest.kt @@ -98,11 +98,9 @@ protected constructor(fileUploadFormat: FileUploadFormat) : @Throws(IOException::class) private fun readMessagesFromFile(messagesFilename: String): List { - return MoreResources.readResource(messagesFilename) - .trim() - .lines() - .map { record -> Jsons.deserialize(record, AirbyteMessage::class.java) } - .toList() + return MoreResources.readResource(messagesFilename).trim().lines().map { record -> + Jsons.deserialize(record, AirbyteMessage::class.java) + } } @Throws(Exception::class) @@ -113,9 +111,9 @@ protected constructor(fileUploadFormat: FileUploadFormat) : protected fun getTypes(record: GenericData.Record): Map> { val fieldList = - record.schema.fields - .filter { field: Schema.Field -> !field.name().startsWith("_airbyte") } - .toList() + record.schema.fields.filter { field: Schema.Field -> + !field.name().startsWith("_airbyte") + } return if (fieldList.size == 1) { fieldList.associate { diff --git a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt index 3e41cbadea440..2fbf1213db214 100644 --- a/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/s3-destinations/src/testFixtures/kotlin/io/airbyte/cdk/integrations/destination/s3/S3DestinationAcceptanceTest.kt @@ -91,12 +91,12 @@ protected constructor(protected val outputFormat: FileUploadFormat) : Destinatio .objectSummaries .filter { o: S3ObjectSummary -> o.key.contains("$streamNameStr/") } .sortedWith(Comparator.comparingLong { o: S3ObjectSummary -> o.lastModified.time }) - .toList() + LOGGER.info( "All objects: {}", - objectSummaries - .map { o: S3ObjectSummary -> String.format("%s/%s", o.bucketName, o.key) } - .toList(), + objectSummaries.map { o: S3ObjectSummary -> + String.format("%s/%s", o.bucketName, o.key) + }, ) return objectSummaries } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.kt index 70eb45d0474af..e0d7412c4c21e 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/AirbyteType.kt @@ -107,11 +107,10 @@ interface AirbyteType { // Recurse into a schema that forces a specific one of each option val options = - typeOptions - .map { typeOption: String -> - fromJsonSchema(getTrimmedJsonSchema(schema, typeOption)) - } - .toList() + typeOptions.map { typeOption: String -> + fromJsonSchema(getTrimmedJsonSchema(schema, typeOption)) + } + return Union(options) } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt index bc02373a3bc78..dc4c1f5aa5d85 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/CatalogParser.kt @@ -138,9 +138,7 @@ constructor( "Only top-level primary keys are supported" } val primaryKey = - stream.primaryKey - .map { key: List -> sqlGenerator.buildColumnId(key[0]) } - .toList() + stream.primaryKey.map { key: List -> sqlGenerator.buildColumnId(key[0]) } require(stream.cursorField.size <= 1) { "Only top-level cursors are supported" } val cursor: Optional = diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt index 9ab701481a34d..0bca4473a5677 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/DefaultTyperDeduper.kt @@ -122,7 +122,7 @@ class DefaultTyperDeduper( val prepareTablesFutureResult = CompletableFutures.allOf( - destinationInitialStatuses.map { this.prepareTablesFuture(it) }.toList() + destinationInitialStatuses.map { this.prepareTablesFuture(it) } ) .toCompletableFuture() .join() diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/FutureUtils.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/FutureUtils.kt index 0e09e029a9aca..4fa0d2d62a454 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/FutureUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/FutureUtils.kt @@ -36,7 +36,7 @@ object FutureUtils { .map { obj: CompletableFuture> -> obj.join() } .filter { obj: Optional -> obj.isPresent } .map { obj: Optional -> obj.get() } - .toList() + logAllAndThrowFirst(initialMessage, exceptions) } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/Sql.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/Sql.kt index 642c9ac77df0b..990faa1d005ae 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/Sql.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/main/kotlin/io/airbyte/integrations/base/destination/typing_deduping/Sql.kt @@ -4,7 +4,6 @@ package io.airbyte.integrations.base.destination.typing_deduping import java.util.function.Consumer -import java.util.stream.Stream /** * Represents a list of SQL transactions, where each transaction consists of one or more SQL @@ -25,27 +24,25 @@ data class Sql(val transactions: List>) { * @return A list of SQL strings, each of which represents a transaction. */ fun asSqlStrings(begin: String?, commit: String?): List { - return transactions - .map { transaction: List -> - // If there's only one statement, we don't need to wrap it in a transaction. - if (transaction.size == 1) { - return@map transaction[0] - } - val builder = StringBuilder() - builder.append(begin) - builder.append(";\n") - transaction.forEach( - Consumer { statement: String -> - builder.append(statement) - // No semicolon - statements already end with a semicolon - builder.append("\n") - } - ) - builder.append(commit) - builder.append(";\n") - builder.toString() + return transactions.map { transaction: List -> + // If there's only one statement, we don't need to wrap it in a transaction. + if (transaction.size == 1) { + return@map transaction[0] } - .toList() + val builder = StringBuilder() + builder.append(begin) + builder.append(";\n") + transaction.forEach( + Consumer { statement: String -> + builder.append(statement) + // No semicolon - statements already end with a semicolon + builder.append("\n") + } + ) + builder.append(commit) + builder.append(";\n") + builder.toString() + } } init { @@ -74,12 +71,12 @@ data class Sql(val transactions: List>) { /** Execute each statement as its own transaction. */ @JvmStatic fun separately(statements: List): Sql { - return create(statements.map { listOf(it) }.toList()) + return create(statements.map { listOf(it) }) } @JvmStatic fun separately(vararg statements: String): Sql { - return separately(Stream.of(*statements).toList()) + return separately(statements.asList()) } /** @@ -118,10 +115,8 @@ data class Sql(val transactions: List>) { } statement } - .toList() } .filter { transaction: List -> !transaction.isEmpty() } - .toList() ) } } diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt index e25decc7c75cd..cbb03cdef0566 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.kt @@ -940,7 +940,7 @@ abstract class BaseTypingDedupingTest { } private fun repeatList(n: Int, list: List): List { - return Collections.nCopies(n, list).flatMap { obj: List -> obj }.toList() + return Collections.nCopies(n, list).flatMap { obj: List -> obj } } @Throws(Exception::class) @@ -1133,7 +1133,6 @@ abstract class BaseTypingDedupingTest { .filter { line: String -> !line.isEmpty() } .filter { line: String -> !line.startsWith("//") } .map { jsonString: String -> Jsons.deserializeExact(jsonString) } - .toList() } @Throws(IOException::class) diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.kt b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.kt index 7c77f9d2f7068..cbe3e8968f678 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.kt +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/kotlin/io/airbyte/integrations/base/destination/typing_deduping/RecordDiffer.kt @@ -90,10 +90,8 @@ constructor( fun diffRawTableRecords(expectedRecords: List, actualRecords: List) { val diff = diffRecords( - expectedRecords - .map { record: JsonNode -> this.copyWithLiftedData(record) } - .toList(), - actualRecords.map { record: JsonNode -> this.copyWithLiftedData(record) }.toList(), + expectedRecords.map { record: JsonNode -> this.copyWithLiftedData(record) }, + actualRecords.map { record: JsonNode -> this.copyWithLiftedData(record) }, rawRecordIdentityComparator, rawRecordSortComparator, rawRecordIdentityExtractor, @@ -277,7 +275,7 @@ constructor( "Row had incorrect data: " + recordIdExtractor.apply(expectedRecord) + "\n" // Iterate through each column in the expected record and compare it to the actual record's // value. - for (column in Streams.stream(expectedRecord.fieldNames()).sorted().toList()) { + for (column in Streams.stream(expectedRecord.fieldNames()).sorted()) { // For all other columns, we can just compare their values directly. val expectedValue = expectedRecord[column] val actualValue = actualRecord[column] @@ -317,7 +315,7 @@ constructor( columnNames: Map ): LinkedHashMap { val extraFields = LinkedHashMap() - for (column in Streams.stream(actualRecord.fieldNames()).sorted().toList()) { + for (column in Streams.stream(actualRecord.fieldNames()).sorted()) { // loaded_at and raw_id are generated dynamically, so we just ignore them. val isLoadedAt = getMetadataColumnName(columnNames, "_airbyte_loaded_at") == column val isRawId = getMetadataColumnName(columnNames, "_airbyte_raw_id") == column