From ed331e2f87053e9e9a3159fc6ec3f7d2d41444d7 Mon Sep 17 00:00:00 2001 From: Maximilian Speicher Date: Fri, 2 Apr 2021 11:24:18 +0200 Subject: [PATCH 1/4] WIP: read pq table chunked --- awswrangler/_data_types.py | 14 +++++++------- awswrangler/s3/_read.py | 4 ++-- awswrangler/s3/_read_parquet.py | 19 ++++++++++++------- 3 files changed, 21 insertions(+), 16 deletions(-) diff --git a/awswrangler/_data_types.py b/awswrangler/_data_types.py index 0f137f8b1..5cd2cd10a 100644 --- a/awswrangler/_data_types.py +++ b/awswrangler/_data_types.py @@ -632,24 +632,24 @@ def _cast2date(value: Any) -> Any: def _cast_pandas_column(df: pd.DataFrame, col: str, current_type: str, desired_type: str) -> pd.DataFrame: if desired_type == "datetime64": - df[col] = pd.to_datetime(df[col]) + df.loc[:, col] = pd.to_datetime(df.loc[:, col]) elif desired_type == "date": - df[col] = df[col].apply(lambda x: _cast2date(value=x)).replace(to_replace={pd.NaT: None}) + df.loc[:, col] = df.loc[:, col].apply(lambda x: _cast2date(value=x)).replace(to_replace={pd.NaT: None}) elif desired_type == "bytes": - df[col] = df[col].astype("string").str.encode(encoding="utf-8").replace(to_replace={pd.NA: None}) + df.loc[:, col] = df.loc[:, col].astype("string").str.encode(encoding="utf-8").replace(to_replace={pd.NA: None}) elif desired_type == "decimal": # First cast to string df = _cast_pandas_column(df=df, col=col, current_type=current_type, desired_type="string") # Then cast to decimal - df[col] = df[col].apply(lambda x: Decimal(str(x)) if str(x) not in ("", "none", "None", " ", "") else None) + df.loc[:, col] = df.loc[:, col].apply(lambda x: Decimal(str(x)) if str(x) not in ("", "none", "None", " ", "") else None) else: try: - df[col] = df[col].astype(desired_type) + df.loc[:, col] = df.loc[:, col].astype(desired_type) except TypeError as ex: if "object cannot be converted to an IntegerDtype" not in str(ex): raise ex - df[col] = ( - df[col] + df.loc[:, col] = ( + df.loc[:, col] .apply(lambda x: int(x) if str(x) not in ("", "none", "None", " ", "") else None) .astype(desired_type) ) diff --git a/awswrangler/s3/_read.py b/awswrangler/s3/_read.py index 14d7d1ef1..1e92f135e 100644 --- a/awswrangler/s3/_read.py +++ b/awswrangler/s3/_read.py @@ -118,7 +118,7 @@ def _union(dfs: List[pd.DataFrame], ignore_index: Optional[bool]) -> pd.DataFram break cats: Tuple[Set[str], ...] = tuple(set(df.select_dtypes(include="category").columns) for df in dfs) for col in set.intersection(*cats): - cat = union_categoricals([df[col] for df in dfs]) + cat = union_categoricals([df.loc[:, col] for df in dfs]) for df in dfs: - df[col] = pd.Categorical(df[col].values, categories=cat.categories) + df.loc[:, col] = pd.Categorical(df.loc[:, col].values, categories=cat.categories) return pd.concat(objs=dfs, sort=False, copy=False, ignore_index=ignore_index) diff --git a/awswrangler/s3/_read_parquet.py b/awswrangler/s3/_read_parquet.py index 75286d3e3..90b1904cc 100644 --- a/awswrangler/s3/_read_parquet.py +++ b/awswrangler/s3/_read_parquet.py @@ -8,6 +8,7 @@ import pprint import warnings from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union, cast +import functools import boto3 import pandas as pd @@ -339,8 +340,8 @@ def _read_parquet_chunked( if next_slice is not None: df = _union(dfs=[next_slice, df], ignore_index=ignore_index) while len(df.index) >= chunked: - yield df.iloc[:chunked] - df = df.iloc[chunked:] + yield df.iloc[:chunked, :] + df = df.iloc[chunked:, :] if df.empty: next_slice = None else: @@ -773,8 +774,7 @@ def read_parquet_table( path: str = res["Table"]["StorageDescriptor"]["Location"] except KeyError as ex: raise exceptions.InvalidTable(f"Missing s3 location for {database}.{table}.") from ex - return _data_types.cast_pandas_with_athena_types( - df=read_parquet( + df = read_parquet( path=path, path_suffix=filename_suffix, path_ignore_suffix=filename_ignore_suffix, @@ -789,9 +789,14 @@ def read_parquet_table( use_threads=use_threads, boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs, - ), - dtype=_extract_partitions_dtypes_from_table_details(response=res), - ) + ) + partial_cast_function = functools.partial(_data_types.cast_pandas_with_athena_types, + dtype=_extract_partitions_dtypes_from_table_details(response=res)) + + if chunked is not False: + return map(partial_cast_function, df) + else: + return partial_cast_function(df) @apply_configs From 7fcade6e237918bd88e76f7e01eff35ae39b2d9b Mon Sep 17 00:00:00 2001 From: Maximilian Speicher Date: Sat, 3 Apr 2021 11:39:04 +0200 Subject: [PATCH 2/4] Fix pandas SettingWithCopyWarning --- awswrangler/_data_types.py | 14 +++++++------- awswrangler/s3/_read.py | 4 ++-- awswrangler/s3/_read_parquet.py | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/awswrangler/_data_types.py b/awswrangler/_data_types.py index 5cd2cd10a..0f137f8b1 100644 --- a/awswrangler/_data_types.py +++ b/awswrangler/_data_types.py @@ -632,24 +632,24 @@ def _cast2date(value: Any) -> Any: def _cast_pandas_column(df: pd.DataFrame, col: str, current_type: str, desired_type: str) -> pd.DataFrame: if desired_type == "datetime64": - df.loc[:, col] = pd.to_datetime(df.loc[:, col]) + df[col] = pd.to_datetime(df[col]) elif desired_type == "date": - df.loc[:, col] = df.loc[:, col].apply(lambda x: _cast2date(value=x)).replace(to_replace={pd.NaT: None}) + df[col] = df[col].apply(lambda x: _cast2date(value=x)).replace(to_replace={pd.NaT: None}) elif desired_type == "bytes": - df.loc[:, col] = df.loc[:, col].astype("string").str.encode(encoding="utf-8").replace(to_replace={pd.NA: None}) + df[col] = df[col].astype("string").str.encode(encoding="utf-8").replace(to_replace={pd.NA: None}) elif desired_type == "decimal": # First cast to string df = _cast_pandas_column(df=df, col=col, current_type=current_type, desired_type="string") # Then cast to decimal - df.loc[:, col] = df.loc[:, col].apply(lambda x: Decimal(str(x)) if str(x) not in ("", "none", "None", " ", "") else None) + df[col] = df[col].apply(lambda x: Decimal(str(x)) if str(x) not in ("", "none", "None", " ", "") else None) else: try: - df.loc[:, col] = df.loc[:, col].astype(desired_type) + df[col] = df[col].astype(desired_type) except TypeError as ex: if "object cannot be converted to an IntegerDtype" not in str(ex): raise ex - df.loc[:, col] = ( - df.loc[:, col] + df[col] = ( + df[col] .apply(lambda x: int(x) if str(x) not in ("", "none", "None", " ", "") else None) .astype(desired_type) ) diff --git a/awswrangler/s3/_read.py b/awswrangler/s3/_read.py index 1e92f135e..14d7d1ef1 100644 --- a/awswrangler/s3/_read.py +++ b/awswrangler/s3/_read.py @@ -118,7 +118,7 @@ def _union(dfs: List[pd.DataFrame], ignore_index: Optional[bool]) -> pd.DataFram break cats: Tuple[Set[str], ...] = tuple(set(df.select_dtypes(include="category").columns) for df in dfs) for col in set.intersection(*cats): - cat = union_categoricals([df.loc[:, col] for df in dfs]) + cat = union_categoricals([df[col] for df in dfs]) for df in dfs: - df.loc[:, col] = pd.Categorical(df.loc[:, col].values, categories=cat.categories) + df[col] = pd.Categorical(df[col].values, categories=cat.categories) return pd.concat(objs=dfs, sort=False, copy=False, ignore_index=ignore_index) diff --git a/awswrangler/s3/_read_parquet.py b/awswrangler/s3/_read_parquet.py index 90b1904cc..ccb44ebf7 100644 --- a/awswrangler/s3/_read_parquet.py +++ b/awswrangler/s3/_read_parquet.py @@ -340,7 +340,7 @@ def _read_parquet_chunked( if next_slice is not None: df = _union(dfs=[next_slice, df], ignore_index=ignore_index) while len(df.index) >= chunked: - yield df.iloc[:chunked, :] + yield df.iloc[:chunked, :].copy() df = df.iloc[chunked:, :] if df.empty: next_slice = None From 7b7aef9ef568e21093963bbdfbc1d49178234d50 Mon Sep 17 00:00:00 2001 From: Maximilian Speicher Date: Sat, 3 Apr 2021 11:43:26 +0200 Subject: [PATCH 3/4] Update checking for iterator --- awswrangler/s3/_read_parquet.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/awswrangler/s3/_read_parquet.py b/awswrangler/s3/_read_parquet.py index ccb44ebf7..dda6d1d4e 100644 --- a/awswrangler/s3/_read_parquet.py +++ b/awswrangler/s3/_read_parquet.py @@ -793,10 +793,10 @@ def read_parquet_table( partial_cast_function = functools.partial(_data_types.cast_pandas_with_athena_types, dtype=_extract_partitions_dtypes_from_table_details(response=res)) - if chunked is not False: - return map(partial_cast_function, df) - else: + if isinstance(df, pd.DataFrame): return partial_cast_function(df) + else: + return map(partial_cast_function, df) @apply_configs From 5871b83a802ce84b9a1a75567f635f062d188fb2 Mon Sep 17 00:00:00 2001 From: Maximilian Speicher Date: Sat, 3 Apr 2021 11:47:15 +0200 Subject: [PATCH 4/4] Linting --- awswrangler/s3/_read_parquet.py | 42 +++++++++++++++++---------------- 1 file changed, 22 insertions(+), 20 deletions(-) diff --git a/awswrangler/s3/_read_parquet.py b/awswrangler/s3/_read_parquet.py index dda6d1d4e..103a54e13 100644 --- a/awswrangler/s3/_read_parquet.py +++ b/awswrangler/s3/_read_parquet.py @@ -2,13 +2,13 @@ import concurrent.futures import datetime +import functools import itertools import json import logging import pprint import warnings from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Union, cast -import functools import boto3 import pandas as pd @@ -775,28 +775,30 @@ def read_parquet_table( except KeyError as ex: raise exceptions.InvalidTable(f"Missing s3 location for {database}.{table}.") from ex df = read_parquet( - path=path, - path_suffix=filename_suffix, - path_ignore_suffix=filename_ignore_suffix, - partition_filter=partition_filter, - columns=columns, - validate_schema=validate_schema, - categories=categories, - safe=safe, - map_types=map_types, - chunked=chunked, - dataset=True, - use_threads=use_threads, - boto3_session=boto3_session, - s3_additional_kwargs=s3_additional_kwargs, - ) - partial_cast_function = functools.partial(_data_types.cast_pandas_with_athena_types, - dtype=_extract_partitions_dtypes_from_table_details(response=res)) + path=path, + path_suffix=filename_suffix, + path_ignore_suffix=filename_ignore_suffix, + partition_filter=partition_filter, + columns=columns, + validate_schema=validate_schema, + categories=categories, + safe=safe, + map_types=map_types, + chunked=chunked, + dataset=True, + use_threads=use_threads, + boto3_session=boto3_session, + s3_additional_kwargs=s3_additional_kwargs, + ) + partial_cast_function = functools.partial( + _data_types.cast_pandas_with_athena_types, dtype=_extract_partitions_dtypes_from_table_details(response=res) + ) if isinstance(df, pd.DataFrame): return partial_cast_function(df) - else: - return map(partial_cast_function, df) + + # df is a generator, so map is needed for casting dtypes + return map(partial_cast_function, df) @apply_configs