diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/file_based_stream_config.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/file_based_stream_config.py index 69b8e7073d93d8..b9dd1887f4f1e9 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/file_based_stream_config.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/config/file_based_stream_config.py @@ -64,6 +64,12 @@ class FileBasedStreamConfig(BaseModel): description="When enabled, syncs will not validate or structure records against the stream's schema.", default=False, ) + recent_n_files_to_read_for_schema_discovery: Optional[int] = Field( + title="Files To Read For Schema Discover", + description="The number of resent files which will be used to discover the schema for this stream.", + default=None, + gt=0, + ) @validator("input_schema", pre=True) def validate_input_schema(cls, v: Optional[str]) -> Optional[str]: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py index 69a44845dd85f5..a905b5a316a2f7 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py @@ -32,7 +32,7 @@ class AbstractFileBasedStream(Stream): files in the stream. - A DiscoveryPolicy that controls the number of concurrent requests sent to the source during discover, and the number of files used for schema discovery. - - A dictionary of FileType:Parser that holds all of the file types that can be handled + - A dictionary of FileType:Parser that holds all the file types that can be handled by the stream. """ @@ -70,7 +70,7 @@ def list_files(self) -> List[RemoteFile]: List all files that belong to the stream. The output of this method is cached so we don't need to list the files more than once. - This means we won't pick up changes to the files during a sync. This meethod uses the + This means we won't pick up changes to the files during a sync. This method uses the get_files method which is implemented by the concrete stream class. """ return list(self.get_files()) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py index df5dde692cb188..8d6b81c6968fa0 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/stream/default_file_based_stream.py @@ -191,30 +191,40 @@ def _get_raw_json_schema(self) -> JsonSchema: return schemaless_schema else: files = self.list_files() - total_n_files = len(files) - - if total_n_files == 0: - self.logger.warning(msg=f"No files were identified in the stream {self.name}. Setting default schema for the stream.") - return schemaless_schema - - max_n_files_for_schema_inference = self._discovery_policy.get_max_n_files_for_schema_inference(self.get_parser()) - if total_n_files > max_n_files_for_schema_inference: - # Use the most recent files for schema inference, so we pick up schema changes during discovery. - files = sorted(files, key=lambda x: x.last_modified, reverse=True)[:max_n_files_for_schema_inference] - self.logger.warn( - msg=f"Refusing to infer schema for all {total_n_files} files; using {max_n_files_for_schema_inference} files." + first_n_files = len(files) + + if self.config.recent_n_files_to_read_for_schema_discovery: + self.logger.info( + msg=( + f"Only first {self.config.recent_n_files_to_read_for_schema_discovery} files will be used to infer schema " + f"for stream {self.name} due to limitation in config." + ) ) + first_n_files = self.config.recent_n_files_to_read_for_schema_discovery - inferred_schema = self.infer_schema(files) + if first_n_files == 0: + self.logger.warning(msg=f"No files were identified in the stream {self.name}. Setting default schema for the stream.") + return schemaless_schema - if not inferred_schema: - raise InvalidSchemaError( - FileBasedSourceError.INVALID_SCHEMA_ERROR, - details=f"Empty schema. Please check that the files are valid for format {self.config.format}", - stream=self.name, - ) + max_n_files_for_schema_inference = self._discovery_policy.get_max_n_files_for_schema_inference(self.get_parser()) + + if first_n_files > max_n_files_for_schema_inference: + # Use the most recent files for schema inference, so we pick up schema changes during discovery. + self.logger.warning(msg=f"Refusing to infer schema for {first_n_files} files; using {max_n_files_for_schema_inference} files.") + first_n_files = max_n_files_for_schema_inference + + files = sorted(files, key=lambda x: x.last_modified, reverse=True)[:first_n_files] + + inferred_schema = self.infer_schema(files) + + if not inferred_schema: + raise InvalidSchemaError( + FileBasedSourceError.INVALID_SCHEMA_ERROR, + details=f"Empty schema. Please check that the files are valid for format {self.config.format}", + stream=self.name, + ) - schema = {"type": "object", "properties": inferred_schema} + schema = {"type": "object", "properties": inferred_schema} return schema diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/csv_scenarios.py b/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/csv_scenarios.py index afe521c2a3d388..7a8549a69f41ac 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/csv_scenarios.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/scenarios/csv_scenarios.py @@ -419,6 +419,12 @@ "default": False, "type": "boolean", }, + "recent_n_files_to_read_for_schema_discovery": { + "title": "Files To Read For Schema Discover", + "description": "The number of resent files which will be used to discover the schema for this stream.", + "exclusiveMinimum": 0, + "type": "integer", + } }, "required": ["name", "format"], }, @@ -815,6 +821,144 @@ ) ).build() +multi_csv_stream_n_file_exceeds_config_limit_for_inference = ( + TestScenarioBuilder[InMemoryFilesSource]() + .set_name("multi_csv_stream_n_file_exceeds_config_limit_for_inference") + .set_config( + { + "streams": [ + { + "name": "stream1", + "format": {"filetype": "csv"}, + "globs": ["*"], + "validation_policy": "Emit Record", + "recent_n_files_to_read_for_schema_discovery": 3, + } + ] + } + ) + .set_source_builder( + FileBasedSourceBuilder() + .set_files( + { + "a.csv": { + "contents": [ + ("col1", "col2"), + ("val11a", "val12a"), + ("val21a", "val22a"), + ], + "last_modified": "2023-06-05T03:54:07.000Z", + }, + "b.csv": { + "contents": [ + ("col1", "col2", "col3"), + ("val11b", "val12b", "val13b"), + ("val21b", "val22b", "val23b"), + ], + "last_modified": "2023-06-05T03:54:07.000Z", + }, + "c.csv": { + "contents": [ + ("col1", "col2", "col3", "col4"), + ("val11c", "val12c", "val13c", "val14c"), + ("val21c", "val22c", "val23c", "val24c"), + ], + "last_modified": "2023-06-06T03:54:07.000Z", + }, + } + ) + .set_file_type("csv") + ) + .set_expected_catalog( + { + "streams": [ + { + "default_cursor_field": ["_ab_source_file_last_modified"], + "json_schema": { + "type": "object", + "properties": { + "col1": {"type": ["null", "string"]}, + "col2": {"type": ["null", "string"]}, + "col3": {"type": ["null", "string"]}, + "col4": {"type": ["null", "string"]}, + "_ab_source_file_last_modified": {"type": "string"}, + "_ab_source_file_url": {"type": "string"}, + }, + }, + "name": "stream1", + "source_defined_cursor": True, + "supported_sync_modes": ["full_refresh", "incremental"], + "is_resumable": True, + } + ] + } + ) + .set_expected_records( + [ + { + "data": { + "col1": "val11a", + "col2": "val12a", + "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", + "_ab_source_file_url": "a.csv", + }, + "stream": "stream1", + }, + { + "data": { + "col1": "val21a", + "col2": "val22a", + "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", + "_ab_source_file_url": "a.csv", + }, + "stream": "stream1", + }, + { + "data": { + "col1": "val11b", + "col2": "val12b", + "col3": "val13b", + "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", + "_ab_source_file_url": "b.csv", + }, + "stream": "stream1", + }, + { + "data": { + "col1": "val21b", + "col2": "val22b", + "col3": "val23b", + "_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z", + "_ab_source_file_url": "b.csv", + }, + "stream": "stream1", + }, + { + "data": { + "col1": "val11c", + "col2": "val12c", + "col3": "val13c", + "col4": "val14c", + "_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z", + "_ab_source_file_url": "c.csv", + }, + "stream": "stream1", + }, + { + "data": { + "col1": "val21c", + "col2": "val22c", + "col3": "val23c", + "col4": "val24c", + "_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z", + "_ab_source_file_url": "c.csv", + }, + "stream": "stream1", + }, + ] + ) +).build() + invalid_csv_scenario: TestScenario[InMemoryFilesSource] = ( TestScenarioBuilder[InMemoryFilesSource]() .set_name("invalid_csv_scenario") # too many values for the number of headers diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/stream/test_default_file_based_stream.py b/airbyte-cdk/python/unit_tests/sources/file_based/stream/test_default_file_based_stream.py index e93eb6bbfc5efa..e199f0a07566cc 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/stream/test_default_file_based_stream.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/stream/test_default_file_based_stream.py @@ -181,6 +181,7 @@ def test_override_max_n_files_for_schema_inference_is_respected(self) -> None: self._parser.infer_schema.return_value = {"data": {"type": "string"}} files = [RemoteFile(uri=f"file{i}", last_modified=self._NOW) for i in range(10)] self._stream_reader.get_matching_files.return_value = files + self._stream.config.recent_n_files_to_read_for_schema_discovery = 3 schema = self._stream.get_json_schema() diff --git a/airbyte-cdk/python/unit_tests/sources/file_based/test_file_based_scenarios.py b/airbyte-cdk/python/unit_tests/sources/file_based/test_file_based_scenarios.py index 6d83674ec220ab..da332c246a1ef0 100644 --- a/airbyte-cdk/python/unit_tests/sources/file_based/test_file_based_scenarios.py +++ b/airbyte-cdk/python/unit_tests/sources/file_based/test_file_based_scenarios.py @@ -77,6 +77,7 @@ invalid_csv_multi_scenario, invalid_csv_scenario, multi_csv_scenario, + multi_csv_stream_n_file_exceeds_config_limit_for_inference, multi_csv_stream_n_file_exceeds_limit_for_inference, multi_stream_custom_format, schemaless_csv_multi_stream_scenario, @@ -167,6 +168,7 @@ single_csv_scenario, multi_csv_scenario, multi_csv_stream_n_file_exceeds_limit_for_inference, + multi_csv_stream_n_file_exceeds_config_limit_for_inference, single_csv_input_state_is_earlier_scenario, single_csv_no_input_state_scenario, single_csv_input_state_is_later_scenario,