Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[file-based cdk]: add config option to limit number of files for schema discover #39317

Merged
merged 13 commits into from
Jul 11, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ class FileBasedStreamConfig(BaseModel):
description="When enabled, syncs will not validate or structure records against the stream's schema.",
default=False,
)
recent_n_files_to_read_for_schema_discovery: Optional[int] = Field(
title="Files To Read For Schema Discover",
description="The number of resent files which will be used to discover the schema for this stream.",
default=None,
gt=0,
)

@validator("input_schema", pre=True)
def validate_input_schema(cls, v: Optional[str]) -> Optional[str]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class AbstractFileBasedStream(Stream):
files in the stream.
- A DiscoveryPolicy that controls the number of concurrent requests sent to the source
during discover, and the number of files used for schema discovery.
- A dictionary of FileType:Parser that holds all of the file types that can be handled
- A dictionary of FileType:Parser that holds all the file types that can be handled
by the stream.
"""

Expand Down Expand Up @@ -70,7 +70,7 @@ def list_files(self) -> List[RemoteFile]:
List all files that belong to the stream.
The output of this method is cached so we don't need to list the files more than once.
This means we won't pick up changes to the files during a sync. This meethod uses the
This means we won't pick up changes to the files during a sync. This method uses the
lazebnyi marked this conversation as resolved.
Show resolved Hide resolved
get_files method which is implemented by the concrete stream class.
"""
return list(self.get_files())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,30 +191,40 @@ def _get_raw_json_schema(self) -> JsonSchema:
return schemaless_schema
else:
files = self.list_files()
total_n_files = len(files)

if total_n_files == 0:
self.logger.warning(msg=f"No files were identified in the stream {self.name}. Setting default schema for the stream.")
return schemaless_schema

max_n_files_for_schema_inference = self._discovery_policy.get_max_n_files_for_schema_inference(self.get_parser())
if total_n_files > max_n_files_for_schema_inference:
# Use the most recent files for schema inference, so we pick up schema changes during discovery.
files = sorted(files, key=lambda x: x.last_modified, reverse=True)[:max_n_files_for_schema_inference]
self.logger.warn(
msg=f"Refusing to infer schema for all {total_n_files} files; using {max_n_files_for_schema_inference} files."
first_n_files = len(files)

if self.config.recent_n_files_to_read_for_schema_discovery:
self.logger.info(
msg=(
f"Only first {self.config.recent_n_files_to_read_for_schema_discovery} files will be used to infer schema "
f"for stream {self.name} due to limitation in config."
)
)
first_n_files = self.config.recent_n_files_to_read_for_schema_discovery

inferred_schema = self.infer_schema(files)
if first_n_files == 0:
self.logger.warning(msg=f"No files were identified in the stream {self.name}. Setting default schema for the stream.")
return schemaless_schema

if not inferred_schema:
raise InvalidSchemaError(
FileBasedSourceError.INVALID_SCHEMA_ERROR,
details=f"Empty schema. Please check that the files are valid for format {self.config.format}",
stream=self.name,
)
max_n_files_for_schema_inference = self._discovery_policy.get_max_n_files_for_schema_inference(self.get_parser())

if first_n_files > max_n_files_for_schema_inference:
# Use the most recent files for schema inference, so we pick up schema changes during discovery.
self.logger.warning(msg=f"Refusing to infer schema for {first_n_files} files; using {max_n_files_for_schema_inference} files.")
first_n_files = max_n_files_for_schema_inference

files = sorted(files, key=lambda x: x.last_modified, reverse=True)[:first_n_files]

inferred_schema = self.infer_schema(files)

if not inferred_schema:
raise InvalidSchemaError(
FileBasedSourceError.INVALID_SCHEMA_ERROR,
details=f"Empty schema. Please check that the files are valid for format {self.config.format}",
stream=self.name,
)

schema = {"type": "object", "properties": inferred_schema}
schema = {"type": "object", "properties": inferred_schema}

return schema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,12 @@
"default": False,
"type": "boolean",
},
"recent_n_files_to_read_for_schema_discovery": {
"title": "Files To Read For Schema Discover",
"description": "The number of resent files which will be used to discover the schema for this stream.",
"exclusiveMinimum": 0,
"type": "integer",
}
},
"required": ["name", "format"],
},
Expand Down Expand Up @@ -815,6 +821,144 @@
)
).build()

multi_csv_stream_n_file_exceeds_config_limit_for_inference = (
TestScenarioBuilder[InMemoryFilesSource]()
.set_name("multi_csv_stream_n_file_exceeds_config_limit_for_inference")
.set_config(
{
"streams": [
{
"name": "stream1",
"format": {"filetype": "csv"},
"globs": ["*"],
"validation_policy": "Emit Record",
"recent_n_files_to_read_for_schema_discovery": 3,
}
]
}
)
.set_source_builder(
FileBasedSourceBuilder()
.set_files(
{
"a.csv": {
"contents": [
("col1", "col2"),
("val11a", "val12a"),
("val21a", "val22a"),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
"b.csv": {
"contents": [
("col1", "col2", "col3"),
("val11b", "val12b", "val13b"),
("val21b", "val22b", "val23b"),
],
"last_modified": "2023-06-05T03:54:07.000Z",
},
"c.csv": {
"contents": [
("col1", "col2", "col3", "col4"),
("val11c", "val12c", "val13c", "val14c"),
("val21c", "val22c", "val23c", "val24c"),
],
"last_modified": "2023-06-06T03:54:07.000Z",
},
}
)
.set_file_type("csv")
)
.set_expected_catalog(
{
"streams": [
{
"default_cursor_field": ["_ab_source_file_last_modified"],
"json_schema": {
"type": "object",
"properties": {
"col1": {"type": ["null", "string"]},
"col2": {"type": ["null", "string"]},
"col3": {"type": ["null", "string"]},
"col4": {"type": ["null", "string"]},
"_ab_source_file_last_modified": {"type": "string"},
"_ab_source_file_url": {"type": "string"},
},
},
"name": "stream1",
"source_defined_cursor": True,
"supported_sync_modes": ["full_refresh", "incremental"],
"is_resumable": True,
}
]
}
)
.set_expected_records(
[
{
"data": {
"col1": "val11a",
"col2": "val12a",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "a.csv",
},
"stream": "stream1",
},
{
"data": {
"col1": "val21a",
"col2": "val22a",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "a.csv",
},
"stream": "stream1",
},
{
"data": {
"col1": "val11b",
"col2": "val12b",
"col3": "val13b",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "b.csv",
},
"stream": "stream1",
},
{
"data": {
"col1": "val21b",
"col2": "val22b",
"col3": "val23b",
"_ab_source_file_last_modified": "2023-06-05T03:54:07.000000Z",
"_ab_source_file_url": "b.csv",
},
"stream": "stream1",
},
{
"data": {
"col1": "val11c",
"col2": "val12c",
"col3": "val13c",
"col4": "val14c",
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z",
"_ab_source_file_url": "c.csv",
},
"stream": "stream1",
},
{
"data": {
"col1": "val21c",
"col2": "val22c",
"col3": "val23c",
"col4": "val24c",
"_ab_source_file_last_modified": "2023-06-06T03:54:07.000000Z",
"_ab_source_file_url": "c.csv",
},
"stream": "stream1",
},
]
)
).build()

invalid_csv_scenario: TestScenario[InMemoryFilesSource] = (
TestScenarioBuilder[InMemoryFilesSource]()
.set_name("invalid_csv_scenario") # too many values for the number of headers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def test_override_max_n_files_for_schema_inference_is_respected(self) -> None:
self._parser.infer_schema.return_value = {"data": {"type": "string"}}
files = [RemoteFile(uri=f"file{i}", last_modified=self._NOW) for i in range(10)]
self._stream_reader.get_matching_files.return_value = files
self._stream.config.recent_n_files_to_read_for_schema_discovery = 3

schema = self._stream.get_json_schema()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
invalid_csv_multi_scenario,
invalid_csv_scenario,
multi_csv_scenario,
multi_csv_stream_n_file_exceeds_config_limit_for_inference,
multi_csv_stream_n_file_exceeds_limit_for_inference,
multi_stream_custom_format,
schemaless_csv_multi_stream_scenario,
Expand Down Expand Up @@ -167,6 +168,7 @@
single_csv_scenario,
multi_csv_scenario,
multi_csv_stream_n_file_exceeds_limit_for_inference,
multi_csv_stream_n_file_exceeds_config_limit_for_inference,
single_csv_input_state_is_earlier_scenario,
single_csv_no_input_state_scenario,
single_csv_input_state_is_later_scenario,
Expand Down
Loading