diff --git a/awswrangler/s3/_write.py b/awswrangler/s3/_write.py index e94a71288..a1915bdb1 100644 --- a/awswrangler/s3/_write.py +++ b/awswrangler/s3/_write.py @@ -47,7 +47,7 @@ def _validate_args( table: Optional[str], database: Optional[str], dataset: bool, - path: str, + path: Optional[str], partition_cols: Optional[List[str]], bucketing_info: Optional[Tuple[List[str], int]], mode: Optional[str], @@ -58,6 +58,8 @@ def _validate_args( if df.empty is True: raise exceptions.EmptyDataFrame() if dataset is False: + if path is None: + raise exceptions.InvalidArgumentValue("If dataset is False, the `path` argument must be passed.") if path.endswith("/"): raise exceptions.InvalidArgumentValue( "If , the argument should be a file path, not a directory." @@ -79,6 +81,10 @@ def _validate_args( "Arguments database and table must be passed together. If you want to store your dataset metadata in " "the Glue Catalog, please ensure you are passing both." ) + elif all(x is None for x in [path, database, table]): + raise exceptions.InvalidArgumentCombination( + "You must specify a `path` if dataset is True and database/table are not enabled." + ) elif bucketing_info and bucketing_info[1] <= 0: raise exceptions.InvalidArgumentValue( "Please pass a value greater than 1 for the number of buckets for bucketing." diff --git a/awswrangler/s3/_write_parquet.py b/awswrangler/s3/_write_parquet.py index 29bf9b694..dd6f1778b 100644 --- a/awswrangler/s3/_write_parquet.py +++ b/awswrangler/s3/_write_parquet.py @@ -198,7 +198,7 @@ def _to_parquet( @apply_configs def to_parquet( # pylint: disable=too-many-arguments,too-many-locals df: pd.DataFrame, - path: str, + path: Optional[str] = None, index: bool = False, compression: Optional[str] = "snappy", max_rows_by_file: Optional[int] = None, @@ -252,8 +252,9 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals ---------- df: pandas.DataFrame Pandas DataFrame https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html - path : str + path : str, optional S3 path (for file e.g. ``s3://bucket/prefix/filename.parquet``) (for dataset e.g. ``s3://bucket/prefix``). + Required if dataset=False or when dataset=True and creating a new dataset index : bool True to store the DataFrame index in file, otherwise False to ignore it. compression: str, optional @@ -511,6 +512,19 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals catalog_table_input = catalog._get_table_input( # pylint: disable=protected-access database=database, table=table, boto3_session=session, catalog_id=catalog_id ) + catalog_path = catalog_table_input["StorageDescriptor"]["Location"] if catalog_table_input else None + if path is None: + if catalog_path: + path = catalog_path + else: + raise exceptions.InvalidArgumentValue( + "Glue table does not exist in the catalog. Please pass the `path` argument to create it." + ) + elif path and catalog_path: + if path.rstrip("/") != catalog_path.rstrip("/"): + raise exceptions.InvalidArgumentValue( + f"The specified path: {path}, does not match the existing Glue catalog table path: {catalog_path}" + ) df = _apply_dtype(df=df, dtype=dtype, catalog_table_input=catalog_table_input, mode=mode) schema: pa.Schema = _data_types.pyarrow_schema_from_pandas( df=df, index=index, ignore_cols=partition_cols, dtype=dtype @@ -545,7 +559,7 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals func=_to_parquet, concurrent_partitioning=concurrent_partitioning, df=df, - path_root=path, + path_root=path, # type: ignore index=index, compression=compression, compression_ext=compression_ext, @@ -565,7 +579,7 @@ def to_parquet( # pylint: disable=too-many-arguments,too-many-locals catalog._create_parquet_table( # pylint: disable=protected-access database=database, table=table, - path=path, + path=path, # type: ignore columns_types=columns_types, partitions_types=partitions_types, bucketing_info=bucketing_info, diff --git a/awswrangler/s3/_write_text.py b/awswrangler/s3/_write_text.py index 809d2bc92..cfb6813d7 100644 --- a/awswrangler/s3/_write_text.py +++ b/awswrangler/s3/_write_text.py @@ -72,9 +72,9 @@ def _to_text( @apply_configs -def to_csv( # pylint: disable=too-many-arguments,too-many-locals,too-many-statements +def to_csv( # pylint: disable=too-many-arguments,too-many-locals,too-many-statements,too-many-branches df: pd.DataFrame, - path: str, + path: Optional[str] = None, sep: str = ",", index: bool = True, columns: Optional[List[str]] = None, @@ -137,8 +137,9 @@ def to_csv( # pylint: disable=too-many-arguments,too-many-locals,too-many-state ---------- df: pandas.DataFrame Pandas DataFrame https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html - path : str - Amazon S3 path (e.g. s3://bucket/filename.csv). + path : str, optional + Amazon S3 path (e.g. s3://bucket/prefix/filename.csv) (for dataset e.g. ``s3://bucket/prefix``). + Required if dataset=False or when creating a new dataset sep : str String of length 1. Field delimiter for the output file. index : bool @@ -414,6 +415,19 @@ def to_csv( # pylint: disable=too-many-arguments,too-many-locals,too-many-state catalog_table_input = catalog._get_table_input( # pylint: disable=protected-access database=database, table=table, boto3_session=session, catalog_id=catalog_id ) + catalog_path = catalog_table_input["StorageDescriptor"]["Location"] if catalog_table_input else None + if path is None: + if catalog_path: + path = catalog_path + else: + raise exceptions.InvalidArgumentValue( + "Glue table does not exist in the catalog. Please pass the `path` argument to create it." + ) + elif path and catalog_path: + if path.rstrip("/") != catalog_path.rstrip("/"): + raise exceptions.InvalidArgumentValue( + f"The specified path: {path}, does not match the existing Glue catalog table path: {catalog_path}" + ) if pandas_kwargs.get("compression") not in ("gzip", "bz2", None): raise exceptions.InvalidArgumentCombination( "If database and table are given, you must use one of these compressions: gzip, bz2 or None." @@ -421,6 +435,7 @@ def to_csv( # pylint: disable=too-many-arguments,too-many-locals,too-many-state df = _apply_dtype(df=df, dtype=dtype, catalog_table_input=catalog_table_input, mode=mode) + paths: List[str] = [] if dataset is False: pandas_kwargs["sep"] = sep pandas_kwargs["index"] = index @@ -434,7 +449,7 @@ def to_csv( # pylint: disable=too-many-arguments,too-many-locals,too-many-state s3_additional_kwargs=s3_additional_kwargs, **pandas_kwargs, ) - paths = [path] + paths = [path] # type: ignore else: if database and table: quoting: Optional[int] = csv.QUOTE_NONE @@ -461,7 +476,7 @@ def to_csv( # pylint: disable=too-many-arguments,too-many-locals,too-many-state func=_to_text, concurrent_partitioning=concurrent_partitioning, df=df, - path_root=path, + path_root=path, # type: ignore index=index, sep=sep, compression=compression, @@ -486,7 +501,7 @@ def to_csv( # pylint: disable=too-many-arguments,too-many-locals,too-many-state catalog._create_csv_table( # pylint: disable=protected-access database=database, table=table, - path=path, + path=path, # type: ignore columns_types=columns_types, partitions_types=partitions_types, bucketing_info=bucketing_info, diff --git a/tests/test__routines.py b/tests/test__routines.py index fb08e8d12..f2a61ba0e 100644 --- a/tests/test__routines.py +++ b/tests/test__routines.py @@ -44,7 +44,6 @@ def test_routine_0(glue_database, glue_table, path, use_threads, concurrent_part df = pd.DataFrame({"c1": [None, 1, None]}, dtype="Int16") wr.s3.to_parquet( df=df, - path=path, dataset=True, mode="overwrite", database=glue_database, @@ -101,7 +100,6 @@ def test_routine_0(glue_database, glue_table, path, use_threads, concurrent_part df = pd.DataFrame({"c2": ["a", None, "b"], "c1": [None, None, None]}) wr.s3.to_parquet( df=df, - path=path, dataset=True, mode="append", database=glue_database, @@ -162,7 +160,6 @@ def test_routine_0(glue_database, glue_table, path, use_threads, concurrent_part df = pd.DataFrame({"c0": ["foo", None], "c1": [0, 1]}) wr.s3.to_parquet( df=df, - path=path, dataset=True, mode="overwrite", database=glue_database, @@ -223,7 +220,6 @@ def test_routine_0(glue_database, glue_table, path, use_threads, concurrent_part df = pd.DataFrame({"c0": [1, 2], "c1": ["1", "3"], "c2": [True, False]}) wr.s3.to_parquet( df=df, - path=path, dataset=True, mode="overwrite_partitions", database=glue_database, diff --git a/tests/test_athena_csv.py b/tests/test_athena_csv.py index b5f008ce6..f62613421 100644 --- a/tests/test_athena_csv.py +++ b/tests/test_athena_csv.py @@ -49,7 +49,6 @@ def test_to_csv_modes(glue_database, glue_table, path, use_threads, concurrent_p df = pd.DataFrame({"c1": [0, 1, 2]}, dtype="Int16") wr.s3.to_csv( df=df, - path=path, dataset=True, mode="overwrite", database=glue_database, @@ -106,7 +105,6 @@ def test_to_csv_modes(glue_database, glue_table, path, use_threads, concurrent_p df = pd.DataFrame({"c0": ["foo", "boo"], "c1": [0, 1]}) wr.s3.to_csv( df=df, - path=path, dataset=True, mode="overwrite", database=glue_database, diff --git a/tests/test_s3.py b/tests/test_s3.py index 9c2c9c026..4db1acf2a 100644 --- a/tests/test_s3.py +++ b/tests/test_s3.py @@ -103,6 +103,23 @@ def mock_make_api_call(self, operation_name, kwarg): wr.s3.delete_objects(path=[path]) +def test_missing_or_wrong_path(path, glue_database, glue_table): + # Missing path + df = pd.DataFrame({"FooBoo": [1, 2, 3]}) + with pytest.raises(wr.exceptions.InvalidArgumentValue): + wr.s3.to_parquet(df=df) + with pytest.raises(wr.exceptions.InvalidArgumentCombination): + wr.s3.to_parquet(df=df, dataset=True) + with pytest.raises(wr.exceptions.InvalidArgumentValue): + wr.s3.to_parquet(df=df, dataset=True, database=glue_database, table=glue_table) + + # Wrong path + wr.s3.to_parquet(df=df, path=path, dataset=True, database=glue_database, table=glue_table) + wrong_path = "s3://bucket/prefix" + with pytest.raises(wr.exceptions.InvalidArgumentValue): + wr.s3.to_parquet(df=df, path=wrong_path, dataset=True, database=glue_database, table=glue_table) + + def test_s3_empty_dfs(): df = pd.DataFrame() with pytest.raises(wr.exceptions.EmptyDataFrame):