From 13a2eae48b9332936684a82700192a0aad10231e Mon Sep 17 00:00:00 2001 From: Tyler Jewell Date: Sun, 12 Apr 2020 20:42:48 -0500 Subject: [PATCH 1/3] added validate_schema parameter to read_parquet --- awswrangler/s3.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/awswrangler/s3.py b/awswrangler/s3.py index 308f5e878..16a43ae57 100644 --- a/awswrangler/s3.py +++ b/awswrangler/s3.py @@ -1200,6 +1200,7 @@ def _read_parquet_init( use_threads: bool = True, boto3_session: Optional[boto3.Session] = None, s3_additional_kwargs: Optional[Dict[str, str]] = None, + validate_schema: bool = True, ) -> pyarrow.parquet.ParquetDataset: """Encapsulate all initialization before the use of the pyarrow.parquet.ParquetDataset.""" if dataset is False: @@ -1212,7 +1213,7 @@ def _read_parquet_init( fs: s3fs.S3FileSystem = _utils.get_fs(session=boto3_session, s3_additional_kwargs=s3_additional_kwargs) cpus: int = _utils.ensure_cpu_count(use_threads=use_threads) data: pyarrow.parquet.ParquetDataset = pyarrow.parquet.ParquetDataset( - path_or_paths=path_or_paths, filesystem=fs, metadata_nthreads=cpus, filters=filters, read_dictionary=categories + path_or_paths=path_or_paths, filesystem=fs, metadata_nthreads=cpus, filters=filters, read_dictionary=categories, validate_schema=validate_schema ) return data @@ -1227,6 +1228,7 @@ def read_parquet( use_threads: bool = True, boto3_session: Optional[boto3.Session] = None, s3_additional_kwargs: Optional[Dict[str, str]] = None, + validate_schema: bool = True ) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]: """Read Apache Parquet file(s) from from a received S3 prefix or list of S3 objects paths. @@ -1261,6 +1263,10 @@ def read_parquet( s3_additional_kwargs: Forward to s3fs, useful for server side encryption https://s3fs.readthedocs.io/en/latest/#serverside-encryption + validate_schema: + Check that individual file schemas are all the same / compatible. Schemas within a + folder prefix should all be the same. Disable if you have schemas that are different + and want to disable this check. Returns ------- @@ -1306,6 +1312,7 @@ def read_parquet( use_threads=use_threads, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs, + validate_schema=validate_schema ) if chunked is False: return _read_parquet(data=data, columns=columns, categories=categories, use_threads=use_threads) From 4b586ed3fc1aae3e9ef55d5c767c5e91137ee75c Mon Sep 17 00:00:00 2001 From: Tyler Jewell Date: Sun, 12 Apr 2020 21:13:21 -0500 Subject: [PATCH 2/3] fix lint issue --- awswrangler/s3.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/awswrangler/s3.py b/awswrangler/s3.py index 16a43ae57..c24ba27ee 100644 --- a/awswrangler/s3.py +++ b/awswrangler/s3.py @@ -1213,7 +1213,12 @@ def _read_parquet_init( fs: s3fs.S3FileSystem = _utils.get_fs(session=boto3_session, s3_additional_kwargs=s3_additional_kwargs) cpus: int = _utils.ensure_cpu_count(use_threads=use_threads) data: pyarrow.parquet.ParquetDataset = pyarrow.parquet.ParquetDataset( - path_or_paths=path_or_paths, filesystem=fs, metadata_nthreads=cpus, filters=filters, read_dictionary=categories, validate_schema=validate_schema + path_or_paths=path_or_paths, + filesystem=fs, + metadata_nthreads=cpus, + filters=filters, + read_dictionary=categories, + validate_schema=validate_schema, ) return data @@ -1228,7 +1233,7 @@ def read_parquet( use_threads: bool = True, boto3_session: Optional[boto3.Session] = None, s3_additional_kwargs: Optional[Dict[str, str]] = None, - validate_schema: bool = True + validate_schema: bool = True, ) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]: """Read Apache Parquet file(s) from from a received S3 prefix or list of S3 objects paths. @@ -1312,7 +1317,7 @@ def read_parquet( use_threads=use_threads, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs, - validate_schema=validate_schema + validate_schema=validate_schema, ) if chunked is False: return _read_parquet(data=data, columns=columns, categories=categories, use_threads=use_threads) From b23448bf0fd1cf0df1b6960008b08b2ff9bd8d0c Mon Sep 17 00:00:00 2001 From: igorborgest Date: Sun, 12 Apr 2020 23:38:27 -0300 Subject: [PATCH 3/3] Add tests and promote feature for validate_schema on s3.read_parquet #167 --- awswrangler/s3.py | 22 ++++++++++-------- testing/test_awswrangler/test_data_lake.py | 27 ++++++++++++++++++++++ 2 files changed, 40 insertions(+), 9 deletions(-) diff --git a/awswrangler/s3.py b/awswrangler/s3.py index c24ba27ee..baec3683b 100644 --- a/awswrangler/s3.py +++ b/awswrangler/s3.py @@ -1196,11 +1196,11 @@ def _read_parquet_init( path: Union[str, List[str]], filters: Optional[Union[List[Tuple], List[List[Tuple]]]] = None, categories: List[str] = None, + validate_schema: bool = True, dataset: bool = False, use_threads: bool = True, boto3_session: Optional[boto3.Session] = None, s3_additional_kwargs: Optional[Dict[str, str]] = None, - validate_schema: bool = True, ) -> pyarrow.parquet.ParquetDataset: """Encapsulate all initialization before the use of the pyarrow.parquet.ParquetDataset.""" if dataset is False: @@ -1227,13 +1227,13 @@ def read_parquet( path: Union[str, List[str]], filters: Optional[Union[List[Tuple], List[List[Tuple]]]] = None, columns: Optional[List[str]] = None, + validate_schema: bool = True, chunked: bool = False, dataset: bool = False, categories: List[str] = None, use_threads: bool = True, boto3_session: Optional[boto3.Session] = None, s3_additional_kwargs: Optional[Dict[str, str]] = None, - validate_schema: bool = True, ) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]: """Read Apache Parquet file(s) from from a received S3 prefix or list of S3 objects paths. @@ -1251,7 +1251,11 @@ def read_parquet( filters: Union[List[Tuple], List[List[Tuple]]], optional List of filters to apply, like ``[[('x', '=', 0), ...], ...]``. columns : List[str], optional - Names of columns to read from the file(s) + Names of columns to read from the file(s). + validate_schema: + Check that individual file schemas are all the same / compatible. Schemas within a + folder prefix should all be the same. Disable if you have schemas that are different + and want to disable this check. chunked : bool If True will break the data in smaller DataFrames (Non deterministic number of lines). Otherwise return a single DataFrame with the whole data. @@ -1268,10 +1272,6 @@ def read_parquet( s3_additional_kwargs: Forward to s3fs, useful for server side encryption https://s3fs.readthedocs.io/en/latest/#serverside-encryption - validate_schema: - Check that individual file schemas are all the same / compatible. Schemas within a - folder prefix should all be the same. Disable if you have schemas that are different - and want to disable this check. Returns ------- @@ -1320,7 +1320,9 @@ def read_parquet( validate_schema=validate_schema, ) if chunked is False: - return _read_parquet(data=data, columns=columns, categories=categories, use_threads=use_threads) + return _read_parquet( + data=data, columns=columns, categories=categories, use_threads=use_threads, validate_schema=validate_schema + ) return _read_parquet_chunked(data=data, columns=columns, categories=categories, use_threads=use_threads) @@ -1329,6 +1331,7 @@ def _read_parquet( columns: Optional[List[str]] = None, categories: List[str] = None, use_threads: bool = True, + validate_schema: bool = True, ) -> pd.DataFrame: tables: List[pa.Table] = [] for piece in data.pieces: @@ -1336,7 +1339,8 @@ def _read_parquet( columns=columns, use_threads=use_threads, partitions=data.partitions, use_pandas_metadata=False ) tables.append(table) - table = pa.lib.concat_tables(tables) + promote: bool = not validate_schema + table = pa.lib.concat_tables(tables, promote=promote) return table.to_pandas( use_threads=use_threads, split_blocks=True, diff --git a/testing/test_awswrangler/test_data_lake.py b/testing/test_awswrangler/test_data_lake.py index 7bcbd24ec..6f21647ae 100644 --- a/testing/test_awswrangler/test_data_lake.py +++ b/testing/test_awswrangler/test_data_lake.py @@ -664,3 +664,30 @@ def test_category(bucket, database): ensure_data_types_category(df2) wr.s3.delete_objects(path=paths) assert wr.catalog.delete_table_if_exists(database=database, table="test_category") is True + + +def test_parquet_validate_schema(bucket, database): + path = f"s3://{bucket}/test_parquet_file_validate/" + wr.s3.delete_objects(path=path) + + df = pd.DataFrame({"id": [1, 2, 3]}) + path_file = f"s3://{bucket}/test_parquet_file_validate/0.parquet" + wr.s3.to_parquet(df=df, path=path_file) + wr.s3.wait_objects_exist(paths=[path_file]) + + df2 = pd.DataFrame({"id2": [1, 2, 3], "val": ["foo", "boo", "bar"]}) + path_file2 = f"s3://{bucket}/test_parquet_file_validate/1.parquet" + wr.s3.to_parquet(df=df2, path=path_file2) + wr.s3.wait_objects_exist(paths=[path_file2]) + + df3 = wr.s3.read_parquet(path=path, validate_schema=False) + assert len(df3.index) == 6 + assert len(df3.columns) == 3 + + with pytest.raises(ValueError): + wr.s3.read_parquet(path=path, validate_schema=True) + + with pytest.raises(ValueError): + wr.s3.store_parquet_metadata(path=path, database=database, table="test_parquet_validate_schema", dataset=True) + + wr.s3.delete_objects(path=path)