diff --git a/awswrangler/redshift/_read.py b/awswrangler/redshift/_read.py index 0906dc4de..288245213 100644 --- a/awswrangler/redshift/_read.py +++ b/awswrangler/redshift/_read.py @@ -224,6 +224,7 @@ def unload_to_files( aws_session_token: Optional[str] = None, region: Optional[str] = None, unload_format: Optional[Literal["CSV", "PARQUET"]] = None, + parallel: bool = True, max_file_size: Optional[float] = None, kms_key_id: Optional[str] = None, manifest: bool = False, @@ -265,6 +266,11 @@ def unload_to_files( unload_format: str, optional Format of the unloaded S3 objects from the query. Valid values: "CSV", "PARQUET". Case sensitive. Defaults to PARQUET. + parallel: bool + Whether to unload to multiple files in parallel. Defaults to True. + By default, UNLOAD writes data in parallel to multiple files, according to the number of + slices in the cluster. If parallel is False, UNLOAD writes to one or more data files serially, + sorted absolutely according to the ORDER BY clause, if one is used. max_file_size : float, optional Specifies the maximum size (MB) of files that UNLOAD creates in Amazon S3. Specify a decimal value between 5.0 MB and 6200.0 MB. If None, the default @@ -305,6 +311,7 @@ def unload_to_files( partition_str: str = f"\nPARTITION BY ({','.join(partition_cols)})" if partition_cols else "" manifest_str: str = "\nmanifest" if manifest is True else "" region_str: str = f"\nREGION AS '{region}'" if region is not None else "" + parallel_str: str = "\nPARALLEL ON" if parallel else "\nPARALLEL OFF" if not max_file_size and engine.get() == EngineEnum.RAY: _logger.warning( "Unload `MAXFILESIZE` is not specified. " @@ -330,7 +337,7 @@ def unload_to_files( f"TO '{path}'\n" f"{auth_str}" "ALLOWOVERWRITE\n" - "PARALLEL ON\n" + f"{parallel_str}\n" f"FORMAT {format_str}\n" "ENCRYPTED" f"{kms_key_id_str}" @@ -362,6 +369,7 @@ def unload( dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable", chunked: Union[bool, int] = False, keep_files: bool = False, + parallel: bool = True, use_threads: Union[bool, int] = True, boto3_session: Optional[boto3.Session] = None, s3_additional_kwargs: Optional[Dict[str, str]] = None, @@ -433,6 +441,11 @@ def unload( used to encrypt data files on Amazon S3. keep_files : bool Should keep stage files? + parallel: bool + Whether to unload to multiple files in parallel. Defaults to True. + By default, UNLOAD writes data in parallel to multiple files, according to the number of + slices in the cluster. If parallel is False, UNLOAD writes to one or more data files serially, + sorted absolutely according to the ORDER BY clause, if one is used. dtype_backend: str, optional Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays, nullable dtypes are used for all dtypes that have a nullable implementation when @@ -487,6 +500,7 @@ def unload( max_file_size=max_file_size, kms_key_id=kms_key_id, manifest=False, + parallel=parallel, boto3_session=boto3_session, ) if chunked is False: diff --git a/awswrangler/redshift/_read.pyi b/awswrangler/redshift/_read.pyi index fc1b6ae76..31bf908f3 100644 --- a/awswrangler/redshift/_read.pyi +++ b/awswrangler/redshift/_read.pyi @@ -100,6 +100,7 @@ def unload_to_files( aws_session_token: Optional[str] = ..., region: Optional[str] = ..., unload_format: Optional[Literal["CSV", "PARQUET"]] = ..., + parallel: bool = ..., max_file_size: Optional[float] = ..., kms_key_id: Optional[str] = ..., manifest: bool = ..., @@ -121,6 +122,7 @@ def unload( dtype_backend: Literal["numpy_nullable", "pyarrow"] = ..., chunked: Literal[False] = ..., keep_files: bool = ..., + parallel: bool = ..., use_threads: Union[bool, int] = ..., boto3_session: Optional[boto3.Session] = ..., s3_additional_kwargs: Optional[Dict[str, str]] = ..., @@ -142,6 +144,7 @@ def unload( dtype_backend: Literal["numpy_nullable", "pyarrow"] = ..., chunked: Literal[True], keep_files: bool = ..., + parallel: bool = ..., use_threads: Union[bool, int] = ..., boto3_session: Optional[boto3.Session] = ..., s3_additional_kwargs: Optional[Dict[str, str]] = ..., @@ -163,6 +166,7 @@ def unload( dtype_backend: Literal["numpy_nullable", "pyarrow"] = ..., chunked: bool, keep_files: bool = ..., + parallel: bool = ..., use_threads: Union[bool, int] = ..., boto3_session: Optional[boto3.Session] = ..., s3_additional_kwargs: Optional[Dict[str, str]] = ..., @@ -184,6 +188,7 @@ def unload( dtype_backend: Literal["numpy_nullable", "pyarrow"] = ..., chunked: Union[bool, int], keep_files: bool = ..., + parallel: bool = ..., use_threads: Union[bool, int] = ..., boto3_session: Optional[boto3.Session] = ..., s3_additional_kwargs: Optional[Dict[str, str]] = ..., diff --git a/tests/unit/test_redshift.py b/tests/unit/test_redshift.py index 39ba0b0ff..805d864ff 100644 --- a/tests/unit/test_redshift.py +++ b/tests/unit/test_redshift.py @@ -649,6 +649,7 @@ def test_spectrum_decimal_cast( [None, {"ServerSideEncryption": "AES256"}, {"ServerSideEncryption": "aws:kms", "SSEKMSKeyId": None}], ) @pytest.mark.parametrize("use_threads", [True, False]) +@pytest.mark.parametrize("parallel", [True, False]) def test_copy_unload_kms( path: str, redshift_table: str, @@ -656,6 +657,7 @@ def test_copy_unload_kms( databases_parameters: Dict[str, Any], kms_key_id: str, use_threads: bool, + parallel: bool, s3_additional_kwargs: Optional[Dict[str, Any]], ) -> None: df = pd.DataFrame({"id": [1, 2, 3]}) @@ -678,6 +680,7 @@ def test_copy_unload_kms( iam_role=databases_parameters["redshift"]["role"], path=path, keep_files=False, + parallel=parallel, use_threads=use_threads, s3_additional_kwargs=s3_additional_kwargs, )