From a1f62710e1b89672b2722f9f2505d65d0135ad27 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Wed, 1 Jan 2020 14:42:53 -0300 Subject: [PATCH 1/4] Adding glue catalog utilities --- awswrangler/glue.py | 129 +++++++++++- demo/getting_started.ipynb | 269 ++++++++++++++++++++++++++ testing/test_awswrangler/test_glue.py | 47 +++++ 3 files changed, 444 insertions(+), 1 deletion(-) diff --git a/awswrangler/glue.py b/awswrangler/glue.py index 8bb2da7ac..ddfd3856f 100644 --- a/awswrangler/glue.py +++ b/awswrangler/glue.py @@ -1,8 +1,11 @@ -from typing import Dict, Optional +from typing import Dict, Optional, Any, Iterator from math import ceil +from itertools import islice import re import logging +from pandas import DataFrame + from awswrangler import data_types from awswrangler.athena import Athena from awswrangler.exceptions import UnsupportedFileFormat, InvalidSerDe, ApiError, UnsupportedType, UndetectedType, InvalidTable, InvalidArguments @@ -390,3 +393,127 @@ def get_table_location(self, database: str, table: str): return res["Table"]["StorageDescriptor"]["Location"] except KeyError: raise InvalidTable(f"{database}.{table}") + + def get_databases(self, catalog_id: Optional[str] = None) -> Iterator[Dict[str, Any]]: + """ + Get an iterator of databases + + :param catalog_id: The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default. + :return: Iterator[Dict[str, Any]] of Databases + """ + paginator = self._client_glue.get_paginator("get_databases") + if catalog_id is None: + response_iterator = paginator.paginate() + else: + response_iterator = paginator.paginate(CatalogId=catalog_id) + for page in response_iterator: + for db in page["DatabaseList"]: + yield db + + def get_tables(self, catalog_id: Optional[str] = None, database: Optional[str] = None, search: Optional[str] = None, prefix: Optional[str] = None, suffix: Optional[str] = None) -> Iterator[Dict[str, Any]]: + """ + Get an iterator of tables + + :param catalog_id: The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default. + :param database: Filter a specific database + :param search: Select by a specific string on table name + :param prefix: Select by a specific prefix on table name + :param suffix: Select by a specific suffix on table name + :return: Iterator[Dict[str, Any]] of Tables + """ + paginator = self._client_glue.get_paginator("get_tables") + args: Dict[str, str] = {} + if catalog_id is not None: + args["CatalogId"] = catalog_id + if (prefix is not None) and (suffix is not None) and (search is not None): + args["Expression"] = f"{prefix}.*{search}.*{suffix}" + elif (prefix is not None) and (suffix is not None): + args["Expression"] = f"{prefix}.*{suffix}" + elif search is not None: + args["Expression"] = f".*{search}.*" + elif prefix is not None: + args["Expression"] = f"{prefix}.*" + elif suffix is not None: + args["Expression"] = f".*{suffix}" + if database is not None: + databases = [database] + else: + databases = [x["Name"] for x in self.get_databases(catalog_id=catalog_id)] + for db in databases: + args["DatabaseName"] = db + response_iterator = paginator.paginate(**args) + for page in response_iterator: + for tbl in page["TableList"]: + yield tbl + + def tables(self, limit: int = 100, catalog_id: Optional[str] = None, database: Optional[str] = None, search: Optional[str] = None, prefix: Optional[str] = None, suffix: Optional[str] = None) -> DataFrame: + table_iter = self.get_tables(catalog_id=catalog_id, database=database, search=search, prefix=prefix, suffix=suffix) + tables = islice(table_iter, limit) + df_dict = { + "Database": [], + "Table": [], + "Description": [], + "Columns": [], + "Partitions": [] + } + for table in tables: + df_dict["Database"].append(table["DatabaseName"]) + df_dict["Table"].append(table["Name"]) + if "Description" in table: + df_dict["Description"].append(table["Description"]) + else: + df_dict["Description"].append("") + df_dict["Columns"].append(", ".join([x["Name"] for x in table["StorageDescriptor"]["Columns"]])) + df_dict["Partitions"].append(", ".join([x["Name"] for x in table["PartitionKeys"]])) + return DataFrame(data=df_dict) + + def databases(self, limit: int = 100, catalog_id: Optional[str] = None) -> DataFrame: + database_iter = self.get_databases(catalog_id=catalog_id) + dbs = islice(database_iter, limit) + df_dict = { + "Database": [], + "Description": [] + } + for db in dbs: + df_dict["Database"].append(db["Name"]) + if "Description" in db: + df_dict["Description"].append(db["Description"]) + else: + df_dict["Description"].append("") + return DataFrame(data=df_dict) + + def table(self, database: str, name: str, catalog_id: Optional[str] = None) -> DataFrame: + if catalog_id is None: + table: Dict[str, Any] = self._client_glue.get_table( + DatabaseName=database, + Name=name + )["Table"] + else: + table = self._client_glue.get_table( + CatalogId=catalog_id, + DatabaseName=database, + Name=name + )["Table"] + df_dict = { + "Column Name": [], + "Type": [], + "Partition": [], + "Comment": [] + } + for col in table["StorageDescriptor"]["Columns"]: + df_dict["Column Name"].append(col["Name"]) + df_dict["Type"].append(col["Type"]) + df_dict["Partition"].append(False) + if "Comment" in table: + df_dict["Comment"].append(table["Comment"]) + else: + df_dict["Comment"].append("") + for col in table["PartitionKeys"]: + df_dict["Column Name"].append(col["Name"]) + df_dict["Type"].append(col["Type"]) + df_dict["Partition"].append(True) + if "Comment" in table: + df_dict["Comment"].append(table["Comment"]) + else: + df_dict["Comment"].append("") + return DataFrame(data=df_dict) diff --git a/demo/getting_started.ipynb b/demo/getting_started.ipynb index e53740434..9e3caa07f 100644 --- a/demo/getting_started.ipynb +++ b/demo/getting_started.ipynb @@ -28,6 +28,275 @@ "source": [ "wr.__version__" ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
DatabaseDescription
0awswrangler_testAWS Data Wrangler Test Arena - Glue Database
1default
\n", + "
" + ], + "text/plain": [ + " Database Description\n", + "0 awswrangler_test AWS Data Wrangler Test Arena - Glue Database\n", + "1 default " + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "wr.glue.databases()" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
DatabaseTableDescriptionColumnsPartitions
0awswrangler_testtest2col1, datecol
1awswrangler_testtest_move_objects_0id, valuename, date
2awswrangler_testtest_table_with_dotwith_spaces, with_dash, accent, with_dot, came...camel_case
3defaultnew_tabledispatching_base_num, pickup_date, locationid,...
4defaulttaxisdispatching_base_num, pickup_date, locationid,...year
5defaulttestid, name, value, date
6defaulttest2id, value, datename
7defaulttest3id, name, value, date
8defaulttest4id, value, datename
\n", + "
" + ], + "text/plain": [ + " Database Table Description \\\n", + "0 awswrangler_test test2 \n", + "1 awswrangler_test test_move_objects_0 \n", + "2 awswrangler_test test_table_with_dot \n", + "3 default new_table \n", + "4 default taxis \n", + "5 default test \n", + "6 default test2 \n", + "7 default test3 \n", + "8 default test4 \n", + "\n", + " Columns Partitions \n", + "0 col1, datecol \n", + "1 id, value name, date \n", + "2 with_spaces, with_dash, accent, with_dot, came... camel_case \n", + "3 dispatching_base_num, pickup_date, locationid,... \n", + "4 dispatching_base_num, pickup_date, locationid,... year \n", + "5 id, name, value, date \n", + "6 id, value, date name \n", + "7 id, name, value, date \n", + "8 id, value, date name " + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "wr.glue.tables()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
Column NameTypePartitionComment
0col1stringFalse
1datecoldateFalse
\n", + "
" + ], + "text/plain": [ + " Column Name Type Partition Comment\n", + "0 col1 string False \n", + "1 datecol date False " + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "wr.glue.table(database=\"awswrangler_test\", name=\"test2\")" + ] } ], "metadata": { diff --git a/testing/test_awswrangler/test_glue.py b/testing/test_awswrangler/test_glue.py index 2b13e44cb..c978de894 100644 --- a/testing/test_awswrangler/test_glue.py +++ b/testing/test_awswrangler/test_glue.py @@ -80,3 +80,50 @@ def test_get_table_python_types(session, database, table): assert ptypes["value"] == float assert ptypes["name"] == str assert ptypes["date"] == str + + +def test_get_databases(session, database): + dbs = list(session.glue.get_databases()) + assert len(dbs) > 0 + for db in dbs: + if db["Name"] == database: + assert db["Description"] == "AWS Data Wrangler Test Arena - Glue Database" + + +def test_get_tables(session, table): + tables = list(session.glue.get_tables()) + assert len(tables) > 0 + for tbl in tables: + if tbl["Name"] == table: + assert tbl["TableType"] == "EXTERNAL_TABLE" + + +def test_get_tables_database(session, database): + tables = list(session.glue.get_tables(database=database)) + assert len(tables) > 0 + for tbl in tables: + assert tbl["DatabaseName"] == database + + +def test_get_tables_search(session, table): + tables = list(session.glue.get_tables(search=table[1:-1])) + assert len(tables) > 0 + for tbl in tables: + if tbl["Name"] == table: + assert tbl["TableType"] == "EXTERNAL_TABLE" + + +def test_get_tables_prefix(session, table): + tables = list(session.glue.get_tables(prefix=table[:-1])) + assert len(tables) > 0 + for tbl in tables: + if tbl["Name"] == table: + assert tbl["TableType"] == "EXTERNAL_TABLE" + + +def test_get_tables_suffix(session, table): + tables = list(session.glue.get_tables(suffix=table[1:])) + assert len(tables) > 0 + for tbl in tables: + if tbl["Name"] == table: + assert tbl["TableType"] == "EXTERNAL_TABLE" From 4daad90d64c4d5709878a2466a5fb794e5596e67 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Wed, 1 Jan 2020 21:07:44 -0300 Subject: [PATCH 2/4] Documenting --- awswrangler/glue.py | 83 +++++++++++++++++---------- testing/test_awswrangler/test_glue.py | 6 ++ 2 files changed, 58 insertions(+), 31 deletions(-) diff --git a/awswrangler/glue.py b/awswrangler/glue.py index ddfd3856f..239702e80 100644 --- a/awswrangler/glue.py +++ b/awswrangler/glue.py @@ -1,10 +1,10 @@ -from typing import Dict, Optional, Any, Iterator +from typing import Dict, Optional, Any, Iterator, List from math import ceil from itertools import islice import re import logging -from pandas import DataFrame +from pandas import DataFrame # type: ignore from awswrangler import data_types from awswrangler.athena import Athena @@ -410,7 +410,12 @@ def get_databases(self, catalog_id: Optional[str] = None) -> Iterator[Dict[str, for db in page["DatabaseList"]: yield db - def get_tables(self, catalog_id: Optional[str] = None, database: Optional[str] = None, search: Optional[str] = None, prefix: Optional[str] = None, suffix: Optional[str] = None) -> Iterator[Dict[str, Any]]: + def get_tables(self, + catalog_id: Optional[str] = None, + database: Optional[str] = None, + search: Optional[str] = None, + prefix: Optional[str] = None, + suffix: Optional[str] = None) -> Iterator[Dict[str, Any]]: """ Get an iterator of tables @@ -446,16 +451,32 @@ def get_tables(self, catalog_id: Optional[str] = None, database: Optional[str] = for tbl in page["TableList"]: yield tbl - def tables(self, limit: int = 100, catalog_id: Optional[str] = None, database: Optional[str] = None, search: Optional[str] = None, prefix: Optional[str] = None, suffix: Optional[str] = None) -> DataFrame: - table_iter = self.get_tables(catalog_id=catalog_id, database=database, search=search, prefix=prefix, suffix=suffix) + def tables(self, + limit: int = 100, + catalog_id: Optional[str] = None, + database: Optional[str] = None, + search: Optional[str] = None, + prefix: Optional[str] = None, + suffix: Optional[str] = None) -> DataFrame: + """ + Get iterator of tables filtered by a search term, prefix, suffix. + + :param limit: Max number of tables + :param catalog_id: The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default. + :param database: Glue database name + :param search: Select only tables with the given string in the name. + :param prefix: Select only tables with the given string in the name prefix. + :param suffix: Select only tables with the given string in the name suffix. + + :return: Pandas Dataframe filled by formatted infos + """ + table_iter = self.get_tables(catalog_id=catalog_id, + database=database, + search=search, + prefix=prefix, + suffix=suffix) tables = islice(table_iter, limit) - df_dict = { - "Database": [], - "Table": [], - "Description": [], - "Columns": [], - "Partitions": [] - } + df_dict: Dict[str, List] = {"Database": [], "Table": [], "Description": [], "Columns": [], "Partitions": []} for table in tables: df_dict["Database"].append(table["DatabaseName"]) df_dict["Table"].append(table["Name"]) @@ -468,12 +489,16 @@ def tables(self, limit: int = 100, catalog_id: Optional[str] = None, database: O return DataFrame(data=df_dict) def databases(self, limit: int = 100, catalog_id: Optional[str] = None) -> DataFrame: + """ + Get iterator of databases. + + :param limit: Max number of tables + :param catalog_id: The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default. + :return: Pandas Dataframe filled by formatted infos + """ database_iter = self.get_databases(catalog_id=catalog_id) dbs = islice(database_iter, limit) - df_dict = { - "Database": [], - "Description": [] - } + df_dict: Dict[str, List] = {"Database": [], "Description": []} for db in dbs: df_dict["Database"].append(db["Name"]) if "Description" in db: @@ -483,23 +508,19 @@ def databases(self, limit: int = 100, catalog_id: Optional[str] = None) -> DataF return DataFrame(data=df_dict) def table(self, database: str, name: str, catalog_id: Optional[str] = None) -> DataFrame: + """ + Get table details as Pandas Dataframe + + :param database: Glue database name + :param name: Table name + :param catalog_id: The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default. + :return: Pandas Dataframe filled by formatted infos + """ if catalog_id is None: - table: Dict[str, Any] = self._client_glue.get_table( - DatabaseName=database, - Name=name - )["Table"] + table: Dict[str, Any] = self._client_glue.get_table(DatabaseName=database, Name=name)["Table"] else: - table = self._client_glue.get_table( - CatalogId=catalog_id, - DatabaseName=database, - Name=name - )["Table"] - df_dict = { - "Column Name": [], - "Type": [], - "Partition": [], - "Comment": [] - } + table = self._client_glue.get_table(CatalogId=catalog_id, DatabaseName=database, Name=name)["Table"] + df_dict: Dict[str, List] = {"Column Name": [], "Type": [], "Partition": [], "Comment": []} for col in table["StorageDescriptor"]["Columns"]: df_dict["Column Name"].append(col["Name"]) df_dict["Type"].append(col["Type"]) diff --git a/testing/test_awswrangler/test_glue.py b/testing/test_awswrangler/test_glue.py index c978de894..ff3f45dd0 100644 --- a/testing/test_awswrangler/test_glue.py +++ b/testing/test_awswrangler/test_glue.py @@ -127,3 +127,9 @@ def test_get_tables_suffix(session, table): for tbl in tables: if tbl["Name"] == table: assert tbl["TableType"] == "EXTERNAL_TABLE" + + +def test_glue_utils(session, database, table): + assert len(session.glue.databases().index) > 1 + assert len(session.glue.tables().index) > 1 + assert len(session.glue.table(database=database, name=table).index) > 1 From d7f1d197c8e4b8ea6b1c0691118af64e1a6a1371 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Wed, 1 Jan 2020 22:05:18 -0300 Subject: [PATCH 3/4] Add searching feature to Glue Catalog --- awswrangler/glue.py | 94 ++++++++++++++++++--------- testing/test_awswrangler/test_glue.py | 11 +++- 2 files changed, 74 insertions(+), 31 deletions(-) diff --git a/awswrangler/glue.py b/awswrangler/glue.py index 239702e80..e6558e9d1 100644 --- a/awswrangler/glue.py +++ b/awswrangler/glue.py @@ -413,33 +413,33 @@ def get_databases(self, catalog_id: Optional[str] = None) -> Iterator[Dict[str, def get_tables(self, catalog_id: Optional[str] = None, database: Optional[str] = None, - search: Optional[str] = None, - prefix: Optional[str] = None, - suffix: Optional[str] = None) -> Iterator[Dict[str, Any]]: + name_contains: Optional[str] = None, + name_prefix: Optional[str] = None, + name_suffix: Optional[str] = None) -> Iterator[Dict[str, Any]]: """ Get an iterator of tables :param catalog_id: The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default. :param database: Filter a specific database - :param search: Select by a specific string on table name - :param prefix: Select by a specific prefix on table name - :param suffix: Select by a specific suffix on table name + :param name_contains: Select by a specific string on table name + :param name_prefix: Select by a specific prefix on table name + :param name_suffix: Select by a specific suffix on table name :return: Iterator[Dict[str, Any]] of Tables """ paginator = self._client_glue.get_paginator("get_tables") args: Dict[str, str] = {} if catalog_id is not None: args["CatalogId"] = catalog_id - if (prefix is not None) and (suffix is not None) and (search is not None): - args["Expression"] = f"{prefix}.*{search}.*{suffix}" - elif (prefix is not None) and (suffix is not None): - args["Expression"] = f"{prefix}.*{suffix}" - elif search is not None: - args["Expression"] = f".*{search}.*" - elif prefix is not None: - args["Expression"] = f"{prefix}.*" - elif suffix is not None: - args["Expression"] = f".*{suffix}" + if (name_prefix is not None) and (name_suffix is not None) and (name_contains is not None): + args["Expression"] = f"{name_prefix}.*{name_contains}.*{name_suffix}" + elif (name_prefix is not None) and (name_suffix is not None): + args["Expression"] = f"{name_prefix}.*{name_suffix}" + elif name_contains is not None: + args["Expression"] = f".*{name_contains}.*" + elif name_prefix is not None: + args["Expression"] = f"{name_prefix}.*" + elif name_suffix is not None: + args["Expression"] = f".*{name_suffix}" if database is not None: databases = [database] else: @@ -455,27 +455,41 @@ def tables(self, limit: int = 100, catalog_id: Optional[str] = None, database: Optional[str] = None, - search: Optional[str] = None, - prefix: Optional[str] = None, - suffix: Optional[str] = None) -> DataFrame: + search_text: Optional[str] = None, + name_contains: Optional[str] = None, + name_prefix: Optional[str] = None, + name_suffix: Optional[str] = None) -> DataFrame: """ - Get iterator of tables filtered by a search term, prefix, suffix. + Get a Dataframe with tables filtered by a search term, prefix, suffix. :param limit: Max number of tables :param catalog_id: The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default. :param database: Glue database name - :param search: Select only tables with the given string in the name. - :param prefix: Select only tables with the given string in the name prefix. - :param suffix: Select only tables with the given string in the name suffix. - + :param search_text: Select only tables with the given string in table's properties + :param name_contains: Select by a specific string on table name + :param name_prefix: Select only tables with the given string in the name prefix + :param name_suffix: Select only tables with the given string in the name suffix :return: Pandas Dataframe filled by formatted infos """ - table_iter = self.get_tables(catalog_id=catalog_id, - database=database, - search=search, - prefix=prefix, - suffix=suffix) - tables = islice(table_iter, limit) + if search_text is None: + table_iter = self.get_tables(catalog_id=catalog_id, + database=database, + name_contains=name_contains, + name_prefix=name_prefix, + name_suffix=name_suffix) + tables: List[Dict[str, Any]] = list(islice(table_iter, limit)) + else: + tables = list(self.search_tables(text=search_text, catalog_id=catalog_id)) + if database is not None: + tables = [x for x in tables if x["DatabaseName"] == database] + if name_contains is not None: + tables = [x for x in tables if name_contains in x["Name"]] + if name_prefix is not None: + tables = [x for x in tables if x["Name"].startswith(name_prefix)] + if name_suffix is not None: + tables = [x for x in tables if x["Name"].endswith(name_suffix)] + tables = tables[:limit] + df_dict: Dict[str, List] = {"Database": [], "Table": [], "Description": [], "Columns": [], "Partitions": []} for table in tables: df_dict["Database"].append(table["DatabaseName"]) @@ -488,6 +502,26 @@ def tables(self, df_dict["Partitions"].append(", ".join([x["Name"] for x in table["PartitionKeys"]])) return DataFrame(data=df_dict) + def search_tables(self, text: str, catalog_id: Optional[str] = None): + """ + Get iterator of tables filtered by a search string. + + :param text: Select only tables with the given string in table's properties. + :param catalog_id: The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default. + :return: Iterator of tables + """ + args: Dict[str, Any] = {"SearchText": text} + if catalog_id is not None: + args["CatalogId"] = catalog_id + response = self._client_glue.search_tables(**args) + for tbl in response["TableList"]: + yield tbl + while "NextToken" in response: + args["NextToken"] = response["NextToken"] + response = self._client_glue.search_tables(**args) + for tbl in response["TableList"]: + yield tbl + def databases(self, limit: int = 100, catalog_id: Optional[str] = None) -> DataFrame: """ Get iterator of databases. diff --git a/testing/test_awswrangler/test_glue.py b/testing/test_awswrangler/test_glue.py index ff3f45dd0..fc5d85347 100644 --- a/testing/test_awswrangler/test_glue.py +++ b/testing/test_awswrangler/test_glue.py @@ -106,7 +106,7 @@ def test_get_tables_database(session, database): def test_get_tables_search(session, table): - tables = list(session.glue.get_tables(search=table[1:-1])) + tables = list(session.glue.search_tables(text="parquet")) assert len(tables) > 0 for tbl in tables: if tbl["Name"] == table: @@ -133,3 +133,12 @@ def test_glue_utils(session, database, table): assert len(session.glue.databases().index) > 1 assert len(session.glue.tables().index) > 1 assert len(session.glue.table(database=database, name=table).index) > 1 + + +def test_glue_tables_full(session, database, table): + assert len( + session.glue.tables(database=database, + search_text="parquet", + name_contains=table[1:-1], + name_prefix=table[0], + name_suffix=table[-1]).index) > 1 From 54823ad44bcf1f2bf861ef1a3fca47057d71b9bb Mon Sep 17 00:00:00 2001 From: igorborgest Date: Wed, 1 Jan 2020 22:59:30 -0300 Subject: [PATCH 4/4] Add Description, parameters and column's comments as args to Glue tables --- awswrangler/glue.py | 64 ++++++++++++++++++++++-- awswrangler/pandas.py | 65 +++++++++++++++++-------- awswrangler/spark.py | 13 ++++- testing/test_awswrangler/test_glue.py | 8 +-- testing/test_awswrangler/test_pandas.py | 37 ++++++++++++++ 5 files changed, 158 insertions(+), 29 deletions(-) diff --git a/awswrangler/glue.py b/awswrangler/glue.py index e6558e9d1..d77c9125f 100644 --- a/awswrangler/glue.py +++ b/awswrangler/glue.py @@ -58,7 +58,29 @@ def metadata_to_glue(self, mode="append", compression=None, cast_columns=None, - extra_args=None): + extra_args=None, + description: Optional[str] = None, + parameters: Optional[Dict[str, str]] = None, + columns_comments: Optional[Dict[str, str]] = None) -> None: + """ + + :param dataframe: Pandas Dataframe + :param objects_paths: Files paths on S3 + :param preserve_index: Should preserve index on S3? + :param partition_cols: partitions names + :param mode: "append", "overwrite", "overwrite_partitions" + :param cast_columns: Dictionary of columns names and Athena/Glue types to be casted. (E.g. {"col name": "bigint", "col2 name": "int"}) (Only for "parquet" file_format) + :param database: AWS Glue Database name + :param table: AWS Glue table name + :param path: AWS S3 path (E.g. s3://bucket-name/folder_name/ + :param file_format: "csv" or "parquet" + :param compression: None, gzip, snappy, etc + :param extra_args: Extra arguments specific for each file formats (E.g. "sep" for CSV) + :param description: Table description + :param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]]) + :param columns_comments: Columns names and the related comments (Optional[Dict[str, str]]) + :return: None + """ indexes_position = "left" if file_format == "csv" else "right" schema, partition_cols_schema = Glue._build_schema(dataframe=dataframe, partition_cols=partition_cols, @@ -78,7 +100,10 @@ def metadata_to_glue(self, path=path, file_format=file_format, compression=compression, - extra_args=extra_args) + extra_args=extra_args, + description=description, + parameters=parameters, + columns_comments=columns_comments) if partition_cols: partitions_tuples = Glue._parse_partitions_tuples(objects_paths=objects_paths, partition_cols=partition_cols) @@ -114,7 +139,26 @@ def create_table(self, file_format, compression, partition_cols_schema=None, - extra_args=None): + extra_args=None, + description: Optional[str] = None, + parameters: Optional[Dict[str, str]] = None, + columns_comments: Optional[Dict[str, str]] = None) -> None: + """ + Create Glue table (Catalog) + + :param database: AWS Glue Database name + :param table: AWS Glue table name + :param schema: Table schema + :param path: AWS S3 path (E.g. s3://bucket-name/folder_name/ + :param file_format: "csv" or "parquet" + :param compression: None, gzip, snappy, etc + :param partition_cols_schema: Partitions schema + :param extra_args: Extra arguments specific for each file formats (E.g. "sep" for CSV) + :param description: Table description + :param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]]) + :param columns_comments: Columns names and the related comments (Optional[Dict[str, str]]) + :return: None + """ if file_format == "parquet": table_input = Glue.parquet_table_definition(table, partition_cols_schema, schema, path, compression) elif file_format == "csv": @@ -126,6 +170,20 @@ def create_table(self, extra_args=extra_args) else: raise UnsupportedFileFormat(file_format) + if description is not None: + table_input["Description"] = description + if parameters is not None: + for k, v in parameters.items(): + table_input["Parameters"][k] = v + if columns_comments is not None: + for col in table_input["StorageDescriptor"]["Columns"]: + name = col["Name"] + if name in columns_comments: + col["Comment"] = columns_comments[name] + for par in table_input["PartitionKeys"]: + name = par["Name"] + if name in columns_comments: + par["Comment"] = columns_comments[name] self._client_glue.create_table(DatabaseName=database, TableInput=table_input) def add_partitions(self, database, table, partition_paths, file_format, compression, extra_args=None): diff --git a/awswrangler/pandas.py b/awswrangler/pandas.py index 6224d8c63..eda9569fd 100644 --- a/awswrangler/pandas.py +++ b/awswrangler/pandas.py @@ -627,21 +627,22 @@ def _apply_dates_to_generator(generator, parse_dates): df[col] = df[col].dt.date.replace(to_replace={pd.NaT: None}) yield df - def to_csv( - self, - dataframe, - path, - sep=",", - serde="OpenCSVSerDe", - database: Optional[str] = None, - table=None, - partition_cols=None, - preserve_index=True, - mode="append", - procs_cpu_bound=None, - procs_io_bound=None, - inplace=True, - ): + def to_csv(self, + dataframe, + path, + sep=",", + serde="OpenCSVSerDe", + database: Optional[str] = None, + table=None, + partition_cols=None, + preserve_index=True, + mode="append", + procs_cpu_bound=None, + procs_io_bound=None, + inplace=True, + description: Optional[str] = None, + parameters: Optional[Dict[str, str]] = None, + columns_comments: Optional[Dict[str, str]] = None): """ Write a Pandas Dataframe as CSV files on S3 Optionally writes metadata on AWS Glue. @@ -658,6 +659,9 @@ def to_csv( :param procs_cpu_bound: Number of cores used for CPU bound tasks :param procs_io_bound: Number of cores used for I/O bound tasks :param inplace: True is cheapest (CPU and Memory) but False leaves your DataFrame intact + :param description: Table description + :param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]]) + :param columns_comments: Columns names and the related comments (Optional[Dict[str, str]]) :return: List of objects written on S3 """ if serde not in Pandas.VALID_CSV_SERDES: @@ -675,7 +679,10 @@ def to_csv( procs_cpu_bound=procs_cpu_bound, procs_io_bound=procs_io_bound, extra_args=extra_args, - inplace=inplace) + inplace=inplace, + description=description, + parameters=parameters, + columns_comments=columns_comments) def to_parquet(self, dataframe, @@ -689,7 +696,10 @@ def to_parquet(self, procs_cpu_bound=None, procs_io_bound=None, cast_columns=None, - inplace=True): + inplace=True, + description: Optional[str] = None, + parameters: Optional[Dict[str, str]] = None, + columns_comments: Optional[Dict[str, str]] = None): """ Write a Pandas Dataframe as parquet files on S3 Optionally writes metadata on AWS Glue. @@ -706,6 +716,9 @@ def to_parquet(self, :param procs_io_bound: Number of cores used for I/O bound tasks :param cast_columns: Dictionary of columns names and Athena/Glue types to be casted (E.g. {"col name": "bigint", "col2 name": "int"}) :param inplace: True is cheapest (CPU and Memory) but False leaves your DataFrame intact + :param description: Table description + :param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]]) + :param columns_comments: Columns names and the related comments (Optional[Dict[str, str]]) :return: List of objects written on S3 """ return self.to_s3(dataframe=dataframe, @@ -720,7 +733,10 @@ def to_parquet(self, procs_cpu_bound=procs_cpu_bound, procs_io_bound=procs_io_bound, cast_columns=cast_columns, - inplace=inplace) + inplace=inplace, + description=description, + parameters=parameters, + columns_comments=columns_comments) def to_s3(self, dataframe: pd.DataFrame, @@ -736,7 +752,10 @@ def to_s3(self, procs_io_bound=None, cast_columns=None, extra_args=None, - inplace: bool = True) -> List[str]: + inplace: bool = True, + description: Optional[str] = None, + parameters: Optional[Dict[str, str]] = None, + columns_comments: Optional[Dict[str, str]] = None) -> List[str]: """ Write a Pandas Dataframe on S3 Optionally writes metadata on AWS Glue. @@ -755,6 +774,9 @@ def to_s3(self, :param cast_columns: Dictionary of columns names and Athena/Glue types to be casted. (E.g. {"col name": "bigint", "col2 name": "int"}) (Only for "parquet" file_format) :param extra_args: Extra arguments specific for each file formats (E.g. "sep" for CSV) :param inplace: True is cheapest (CPU and Memory) but False leaves your DataFrame intact + :param description: Table description + :param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]]) + :param columns_comments: Columns names and the related comments (Optional[Dict[str, str]]) :return: List of objects written on S3 """ if partition_cols is None: @@ -810,7 +832,10 @@ def to_s3(self, mode=mode, compression=compression, cast_columns=cast_columns, - extra_args=extra_args) + extra_args=extra_args, + description=description, + parameters=parameters, + columns_comments=columns_comments) return objects_paths def data_to_s3(self, diff --git a/awswrangler/spark.py b/awswrangler/spark.py index 4636634da..c98766722 100644 --- a/awswrangler/spark.py +++ b/awswrangler/spark.py @@ -164,7 +164,10 @@ def create_glue_table(self, sep=",", partition_by=None, load_partitions=True, - replace_if_exists=True): + replace_if_exists=True, + description: Optional[str] = None, + parameters: Optional[Dict[str, str]] = None, + columns_comments: Optional[Dict[str, str]] = None): """ Create a Glue metadata table pointing for some dataset stored on AWS S3. @@ -179,6 +182,9 @@ def create_glue_table(self, :param table: Glue table name. If not passed, extracted from the path :param load_partitions: Load partitions after the table creation :param replace_if_exists: Drop table and recreates that if already exists + :param description: Table description + :param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]]) + :param columns_comments: Columns names and the related comments (Optional[Dict[str, str]]) :return: None """ file_format = file_format.lower() @@ -210,7 +216,10 @@ def create_glue_table(self, path=path, file_format=file_format, compression=compression, - extra_args=extra_args) + extra_args=extra_args, + description=description, + parameters=parameters, + columns_comments=columns_comments) if load_partitions: self._session.athena.repair_table(database=database, table=table) diff --git a/testing/test_awswrangler/test_glue.py b/testing/test_awswrangler/test_glue.py index fc5d85347..17957db7d 100644 --- a/testing/test_awswrangler/test_glue.py +++ b/testing/test_awswrangler/test_glue.py @@ -130,9 +130,9 @@ def test_get_tables_suffix(session, table): def test_glue_utils(session, database, table): - assert len(session.glue.databases().index) > 1 - assert len(session.glue.tables().index) > 1 - assert len(session.glue.table(database=database, name=table).index) > 1 + assert len(session.glue.databases().index) > 0 + assert len(session.glue.tables().index) > 0 + assert len(session.glue.table(database=database, name=table).index) > 0 def test_glue_tables_full(session, database, table): @@ -141,4 +141,4 @@ def test_glue_tables_full(session, database, table): search_text="parquet", name_contains=table[1:-1], name_prefix=table[0], - name_suffix=table[-1]).index) > 1 + name_suffix=table[-1]).index) > 0 diff --git a/testing/test_awswrangler/test_pandas.py b/testing/test_awswrangler/test_pandas.py index f41605a89..381e80c5f 100644 --- a/testing/test_awswrangler/test_pandas.py +++ b/testing/test_awswrangler/test_pandas.py @@ -1777,3 +1777,40 @@ def test_read_csv_list_iterator(bucket, sample, row_num): total_count += count wr.s3.delete_listed_objects(objects_paths=paths) assert total_count == row_num * n + + +def test_to_csv_metadata( + session, + bucket, + database, +): + session.glue.delete_table_if_exists(table="test_to_csv_metadata", database=database) + assert len(session.glue.tables(database=database, search_text="boo bar").index) == 0 + dataframe = pd.read_csv("data_samples/nano.csv") + session.pandas.to_csv(dataframe=dataframe, + database=database, + path=f"s3://{bucket}/test_to_csv_metadata/", + preserve_index=False, + mode="overwrite", + sep="|", + description="foo boo bar", + parameters={ + "123": "345", + "678": "910" + }, + columns_comments={ + "name": "zoo", + "value": "zaa" + }) + dataframe2 = None + for counter in range(10): + sleep(1) + dataframe2 = session.pandas.read_sql_athena(ctas_approach=False, + sql="select * from test_to_csv_metadata", + database=database) + if len(dataframe.index) == len(dataframe2.index): + break + assert len(dataframe.index) == len(dataframe2.index) + assert len(list(dataframe.columns)) == len(list(dataframe2.columns)) + assert len(session.glue.tables(database=database, search_text="boo bar").index) == 1 + assert len(session.glue.tables(database=database, search_text="value").index) > 0