From 21b7955f1e84ce5b5f56aaf744ff61671173bcb0 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Fri, 17 Apr 2020 14:01:05 -0300 Subject: [PATCH] Add append mode to create_*_table #188 --- awswrangler/catalog.py | 15 ++++++++------- testing/test_awswrangler/test_data_lake.py | 6 ++++++ 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/awswrangler/catalog.py b/awswrangler/catalog.py index 1ae597869..11eb3ad10 100644 --- a/awswrangler/catalog.py +++ b/awswrangler/catalog.py @@ -87,7 +87,7 @@ def create_parquet_table( table: str, path: str, columns_types: Dict[str, str], - partitions_types: Optional[Dict[str, str]], + partitions_types: Optional[Dict[str, str]] = None, compression: Optional[str] = None, description: Optional[str] = None, parameters: Optional[Dict[str, str]] = None, @@ -120,7 +120,7 @@ def create_parquet_table( columns_comments: Dict[str, str], optional Columns names and the related comments (e.g. {'col0': 'Column 0.', 'col1': 'Column 1.', 'col2': 'Partition.'}). mode: str - Only 'overwrite' available by now. + 'overwrite' to recreate any possible axisting table or 'append' to keep any possible axisting table. boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. @@ -859,7 +859,7 @@ def create_csv_table( table: str, path: str, columns_types: Dict[str, str], - partitions_types: Optional[Dict[str, str]], + partitions_types: Optional[Dict[str, str]] = None, compression: Optional[str] = None, description: Optional[str] = None, parameters: Optional[Dict[str, str]] = None, @@ -893,7 +893,7 @@ def create_csv_table( columns_comments: Dict[str, str], optional Columns names and the related comments (e.g. {'col0': 'Column 0.', 'col1': 'Column 1.', 'col2': 'Partition.'}). mode: str - Only 'overwrite' available by now. + 'overwrite' to recreate any possible axisting table or 'append' to keep any possible axisting table. sep : str String of length 1. Field delimiter for the output file. boto3_session : boto3.Session(), optional @@ -967,10 +967,11 @@ def _create_table( if name in columns_comments: par["Comment"] = columns_comments[name] session: boto3.Session = _utils.ensure_session(session=boto3_session) - if mode == "overwrite": + exist: bool = does_table_exist(database=database, table=table, boto3_session=session) + if (mode == "overwrite") or (exist is False): delete_table_if_exists(database=database, table=table, boto3_session=session) - client_glue: boto3.client = _utils.client(service_name="glue", session=session) - client_glue.create_table(DatabaseName=database, TableInput=table_input) + client_glue: boto3.client = _utils.client(service_name="glue", session=session) + client_glue.create_table(DatabaseName=database, TableInput=table_input) def _csv_table_definition( diff --git a/testing/test_awswrangler/test_data_lake.py b/testing/test_awswrangler/test_data_lake.py index d5fccd7b4..afa2a8307 100644 --- a/testing/test_awswrangler/test_data_lake.py +++ b/testing/test_awswrangler/test_data_lake.py @@ -443,6 +443,9 @@ def test_catalog(bucket, database): partitions_types={"y": "int", "m": "int"}, compression="snappy", ) + wr.catalog.create_parquet_table( + database=database, table="test_catalog", path=path, columns_types={"col0": "string"}, mode="append" + ) assert wr.catalog.does_table_exist(database=database, table="test_catalog") is True assert wr.catalog.delete_table_if_exists(database=database, table="test_catalog") is True assert wr.catalog.delete_table_if_exists(database=database, table="test_catalog") is False @@ -853,6 +856,9 @@ def test_athena_types(bucket, database): partitions_types=partitions_types, columns_types=columns_types, ) + wr.catalog.create_csv_table( + database=database, table="test_athena_types", path=path, columns_types={"col0": "string"}, mode="append" + ) wr.athena.repair_table("test_athena_types", database) assert len(wr.catalog.get_csv_partitions(database, "test_athena_types")) == 3 df2 = wr.athena.read_sql_table("test_athena_types", database)