diff --git a/README.md b/README.md index c4ad94921..a278d199e 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,7 @@ df = wr.athena.read_sql_query("SELECT * FROM my_table", database="my_db") - [10 - Parquet Crawler](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/10%20-%20Parquet%20Crawler.ipynb) - [11 - CSV Datasets](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/11%20-%20CSV%20Datasets.ipynb) - [12 - CSV Crawler](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/12%20-%20CSV%20Crawler.ipynb) + - [13 - Merging Datasets on S3](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/13%20-%20Merging%20Datasets%20on%20S3.ipynb) - [**API Reference**](https://aws-data-wrangler.readthedocs.io/en/latest/api.html) - [Amazon S3](https://aws-data-wrangler.readthedocs.io/en/latest/api.html#amazon-s3) - [AWS Glue Catalog](https://aws-data-wrangler.readthedocs.io/en/latest/api.html#aws-glue-catalog) diff --git a/awswrangler/_utils.py b/awswrangler/_utils.py index 793f166fb..21a27d37e 100644 --- a/awswrangler/_utils.py +++ b/awswrangler/_utils.py @@ -28,6 +28,13 @@ def client(service_name: str, session: Optional[boto3.Session] = None) -> boto3. ) +def resource(service_name: str, session: Optional[boto3.Session] = None) -> boto3.resource: + """Create a valid boto3.resource.""" + return ensure_session(session=session).resource( + service_name=service_name, use_ssl=True, config=botocore.config.Config(retries={"max_attempts": 15}) + ) + + def parse_path(path: str) -> Tuple[str, str]: """Split a full S3 path in bucket and key strings. @@ -62,7 +69,7 @@ def ensure_cpu_count(use_threads: bool = True) -> int: Note ---- - In case of `use_threads=True` the number of process that could be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that could be spawned will be get from os.cpu_count(). Parameters ---------- diff --git a/awswrangler/athena.py b/awswrangler/athena.py index 41047d428..1933606ad 100644 --- a/awswrangler/athena.py +++ b/awswrangler/athena.py @@ -369,7 +369,7 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -605,7 +605,7 @@ def read_sql_table( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- diff --git a/awswrangler/db.py b/awswrangler/db.py index fb17c40cf..491fe7784 100644 --- a/awswrangler/db.py +++ b/awswrangler/db.py @@ -438,7 +438,7 @@ def copy_to_redshift( # pylint: disable=too-many-arguments Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -576,7 +576,7 @@ def copy_files_to_redshift( # pylint: disable=too-many-locals,too-many-argument Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -798,7 +798,7 @@ def write_redshift_copy_manifest( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -908,7 +908,7 @@ def unload_redshift( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -1024,7 +1024,7 @@ def unload_redshift_to_files( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- diff --git a/awswrangler/s3.py b/awswrangler/s3.py index bdf3af1b8..f728937db 100644 --- a/awswrangler/s3.py +++ b/awswrangler/s3.py @@ -16,6 +16,7 @@ import pyarrow.lib # type: ignore import pyarrow.parquet # type: ignore import s3fs # type: ignore +from boto3.s3.transfer import TransferConfig # type: ignore from pandas.io.common import infer_compression # type: ignore from awswrangler import _data_types, _utils, catalog, exceptions @@ -175,7 +176,7 @@ def delete_objects( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -247,7 +248,7 @@ def describe_objects( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -327,7 +328,7 @@ def size_objects( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -395,7 +396,7 @@ def to_csv( # pylint: disable=too-many-arguments Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -425,9 +426,9 @@ def to_csv( # pylint: disable=too-many-arguments List of column names that will be used to create partitions. Only takes effect if dataset=True. mode: str, optional ``append`` (Default), ``overwrite``, ``overwrite_partitions``. Only takes effect if dataset=True. - database : str + database : str, optional Glue/Athena catalog: Database name. - table : str + table : str, optional Glue/Athena catalog: Table name. dtype: Dict[str, str], optional Dictionary of columns names and Athena/Glue types to be casted. @@ -807,7 +808,7 @@ def to_parquet( # pylint: disable=too-many-arguments Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -835,9 +836,9 @@ def to_parquet( # pylint: disable=too-many-arguments List of column names that will be used to create partitions. Only takes effect if dataset=True. mode: str, optional ``append`` (Default), ``overwrite``, ``overwrite_partitions``. Only takes effect if dataset=True. - database : str + database : str, optional Glue/Athena catalog: Database name. - table : str + table : str, optional Glue/Athena catalog: Table name. dtype: Dict[str, str], optional Dictionary of columns names and Athena/Glue types to be casted. @@ -1152,7 +1153,7 @@ def read_csv( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -1235,7 +1236,7 @@ def read_fwf( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -1318,7 +1319,7 @@ def read_json( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -1523,7 +1524,7 @@ def read_parquet( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -1670,7 +1671,7 @@ def read_parquet_metadata( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -1742,7 +1743,7 @@ def store_parquet_metadata( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -1842,7 +1843,7 @@ def wait_objects_exist( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -1894,7 +1895,7 @@ def wait_objects_not_exist( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -1980,7 +1981,7 @@ def read_parquet_table( Note ---- - In case of `use_threads=True` the number of process that will be spawned will be get from os.cpu_count(). + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). Parameters ---------- @@ -2051,3 +2052,160 @@ def read_parquet_table( boto3_session=boto3_session, s3_additional_kwargs=s3_additional_kwargs, ) + + +def merge_datasets( + source_path: str, + target_path: str, + mode: str = "append", + use_threads: bool = True, + boto3_session: Optional[boto3.Session] = None, +) -> List[str]: + """Merge a source dataset into a target dataset. + + Note + ---- + If you are merging tables (S3 datasets + Glue Catalog metadata), + remember that you will also need to update your partitions metadata in some cases. + (e.g. wr.athena.repair_table(table='...', database='...')) + + Note + ---- + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). + + Parameters + ---------- + source_path : str, + S3 Path for the source directory. + target_path : str, + S3 Path for the target directory. + mode: str, optional + ``append`` (Default), ``overwrite``, ``overwrite_partitions``. + use_threads : bool + True to enable concurrent requests, False to disable multiple threads. + If enabled os.cpu_count() will be used as the max number of threads. + boto3_session : boto3.Session(), optional + Boto3 Session. The default boto3 session will be used if boto3_session receive None. + + Returns + ------- + List[str] + List of new objects paths. + + Examples + -------- + >>> import awswrangler as wr + >>> wr.s3.merge_datasets( + ... source_path="s3://bucket0/dir0/", + ... target_path="s3://bucket1/dir1/", + ... mode="append" + ... ) + ["s3://bucket1/dir1/key0", "s3://bucket1/dir1/key1"] + + """ + source_path = source_path[:-1] if source_path[-1] == "/" else source_path + target_path = target_path[:-1] if target_path[-1] == "/" else target_path + session: boto3.Session = _utils.ensure_session(session=boto3_session) + + paths: List[str] = list_objects(path=f"{source_path}/", boto3_session=session) + _logger.debug(f"len(paths): {len(paths)}") + if len(paths) < 1: + return [] + + if mode == "overwrite": + _logger.debug(f"Deleting to overwrite: {target_path}/") + delete_objects(path=f"{target_path}/", use_threads=use_threads, boto3_session=session) + elif mode == "overwrite_partitions": + paths_wo_prefix: List[str] = [x.replace(f"{source_path}/", "") for x in paths] + paths_wo_filename: List[str] = [f"{x.rpartition('/')[0]}/" for x in paths_wo_prefix] + partitions_paths: List[str] = list(set(paths_wo_filename)) + target_partitions_paths = [f"{target_path}/{x}" for x in partitions_paths] + for path in target_partitions_paths: + _logger.debug(f"Deleting to overwrite_partitions: {path}") + delete_objects(path=path, use_threads=use_threads, boto3_session=session) + elif mode != "append": + raise exceptions.InvalidArgumentValue(f"{mode} is a invalid mode option.") + + new_objects: List[str] = copy_objects( + paths=paths, source_path=source_path, target_path=target_path, use_threads=use_threads, boto3_session=session + ) + _logger.debug(f"len(new_objects): {len(new_objects)}") + return new_objects + + +def copy_objects( + paths: List[str], + source_path: str, + target_path: str, + use_threads: bool = True, + boto3_session: Optional[boto3.Session] = None, +) -> List[str]: + """Copy a list of S3 objects to another S3 directory. + + Note + ---- + In case of `use_threads=True` the number of threads that will be spawned will be get from os.cpu_count(). + + Parameters + ---------- + paths : List[str] + List of S3 objects paths (e.g. [s3://bucket/dir0/key0, s3://bucket/dir0/key1]). + source_path : str, + S3 Path for the source directory. + target_path : str, + S3 Path for the target directory. + use_threads : bool + True to enable concurrent requests, False to disable multiple threads. + If enabled os.cpu_count() will be used as the max number of threads. + boto3_session : boto3.Session(), optional + Boto3 Session. The default boto3 session will be used if boto3_session receive None. + + Returns + ------- + List[str] + List of new objects paths. + + Examples + -------- + >>> import awswrangler as wr + >>> wr.s3.copy_objects( + ... paths=["s3://bucket0/dir0/key0", "s3://bucket0/dir0/key1"]) + ... source_path="s3://bucket0/dir0/", + ... target_path="s3://bucket1/dir1/", + ... ) + ["s3://bucket1/dir1/key0", "s3://bucket1/dir1/key1"] + + """ + _logger.debug(f"len(paths): {len(paths)}") + if len(paths) < 1: + return [] + source_path = source_path[:-1] if source_path[-1] == "/" else source_path + target_path = target_path[:-1] if target_path[-1] == "/" else target_path + session: boto3.Session = _utils.ensure_session(session=boto3_session) + batch: List[Tuple[str, str]] = [] + new_objects: List[str] = [] + for path in paths: + path_wo_prefix: str = path.replace(f"{source_path}/", "") + path_final: str = f"{target_path}/{path_wo_prefix}" + new_objects.append(path_final) + batch.append((path, path_final)) + _logger.debug(f"len(new_objects): {len(new_objects)}") + _copy_objects(batch=batch, use_threads=use_threads, boto3_session=session) + return new_objects + + +def _copy_objects(batch: List[Tuple[str, str]], use_threads: bool, boto3_session: boto3.Session) -> None: + _logger.debug(f"len(batch): {len(batch)}") + client_s3: boto3.client = _utils.client(service_name="s3", session=boto3_session) + resource_s3: boto3.resource = _utils.resource(service_name="s3", session=boto3_session) + for source, target in batch: + source_bucket, source_key = _utils.parse_path(path=source) + copy_source: Dict[str, str] = {"Bucket": source_bucket, "Key": source_key} + target_bucket, target_key = _utils.parse_path(path=target) + resource_s3.meta.client.copy( + CopySource=copy_source, + Bucket=target_bucket, + Key=target_key, + SourceClient=client_s3, + Config=TransferConfig(num_download_attempts=15, use_threads=use_threads), + ) diff --git a/docs/source/api.rst b/docs/source/api.rst index abb7cb16f..897fc7a3e 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -29,6 +29,8 @@ Amazon S3 to_parquet wait_objects_exist wait_objects_not_exist + copy_objects + merge_datasets AWS Glue Catalog ---------------- diff --git a/testing/test_awswrangler/test_data_lake.py b/testing/test_awswrangler/test_data_lake.py index 3f3555152..d5fccd7b4 100644 --- a/testing/test_awswrangler/test_data_lake.py +++ b/testing/test_awswrangler/test_data_lake.py @@ -1010,3 +1010,71 @@ def test_parquet_char_length(bucket, database, external_schema): wr.s3.delete_objects(path=path) assert wr.catalog.delete_table_if_exists(database=database, table=table) is True + + +def test_merge(bucket): + path = f"s3://{bucket}/test_merge/" + df = pd.DataFrame({"id": [1, 2, 3], "par": [1, 2, 3]}) + paths = wr.s3.to_parquet(df=df, path=path, dataset=True, partition_cols=["par"], mode="overwrite")["paths"] + wr.s3.wait_objects_exist(paths=paths) + df = wr.s3.read_parquet(path=path, dataset=True) + assert df.id.sum() == 6 + assert df.par.astype("Int64").sum() == 6 + + path2 = f"s3://{bucket}/test_merge2/" + df = pd.DataFrame({"id": [1, 2, 3], "par": [1, 2, 3]}) + paths = wr.s3.to_parquet(df=df, path=path2, dataset=True, partition_cols=["par"], mode="overwrite")["paths"] + wr.s3.wait_objects_exist(paths=paths) + paths = wr.s3.merge_datasets(source_path=path2, target_path=path, mode="append", use_threads=True) + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + df = wr.s3.read_parquet(path=path, dataset=True) + assert df.id.sum() == 12 + assert df.par.astype("Int64").sum() == 12 + + paths = wr.s3.merge_datasets(source_path=path2, target_path=path, mode="overwrite", use_threads=False) + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + df = wr.s3.read_parquet(path=path, dataset=True) + assert df.id.sum() == 6 + assert df.par.astype("Int64").sum() == 6 + + df = pd.DataFrame({"id": [4], "par": [3]}) + paths = wr.s3.to_parquet(df=df, path=path2, dataset=True, partition_cols=["par"], mode="overwrite")["paths"] + wr.s3.wait_objects_exist(paths=paths) + paths = wr.s3.merge_datasets(source_path=path2, target_path=path, mode="overwrite_partitions", use_threads=True) + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + df = wr.s3.read_parquet(path=path, dataset=True) + assert df.id.sum() == 7 + assert df.par.astype("Int64").sum() == 6 + + with pytest.raises(wr.exceptions.InvalidArgumentValue): + wr.s3.merge_datasets(source_path=path, target_path="bar", mode="WRONG") + + assert len(wr.s3.merge_datasets(source_path=f"s3://{bucket}/empty/", target_path="bar")) == 0 + + wr.s3.delete_objects(path=path) + wr.s3.delete_objects(path=path2) + + +def test_copy(bucket): + path = f"s3://{bucket}/test_copy/" + df = pd.DataFrame({"id": [1, 2, 3], "par": [1, 2, 3]}) + paths = wr.s3.to_parquet(df=df, path=path, dataset=True, partition_cols=["par"], mode="overwrite")["paths"] + wr.s3.wait_objects_exist(paths=paths) + df = wr.s3.read_parquet(path=path, dataset=True) + assert df.id.sum() == 6 + assert df.par.astype("Int64").sum() == 6 + + path2 = f"s3://{bucket}/test_copy2/" + df = pd.DataFrame({"id": [1, 2, 3], "par": [1, 2, 3]}) + paths = wr.s3.to_parquet(df=df, path=path2, dataset=True, partition_cols=["par"], mode="overwrite")["paths"] + wr.s3.wait_objects_exist(paths=paths) + paths = wr.s3.copy_objects(paths, source_path=path2, target_path=path, use_threads=True) + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + df = wr.s3.read_parquet(path=path, dataset=True) + assert df.id.sum() == 12 + assert df.par.astype("Int64").sum() == 12 + + assert len(wr.s3.copy_objects([], source_path="boo", target_path="bar")) == 0 + + wr.s3.delete_objects(path=path) + wr.s3.delete_objects(path=path2) diff --git a/tutorials/13 - Merging Datasets on S3.ipynb b/tutorials/13 - Merging Datasets on S3.ipynb new file mode 100644 index 000000000..03f823043 --- /dev/null +++ b/tutorials/13 - Merging Datasets on S3.ipynb @@ -0,0 +1,536 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "[![AWS Data Wrangler](_static/logo.png \"AWS Data Wrangler\")](https://github.com/awslabs/aws-data-wrangler)\n", + "\n", + "# 13 - Merging Datasets on S3\n", + "\n", + "Wrangler has 3 different copy modes to store Parquet Datasets on Amazon S3.\n", + "\n", + "- **append** (Default)\n", + "\n", + " Only adds new files without any delete.\n", + " \n", + "- **overwrite**\n", + "\n", + " Deletes everything in the target directory and then add new files.\n", + " \n", + "- **overwrite_partitions** (Partition Upsert)\n", + "\n", + " Only deletes the paths of partitions that should be updated and then writes the new partitions files. It's like a \"partition Upsert\"." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "from datetime import date\n", + "import awswrangler as wr\n", + "import pandas as pd" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Enter your bucket name:" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdin", + "output_type": "stream", + "text": [ + " ··········································\n" + ] + } + ], + "source": [ + "import getpass\n", + "bucket = getpass.getpass()\n", + "path1 = f\"s3://{bucket}/dataset1/\"\n", + "path2 = f\"s3://{bucket}/dataset2/\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Creating Dataset 1" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
idvaluedate
01foo2020-01-01
12boo2020-01-02
\n", + "
" + ], + "text/plain": [ + " id value date\n", + "0 1 foo 2020-01-01\n", + "1 2 boo 2020-01-02" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df = pd.DataFrame({\n", + " \"id\": [1, 2],\n", + " \"value\": [\"foo\", \"boo\"],\n", + " \"date\": [date(2020, 1, 1), date(2020, 1, 2)]\n", + "})\n", + "\n", + "wr.s3.to_parquet(\n", + " df=df,\n", + " path=path1,\n", + " dataset=True,\n", + " mode=\"overwrite\",\n", + " partition_cols=[\"date\"]\n", + ")\n", + "\n", + "wr.s3.read_parquet(path1, dataset=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Creating Dataset 2" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
idvaluedate
02xoo2020-01-02
13bar2020-01-03
\n", + "
" + ], + "text/plain": [ + " id value date\n", + "0 2 xoo 2020-01-02\n", + "1 3 bar 2020-01-03" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df = pd.DataFrame({\n", + " \"id\": [2, 3],\n", + " \"value\": [\"xoo\", \"bar\"],\n", + " \"date\": [date(2020, 1, 2), date(2020, 1, 3)]\n", + "})\n", + "\n", + "dataset2_files = wr.s3.to_parquet(\n", + " df=df,\n", + " path=path2,\n", + " dataset=True,\n", + " mode=\"overwrite\",\n", + " partition_cols=[\"date\"]\n", + ")[\"paths\"]\n", + "\n", + "wr.s3.read_parquet(path2, dataset=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Merging (Dataset 2 -> Dataset 1) (APPEND)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
idvaluedate
01foo2020-01-01
12xoo2020-01-02
22boo2020-01-02
33bar2020-01-03
\n", + "
" + ], + "text/plain": [ + " id value date\n", + "0 1 foo 2020-01-01\n", + "1 2 xoo 2020-01-02\n", + "2 2 boo 2020-01-02\n", + "3 3 bar 2020-01-03" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "wr.s3.merge_datasets(\n", + " source_path=path2,\n", + " target_path=path1,\n", + " mode=\"append\"\n", + ")\n", + "\n", + "wr.s3.read_parquet(path1, dataset=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Merging (Dataset 2 -> Dataset 1) (OVERWRITE_PARTITIONS)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
idvaluedate
01foo2020-01-01
12xoo2020-01-02
23bar2020-01-03
\n", + "
" + ], + "text/plain": [ + " id value date\n", + "0 1 foo 2020-01-01\n", + "1 2 xoo 2020-01-02\n", + "2 3 bar 2020-01-03" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "wr.s3.merge_datasets(\n", + " source_path=path2,\n", + " target_path=path1,\n", + " mode=\"overwrite_partitions\"\n", + ")\n", + "\n", + "wr.s3.read_parquet(path1, dataset=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Merging (Dataset 2 -> Dataset 1) (OVERWRITE)" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
idvaluedate
02xoo2020-01-02
13bar2020-01-03
\n", + "
" + ], + "text/plain": [ + " id value date\n", + "0 2 xoo 2020-01-02\n", + "1 3 bar 2020-01-03" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "wr.s3.merge_datasets(\n", + " source_path=path2,\n", + " target_path=path1,\n", + " mode=\"overwrite\"\n", + ")\n", + "\n", + "wr.s3.read_parquet(path1, dataset=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Cleaning Up" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "wr.s3.delete_objects(path1)\n", + "wr.s3.delete_objects(path2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "conda_python3", + "language": "python", + "name": "conda_python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.5" + }, + "pycharm": { + "stem_cell": { + "cell_type": "raw", + "metadata": { + "collapsed": false + }, + "source": [] + } + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}