diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java index 4ea0529a85ce7..35415ed3d0575 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java @@ -30,8 +30,8 @@ public static void main(final String[] args) throws Exception { @Override public AirbyteMessageConsumer getConsumer(final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector) { + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector) { final DatabricksDestinationConfig databricksConfig = DatabricksDestinationConfig.get(config); return CopyConsumerFactory.create( outputRecordCollector, diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java index 43911c4bd7ff7..ee48d8dd7151d 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopier.java @@ -66,15 +66,15 @@ public abstract class S3StreamCopier implements StreamCopier { private final StagingFilenameGenerator filenameGenerator; public S3StreamCopier(final String stagingFolder, - final DestinationSyncMode destSyncMode, - final String schema, - final String streamName, - final String s3FileName, - final AmazonS3 client, - final JdbcDatabase db, - final S3DestinationConfig s3Config, - final ExtendedNameTransformer nameTransformer, - final SqlOperations sqlOperations) { + final DestinationSyncMode destSyncMode, + final String schema, + final String streamName, + final String s3FileName, + final AmazonS3 client, + final JdbcDatabase db, + final S3DestinationConfig s3Config, + final ExtendedNameTransformer nameTransformer, + final SqlOperations sqlOperations) { this.destSyncMode = destSyncMode; this.schemaName = schema; this.streamName = streamName; @@ -224,10 +224,10 @@ private void closeAndWaitForUpload() throws IOException { } public abstract void copyS3CsvFileIntoTable(JdbcDatabase database, - String s3FileLocation, - String schema, - String tableName, - S3DestinationConfig s3Config) + String s3FileLocation, + String schema, + String tableName, + S3DestinationConfig s3Config) throws SQLException; } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java index 2c03497f42fa0..5d206562ae020 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/s3/S3StreamCopierFactory.java @@ -22,12 +22,12 @@ public abstract class S3StreamCopierFactory implements StreamCopierFactory - * Creating multiple files per stream currently has the naive approach of one file per batch on a stream up to the max limit of (26 * 26 * 26) 17576 - * files. Each batch is randomly prefixed by 3 Alpha characters and on a collision the batch is appended to the existing file. + * Creating multiple files per stream currently has the naive approach of one file per batch on a + * stream up to the max limit of (26 * 26 * 26) 17576 files. Each batch is randomly prefixed by 3 + * Alpha characters and on a collision the batch is appended to the existing file. */ public class RedshiftCopyS3Destination extends CopyDestination { @Override public AirbyteMessageConsumer getConsumer(final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector) + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector) throws Exception { return CopyConsumerFactory.create( outputRecordCollector, diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java index edce6fd09828d..41a450389d000 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopier.java @@ -31,14 +31,14 @@ public class RedshiftStreamCopier extends S3StreamCopier { private String manifestFilePath = null; public RedshiftStreamCopier(final String stagingFolder, - final DestinationSyncMode destSyncMode, - final String schema, - final String streamName, - final AmazonS3 client, - final JdbcDatabase db, - final S3DestinationConfig s3Config, - final ExtendedNameTransformer nameTransformer, - final SqlOperations sqlOperations) { + final DestinationSyncMode destSyncMode, + final String schema, + final String streamName, + final AmazonS3 client, + final JdbcDatabase db, + final S3DestinationConfig s3Config, + final ExtendedNameTransformer nameTransformer, + final SqlOperations sqlOperations) { super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "", FILE_PREFIX_LENGTH) + "_" + streamName, client, db, s3Config, nameTransformer, sqlOperations); objectMapper = new ObjectMapper(); @@ -56,11 +56,11 @@ public void copyStagingFileToTemporaryTable() { @Override public void copyS3CsvFileIntoTable( - final JdbcDatabase database, - final String s3FileLocation, - final String schema, - final String tableName, - final S3DestinationConfig s3Config) { + final JdbcDatabase database, + final String s3FileLocation, + final String schema, + final String tableName, + final S3DestinationConfig s3Config) { throw new RuntimeException("Redshift Stream Copier should not copy individual files without use of a manifest"); } diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierFactory.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierFactory.java index d665eb7adb013..2f24bdae1fe23 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStreamCopierFactory.java @@ -17,14 +17,14 @@ public class RedshiftStreamCopierFactory extends S3StreamCopierFactory { @Override public StreamCopier create(final String stagingFolder, - final DestinationSyncMode syncMode, - final String schema, - final String streamName, - final AmazonS3 s3Client, - final JdbcDatabase db, - final S3DestinationConfig s3Config, - final ExtendedNameTransformer nameTransformer, - final SqlOperations sqlOperations) + final DestinationSyncMode syncMode, + final String schema, + final String streamName, + final AmazonS3 s3Client, + final JdbcDatabase db, + final S3DestinationConfig s3Config, + final ExtendedNameTransformer nameTransformer, + final SqlOperations sqlOperations) throws Exception { return new RedshiftStreamCopier(stagingFolder, syncMode, schema, streamName, s3Client, db, s3Config, nameTransformer, sqlOperations); } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java index 1724c0600b3f7..7ebebacc99d10 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Destination.java @@ -46,14 +46,15 @@ public AirbyteConnectionStatus check(final JsonNode config) { @Override public AirbyteMessageConsumer getConsumer(final JsonNode config, - final ConfiguredAirbyteCatalog configuredCatalog, - final Consumer outputRecordCollector) { + final ConfiguredAirbyteCatalog configuredCatalog, + final Consumer outputRecordCollector) { final S3WriterFactory formatterFactory = new ProductionWriterFactory(); return new S3Consumer(S3DestinationConfig.getS3DestinationConfig(config), configuredCatalog, formatterFactory, outputRecordCollector); } /** - * Note that this method completely ignores s3Config.getBucketPath(), in favor of the bucketPath parameter. + * Note that this method completely ignores s3Config.getBucketPath(), in favor of the bucketPath + * parameter. */ public static void attemptS3WriteAndDelete(final S3DestinationConfig s3Config, final String bucketPath) { attemptS3WriteAndDelete(s3Config, bucketPath, s3Config.getS3Client()); @@ -72,4 +73,5 @@ private static void attemptWriteAndDeleteS3Object(final S3DestinationConfig s3Co s3.putObject(s3Bucket, outputTableName, "check-content"); s3.deleteObject(s3Bucket, outputTableName); } + } diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java index c1d6f18d99ac0..12fba68b1c952 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3DestinationConfig.java @@ -14,8 +14,9 @@ import com.fasterxml.jackson.databind.JsonNode; /** - * An S3 configuration. Typical usage sets at most one of {@code bucketPath} (necessary for more delicate data syncing to S3) and {@code partSize} - * (used by certain bulk-load database operations). + * An S3 configuration. Typical usage sets at most one of {@code bucketPath} (necessary for more + * delicate data syncing to S3) and {@code partSize} (used by certain bulk-load database + * operations). */ public class S3DestinationConfig { @@ -34,28 +35,29 @@ public class S3DestinationConfig { private final S3FormatConfig formatConfig; /** - * The part size should not matter in any use case that depends on this constructor. So the default 10 MB is used. + * The part size should not matter in any use case that depends on this constructor. So the default + * 10 MB is used. */ public S3DestinationConfig( - final String endpoint, - final String bucketName, - final String bucketPath, - final String bucketRegion, - final String accessKeyId, - final String secretAccessKey, - final S3FormatConfig formatConfig) { + final String endpoint, + final String bucketName, + final String bucketPath, + final String bucketRegion, + final String accessKeyId, + final String secretAccessKey, + final S3FormatConfig formatConfig) { this(endpoint, bucketName, bucketPath, bucketRegion, accessKeyId, secretAccessKey, DEFAULT_PART_SIZE_MB, formatConfig); } public S3DestinationConfig( - final String endpoint, - final String bucketName, - final String bucketPath, - final String bucketRegion, - final String accessKeyId, - final String secretAccessKey, - final Integer partSize, - final S3FormatConfig formatConfig) { + final String endpoint, + final String bucketName, + final String bucketPath, + final String bucketRegion, + final String accessKeyId, + final String secretAccessKey, + final Integer partSize, + final S3FormatConfig formatConfig) { this.endpoint = endpoint; this.bucketName = bucketName; this.bucketPath = bucketPath; @@ -75,7 +77,8 @@ public static S3DestinationConfig getS3DestinationConfig(final JsonNode config) if (config.get("s3_bucket_path") != null) { bucketPath = config.get("s3_bucket_path").asText(); } - // In the "normal" S3 destination, this is never null. However, the Redshift and Snowflake copy destinations don't set a Format config. + // In the "normal" S3 destination, this is never null. However, the Redshift and Snowflake copy + // destinations don't set a Format config. S3FormatConfig format = null; if (config.get("format") != null) { format = S3FormatConfigs.getS3FormatConfig(config); @@ -88,8 +91,7 @@ public static S3DestinationConfig getS3DestinationConfig(final JsonNode config) config.get("access_key_id").asText(), config.get("secret_access_key").asText(), partSize, - format - ); + format); } public String getEndpoint() { @@ -145,4 +147,5 @@ public AmazonS3 getS3Client() { .withCredentials(new AWSStaticCredentialsProvider(awsCreds)) .build(); } + } diff --git a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/S3DestinationTest.java b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/S3DestinationTest.java index b06bfcfbc190b..6b385831b7913 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/S3DestinationTest.java +++ b/airbyte-integrations/connectors/destination-s3/src/test/java/io/airbyte/integrations/destination/s3/S3DestinationTest.java @@ -1,3 +1,7 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + package io.airbyte.integrations.destination.s3; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -28,8 +32,7 @@ public void setup() { "fake-region", "fake-accessKeyId", "fake-secretAccessKey", - null - ); + null); } @Test @@ -49,4 +52,5 @@ public void createsThenDeletesTestFile() { verifyNoMoreInteractions(s3); } + } diff --git a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/json_conversion_test_cases.json b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/json_conversion_test_cases.json index 80870077f1f95..5c3f3c9cecd76 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/json_conversion_test_cases.json +++ b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/json_conversion_test_cases.json @@ -749,11 +749,7 @@ }, { "name": "date_field", - "type": [ - "null", - { "type": "int", "logicalType": "date" }, - "string" - ], + "type": ["null", { "type": "int", "logicalType": "date" }, "string"], "default": null }, { diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java index b09107bb5448b..0e58e705c6690 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeCopyS3Destination.java @@ -21,8 +21,8 @@ public class SnowflakeCopyS3Destination extends CopyDestination { @Override public AirbyteMessageConsumer getConsumer(final JsonNode config, - final ConfiguredAirbyteCatalog catalog, - final Consumer outputRecordCollector) { + final ConfiguredAirbyteCatalog catalog, + final Consumer outputRecordCollector) { return CopyConsumerFactory.create( outputRecordCollector, getDatabase(config), diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java index d23b5421ff34c..997d8838cee3b 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopier.java @@ -22,25 +22,25 @@ public class SnowflakeS3StreamCopier extends S3StreamCopier { private static final int FILE_PREFIX_LENGTH = 5; public SnowflakeS3StreamCopier(final String stagingFolder, - final DestinationSyncMode destSyncMode, - final String schema, - final String streamName, - final AmazonS3 client, - final JdbcDatabase db, - final S3DestinationConfig s3Config, - final ExtendedNameTransformer nameTransformer, - final SqlOperations sqlOperations) { + final DestinationSyncMode destSyncMode, + final String schema, + final String streamName, + final AmazonS3 client, + final JdbcDatabase db, + final S3DestinationConfig s3Config, + final ExtendedNameTransformer nameTransformer, + final SqlOperations sqlOperations) { super(stagingFolder, destSyncMode, schema, streamName, Strings.addRandomSuffix("", "", FILE_PREFIX_LENGTH) + "_" + streamName, client, db, s3Config, nameTransformer, sqlOperations); } @Override public void copyS3CsvFileIntoTable( - final JdbcDatabase database, - final String s3FileLocation, - final String schema, - final String tableName, - final S3DestinationConfig s3Config) + final JdbcDatabase database, + final String s3FileLocation, + final String schema, + final String tableName, + final S3DestinationConfig s3Config) throws SQLException { final var copyQuery = String.format( "COPY INTO %s.%s FROM '%s' " diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierFactory.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierFactory.java index 767c475d429f4..dcf2697a97908 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierFactory.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeS3StreamCopierFactory.java @@ -17,14 +17,14 @@ public class SnowflakeS3StreamCopierFactory extends S3StreamCopierFactory { @Override public StreamCopier create(final String stagingFolder, - final DestinationSyncMode syncMode, - final String schema, - final String streamName, - final AmazonS3 s3Client, - final JdbcDatabase db, - final S3DestinationConfig s3Config, - final ExtendedNameTransformer nameTransformer, - final SqlOperations sqlOperations) + final DestinationSyncMode syncMode, + final String schema, + final String streamName, + final AmazonS3 s3Client, + final JdbcDatabase db, + final S3DestinationConfig s3Config, + final ExtendedNameTransformer nameTransformer, + final SqlOperations sqlOperations) throws Exception { return new SnowflakeS3StreamCopier(stagingFolder, syncMode, schema, streamName, s3Client, db, s3Config, nameTransformer, sqlOperations); } diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py b/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py index ce0fd5e57be11..ac125c0c61eab 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/source.py @@ -74,9 +74,7 @@ def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> query = query.get("query") if is_manager_account and self.is_metrics_in_custom_query(query): - raise Exception( - f"Metrics are not available for manager account. Check fields in your custom query: {query}" - ) + raise Exception(f"Metrics are not available for manager account. Check fields in your custom query: {query}") if CustomQuery.cursor_field in query: raise Exception(f"Custom query should not contain {CustomQuery.cursor_field}") @@ -91,10 +89,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: account_info = self.get_account_info(google_api) time_zone = self.get_time_zone(account_info) incremental_stream_config = dict( - api=google_api, - conversion_window_days=config["conversion_window_days"], - start_date=config["start_date"], - time_zone=time_zone + api=google_api, conversion_window_days=config["conversion_window_days"], start_date=config["start_date"], time_zone=time_zone ) streams = [ @@ -113,12 +108,14 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: # Metrics streams cannot be requested for a manager account. if not self.is_manager_account(account_info): - streams.extend([ - UserLocationReport(**incremental_stream_config), - AccountPerformanceReport(**incremental_stream_config), - DisplayTopicsPerformanceReport(**incremental_stream_config), - DisplayKeywordPerformanceReport(**incremental_stream_config), - ShoppingPerformanceReport(**incremental_stream_config), - AdGroupAdReport(**incremental_stream_config) - ]) + streams.extend( + [ + UserLocationReport(**incremental_stream_config), + AccountPerformanceReport(**incremental_stream_config), + DisplayTopicsPerformanceReport(**incremental_stream_config), + DisplayKeywordPerformanceReport(**incremental_stream_config), + ShoppingPerformanceReport(**incremental_stream_config), + AdGroupAdReport(**incremental_stream_config), + ] + ) return streams diff --git a/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py b/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py index 80a81df4d2acf..e988cae63ba0a 100644 --- a/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py +++ b/airbyte-integrations/connectors/source-google-ads/source_google_ads/streams.py @@ -84,7 +84,9 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite days_of_data_storage=self.days_of_data_storage, ) - def get_date_params(self, stream_slice: Mapping[str, Any], cursor_field: str, end_date: pendulum.datetime = None, time_unit: str = "months"): + def get_date_params( + self, stream_slice: Mapping[str, Any], cursor_field: str, end_date: pendulum.datetime = None, time_unit: str = "months" + ): end_date = end_date or pendulum.yesterday(tz=self.time_zone) start_date = pendulum.parse(stream_slice.get(cursor_field)) if start_date > pendulum.now(): diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_google_ads.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_google_ads.py index 20d803a90b993..0c05d6bf70704 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_google_ads.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_google_ads.py @@ -116,7 +116,7 @@ def test_get_date_params(): conversion_window_days=mock_conversion_window_days, start_date=mock_start_date, api=MockGoogleAdsClient(SAMPLE_CONFIG), - time_zone="local" + time_zone="local", ) start_date, end_date = IncrementalGoogleAdsStream(**incremental_stream_config).get_date_params( @@ -127,9 +127,9 @@ def test_get_date_params(): def test_get_date_params_with_time_zone(): - time_zone_chatham = Timezone('Pacific/Chatham') # UTC+12:45 + time_zone_chatham = Timezone("Pacific/Chatham") # UTC+12:45 mock_start_date_chatham = pendulum.today(tz=time_zone_chatham).subtract(days=1).to_date_string() - time_zone_honolulu = Timezone('Pacific/Honolulu') # UTC-10:00 + time_zone_honolulu = Timezone("Pacific/Honolulu") # UTC-10:00 mock_start_date_honolulu = pendulum.today(tz=time_zone_honolulu).subtract(days=1).to_date_string() mock_conversion_window_days = 14 @@ -138,16 +138,13 @@ def test_get_date_params_with_time_zone(): conversion_window_days=mock_conversion_window_days, start_date=mock_start_date_chatham, api=MockGoogleAdsClient(SAMPLE_CONFIG), - time_zone=time_zone_chatham + time_zone=time_zone_chatham, ) start_date_chatham, end_date_chatham = IncrementalGoogleAdsStream(**incremental_stream_config).get_date_params( stream_slice={"segments.date": mock_start_date_chatham}, cursor_field="segments.date" ) - incremental_stream_config.update({ - 'start_date': mock_start_date_honolulu, - 'time_zone': time_zone_honolulu - }) + incremental_stream_config.update({"start_date": mock_start_date_honolulu, "time_zone": time_zone_honolulu}) start_date_honolulu, end_date_honolulu = IncrementalGoogleAdsStream(**incremental_stream_config).get_date_params( stream_slice={"segments.date": mock_start_date_honolulu}, cursor_field="segments.date" ) diff --git a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_source.py b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_source.py index bc49ff8a9ed03..20f9afcbd6ac2 100644 --- a/airbyte-integrations/connectors/source-google-ads/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-google-ads/unit_tests/test_source.py @@ -63,8 +63,9 @@ def test_time_zone(): # this requires the config because instantiating a stream creates a google client. TODO refactor so client can be mocked. def test_get_updated_state(config): google_api = GoogleAds(credentials=config["credentials"], customer_id=config["customer_id"]) - client = AdGroupAdReport(start_date=config["start_date"], api=google_api, - conversion_window_days=config["conversion_window_days"], time_zone="local") + client = AdGroupAdReport( + start_date=config["start_date"], api=google_api, conversion_window_days=config["conversion_window_days"], time_zone="local" + ) current_state_stream = {} latest_record = {"segments.date": "2020-01-01"} @@ -87,7 +88,7 @@ def get_instance_from_config(config, query): conversion_window_days=conversion_window_days, start_date=start_date, custom_query_config={"query": query, "table_name": "whatever_table"}, - time_zone="local" + time_zone="local", ) return instance diff --git a/airbyte-integrations/connectors/source-intercom/integration_tests/integration_test.py b/airbyte-integrations/connectors/source-intercom/integration_tests/integration_test.py index 6b564c21b3134..1bf0f2c761fb4 100644 --- a/airbyte-integrations/connectors/source-intercom/integration_tests/integration_test.py +++ b/airbyte-integrations/connectors/source-intercom/integration_tests/integration_test.py @@ -3,16 +3,16 @@ # import json -import pytest import time -from airbyte_cdk import AirbyteLogger -from airbyte_cdk.models import SyncMode from copy import deepcopy from pathlib import Path -from requests.exceptions import HTTPError from typing import Mapping from unittest.mock import patch +import pytest +from airbyte_cdk import AirbyteLogger +from airbyte_cdk.models import SyncMode +from requests.exceptions import HTTPError from source_intercom.source import Companies, ConversationParts, SourceIntercom, VersionApiAuthenticator LOGGER = AirbyteLogger() @@ -32,15 +32,15 @@ def stream_attributes() -> Mapping[str, str]: @pytest.mark.parametrize( "version,not_supported_streams,custom_companies_data_field", ( - (1.0, ["company_segments", "company_attributes", "contact_attributes"], "companies"), - (1.1, ["company_segments", "company_attributes", "contact_attributes"], "companies"), - (1.2, ["company_segments", "company_attributes", "contact_attributes"], "companies"), - (1.3, ["company_segments", "company_attributes", "contact_attributes"], "companies"), - (1.4, ["company_segments"], "companies"), - (2.0, [], "data"), - (2.1, [], "data"), - (2.2, [], "data"), - (2.3, [], "data"), + (1.0, ["company_segments", "company_attributes", "contact_attributes"], "companies"), + (1.1, ["company_segments", "company_attributes", "contact_attributes"], "companies"), + (1.2, ["company_segments", "company_attributes", "contact_attributes"], "companies"), + (1.3, ["company_segments", "company_attributes", "contact_attributes"], "companies"), + (1.4, ["company_segments"], "companies"), + (2.0, [], "data"), + (2.1, [], "data"), + (2.2, [], "data"), + (2.3, [], "data"), ), ) def test_supported_versions(stream_attributes, version, not_supported_streams, custom_companies_data_field): diff --git a/airbyte-integrations/connectors/source-microsoft-teams/source_microsoft_teams/spec.json b/airbyte-integrations/connectors/source-microsoft-teams/source_microsoft_teams/spec.json index 43bd97f19294d..230b1f9af530a 100644 --- a/airbyte-integrations/connectors/source-microsoft-teams/source_microsoft_teams/spec.json +++ b/airbyte-integrations/connectors/source-microsoft-teams/source_microsoft_teams/spec.json @@ -20,7 +20,12 @@ { "type": "object", "title": "Authenticate via Microsoft (OAuth 2.0)", - "required": ["tenant_id", "client_id", "client_secret", "refresh_token"], + "required": [ + "tenant_id", + "client_id", + "client_secret", + "refresh_token" + ], "additionalProperties": false, "properties": { "auth_type": { diff --git a/airbyte-integrations/connectors/source-mssql/mssql-script.sql b/airbyte-integrations/connectors/source-mssql/mssql-script.sql index 270ce92df65f5..2c55ca5203c66 100644 --- a/airbyte-integrations/connectors/source-mssql/mssql-script.sql +++ b/airbyte-integrations/connectors/source-mssql/mssql-script.sql @@ -1,139 +1,303 @@ -create procedure table_copy(@tablecount int) -as -begin -set nocount on; +CREATE + PROCEDURE table_copy( + @tablecount INT + ) AS BEGIN +SET + nocount ON; + + DECLARE @v_max_table INT; + +DECLARE @v_counter_table INT; -DECLARE @v_max_table int; -DECLARE @v_counter_table int; DECLARE @tnamee VARCHAR(255); -set @v_max_table = @tablecount; -set @v_counter_table = 1; - -while @v_counter_table < @v_max_table begin -set @tnamee = concat('SELECT * INTO test_', @v_counter_table, ' FROM test;'); -EXEC (@tnamee); -set @v_counter_table = @v_counter_table + 1; -end; - -end; -go -- - -create procedure insert_rows( @allrows int, @insertcount int, @value NVARCHAR(max)) -as -begin -set nocount on; - -DECLARE @dummyIpsum varchar(255) -DECLARE @fieldText NVARCHAR(max) -set @fieldText = @value -DECLARE @vmax int; -DECLARE @vmaxx int; -DECLARE @vmaxoneinsert int; -DECLARE @counter int; -DECLARE @lastinsertcounter int; -DECLARE @lastinsert int; -DECLARE @fullloop int; -DECLARE @fullloopcounter int; -set @dummyIpsum = '''dummy_ipsum''' -set @vmax = @allrows; -set @vmaxx = @allrows; -set @vmaxoneinsert = @insertcount; -set @counter = 1; -set @lastinsertcounter = 1; -set @lastinsert = 0; -set @fullloop = 0; -set @fullloopcounter = 0; - -while @vmaxx <= @vmaxoneinsert begin - set @vmaxoneinsert = @vmaxx; - set @fullloop = @fullloop + 1; - set @vmaxx = @vmaxx + 1; -end; - -while @vmax > @vmaxoneinsert begin - set @fullloop = @fullloop + 1; - set @vmax = @vmax - @vmaxoneinsert; - set @lastinsert = @vmax; -end; +SET +@v_max_table = @tablecount; +SET +@v_counter_table = 1; + +while @v_counter_table < @v_max_table BEGIN +SET +@tnamee = concat( + 'SELECT * INTO test_', + @v_counter_table, + ' FROM test;' +); + +EXEC(@tnamee); +SET +@v_counter_table = @v_counter_table + 1; +END; +END; + +GO -- +CREATE + PROCEDURE insert_rows( + @allrows INT, + @insertcount INT, + @value NVARCHAR(MAX) + ) AS BEGIN +SET + nocount ON; + + DECLARE @dummyIpsum VARCHAR(255) DECLARE @fieldText NVARCHAR(MAX) +SET + @fieldText = @value DECLARE @vmax INT; + +DECLARE @vmaxx INT; + +DECLARE @vmaxoneinsert INT; + +DECLARE @counter INT; + +DECLARE @lastinsertcounter INT; + +DECLARE @lastinsert INT; + +DECLARE @fullloop INT; + +DECLARE @fullloopcounter INT; +SET +@dummyIpsum = '''dummy_ipsum''' +SET +@vmax = @allrows; +SET +@vmaxx = @allrows; +SET +@vmaxoneinsert = @insertcount; +SET +@counter = 1; +SET +@lastinsertcounter = 1; +SET +@lastinsert = 0; +SET +@fullloop = 0; +SET +@fullloopcounter = 0; + +while @vmaxx <= @vmaxoneinsert BEGIN +SET +@vmaxoneinsert = @vmaxx; +SET +@fullloop = @fullloop + 1; +SET +@vmaxx = @vmaxx + 1; +END; + +while @vmax > @vmaxoneinsert BEGIN +SET +@fullloop = @fullloop + 1; +SET +@vmax = @vmax - @vmaxoneinsert; +SET +@lastinsert = @vmax; +END; DECLARE @insertTable NVARCHAR(MAX) -set @insertTable = CONVERT(NVARCHAR(max), 'insert into test (varchar1, varchar2, varchar3, varchar4, varchar5, longblobfield, timestampfield) values ('); -while @counter < @vmaxoneinsert begin - set @insertTable = CONVERT(NVARCHAR(max), concat(@insertTable, @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @fieldText, ', CURRENT_TIMESTAMP), (')); - set @counter = @counter + 1; -end; -set @insertTable = CONVERT(NVARCHAR(max), concat(@insertTable, @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @fieldText, ', CURRENT_TIMESTAMP);')); - -while @vmax < 1 begin - set @fullloop = 0 - set @vmax = 1 -end; - -while @fullloopcounter < @fullloop begin - EXEC (@insertTable); - set @fullloopcounter = @fullloopcounter + 1; -end; - -DECLARE @insertTableLasted NVARCHAR(max); -set @insertTableLasted = CONVERT(NVARCHAR(max), 'insert into test (varchar1, varchar2, varchar3, varchar4, varchar5, longblobfield, timestampfield) values ('); -while @lastinsertcounter < @lastinsert begin - set @insertTableLasted = CONVERT(NVARCHAR(max), concat(@insertTableLasted, @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @fieldText, ', CURRENT_TIMESTAMP), (')); - set @lastinsertcounter = @lastinsertcounter + 1; -end; - -set @insertTableLasted = CONVERT(NVARCHAR(max), concat(@insertTableLasted, @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @fieldText, ', CURRENT_TIMESTAMP);')); - -while @lastinsert > 0 begin - EXEC (@insertTableLasted); - set @lastinsert = 0; -end; - -end; -go -- - -create procedure table_create(@val int) -as -begin -set nocount on; - --- SQLINES LICENSE FOR EVALUATION USE ONLY -create table test -( -id int check (id > 0) not null identity primary key, -varchar1 varchar(255), -varchar2 varchar(255), -varchar3 varchar(255), -varchar4 varchar(255), -varchar5 varchar(255), -longblobfield nvarchar(max), -timestampfield datetime2(0) +SET +@insertTable = CONVERT( + NVARCHAR(MAX), + 'insert into test (varchar1, varchar2, varchar3, varchar4, varchar5, longblobfield, timestampfield) values (' +); + +while @counter < @vmaxoneinsert BEGIN +SET +@insertTable = CONVERT( + NVARCHAR(MAX), + concat( + @insertTable, + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @fieldText, + ', CURRENT_TIMESTAMP), (' + ) +); +SET +@counter = @counter + 1; +END; +SET +@insertTable = CONVERT( + NVARCHAR(MAX), + concat( + @insertTable, + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @fieldText, + ', CURRENT_TIMESTAMP);' + ) ); -DECLARE @extraSmallText NVARCHAR(max); -DECLARE @smallText NVARCHAR(max); -DECLARE @regularText NVARCHAR(max); -DECLARE @largeText NVARCHAR(max); -set @extraSmallText = '''test weight 50b - 50b text, 50b text, 50b text''' -set @smallText = CONCAT('''test weight 500b - ', REPLICATE('some text, some text, ', 20), '''') -set @regularText = CONCAT('''test weight 10kb - ', REPLICATE('some text, some text, some text, some text, ', 295), 'some text''') -set @largeText = CONCAT('''test weight 100kb - ', REPLICATE('some text, some text, some text, some text, ', 2225), 'some text''') --- TODO: change the following @allrows to control the number of records with different sizes +while @vmax < 1 BEGIN +SET +@fullloop = 0 +SET +@vmax = 1 +END; + +while @fullloopcounter < @fullloop BEGIN EXEC(@insertTable); +SET +@fullloopcounter = @fullloopcounter + 1; +END; + +DECLARE @insertTableLasted NVARCHAR(MAX); +SET +@insertTableLasted = CONVERT( + NVARCHAR(MAX), + 'insert into test (varchar1, varchar2, varchar3, varchar4, varchar5, longblobfield, timestampfield) values (' +); + +while @lastinsertcounter < @lastinsert BEGIN +SET +@insertTableLasted = CONVERT( + NVARCHAR(MAX), + concat( + @insertTableLasted, + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @fieldText, + ', CURRENT_TIMESTAMP), (' + ) +); +SET +@lastinsertcounter = @lastinsertcounter + 1; +END; +SET +@insertTableLasted = CONVERT( + NVARCHAR(MAX), + concat( + @insertTableLasted, + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @fieldText, + ', CURRENT_TIMESTAMP);' + ) +); + +while @lastinsert > 0 BEGIN EXEC(@insertTableLasted); +SET +@lastinsert = 0; +END; +END; + +GO -- +CREATE + PROCEDURE table_create( + @val INT + ) AS BEGIN +SET + nocount ON; + + -- SQLINES LICENSE FOR EVALUATION USE ONLY +CREATE + TABLE + test( + id INT CHECK( + id > 0 + ) NOT NULL IDENTITY PRIMARY KEY, + varchar1 VARCHAR(255), + varchar2 VARCHAR(255), + varchar3 VARCHAR(255), + varchar4 VARCHAR(255), + varchar5 VARCHAR(255), + longblobfield nvarchar(MAX), + timestampfield datetime2(0) + ); + +DECLARE @extraSmallText NVARCHAR(MAX); + +DECLARE @smallText NVARCHAR(MAX); + +DECLARE @regularText NVARCHAR(MAX); + +DECLARE @largeText NVARCHAR(MAX); +SET +@extraSmallText = '''test weight 50b - 50b text, 50b text, 50b text''' +SET +@smallText = CONCAT( + '''test weight 500b - ', + REPLICATE( + 'some text, some text, ', + 20 + ), + '''' +) +SET +@regularText = CONCAT( + '''test weight 10kb - ', + REPLICATE( + 'some text, some text, some text, some text, ', + 295 + ), + 'some text''' +) +SET +@largeText = CONCAT( + '''test weight 100kb - ', + REPLICATE( + 'some text, some text, some text, some text, ', + 2225 + ), + 'some text''' +) -- TODO: change the following @allrows to control the number of records with different sizes -- number of 50B records -EXEC insert_rows @allrows = 0, @insertcount = 998, @value = @extraSmallText --- number of 500B records -EXEC insert_rows @allrows = 0, @insertcount = 998, @value = @smallText --- number of 10KB records -EXEC insert_rows @allrows = 0, @insertcount = 998, @value = @regularText --- number of 100KB records -EXEC insert_rows @allrows = 0, @insertcount = 98, @value = @largeText -end; -go -- - -EXEC table_create @val = 0 -drop procedure if exists insert_rows; -drop procedure if exists table_create; +EXEC insert_rows @allrows = 0, +@insertcount = 998, +@value = @extraSmallText -- number of 500B records +EXEC insert_rows @allrows = 0, +@insertcount = 998, +@value = @smallText -- number of 10KB records +EXEC insert_rows @allrows = 0, +@insertcount = 998, +@value = @regularText -- number of 100KB records +EXEC insert_rows @allrows = 0, +@insertcount = 98, +@value = @largeText +END; + +GO -- +EXEC table_create @val = 0 DROP + PROCEDURE IF EXISTS insert_rows; + +DROP + PROCEDURE IF EXISTS table_create; -- TODO: change the value to control the number of tables EXEC table_copy @tablecount = 1; -drop procedure if exists table_copy; -exec sp_rename 'test', 'test_0'; + +DROP + PROCEDURE IF EXISTS table_copy; + +EXEC sp_rename 'test', +'test_0'; diff --git a/airbyte-integrations/connectors/source-mysql/mysql-script.sql b/airbyte-integrations/connectors/source-mysql/mysql-script.sql index f127c057b4c1c..f4de7715e15e2 100644 --- a/airbyte-integrations/connectors/source-mysql/mysql-script.sql +++ b/airbyte-integrations/connectors/source-mysql/mysql-script.sql @@ -1,134 +1,280 @@ -delimiter # -create procedure table_copy(in tablecount int) -begin - -set @v_max_table = tablecount; -set @v_counter_table = 1; +delimiter # CREATE + PROCEDURE table_copy( + IN tablecount INT + ) BEGIN +SET + @v_max_table = tablecount; +SET +@v_counter_table = 1; while @v_counter_table < @v_max_table do -set @tnamee = concat('create table IF NOT EXISTS test_', @v_counter_table, ' SELECT * FROM test;'); -PREPARE stmt from @tnamee; -EXECUTE stmt; -DEALLOCATE PREPARE stmt; -set @v_counter_table = @v_counter_table + 1; -end while; -commit; +SET +@tnamee = concat( + 'create table IF NOT EXISTS test_', + @v_counter_table, + ' SELECT * FROM test;' +); -end # +PREPARE stmt +FROM +@tnamee; -delimiter ; +EXECUTE stmt; -delimiter # -create procedure insert_rows(in allrows int, in insertcount int, in value longblob) -begin +DEALLOCATE PREPARE stmt; +SET +@v_counter_table = @v_counter_table + 1; +END while; + +COMMIT; +END # delimiter; -set @dummyIpsum = '\'dummy_ipsum\''; -set @fieldText = value; -set @vmax = allrows; -set @vmaxx = allrows; -set @vmaxoneinsert = insertcount; -set @counter = 1; -set @lastinsertcounter = 1; -set @lastinsert = 0; -set @fullloop = 0; -set @fullloopcounter = 0; +delimiter # CREATE + PROCEDURE insert_rows( + IN allrows INT, + IN insertcount INT, + IN value longblob + ) BEGIN +SET + @dummyIpsum = '\' dummy_ipsum\''; +SET +@fieldText = value; +SET +@vmax = allrows; +SET +@vmaxx = allrows; +SET +@vmaxoneinsert = insertcount; +SET +@counter = 1; +SET +@lastinsertcounter = 1; +SET +@lastinsert = 0; +SET +@fullloop = 0; +SET +@fullloopcounter = 0; while @vmaxx <= @vmaxoneinsert do - set @vmaxoneinsert = @vmaxx; - set @fullloop = @fullloop + 1; - set @vmaxx = @vmaxx + 1; -end while; -commit; +SET +@vmaxoneinsert = @vmaxx; +SET +@fullloop = @fullloop + 1; +SET +@vmaxx = @vmaxx + 1; +END while; + +COMMIT; while @vmax > @vmaxoneinsert do - set @fullloop = @fullloop + 1; - set @vmax = @vmax - @vmaxoneinsert; - set @lastinsert = @vmax; -end while; -commit; +SET +@fullloop = @fullloop + 1; +SET +@vmax = @vmax - @vmaxoneinsert; +SET +@lastinsert = @vmax; +END while; + +COMMIT; +SET +@insertTable = concat('insert into test (varchar1, varchar2, varchar3, varchar4, varchar5, longblobfield, timestampfield) values ('); -set @insertTable = concat('insert into test (varchar1, varchar2, varchar3, varchar4, varchar5, longblobfield, timestampfield) values ('); while @counter < @vmaxoneinsert do - set @insertTable = concat(@insertTable, @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @fieldText, ', CURRENT_TIMESTAMP), ('); - set @counter = @counter + 1; -end while; -commit; -set @insertTable = concat(@insertTable, @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @fieldText, ', CURRENT_TIMESTAMP);'); +SET +@insertTable = concat( + @insertTable, + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @fieldText, + ', CURRENT_TIMESTAMP), (' +); +SET +@counter = @counter + 1; +END while; + +COMMIT; +SET +@insertTable = concat( + @insertTable, + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @fieldText, + ', CURRENT_TIMESTAMP);' +); while @vmax < 1 do - set @fullloop = 0; - set @vmax = 1; -end while; -commit; - -while @fullloopcounter < @fullloop do - PREPARE runinsert from @insertTable; - EXECUTE runinsert; - DEALLOCATE PREPARE runinsert; - set @fullloopcounter = @fullloopcounter + 1; -end while; -commit; - -set @insertTableLasted = concat('insert into test (varchar1, varchar2, varchar3, varchar4, varchar5, longblobfield, timestampfield) values ('); +SET +@fullloop = 0; +SET +@vmax = 1; +END while; + +COMMIT; + +while @fullloopcounter < @fullloop do PREPARE runinsert +FROM +@insertTable; + +EXECUTE runinsert; + +DEALLOCATE PREPARE runinsert; +SET +@fullloopcounter = @fullloopcounter + 1; +END while; + +COMMIT; +SET +@insertTableLasted = concat('insert into test (varchar1, varchar2, varchar3, varchar4, varchar5, longblobfield, timestampfield) values ('); + while @lastinsertcounter < @lastinsert do - set @insertTableLasted = concat(@insertTableLasted, @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @fieldText, ', CURRENT_TIMESTAMP), ('); - set @lastinsertcounter = @lastinsertcounter + 1; -end while; -commit; -set @insertTableLasted = concat(@insertTableLasted, @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @dummyIpsum, ', ', @fieldText, ', CURRENT_TIMESTAMP);'); - -while @lastinsert > 0 do - PREPARE runinsert from @insertTableLasted; - EXECUTE runinsert; - DEALLOCATE PREPARE runinsert; - set @lastinsert = 0; -end while; -commit; - -end # - -delimiter ; - -delimiter # -create procedure table_create() -begin - -create table test -( -id int unsigned not null auto_increment primary key, -varchar1 varchar(255), -varchar2 varchar(255), -varchar3 varchar(255), -varchar4 varchar(255), -varchar5 varchar(255), -longblobfield longblob, -timestampfield timestamp -) -engine=innodb; - -set @extraSmallText = '\'test weight 50b - some text, some text, some text\''; -set @smallText = CONCAT('\'test weight 500b - ', REPEAT('some text, some text, ', 20), '\''); -set @regularText = CONCAT('\'test weight 10kb - ', REPEAT('some text, some text, ', 590), '\''); -set @largeText = CONCAT('\'test weight 100kb - ', REPEAT('some text, some text, ', 4450), '\''); +SET +@insertTableLasted = concat( + @insertTableLasted, + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @fieldText, + ', CURRENT_TIMESTAMP), (' +); +SET +@lastinsertcounter = @lastinsertcounter + 1; +END while; + +COMMIT; +SET +@insertTableLasted = concat( + @insertTableLasted, + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @dummyIpsum, + ', ', + @fieldText, + ', CURRENT_TIMESTAMP);' +); + +while @lastinsert > 0 do PREPARE runinsert +FROM +@insertTableLasted; + +EXECUTE runinsert; + +DEALLOCATE PREPARE runinsert; +SET +@lastinsert = 0; +END while; + +COMMIT; +END # delimiter; + +delimiter # CREATE + PROCEDURE table_create() BEGIN CREATE + TABLE + test( + id INT unsigned NOT NULL auto_increment PRIMARY KEY, + varchar1 VARCHAR(255), + varchar2 VARCHAR(255), + varchar3 VARCHAR(255), + varchar4 VARCHAR(255), + varchar5 VARCHAR(255), + longblobfield longblob, + timestampfield TIMESTAMP + ) engine = innodb; +SET +@extraSmallText = '\' test weight 50 b - SOME text, +SOME text, +SOME text\''; +SET +@smallText = CONCAT( + '\' test weight 500 b - ', REPEAT(' SOME text, + SOME text, + ', 20), ' \'' +); +SET +@regularText = CONCAT( + '\' test weight 10 kb - ', REPEAT(' SOME text, + SOME text, + ', 590), ' \'' +); +SET +@largeText = CONCAT( + '\' test weight 100 kb - ', REPEAT(' SOME text, + SOME text, + ', 4450), ' \'' +); -- TODO: change the following @allrows to control the number of records with different sizes -- number of 50B records -call insert_rows(0, 5000000, @extraSmallText); +CALL insert_rows( + 0, + 5000000, + @extraSmallText +); + -- number of 500B records -call insert_rows(0, 50000, @smallText); +CALL insert_rows( + 0, + 50000, + @smallText +); + -- number of 10KB records -call insert_rows(0, 5000, @regularText); +CALL insert_rows( + 0, + 5000, + @regularText +); + -- number of 100KB records -call insert_rows(0, 50, @largeText); -end # +CALL insert_rows( + 0, + 50, + @largeText +); +END # delimiter; -delimiter ; +CALL table_create(); -call table_create(); -drop procedure if exists table_create; -drop procedure if exists insert_rows; +DROP + PROCEDURE IF EXISTS table_create; + +DROP + PROCEDURE IF EXISTS insert_rows; -- TODO: change the value to control the number of tables -call table_copy(1); -drop procedure if exists table_copy; -ALTER TABLE test RENAME test_0; +CALL table_copy(1); + +DROP + PROCEDURE IF EXISTS table_copy; + +ALTER TABLE + test RENAME test_0; diff --git a/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleContainer.java b/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleContainer.java index 05f24cecf8136..f5b99dc57d3b5 100644 --- a/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleContainer.java +++ b/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleContainer.java @@ -1,197 +1,199 @@ -package io.airbyte.integrations.source.oracle_strict_encrypt; +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.source.oracle_strict_encrypt; -import org.apache.commons.lang3.StringUtils; -import org.testcontainers.containers.JdbcDatabaseContainer; -import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; -import org.testcontainers.utility.DockerImageName; +import static java.time.temporal.ChronoUnit.SECONDS; +import static java.util.Collections.singleton; import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Set; import java.util.concurrent.Future; - -import static java.time.temporal.ChronoUnit.SECONDS; -import static java.util.Collections.singleton; +import org.apache.commons.lang3.StringUtils; +import org.testcontainers.containers.JdbcDatabaseContainer; +import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy; +import org.testcontainers.utility.DockerImageName; public class OracleContainer extends JdbcDatabaseContainer { - public static final String NAME = "oracle"; - private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("gvenzl/oracle-xe"); - - static final String DEFAULT_TAG = "18.4.0-slim"; - static final String IMAGE = DEFAULT_IMAGE_NAME.getUnversionedPart(); - - private static final int ORACLE_PORT = 1521; - private static final int APEX_HTTP_PORT = 8080; - - private static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240; - private static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 120; - - // Container defaults - static final String DEFAULT_DATABASE_NAME = "xepdb1"; - static final String DEFAULT_SID = "xe"; - static final String DEFAULT_SYSTEM_USER = "system"; - static final String DEFAULT_SYS_USER = "sys"; - - // Test container defaults - static final String APP_USER = "test"; - static final String APP_USER_PASSWORD = "test"; - - // Restricted user and database names - private static final List ORACLE_SYSTEM_USERS = Arrays.asList(DEFAULT_SYSTEM_USER, DEFAULT_SYS_USER); - - private String databaseName = DEFAULT_DATABASE_NAME; - private String username = APP_USER; - private String password = APP_USER_PASSWORD; - private boolean usingSid = false; - - public OracleContainer() { - this(DEFAULT_IMAGE_NAME.withTag(DEFAULT_TAG)); - } - - public OracleContainer(final String dockerImageName) { - this(DockerImageName.parse(dockerImageName)); - } - - public OracleContainer(final DockerImageName dockerImageName) { - super(dockerImageName); - dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME); - preconfigure(); - } - - public OracleContainer(final Future dockerImageName) { - super(dockerImageName); - preconfigure(); - } - - private void preconfigure() { - this.waitStrategy = new LogMessageWaitStrategy() - .withRegEx(".*DATABASE IS READY TO USE!.*\\s") - .withTimes(1) - .withStartupTimeout(Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, SECONDS)); - - withConnectTimeoutSeconds(DEFAULT_CONNECT_TIMEOUT_SECONDS); - addExposedPorts(ORACLE_PORT, APEX_HTTP_PORT); - } - - @Override - protected void waitUntilContainerStarted() { - getWaitStrategy().waitUntilReady(this); - } - - @Override - public Set getLivenessCheckPortNumbers() { - return singleton(getMappedPort(ORACLE_PORT)); - } - - @Override - public String getDriverClassName() { - return "oracle.jdbc.OracleDriver"; - } - - @Override - public String getJdbcUrl() { - return isUsingSid() ? "jdbc:oracle:thin:" + "@" + getHost() + ":" + getOraclePort() + ":" + getSid() - : "jdbc:oracle:thin:" + "@" + getHost() + ":" + getOraclePort() + "/" + getDatabaseName(); - } - - @Override - public String getUsername() { - // An application user is tied to the database, and therefore not authenticated to connect to SID. - return isUsingSid() ? DEFAULT_SYSTEM_USER : username; - } - - @Override - public String getPassword() { - return password; - } - - @Override - public String getDatabaseName() { - return databaseName; - } - - protected boolean isUsingSid() { - return usingSid; - } - - @Override - public OracleContainer withUsername(final String username) { - if (StringUtils.isEmpty(username)) { - throw new IllegalArgumentException("Username cannot be null or empty"); - } - if (ORACLE_SYSTEM_USERS.contains(username.toLowerCase())) { - throw new IllegalArgumentException("Username cannot be one of " + ORACLE_SYSTEM_USERS); - } - this.username = username; - return self(); - } - - @Override - public OracleContainer withPassword(final String password) { - if (StringUtils.isEmpty(password)) { - throw new IllegalArgumentException("Password cannot be null or empty"); - } - this.password = password; - return self(); - } - - @Override - public OracleContainer withDatabaseName(final String databaseName) { - if (StringUtils.isEmpty(databaseName)) { - throw new IllegalArgumentException("Database name cannot be null or empty"); - } - - if (DEFAULT_DATABASE_NAME.equals(databaseName.toLowerCase())) { - throw new IllegalArgumentException("Database name cannot be set to " + DEFAULT_DATABASE_NAME); - } - - this.databaseName = databaseName; - return self(); - } - - public OracleContainer usingSid() { - this.usingSid = true; - return self(); - } - - @Override - public OracleContainer withUrlParam(final String paramName, final String paramValue) { - throw new UnsupportedOperationException("The Oracle Database driver does not support this"); - } - - @SuppressWarnings("SameReturnValue") - public String getSid() { - return DEFAULT_SID; - } - - public Integer getOraclePort() { - return getMappedPort(ORACLE_PORT); - } - - @SuppressWarnings("unused") - public Integer getWebPort() { - return getMappedPort(APEX_HTTP_PORT); - } - - @Override - public String getTestQueryString() { - return "SELECT 1 FROM DUAL"; - } - - @Override - protected void configure() { - withEnv("ORACLE_PASSWORD", password); - - // Only set ORACLE_DATABASE if different than the default. - if (databaseName != DEFAULT_DATABASE_NAME) { - withEnv("ORACLE_DATABASE", databaseName); - } - - withEnv("APP_USER", username); - withEnv("APP_USER_PASSWORD", password); - } + public static final String NAME = "oracle"; + private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("gvenzl/oracle-xe"); + + static final String DEFAULT_TAG = "18.4.0-slim"; + static final String IMAGE = DEFAULT_IMAGE_NAME.getUnversionedPart(); + + private static final int ORACLE_PORT = 1521; + private static final int APEX_HTTP_PORT = 8080; + + private static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240; + private static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 120; + + // Container defaults + static final String DEFAULT_DATABASE_NAME = "xepdb1"; + static final String DEFAULT_SID = "xe"; + static final String DEFAULT_SYSTEM_USER = "system"; + static final String DEFAULT_SYS_USER = "sys"; + + // Test container defaults + static final String APP_USER = "test"; + static final String APP_USER_PASSWORD = "test"; + + // Restricted user and database names + private static final List ORACLE_SYSTEM_USERS = Arrays.asList(DEFAULT_SYSTEM_USER, DEFAULT_SYS_USER); + + private String databaseName = DEFAULT_DATABASE_NAME; + private String username = APP_USER; + private String password = APP_USER_PASSWORD; + private boolean usingSid = false; + + public OracleContainer() { + this(DEFAULT_IMAGE_NAME.withTag(DEFAULT_TAG)); + } + + public OracleContainer(final String dockerImageName) { + this(DockerImageName.parse(dockerImageName)); + } + + public OracleContainer(final DockerImageName dockerImageName) { + super(dockerImageName); + dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME); + preconfigure(); + } + + public OracleContainer(final Future dockerImageName) { + super(dockerImageName); + preconfigure(); + } + + private void preconfigure() { + this.waitStrategy = new LogMessageWaitStrategy() + .withRegEx(".*DATABASE IS READY TO USE!.*\\s") + .withTimes(1) + .withStartupTimeout(Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, SECONDS)); + + withConnectTimeoutSeconds(DEFAULT_CONNECT_TIMEOUT_SECONDS); + addExposedPorts(ORACLE_PORT, APEX_HTTP_PORT); + } + + @Override + protected void waitUntilContainerStarted() { + getWaitStrategy().waitUntilReady(this); + } + + @Override + public Set getLivenessCheckPortNumbers() { + return singleton(getMappedPort(ORACLE_PORT)); + } + + @Override + public String getDriverClassName() { + return "oracle.jdbc.OracleDriver"; + } + + @Override + public String getJdbcUrl() { + return isUsingSid() ? "jdbc:oracle:thin:" + "@" + getHost() + ":" + getOraclePort() + ":" + getSid() + : "jdbc:oracle:thin:" + "@" + getHost() + ":" + getOraclePort() + "/" + getDatabaseName(); + } + + @Override + public String getUsername() { + // An application user is tied to the database, and therefore not authenticated to connect to SID. + return isUsingSid() ? DEFAULT_SYSTEM_USER : username; + } + + @Override + public String getPassword() { + return password; + } + + @Override + public String getDatabaseName() { + return databaseName; + } + + protected boolean isUsingSid() { + return usingSid; + } + + @Override + public OracleContainer withUsername(final String username) { + if (StringUtils.isEmpty(username)) { + throw new IllegalArgumentException("Username cannot be null or empty"); + } + if (ORACLE_SYSTEM_USERS.contains(username.toLowerCase())) { + throw new IllegalArgumentException("Username cannot be one of " + ORACLE_SYSTEM_USERS); + } + this.username = username; + return self(); + } + + @Override + public OracleContainer withPassword(final String password) { + if (StringUtils.isEmpty(password)) { + throw new IllegalArgumentException("Password cannot be null or empty"); + } + this.password = password; + return self(); + } + + @Override + public OracleContainer withDatabaseName(final String databaseName) { + if (StringUtils.isEmpty(databaseName)) { + throw new IllegalArgumentException("Database name cannot be null or empty"); + } + + if (DEFAULT_DATABASE_NAME.equals(databaseName.toLowerCase())) { + throw new IllegalArgumentException("Database name cannot be set to " + DEFAULT_DATABASE_NAME); + } + + this.databaseName = databaseName; + return self(); + } + + public OracleContainer usingSid() { + this.usingSid = true; + return self(); + } + + @Override + public OracleContainer withUrlParam(final String paramName, final String paramValue) { + throw new UnsupportedOperationException("The Oracle Database driver does not support this"); + } + + @SuppressWarnings("SameReturnValue") + public String getSid() { + return DEFAULT_SID; + } + + public Integer getOraclePort() { + return getMappedPort(ORACLE_PORT); + } + + @SuppressWarnings("unused") + public Integer getWebPort() { + return getMappedPort(APEX_HTTP_PORT); + } + + @Override + public String getTestQueryString() { + return "SELECT 1 FROM DUAL"; + } + + @Override + protected void configure() { + withEnv("ORACLE_PASSWORD", password); + + // Only set ORACLE_DATABASE if different than the default. + if (databaseName != DEFAULT_DATABASE_NAME) { + withEnv("ORACLE_DATABASE", databaseName); + } + + withEnv("APP_USER", username); + withEnv("APP_USER_PASSWORD", password); + } } diff --git a/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleSourceNneAcceptanceTest.java b/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleSourceNneAcceptanceTest.java index 8e9ff7d3e83c6..bf1458aa9af77 100644 --- a/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleSourceNneAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleSourceNneAcceptanceTest.java @@ -47,7 +47,7 @@ public void testEncrytion() throws SQLException { final List collect = database.query(network_service_banner).collect(Collectors.toList()); assertTrue(collect.get(2).get("NETWORK_SERVICE_BANNER").asText() - .contains(algorithm + " Encryption")); + .contains(algorithm + " Encryption")); } @Test diff --git a/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleStrictEncryptSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleStrictEncryptSourceAcceptanceTest.java index 99553d7bf38b1..44c381069cc9a 100644 --- a/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleStrictEncryptSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleStrictEncryptSourceAcceptanceTest.java @@ -21,7 +21,6 @@ import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaPrimitive; import io.airbyte.protocol.models.SyncMode; - import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -37,9 +36,9 @@ public class OracleStrictEncryptSourceAcceptanceTest extends SourceAcceptanceTes @Override protected void setupEnvironment(final TestDestinationEnv environment) throws Exception { container = new OracleContainer() - .withUsername("test") - .withPassword("oracle") - .usingSid();; + .withUsername("test") + .withPassword("oracle") + .usingSid();; container.start(); config = Jsons.jsonNode(ImmutableMap.builder() diff --git a/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/AbstractSshOracleSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/AbstractSshOracleSourceAcceptanceTest.java index d252ac53e596b..cd13e75b49582 100644 --- a/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/AbstractSshOracleSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/AbstractSshOracleSourceAcceptanceTest.java @@ -78,10 +78,10 @@ private void startTestContainers() { private void initAndStartJdbcContainer() { db = new OracleContainer() - .withUsername("test") - .withPassword("oracle") - .usingSid() - .withNetwork(sshBastionContainer.getNetWork());; + .withUsername("test") + .withPassword("oracle") + .usingSid() + .withNetwork(sshBastionContainer.getNetWork());; db.start(); } diff --git a/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleContainer.java b/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleContainer.java index 0f879c6c41936..c0f000fcad042 100644 --- a/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleContainer.java +++ b/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleContainer.java @@ -1,5 +1,8 @@ -package io.airbyte.integrations.source.oracle; +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.integrations.source.oracle; import static java.time.temporal.ChronoUnit.SECONDS; import static java.util.Collections.singleton; @@ -16,181 +19,181 @@ public class OracleContainer extends JdbcDatabaseContainer { - public static final String NAME = "oracle"; - private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("gvenzl/oracle-xe"); - - static final String DEFAULT_TAG = "18.4.0-slim"; - static final String IMAGE = DEFAULT_IMAGE_NAME.getUnversionedPart(); - - private static final int ORACLE_PORT = 1521; - private static final int APEX_HTTP_PORT = 8080; - - private static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240; - private static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 120; - - // Container defaults - static final String DEFAULT_DATABASE_NAME = "xepdb1"; - static final String DEFAULT_SID = "xe"; - static final String DEFAULT_SYSTEM_USER = "system"; - static final String DEFAULT_SYS_USER = "sys"; - - // Test container defaults - static final String APP_USER = "test"; - static final String APP_USER_PASSWORD = "test"; - - // Restricted user and database names - private static final List ORACLE_SYSTEM_USERS = Arrays.asList(DEFAULT_SYSTEM_USER, DEFAULT_SYS_USER); - - private String databaseName = DEFAULT_DATABASE_NAME; - private String username = APP_USER; - private String password = APP_USER_PASSWORD; - private boolean usingSid = false; - - public OracleContainer() { - this(DEFAULT_IMAGE_NAME.withTag(DEFAULT_TAG)); - } - - public OracleContainer(final String dockerImageName) { - this(DockerImageName.parse(dockerImageName)); - } - - public OracleContainer(final DockerImageName dockerImageName) { - super(dockerImageName); - dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME); - preconfigure(); - } - - public OracleContainer(final Future dockerImageName) { - super(dockerImageName); - preconfigure(); - } - - private void preconfigure() { - this.waitStrategy = new LogMessageWaitStrategy() - .withRegEx(".*DATABASE IS READY TO USE!.*\\s") - .withTimes(1) - .withStartupTimeout(Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, SECONDS)); - - withConnectTimeoutSeconds(DEFAULT_CONNECT_TIMEOUT_SECONDS); - addExposedPorts(ORACLE_PORT, APEX_HTTP_PORT); - } - - @Override - protected void waitUntilContainerStarted() { - getWaitStrategy().waitUntilReady(this); - } - - @Override - public Set getLivenessCheckPortNumbers() { - return singleton(getMappedPort(ORACLE_PORT)); - } - - @Override - public String getDriverClassName() { - return "oracle.jdbc.OracleDriver"; - } - - @Override - public String getJdbcUrl() { - return isUsingSid() ? "jdbc:oracle:thin:" + "@" + getHost() + ":" + getOraclePort() + ":" + getSid() - : "jdbc:oracle:thin:" + "@" + getHost() + ":" + getOraclePort() + "/" + getDatabaseName(); - } - - @Override - public String getUsername() { - // An application user is tied to the database, and therefore not authenticated to connect to SID. - return isUsingSid() ? DEFAULT_SYSTEM_USER : username; - } - - @Override - public String getPassword() { - return password; - } - - @Override - public String getDatabaseName() { - return databaseName; - } - - protected boolean isUsingSid() { - return usingSid; - } - - @Override - public OracleContainer withUsername(final String username) { - if (StringUtils.isEmpty(username)) { - throw new IllegalArgumentException("Username cannot be null or empty"); - } - if (ORACLE_SYSTEM_USERS.contains(username.toLowerCase())) { - throw new IllegalArgumentException("Username cannot be one of " + ORACLE_SYSTEM_USERS); - } - this.username = username; - return self(); - } - - @Override - public OracleContainer withPassword(final String password) { - if (StringUtils.isEmpty(password)) { - throw new IllegalArgumentException("Password cannot be null or empty"); - } - this.password = password; - return self(); - } - - @Override - public OracleContainer withDatabaseName(final String databaseName) { - if (StringUtils.isEmpty(databaseName)) { - throw new IllegalArgumentException("Database name cannot be null or empty"); - } - - if (DEFAULT_DATABASE_NAME.equals(databaseName.toLowerCase())) { - throw new IllegalArgumentException("Database name cannot be set to " + DEFAULT_DATABASE_NAME); - } - - this.databaseName = databaseName; - return self(); - } - - public OracleContainer usingSid() { - this.usingSid = true; - return self(); - } - - @Override - public OracleContainer withUrlParam(final String paramName, final String paramValue) { - throw new UnsupportedOperationException("The Oracle Database driver does not support this"); - } - - @SuppressWarnings("SameReturnValue") - public String getSid() { - return DEFAULT_SID; - } - - public Integer getOraclePort() { - return getMappedPort(ORACLE_PORT); - } - - @SuppressWarnings("unused") - public Integer getWebPort() { - return getMappedPort(APEX_HTTP_PORT); - } - - @Override - public String getTestQueryString() { - return "SELECT 1 FROM DUAL"; - } - - @Override - protected void configure() { - withEnv("ORACLE_PASSWORD", password); - - // Only set ORACLE_DATABASE if different than the default. - if (databaseName != DEFAULT_DATABASE_NAME) { - withEnv("ORACLE_DATABASE", databaseName); - } - - withEnv("APP_USER", username); - withEnv("APP_USER_PASSWORD", password); - } + public static final String NAME = "oracle"; + private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("gvenzl/oracle-xe"); + + static final String DEFAULT_TAG = "18.4.0-slim"; + static final String IMAGE = DEFAULT_IMAGE_NAME.getUnversionedPart(); + + private static final int ORACLE_PORT = 1521; + private static final int APEX_HTTP_PORT = 8080; + + private static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240; + private static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 120; + + // Container defaults + static final String DEFAULT_DATABASE_NAME = "xepdb1"; + static final String DEFAULT_SID = "xe"; + static final String DEFAULT_SYSTEM_USER = "system"; + static final String DEFAULT_SYS_USER = "sys"; + + // Test container defaults + static final String APP_USER = "test"; + static final String APP_USER_PASSWORD = "test"; + + // Restricted user and database names + private static final List ORACLE_SYSTEM_USERS = Arrays.asList(DEFAULT_SYSTEM_USER, DEFAULT_SYS_USER); + + private String databaseName = DEFAULT_DATABASE_NAME; + private String username = APP_USER; + private String password = APP_USER_PASSWORD; + private boolean usingSid = false; + + public OracleContainer() { + this(DEFAULT_IMAGE_NAME.withTag(DEFAULT_TAG)); + } + + public OracleContainer(final String dockerImageName) { + this(DockerImageName.parse(dockerImageName)); + } + + public OracleContainer(final DockerImageName dockerImageName) { + super(dockerImageName); + dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME); + preconfigure(); + } + + public OracleContainer(final Future dockerImageName) { + super(dockerImageName); + preconfigure(); + } + + private void preconfigure() { + this.waitStrategy = new LogMessageWaitStrategy() + .withRegEx(".*DATABASE IS READY TO USE!.*\\s") + .withTimes(1) + .withStartupTimeout(Duration.of(DEFAULT_STARTUP_TIMEOUT_SECONDS, SECONDS)); + + withConnectTimeoutSeconds(DEFAULT_CONNECT_TIMEOUT_SECONDS); + addExposedPorts(ORACLE_PORT, APEX_HTTP_PORT); + } + + @Override + protected void waitUntilContainerStarted() { + getWaitStrategy().waitUntilReady(this); + } + + @Override + public Set getLivenessCheckPortNumbers() { + return singleton(getMappedPort(ORACLE_PORT)); + } + + @Override + public String getDriverClassName() { + return "oracle.jdbc.OracleDriver"; + } + + @Override + public String getJdbcUrl() { + return isUsingSid() ? "jdbc:oracle:thin:" + "@" + getHost() + ":" + getOraclePort() + ":" + getSid() + : "jdbc:oracle:thin:" + "@" + getHost() + ":" + getOraclePort() + "/" + getDatabaseName(); + } + + @Override + public String getUsername() { + // An application user is tied to the database, and therefore not authenticated to connect to SID. + return isUsingSid() ? DEFAULT_SYSTEM_USER : username; + } + + @Override + public String getPassword() { + return password; + } + + @Override + public String getDatabaseName() { + return databaseName; + } + + protected boolean isUsingSid() { + return usingSid; + } + + @Override + public OracleContainer withUsername(final String username) { + if (StringUtils.isEmpty(username)) { + throw new IllegalArgumentException("Username cannot be null or empty"); + } + if (ORACLE_SYSTEM_USERS.contains(username.toLowerCase())) { + throw new IllegalArgumentException("Username cannot be one of " + ORACLE_SYSTEM_USERS); + } + this.username = username; + return self(); + } + + @Override + public OracleContainer withPassword(final String password) { + if (StringUtils.isEmpty(password)) { + throw new IllegalArgumentException("Password cannot be null or empty"); + } + this.password = password; + return self(); + } + + @Override + public OracleContainer withDatabaseName(final String databaseName) { + if (StringUtils.isEmpty(databaseName)) { + throw new IllegalArgumentException("Database name cannot be null or empty"); + } + + if (DEFAULT_DATABASE_NAME.equals(databaseName.toLowerCase())) { + throw new IllegalArgumentException("Database name cannot be set to " + DEFAULT_DATABASE_NAME); + } + + this.databaseName = databaseName; + return self(); + } + + public OracleContainer usingSid() { + this.usingSid = true; + return self(); + } + + @Override + public OracleContainer withUrlParam(final String paramName, final String paramValue) { + throw new UnsupportedOperationException("The Oracle Database driver does not support this"); + } + + @SuppressWarnings("SameReturnValue") + public String getSid() { + return DEFAULT_SID; + } + + public Integer getOraclePort() { + return getMappedPort(ORACLE_PORT); + } + + @SuppressWarnings("unused") + public Integer getWebPort() { + return getMappedPort(APEX_HTTP_PORT); + } + + @Override + public String getTestQueryString() { + return "SELECT 1 FROM DUAL"; + } + + @Override + protected void configure() { + withEnv("ORACLE_PASSWORD", password); + + // Only set ORACLE_DATABASE if different than the default. + if (databaseName != DEFAULT_DATABASE_NAME) { + withEnv("ORACLE_DATABASE", databaseName); + } + + withEnv("APP_USER", username); + withEnv("APP_USER_PASSWORD", password); + } } diff --git a/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceAcceptanceTest.java index 9ec1ee8814e34..286760b44cf85 100644 --- a/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceAcceptanceTest.java @@ -35,9 +35,9 @@ public class OracleSourceAcceptanceTest extends SourceAcceptanceTest { @Override protected void setupEnvironment(final TestDestinationEnv environment) throws Exception { container = new OracleContainer() - .withUsername("test") - .withPassword("oracle") - .usingSid(); + .withUsername("test") + .withPassword("oracle") + .usingSid(); container.start(); config = Jsons.jsonNode(ImmutableMap.builder() diff --git a/airbyte-integrations/connectors/source-postgres/1-create-copy-tables-procedure.sql b/airbyte-integrations/connectors/source-postgres/1-create-copy-tables-procedure.sql index 3d339e5ff0fa2..861714f0b9fc7 100644 --- a/airbyte-integrations/connectors/source-postgres/1-create-copy-tables-procedure.sql +++ b/airbyte-integrations/connectors/source-postgres/1-create-copy-tables-procedure.sql @@ -1,14 +1,25 @@ -create or replace procedure copy_table(tablecount int) -language plpgsql -as $$ -declare v_max_table int; v_counter_table int; v_tnamee VARCHAR(255); -begin - v_max_table := tablecount; - v_counter_table := 1; - while v_counter_table < v_max_table loop - EXECUTE format('create table test_%s as (select * from test t)', v_counter_table); - v_counter_table := v_counter_table + 1; - end loop; - commit; -end;$$ +CREATE + OR replace PROCEDURE copy_table( + tablecount INT + ) LANGUAGE plpgsql AS $$ DECLARE v_max_table INT; +v_counter_table INT; + +v_tnamee VARCHAR(255); + +BEGIN v_max_table := tablecount; + +v_counter_table := 1; + +while v_counter_table < v_max_table loop EXECUTE format( + 'create table test_%s as (select * from test t)', + v_counter_table +); + +v_counter_table := v_counter_table + 1; +END loop; + +COMMIT; +END; + +$$ diff --git a/airbyte-integrations/connectors/source-postgres/2-create-insert-rows-to-table-procedure.sql b/airbyte-integrations/connectors/source-postgres/2-create-insert-rows-to-table-procedure.sql index af00b332233df..6f8b07fc6b123 100644 --- a/airbyte-integrations/connectors/source-postgres/2-create-insert-rows-to-table-procedure.sql +++ b/airbyte-integrations/connectors/source-postgres/2-create-insert-rows-to-table-procedure.sql @@ -1,68 +1,168 @@ -create or replace procedure insert_rows(allrows int, insertcount int, value text) -language plpgsql -as $$ -declare dummyIpsum varchar(255); fieldText text; vmax int; vmaxx int; vmaxoneinsert int; counter int; -declare lastinsertcounter int; lastinsert int; fullloop int; fullloopcounter int; insertTable text; insertTableLasted text; - -begin - fieldText := value; - dummyIpsum = '''dummy_ipsum'''; - vmax = allrows; - vmaxx = allrows; - vmaxoneinsert = insertcount; - counter = 1; - lastinsertcounter = 1; - lastinsert = 0; - fullloop = 0; - fullloopcounter = 0; - - while vmaxx <= vmaxoneinsert loop - vmaxoneinsert := vmaxx; - fullloop := fullloop + 1; - vmaxx := vmaxx + 1; - end loop; - commit; - - while vmax > vmaxoneinsert loop - fullloop := fullloop + 1; - vmax := vmax - vmaxoneinsert; - lastinsert := vmax; - end loop; - commit; - - insertTable := 'insert into test (varchar1, varchar2, varchar3, varchar4, varchar5, longblobfield, timestampfield) values ('; - while counter < vmaxoneinsert loop - insertTable := concat(insertTable, dummyIpsum, ', ', dummyIpsum, ', ', dummyIpsum, ', ', dummyIpsum, ', ', dummyIpsum, ', ', fieldText, ', CURRENT_TIMESTAMP), ('); - counter := counter + 1; - end loop; - commit; - insertTable := concat(insertTable, dummyIpsum, ', ', dummyIpsum, ', ', dummyIpsum, ', ', dummyIpsum, ', ', dummyIpsum, ', ', fieldText, ', CURRENT_TIMESTAMP);'); - - while vmax < 1 loop - fullloop := 0; - vmax := 1; - end loop; - commit; - - while fullloopcounter < fullloop loop - EXECUTE insertTable; - fullloopcounter := fullloopcounter + 1; - end loop; - commit; - - insertTableLasted := 'insert into test (varchar1, varchar2, varchar3, varchar4, varchar5, longblobfield, timestampfield) values ('; - while lastinsertcounter < lastinsert loop - insertTableLasted := concat(insertTableLasted, dummyIpsum, ', ', dummyIpsum, ', ', dummyIpsum, ', ', dummyIpsum, ', ', dummyIpsum, ', ', fieldText, ', CURRENT_TIMESTAMP), ('); - lastinsertcounter := lastinsertcounter + 1; - end loop; - commit; - insertTableLasted := concat(insertTableLasted, dummyIpsum, ', ', dummyIpsum, ', ', dummyIpsum, ', ', dummyIpsum, ', ', dummyIpsum, ', ', fieldText, ', CURRENT_TIMESTAMP);'); - - while lastinsert > 0 loop - EXECUTE insertTableLasted; - lastinsert := 0; - end loop; - commit; -end;$$ +CREATE + OR replace PROCEDURE insert_rows( + allrows INT, + insertcount INT, + value text + ) LANGUAGE plpgsql AS $$ DECLARE dummyIpsum VARCHAR(255); +fieldText text; +vmax INT; + +vmaxx INT; + +vmaxoneinsert INT; + +counter INT; + +DECLARE lastinsertcounter INT; + +lastinsert INT; + +fullloop INT; + +fullloopcounter INT; + +insertTable text; + +insertTableLasted text; + +BEGIN fieldText := value; + +dummyIpsum = '''dummy_ipsum'''; + +vmax = allrows; + +vmaxx = allrows; + +vmaxoneinsert = insertcount; + +counter = 1; + +lastinsertcounter = 1; + +lastinsert = 0; + +fullloop = 0; + +fullloopcounter = 0; + +while vmaxx <= vmaxoneinsert loop vmaxoneinsert := vmaxx; + +fullloop := fullloop + 1; + +vmaxx := vmaxx + 1; +END loop; + +COMMIT; + +while vmax > vmaxoneinsert loop fullloop := fullloop + 1; + +vmax := vmax - vmaxoneinsert; + +lastinsert := vmax; +END loop; + +COMMIT; + +insertTable := 'insert into test (varchar1, varchar2, varchar3, varchar4, varchar5, longblobfield, timestampfield) values ('; + +while counter < vmaxoneinsert loop insertTable := concat( + insertTable, + dummyIpsum, + ', ', + dummyIpsum, + ', ', + dummyIpsum, + ', ', + dummyIpsum, + ', ', + dummyIpsum, + ', ', + fieldText, + ', CURRENT_TIMESTAMP), (' +); + +counter := counter + 1; +END loop; + +COMMIT; + +insertTable := concat( + insertTable, + dummyIpsum, + ', ', + dummyIpsum, + ', ', + dummyIpsum, + ', ', + dummyIpsum, + ', ', + dummyIpsum, + ', ', + fieldText, + ', CURRENT_TIMESTAMP);' +); + +while vmax < 1 loop fullloop := 0; + +vmax := 1; +END loop; + +COMMIT; + +while fullloopcounter < fullloop loop EXECUTE insertTable; + +fullloopcounter := fullloopcounter + 1; +END loop; + +COMMIT; + +insertTableLasted := 'insert into test (varchar1, varchar2, varchar3, varchar4, varchar5, longblobfield, timestampfield) values ('; + +while lastinsertcounter < lastinsert loop insertTableLasted := concat( + insertTableLasted, + dummyIpsum, + ', ', + dummyIpsum, + ', ', + dummyIpsum, + ', ', + dummyIpsum, + ', ', + dummyIpsum, + ', ', + fieldText, + ', CURRENT_TIMESTAMP), (' +); + +lastinsertcounter := lastinsertcounter + 1; +END loop; + +COMMIT; + +insertTableLasted := concat( + insertTableLasted, + dummyIpsum, + ', ', + dummyIpsum, + ', ', + dummyIpsum, + ', ', + dummyIpsum, + ', ', + dummyIpsum, + ', ', + fieldText, + ', CURRENT_TIMESTAMP);' +); + +while lastinsert > 0 loop EXECUTE insertTableLasted; + +lastinsert := 0; +END loop; + +COMMIT; +END; + +$$ diff --git a/airbyte-integrations/connectors/source-postgres/3-run-script.sql b/airbyte-integrations/connectors/source-postgres/3-run-script.sql index a9dfb20a533e7..5eb1655c3f2e3 100644 --- a/airbyte-integrations/connectors/source-postgres/3-run-script.sql +++ b/airbyte-integrations/connectors/source-postgres/3-run-script.sql @@ -1,26 +1,73 @@ -create sequence test_seq; +CREATE + SEQUENCE test_seq; -create table test( -id int check (id > 0) not null default nextval ('test_seq') primary key, -varchar1 varchar(255), -varchar2 varchar(255), -varchar3 varchar(255), -varchar4 varchar(255), -varchar5 varchar(255), -longblobfield bytea, -timestampfield timestamp(0)); +CREATE + TABLE + test( + id INT CHECK( + id > 0 + ) NOT NULL DEFAULT nextval('test_seq') PRIMARY KEY, + varchar1 VARCHAR(255), + varchar2 VARCHAR(255), + varchar3 VARCHAR(255), + varchar4 VARCHAR(255), + varchar5 VARCHAR(255), + longblobfield bytea, + timestampfield TIMESTAMP(0) + ); -- TODO: change the following @allrows to control the number of records with different sizes -- number of 50B records -call insert_rows(0, 500000, '''test weight 50b - some text, some text, some text'''); +CALL insert_rows( + 0, + 500000, + '''test weight 50b - some text, some text, some text''' +); + -- number of 500B records -call insert_rows(0, 50000, CONCAT('''test weight 500b - ', repeat('some text, some text, ', 20), 'some text''')); +CALL insert_rows( + 0, + 50000, + CONCAT( + '''test weight 500b - ', + repeat( + 'some text, some text, ', + 20 + ), + 'some text''' + ) +); + -- number of 10KB records -call insert_rows(0, 5000, CONCAT('''test weight 10kb - ', repeat('some text, some text, some text, some text, ', 295), 'some text''')); +CALL insert_rows( + 0, + 5000, + CONCAT( + '''test weight 10kb - ', + repeat( + 'some text, some text, some text, some text, ', + 295 + ), + 'some text''' + ) +); + -- number of 100KB records -call insert_rows(0, 50, CONCAT('''test weight 100kb - ', repeat('some text, some text, ', 4450), 'some text''')); --- TODO: change the value to control the number of tables -call copy_table(0); -ALTER TABLE test RENAME TO test_0; +CALL insert_rows( + 0, + 50, + CONCAT( + '''test weight 100kb - ', + repeat( + 'some text, some text, ', + 4450 + ), + 'some text''' + ) +); +-- TODO: change the value to control the number of tables +CALL copy_table(0); +ALTER TABLE + test RENAME TO test_0;