From 73f3b880daadb70107a7086ef23518b4284090b1 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Thu, 16 Apr 2020 16:18:01 -0300 Subject: [PATCH 1/6] Add s3.copy_objects() #186 --- awswrangler/_utils.py | 7 ++ awswrangler/s3.py | 97 ++++++++++++++++++++++ testing/test_awswrangler/test_data_lake.py | 47 +++++++++++ 3 files changed, 151 insertions(+) diff --git a/awswrangler/_utils.py b/awswrangler/_utils.py index 793f166fb..61b576f24 100644 --- a/awswrangler/_utils.py +++ b/awswrangler/_utils.py @@ -28,6 +28,13 @@ def client(service_name: str, session: Optional[boto3.Session] = None) -> boto3. ) +def resource(service_name: str, session: Optional[boto3.Session] = None) -> boto3.resource: + """Create a valid boto3.resource.""" + return ensure_session(session=session).resource( + service_name=service_name, use_ssl=True, config=botocore.config.Config(retries={"max_attempts": 15}) + ) + + def parse_path(path: str) -> Tuple[str, str]: """Split a full S3 path in bucket and key strings. diff --git a/awswrangler/s3.py b/awswrangler/s3.py index bdf3af1b8..6f80cb384 100644 --- a/awswrangler/s3.py +++ b/awswrangler/s3.py @@ -16,6 +16,7 @@ import pyarrow.lib # type: ignore import pyarrow.parquet # type: ignore import s3fs # type: ignore +from boto3.s3.transfer import TransferConfig # type: ignore from pandas.io.common import infer_compression # type: ignore from awswrangler import _data_types, _utils, catalog, exceptions @@ -2051,3 +2052,99 @@ def read_parquet_table( boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs, ) + + +def copy_objects( + paths: List[str], + source_path: str, + target_path: str, + mode: str = "append", + use_threads: bool = True, + boto3_session: Optional[boto3.Session] = None, +) -> List[str]: + """Copy a list of S3 objects to another S3 directory. + + Note + ---- + In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + + Parameters + ---------- + paths : List[str] + List of S3 objects paths (e.g. [s3://bucket/dir0/key0, s3://bucket/dir0/key1]). + source_path : str, + S3 Path for the source directory. + target_path : str, + S3 Path for the target directory. + mode: str, optional + ``append`` (Default), ``overwrite``, ``overwrite_partitions``. + use_threads : bool + True to enable concurrent requests, False to disable multiple threads. + If enabled os.cpu_count() will be used as the max number of threads. + boto3_session : boto3.Session(), optional + Boto3 Session. The default boto3 session will be used if boto3_session receive None. + + Returns + ------- + List[str] + List of new objects paths. + + Examples + -------- + >>> import awswrangler as wr + >>> wr.s3.copy_objects( + ... paths=["s3://bucket0/dir0/key0", "s3://bucket0/dir0/key1"]) + ... source_path="s3://bucket0/dir0/", + ... target_path="s3://bucket1/dir1/", + ... mode="append" + ... ) + ["s3://bucket1/dir1/key0", "s3://bucket1/dir1/key1"] + + """ + _logger.debug(f"len(paths): {len(paths)}") + if len(paths) < 1: + return [] + source_path = source_path[:-1] if source_path[-1] == "/" else source_path + target_path = target_path[:-1] if target_path[-1] == "/" else target_path + session: boto3.Session = _utils.ensure_session(session=boto3_session) + if mode == "overwrite": + _logger.debug(f"Deleting to overwrite: {target_path}/") + delete_objects(path=f"{target_path}/", use_threads=use_threads, boto3_session=session) + elif mode == "overwrite_partitions": + paths_wo_prefix: List[str] = [x.replace(f"{source_path}/", "") for x in paths] + paths_wo_filename: List[str] = [f"{x.rpartition('/')[0]}/" for x in paths_wo_prefix] + partitions_paths: List[str] = list(set(paths_wo_filename)) + target_partitions_paths = [f"{target_path}/{x}" for x in partitions_paths] + for path in target_partitions_paths: + _logger.debug(f"Deleting to overwrite_partitions: {path}") + delete_objects(path=path, use_threads=use_threads, boto3_session=session) + elif mode != "append": + raise exceptions.InvalidArgumentValue(f"{mode} is a invalid mode option.") + + batch: List[Tuple[str, str]] = [] + new_objects: List[str] = [] + for path in paths: + path_wo_prefix: str = path.replace(f"{source_path}/", "") + path_final: str = f"{target_path}/{path_wo_prefix}" + new_objects.append(path_final) + batch.append((path, path_final)) + _logger.debug(f"len(new_objects): {len(new_objects)}") + _copy_objects(batch=batch, use_threads=use_threads, boto3_session=session) + return new_objects + + +def _copy_objects(batch: List[Tuple[str, str]], use_threads: bool, boto3_session: boto3.Session) -> None: + _logger.debug(f"len(batch): {len(batch)}") + client_s3: boto3.client = _utils.client(service_name="s3", session=boto3_session) + resource_s3: boto3.resource = _utils.resource(service_name="s3", session=boto3_session) + for source, target in batch: + source_bucket, source_key = _utils.parse_path(path=source) + copy_source: Dict[str, str] = {"Bucket": source_bucket, "Key": source_key} + target_bucket, target_key = _utils.parse_path(path=target) + resource_s3.meta.client.copy( + CopySource=copy_source, + Bucket=target_bucket, + Key=target_key, + SourceClient=client_s3, + Config=TransferConfig(num_download_attempts=15, use_threads=use_threads), + ) diff --git a/testing/test_awswrangler/test_data_lake.py b/testing/test_awswrangler/test_data_lake.py index 3f3555152..55a06715f 100644 --- a/testing/test_awswrangler/test_data_lake.py +++ b/testing/test_awswrangler/test_data_lake.py @@ -1010,3 +1010,50 @@ def test_parquet_char_length(bucket, database, external_schema): wr.s3.delete_objects(path=path) assert wr.catalog.delete_table_if_exists(database=database, table=table) is True + + +def test_copy(bucket): + path = f"s3://{bucket}/test_copy/" + df = pd.DataFrame({"id": [1, 2, 3], "par": [1, 2, 3]}) + paths = wr.s3.to_parquet(df=df, path=path, dataset=True, partition_cols=["par"], mode="overwrite")["paths"] + wr.s3.wait_objects_exist(paths=paths) + df = wr.s3.read_parquet(path=path, dataset=True) + assert df.id.sum() == 6 + assert df.par.astype("Int64").sum() == 6 + + path2 = f"s3://{bucket}/test_copy2/" + df = pd.DataFrame({"id": [1, 2, 3], "par": [1, 2, 3]}) + paths = wr.s3.to_parquet(df=df, path=path2, dataset=True, partition_cols=["par"], mode="overwrite")["paths"] + wr.s3.wait_objects_exist(paths=paths) + paths = wr.s3.copy_objects(paths, source_path=path2, target_path=path, mode="append", use_threads=True) + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + df = wr.s3.read_parquet(path=path, dataset=True) + assert df.id.sum() == 12 + assert df.par.astype("Int64").sum() == 12 + + paths = wr.s3.copy_objects( + wr.s3.list_objects(path2), source_path=path2, target_path=path, mode="overwrite", use_threads=False + ) + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + df = wr.s3.read_parquet(path=path, dataset=True) + assert df.id.sum() == 6 + assert df.par.astype("Int64").sum() == 6 + + df = pd.DataFrame({"id": [4], "par": [3]}) + paths = wr.s3.to_parquet(df=df, path=path2, dataset=True, partition_cols=["par"], mode="overwrite")["paths"] + wr.s3.wait_objects_exist(paths=paths) + paths = wr.s3.copy_objects( + paths, source_path=path2, target_path=path, mode="overwrite_partitions", use_threads=True + ) + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + df = wr.s3.read_parquet(path=path, dataset=True) + assert df.id.sum() == 7 + assert df.par.astype("Int64").sum() == 6 + + with pytest.raises(wr.exceptions.InvalidArgumentValue): + wr.s3.copy_objects(["foo"], source_path="boo", target_path="bar", mode="WRONG") + + assert len(wr.s3.copy_objects([], source_path="boo", target_path="bar")) == 0 + + wr.s3.delete_objects(path=path) + wr.s3.delete_objects(path=path2) From f480e74091f822f3220ecdfc939046c4b4fc24f1 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Thu, 16 Apr 2020 20:17:03 +0000 Subject: [PATCH 2/6] Add merge tutorial --- tutorials/13 - Merging Datasets on S3.ipynb | 539 ++++++++++++++++++++ 1 file changed, 539 insertions(+) create mode 100644 tutorials/13 - Merging Datasets on S3.ipynb diff --git a/tutorials/13 - Merging Datasets on S3.ipynb b/tutorials/13 - Merging Datasets on S3.ipynb new file mode 100644 index 000000000..d430599a2 --- /dev/null +++ b/tutorials/13 - Merging Datasets on S3.ipynb @@ -0,0 +1,539 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "[![AWS Data Wrangler](_static/logo.png \"AWS Data Wrangler\")](https://github.com/awslabs/aws-data-wrangler)\n", + "\n", + "# 13 - Merging Datasets on S3\n", + "\n", + "Wrangler has 3 different copy modes to store Parquet Datasets on Amazon S3.\n", + "\n", + "- **append** (Default)\n", + "\n", + " Only adds new files without any delete.\n", + " \n", + "- **overwrite**\n", + "\n", + " Deletes everything in the target directory and then add new files.\n", + " \n", + "- **overwrite_partitions** (Partition Upsert)\n", + "\n", + " Only deletes the paths of partitions that should be updated and then writes the new partitions files. It's like a \"partition Upsert\"." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "from datetime import date\n", + "import awswrangler as wr\n", + "import pandas as pd" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Enter your bucket name:" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdin", + "output_type": "stream", + "text": [ + " ··········································\n" + ] + } + ], + "source": [ + "import getpass\n", + "bucket = getpass.getpass()\n", + "path1 = f\"s3://{bucket}/dataset1/\"\n", + "path2 = f\"s3://{bucket}/dataset2/\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Creating Dataset 1" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
idvaluedate
01foo2020-01-01
12boo2020-01-02
\n", + "
" + ], + "text/plain": [ + " id value date\n", + "0 1 foo 2020-01-01\n", + "1 2 boo 2020-01-02" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df = pd.DataFrame({\n", + " \"id\": [1, 2],\n", + " \"value\": [\"foo\", \"boo\"],\n", + " \"date\": [date(2020, 1, 1), date(2020, 1, 2)]\n", + "})\n", + "\n", + "wr.s3.to_parquet(\n", + " df=df,\n", + " path=path1,\n", + " dataset=True,\n", + " mode=\"overwrite\",\n", + " partition_cols=[\"date\"]\n", + ")\n", + "\n", + "wr.s3.read_parquet(path1, dataset=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Creating Dataset 2" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
idvaluedate
02xoo2020-01-02
13bar2020-01-03
\n", + "
" + ], + "text/plain": [ + " id value date\n", + "0 2 xoo 2020-01-02\n", + "1 3 bar 2020-01-03" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df = pd.DataFrame({\n", + " \"id\": [2, 3],\n", + " \"value\": [\"xoo\", \"bar\"],\n", + " \"date\": [date(2020, 1, 2), date(2020, 1, 3)]\n", + "})\n", + "\n", + "dataset2_files = wr.s3.to_parquet(\n", + " df=df,\n", + " path=path2,\n", + " dataset=True,\n", + " mode=\"overwrite\",\n", + " partition_cols=[\"date\"]\n", + ")[\"paths\"]\n", + "\n", + "wr.s3.read_parquet(path2, dataset=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Merging (Dataset 2 -> Dataset 1) (APPEND)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
idvaluedate
01foo2020-01-01
12xoo2020-01-02
22boo2020-01-02
33bar2020-01-03
\n", + "
" + ], + "text/plain": [ + " id value date\n", + "0 1 foo 2020-01-01\n", + "1 2 xoo 2020-01-02\n", + "2 2 boo 2020-01-02\n", + "3 3 bar 2020-01-03" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "wr.s3.copy_objects(\n", + " paths=dataset2_files,\n", + " source_path=path2,\n", + " target_path=path1,\n", + " mode=\"append\"\n", + ")\n", + "\n", + "wr.s3.read_parquet(path1, dataset=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Merging (Dataset 2 -> Dataset 1) (OVERWRITE_PARTITIONS)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
idvaluedate
01foo2020-01-01
12xoo2020-01-02
23bar2020-01-03
\n", + "
" + ], + "text/plain": [ + " id value date\n", + "0 1 foo 2020-01-01\n", + "1 2 xoo 2020-01-02\n", + "2 3 bar 2020-01-03" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "wr.s3.copy_objects(\n", + " paths=dataset2_files,\n", + " source_path=path2,\n", + " target_path=path1,\n", + " mode=\"overwrite_partitions\"\n", + ")\n", + "\n", + "wr.s3.read_parquet(path1, dataset=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Merging (Dataset 2 -> Dataset 1) (OVERWRITE)" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
idvaluedate
02xoo2020-01-02
13bar2020-01-03
\n", + "
" + ], + "text/plain": [ + " id value date\n", + "0 2 xoo 2020-01-02\n", + "1 3 bar 2020-01-03" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "wr.s3.copy_objects(\n", + " paths=dataset2_files,\n", + " source_path=path2,\n", + " target_path=path1,\n", + " mode=\"overwrite\"\n", + ")\n", + "\n", + "wr.s3.read_parquet(path1, dataset=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Cleaning Up" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "wr.s3.delete_objects(path1)\n", + "wr.s3.delete_objects(path2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "conda_python3", + "language": "python", + "name": "conda_python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.5" + }, + "pycharm": { + "stem_cell": { + "cell_type": "raw", + "metadata": { + "collapsed": false + }, + "source": [] + } + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} From e6d808d16c850ac534cf80ce6a9c350e9d786f42 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Thu, 16 Apr 2020 17:41:21 -0300 Subject: [PATCH 3/6] Add s3.copy_objects() on the docs #186 --- README.md | 1 + docs/source/api.rst | 1 + 2 files changed, 2 insertions(+) diff --git a/README.md b/README.md index c4ad94921..a278d199e 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,7 @@ df = wr.athena.read_sql_query("SELECT * FROM my_table", database="my_db") - [10 - Parquet Crawler](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/10%20-%20Parquet%20Crawler.ipynb) - [11 - CSV Datasets](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/11%20-%20CSV%20Datasets.ipynb) - [12 - CSV Crawler](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/12%20-%20CSV%20Crawler.ipynb) + - [13 - Merging Datasets on S3](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/13%20-%20Merging%20Datasets%20on%20S3.ipynb) - [**API Reference**](https://aws-data-wrangler.readthedocs.io/en/latest/api.html) - [Amazon S3](https://aws-data-wrangler.readthedocs.io/en/latest/api.html#amazon-s3) - [AWS Glue Catalog](https://aws-data-wrangler.readthedocs.io/en/latest/api.html#aws-glue-catalog) diff --git a/docs/source/api.rst b/docs/source/api.rst index abb7cb16f..1565faaae 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -29,6 +29,7 @@ Amazon S3 to_parquet wait_objects_exist wait_objects_not_exist + copy_objects AWS Glue Catalog ---------------- From 537afe5818197d025dcde8c98f199631be8cc8dc Mon Sep 17 00:00:00 2001 From: igorborgest Date: Fri, 17 Apr 2020 10:13:16 -0300 Subject: [PATCH 4/6] Splitting up and creating two functions, copy_objects() and merge_datasets() --- awswrangler/_utils.py | 2 +- awswrangler/athena.py | 4 +- awswrangler/db.py | 10 +- awswrangler/s3.py | 117 ++++++++++++++++----- docs/source/api.rst | 1 + testing/test_awswrangler/test_data_lake.py | 43 ++++++-- 6 files changed, 129 insertions(+), 48 deletions(-) diff --git a/awswrangler/_utils.py b/awswrangler/_utils.py index 61b576f24..21a27d37e 100644 --- a/awswrangler/_utils.py +++ b/awswrangler/_utils.py @@ -69,7 +69,7 @@ def ensure_cpu_count(use_threads: bool = True) -> int: Note ---- - In case of `use_threads=True` the number of process that could be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that could be spawned will be get from os.cpu_count(). Parameters ---------- diff --git a/awswrangler/athena.py b/awswrangler/athena.py index 41047d428..1933606ad 100644 --- a/awswrangler/athena.py +++ b/awswrangler/athena.py @@ -369,7 +369,7 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -605,7 +605,7 @@ def read_sql_table( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- diff --git a/awswrangler/db.py b/awswrangler/db.py index fb17c40cf..491fe7784 100644 --- a/awswrangler/db.py +++ b/awswrangler/db.py @@ -438,7 +438,7 @@ def copy_to_redshift( # pylint: disable=too-many-arguments Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -576,7 +576,7 @@ def copy_files_to_redshift( # pylint: disable=too-many-locals,too-many-argument Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -798,7 +798,7 @@ def write_redshift_copy_manifest( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -908,7 +908,7 @@ def unload_redshift( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -1024,7 +1024,7 @@ def unload_redshift_to_files( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- diff --git a/awswrangler/s3.py b/awswrangler/s3.py index 6f80cb384..a1dff75bb 100644 --- a/awswrangler/s3.py +++ b/awswrangler/s3.py @@ -176,7 +176,7 @@ def delete_objects( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -248,7 +248,7 @@ def describe_objects( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -328,7 +328,7 @@ def size_objects( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -396,7 +396,7 @@ def to_csv( # pylint: disable=too-many-arguments Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -426,9 +426,9 @@ def to_csv( # pylint: disable=too-many-arguments List of column names that will be used to create partitions. Only takes effect if dataset=True. mode: str, optional ``append`` (Default), ``overwrite``, ``overwrite_partitions``. Only takes effect if dataset=True. - database : str + database : str, optional Glue/Athena catalog: Database name. - table : str + table : str, optional Glue/Athena catalog: Table name. dtype: Dict[str, str], optional Dictionary of columns names and Athena/Glue types to be casted. @@ -808,7 +808,7 @@ def to_parquet( # pylint: disable=too-many-arguments Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -836,9 +836,9 @@ def to_parquet( # pylint: disable=too-many-arguments List of column names that will be used to create partitions. Only takes effect if dataset=True. mode: str, optional ``append`` (Default), ``overwrite``, ``overwrite_partitions``. Only takes effect if dataset=True. - database : str + database : str, optional Glue/Athena catalog: Database name. - table : str + table : str, optional Glue/Athena catalog: Table name. dtype: Dict[str, str], optional Dictionary of columns names and Athena/Glue types to be casted. @@ -1153,7 +1153,7 @@ def read_csv( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -1236,7 +1236,7 @@ def read_fwf( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -1319,7 +1319,7 @@ def read_json( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -1524,7 +1524,7 @@ def read_parquet( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -1671,7 +1671,7 @@ def read_parquet_metadata( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -1743,7 +1743,7 @@ def store_parquet_metadata( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -1843,7 +1843,7 @@ def wait_objects_exist( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -1895,7 +1895,7 @@ def wait_objects_not_exist( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -1981,7 +1981,7 @@ def read_parquet_table( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -2054,24 +2054,27 @@ def read_parquet_table( ) -def copy_objects( - paths: List[str], +def merge_datasets( source_path: str, target_path: str, mode: str = "append", use_threads: bool = True, boto3_session: Optional[boto3.Session] = None, ) -> List[str]: - """Copy a list of S3 objects to another S3 directory. + """Merge a source dataset into a target dataset. + + Note + ---- + If you are merging tables (S3 datasets + Glue Catalog metadata), + remember that you will also need to update your partitions metadata in some cases. + (e.g. wr.athena.repair_table(table='...', database='...')) Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- - paths : List[str] - List of S3 objects paths (e.g. [s3://bucket/dir0/key0, s3://bucket/dir0/key1]). source_path : str, S3 Path for the source directory. target_path : str, @@ -2092,8 +2095,7 @@ def copy_objects( Examples -------- >>> import awswrangler as wr - >>> wr.s3.copy_objects( - ... paths=["s3://bucket0/dir0/key0", "s3://bucket0/dir0/key1"]) + >>> wr.s3.merge_datasets( ... source_path="s3://bucket0/dir0/", ... target_path="s3://bucket1/dir1/", ... mode="append" @@ -2101,12 +2103,15 @@ def copy_objects( ["s3://bucket1/dir1/key0", "s3://bucket1/dir1/key1"] """ - _logger.debug(f"len(paths): {len(paths)}") - if len(paths) < 1: - return [] source_path = source_path[:-1] if source_path[-1] == "/" else source_path target_path = target_path[:-1] if target_path[-1] == "/" else target_path session: boto3.Session = _utils.ensure_session(session=boto3_session) + + paths: List[str] = list_objects(path=f"{source_path}/", boto3_session=session) + _logger.debug(f"len(paths): {len(paths)}") + if len(paths) < 1: + return [] + if mode == "overwrite": _logger.debug(f"Deleting to overwrite: {target_path}/") delete_objects(path=f"{target_path}/", use_threads=use_threads, boto3_session=session) @@ -2121,6 +2126,60 @@ def copy_objects( elif mode != "append": raise exceptions.InvalidArgumentValue(f"{mode} is a invalid mode option.") + new_objects: List[str] = copy_objects(paths=paths, source_path=source_path, target_path=target_path, use_threads=use_threads, boto3_session=session) + _logger.debug(f"len(new_objects): {len(new_objects)}") + return new_objects + + +def copy_objects( + paths: List[str], + source_path: str, + target_path: str, + use_threads: bool = True, + boto3_session: Optional[boto3.Session] = None, +) -> List[str]: + """Copy a list of S3 objects to another S3 directory. + + Note + ---- + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). + + Parameters + ---------- + paths : List[str] + List of S3 objects paths (e.g. [s3://bucket/dir0/key0, s3://bucket/dir0/key1]). + source_path : str, + S3 Path for the source directory. + target_path : str, + S3 Path for the target directory. + use_threads : bool + True to enable concurrent requests, False to disable multiple threads. + If enabled os.cpu_count() will be used as the max number of threads. + boto3_session : boto3.Session(), optional + Boto3 Session. The default boto3 session will be used if boto3_session receive None. + + Returns + ------- + List[str] + List of new objects paths. + + Examples + -------- + >>> import awswrangler as wr + >>> wr.s3.copy_objects( + ... paths=["s3://bucket0/dir0/key0", "s3://bucket0/dir0/key1"]) + ... source_path="s3://bucket0/dir0/", + ... target_path="s3://bucket1/dir1/", + ... ) + ["s3://bucket1/dir1/key0", "s3://bucket1/dir1/key1"] + + """ + _logger.debug(f"len(paths): {len(paths)}") + if len(paths) < 1: + return [] + source_path = source_path[:-1] if source_path[-1] == "/" else source_path + target_path = target_path[:-1] if target_path[-1] == "/" else target_path + session: boto3.Session = _utils.ensure_session(session=boto3_session) batch: List[Tuple[str, str]] = [] new_objects: List[str] = [] for path in paths: diff --git a/docs/source/api.rst b/docs/source/api.rst index 1565faaae..897fc7a3e 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -30,6 +30,7 @@ Amazon S3 wait_objects_exist wait_objects_not_exist copy_objects + merge_datasets AWS Glue Catalog ---------------- diff --git a/testing/test_awswrangler/test_data_lake.py b/testing/test_awswrangler/test_data_lake.py index 55a06715f..d5fccd7b4 100644 --- a/testing/test_awswrangler/test_data_lake.py +++ b/testing/test_awswrangler/test_data_lake.py @@ -1012,8 +1012,8 @@ def test_parquet_char_length(bucket, database, external_schema): assert wr.catalog.delete_table_if_exists(database=database, table=table) is True -def test_copy(bucket): - path = f"s3://{bucket}/test_copy/" +def test_merge(bucket): + path = f"s3://{bucket}/test_merge/" df = pd.DataFrame({"id": [1, 2, 3], "par": [1, 2, 3]}) paths = wr.s3.to_parquet(df=df, path=path, dataset=True, partition_cols=["par"], mode="overwrite")["paths"] wr.s3.wait_objects_exist(paths=paths) @@ -1021,19 +1021,17 @@ def test_copy(bucket): assert df.id.sum() == 6 assert df.par.astype("Int64").sum() == 6 - path2 = f"s3://{bucket}/test_copy2/" + path2 = f"s3://{bucket}/test_merge2/" df = pd.DataFrame({"id": [1, 2, 3], "par": [1, 2, 3]}) paths = wr.s3.to_parquet(df=df, path=path2, dataset=True, partition_cols=["par"], mode="overwrite")["paths"] wr.s3.wait_objects_exist(paths=paths) - paths = wr.s3.copy_objects(paths, source_path=path2, target_path=path, mode="append", use_threads=True) + paths = wr.s3.merge_datasets(source_path=path2, target_path=path, mode="append", use_threads=True) wr.s3.wait_objects_exist(paths=paths, use_threads=False) df = wr.s3.read_parquet(path=path, dataset=True) assert df.id.sum() == 12 assert df.par.astype("Int64").sum() == 12 - paths = wr.s3.copy_objects( - wr.s3.list_objects(path2), source_path=path2, target_path=path, mode="overwrite", use_threads=False - ) + paths = wr.s3.merge_datasets(source_path=path2, target_path=path, mode="overwrite", use_threads=False) wr.s3.wait_objects_exist(paths=paths, use_threads=False) df = wr.s3.read_parquet(path=path, dataset=True) assert df.id.sum() == 6 @@ -1042,16 +1040,39 @@ def test_copy(bucket): df = pd.DataFrame({"id": [4], "par": [3]}) paths = wr.s3.to_parquet(df=df, path=path2, dataset=True, partition_cols=["par"], mode="overwrite")["paths"] wr.s3.wait_objects_exist(paths=paths) - paths = wr.s3.copy_objects( - paths, source_path=path2, target_path=path, mode="overwrite_partitions", use_threads=True - ) + paths = wr.s3.merge_datasets(source_path=path2, target_path=path, mode="overwrite_partitions", use_threads=True) wr.s3.wait_objects_exist(paths=paths, use_threads=False) df = wr.s3.read_parquet(path=path, dataset=True) assert df.id.sum() == 7 assert df.par.astype("Int64").sum() == 6 with pytest.raises(wr.exceptions.InvalidArgumentValue): - wr.s3.copy_objects(["foo"], source_path="boo", target_path="bar", mode="WRONG") + wr.s3.merge_datasets(source_path=path, target_path="bar", mode="WRONG") + + assert len(wr.s3.merge_datasets(source_path=f"s3://{bucket}/empty/", target_path="bar")) == 0 + + wr.s3.delete_objects(path=path) + wr.s3.delete_objects(path=path2) + + +def test_copy(bucket): + path = f"s3://{bucket}/test_copy/" + df = pd.DataFrame({"id": [1, 2, 3], "par": [1, 2, 3]}) + paths = wr.s3.to_parquet(df=df, path=path, dataset=True, partition_cols=["par"], mode="overwrite")["paths"] + wr.s3.wait_objects_exist(paths=paths) + df = wr.s3.read_parquet(path=path, dataset=True) + assert df.id.sum() == 6 + assert df.par.astype("Int64").sum() == 6 + + path2 = f"s3://{bucket}/test_copy2/" + df = pd.DataFrame({"id": [1, 2, 3], "par": [1, 2, 3]}) + paths = wr.s3.to_parquet(df=df, path=path2, dataset=True, partition_cols=["par"], mode="overwrite")["paths"] + wr.s3.wait_objects_exist(paths=paths) + paths = wr.s3.copy_objects(paths, source_path=path2, target_path=path, use_threads=True) + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + df = wr.s3.read_parquet(path=path, dataset=True) + assert df.id.sum() == 12 + assert df.par.astype("Int64").sum() == 12 assert len(wr.s3.copy_objects([], source_path="boo", target_path="bar")) == 0 From b6b8bfc54f13b3b98589b57b501a948a32bf3f40 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Fri, 17 Apr 2020 10:27:18 -0300 Subject: [PATCH 5/6] Fixing Flake8 compatibility --- awswrangler/s3.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/awswrangler/s3.py b/awswrangler/s3.py index a1dff75bb..f728937db 100644 --- a/awswrangler/s3.py +++ b/awswrangler/s3.py @@ -2126,7 +2126,9 @@ def merge_datasets( elif mode != "append": raise exceptions.InvalidArgumentValue(f"{mode} is a invalid mode option.") - new_objects: List[str] = copy_objects(paths=paths, source_path=source_path, target_path=target_path, use_threads=use_threads, boto3_session=session) + new_objects: List[str] = copy_objects( + paths=paths, source_path=source_path, target_path=target_path, use_threads=use_threads, boto3_session=session + ) _logger.debug(f"len(new_objects): {len(new_objects)}") return new_objects From 30f3ea0c4423b37bdc0ea4eaba8bd4f1537999e2 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Fri, 17 Apr 2020 13:54:55 +0000 Subject: [PATCH 6/6] Updating tutorial 13 --- tutorials/13 - Merging Datasets on S3.ipynb | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tutorials/13 - Merging Datasets on S3.ipynb b/tutorials/13 - Merging Datasets on S3.ipynb index d430599a2..03f823043 100644 --- a/tutorials/13 - Merging Datasets on S3.ipynb +++ b/tutorials/13 - Merging Datasets on S3.ipynb @@ -310,8 +310,7 @@ } ], "source": [ - "wr.s3.copy_objects(\n", - " paths=dataset2_files,\n", + "wr.s3.merge_datasets(\n", " source_path=path2,\n", " target_path=path1,\n", " mode=\"append\"\n", @@ -394,8 +393,7 @@ } ], "source": [ - "wr.s3.copy_objects(\n", - " paths=dataset2_files,\n", + "wr.s3.merge_datasets(\n", " source_path=path2,\n", " target_path=path1,\n", " mode=\"overwrite_partitions\"\n", @@ -471,8 +469,7 @@ } ], "source": [ - "wr.s3.copy_objects(\n", - " paths=dataset2_files,\n", + "wr.s3.merge_datasets(\n", " source_path=path2,\n", " target_path=path1,\n", " mode=\"overwrite\"\n",