Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion awswrangler/redshift/_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ def unload_to_files(
aws_session_token: Optional[str] = None,
region: Optional[str] = None,
unload_format: Optional[Literal["CSV", "PARQUET"]] = None,
parallel: bool = True,
max_file_size: Optional[float] = None,
kms_key_id: Optional[str] = None,
manifest: bool = False,
Expand Down Expand Up @@ -265,6 +266,11 @@ def unload_to_files(
unload_format: str, optional
Format of the unloaded S3 objects from the query.
Valid values: "CSV", "PARQUET". Case sensitive. Defaults to PARQUET.
parallel: bool
Whether to unload to multiple files in parallel. Defaults to True.
By default, UNLOAD writes data in parallel to multiple files, according to the number of
slices in the cluster. If parallel is False, UNLOAD writes to one or more data files serially,
sorted absolutely according to the ORDER BY clause, if one is used.
max_file_size : float, optional
Specifies the maximum size (MB) of files that UNLOAD creates in Amazon S3.
Specify a decimal value between 5.0 MB and 6200.0 MB. If None, the default
Expand Down Expand Up @@ -305,6 +311,7 @@ def unload_to_files(
partition_str: str = f"\nPARTITION BY ({','.join(partition_cols)})" if partition_cols else ""
manifest_str: str = "\nmanifest" if manifest is True else ""
region_str: str = f"\nREGION AS '{region}'" if region is not None else ""
parallel_str: str = "\nPARALLEL ON" if parallel else "\nPARALLEL OFF"
if not max_file_size and engine.get() == EngineEnum.RAY:
_logger.warning(
"Unload `MAXFILESIZE` is not specified. "
Expand All @@ -330,7 +337,7 @@ def unload_to_files(
f"TO '{path}'\n"
f"{auth_str}"
"ALLOWOVERWRITE\n"
"PARALLEL ON\n"
f"{parallel_str}\n"
f"FORMAT {format_str}\n"
"ENCRYPTED"
f"{kms_key_id_str}"
Expand Down Expand Up @@ -362,6 +369,7 @@ def unload(
dtype_backend: Literal["numpy_nullable", "pyarrow"] = "numpy_nullable",
chunked: Union[bool, int] = False,
keep_files: bool = False,
parallel: bool = True,
use_threads: Union[bool, int] = True,
boto3_session: Optional[boto3.Session] = None,
s3_additional_kwargs: Optional[Dict[str, str]] = None,
Expand Down Expand Up @@ -433,6 +441,11 @@ def unload(
used to encrypt data files on Amazon S3.
keep_files : bool
Should keep stage files?
parallel: bool
Whether to unload to multiple files in parallel. Defaults to True.
By default, UNLOAD writes data in parallel to multiple files, according to the number of
slices in the cluster. If parallel is False, UNLOAD writes to one or more data files serially,
sorted absolutely according to the ORDER BY clause, if one is used.
dtype_backend: str, optional
Which dtype_backend to use, e.g. whether a DataFrame should have NumPy arrays,
nullable dtypes are used for all dtypes that have a nullable implementation when
Expand Down Expand Up @@ -487,6 +500,7 @@ def unload(
max_file_size=max_file_size,
kms_key_id=kms_key_id,
manifest=False,
parallel=parallel,
boto3_session=boto3_session,
)
if chunked is False:
Expand Down
5 changes: 5 additions & 0 deletions awswrangler/redshift/_read.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def unload_to_files(
aws_session_token: Optional[str] = ...,
region: Optional[str] = ...,
unload_format: Optional[Literal["CSV", "PARQUET"]] = ...,
parallel: bool = ...,
max_file_size: Optional[float] = ...,
kms_key_id: Optional[str] = ...,
manifest: bool = ...,
Expand All @@ -121,6 +122,7 @@ def unload(
dtype_backend: Literal["numpy_nullable", "pyarrow"] = ...,
chunked: Literal[False] = ...,
keep_files: bool = ...,
parallel: bool = ...,
use_threads: Union[bool, int] = ...,
boto3_session: Optional[boto3.Session] = ...,
s3_additional_kwargs: Optional[Dict[str, str]] = ...,
Expand All @@ -142,6 +144,7 @@ def unload(
dtype_backend: Literal["numpy_nullable", "pyarrow"] = ...,
chunked: Literal[True],
keep_files: bool = ...,
parallel: bool = ...,
use_threads: Union[bool, int] = ...,
boto3_session: Optional[boto3.Session] = ...,
s3_additional_kwargs: Optional[Dict[str, str]] = ...,
Expand All @@ -163,6 +166,7 @@ def unload(
dtype_backend: Literal["numpy_nullable", "pyarrow"] = ...,
chunked: bool,
keep_files: bool = ...,
parallel: bool = ...,
use_threads: Union[bool, int] = ...,
boto3_session: Optional[boto3.Session] = ...,
s3_additional_kwargs: Optional[Dict[str, str]] = ...,
Expand All @@ -184,6 +188,7 @@ def unload(
dtype_backend: Literal["numpy_nullable", "pyarrow"] = ...,
chunked: Union[bool, int],
keep_files: bool = ...,
parallel: bool = ...,
use_threads: Union[bool, int] = ...,
boto3_session: Optional[boto3.Session] = ...,
s3_additional_kwargs: Optional[Dict[str, str]] = ...,
Expand Down
3 changes: 3 additions & 0 deletions tests/unit/test_redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,13 +649,15 @@ def test_spectrum_decimal_cast(
[None, {"ServerSideEncryption": "AES256"}, {"ServerSideEncryption": "aws:kms", "SSEKMSKeyId": None}],
)
@pytest.mark.parametrize("use_threads", [True, False])
@pytest.mark.parametrize("parallel", [True, False])
def test_copy_unload_kms(
path: str,
redshift_table: str,
redshift_con: redshift_connector.Connection,
databases_parameters: Dict[str, Any],
kms_key_id: str,
use_threads: bool,
parallel: bool,
s3_additional_kwargs: Optional[Dict[str, Any]],
) -> None:
df = pd.DataFrame({"id": [1, 2, 3]})
Expand All @@ -678,6 +680,7 @@ def test_copy_unload_kms(
iam_role=databases_parameters["redshift"]["role"],
path=path,
keep_files=False,
parallel=parallel,
use_threads=use_threads,
s3_additional_kwargs=s3_additional_kwargs,
)
Expand Down