diff --git a/awswrangler/glue.py b/awswrangler/glue.py index 8bb2da7ac..d77c9125f 100644 --- a/awswrangler/glue.py +++ b/awswrangler/glue.py @@ -1,8 +1,11 @@ -from typing import Dict, Optional +from typing import Dict, Optional, Any, Iterator, List from math import ceil +from itertools import islice import re import logging +from pandas import DataFrame # type: ignore + from awswrangler import data_types from awswrangler.athena import Athena from awswrangler.exceptions import UnsupportedFileFormat, InvalidSerDe, ApiError, UnsupportedType, UndetectedType, InvalidTable, InvalidArguments @@ -55,7 +58,29 @@ def metadata_to_glue(self, mode="append", compression=None, cast_columns=None, - extra_args=None): + extra_args=None, + description: Optional[str] = None, + parameters: Optional[Dict[str, str]] = None, + columns_comments: Optional[Dict[str, str]] = None) -> None: + """ + + :param dataframe: Pandas Dataframe + :param objects_paths: Files paths on S3 + :param preserve_index: Should preserve index on S3? + :param partition_cols: partitions names + :param mode: "append", "overwrite", "overwrite_partitions" + :param cast_columns: Dictionary of columns names and Athena/Glue types to be casted. (E.g. {"col name": "bigint", "col2 name": "int"}) (Only for "parquet" file_format) + :param database: AWS Glue Database name + :param table: AWS Glue table name + :param path: AWS S3 path (E.g. s3://bucket-name/folder_name/ + :param file_format: "csv" or "parquet" + :param compression: None, gzip, snappy, etc + :param extra_args: Extra arguments specific for each file formats (E.g. "sep" for CSV) + :param description: Table description + :param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]]) + :param columns_comments: Columns names and the related comments (Optional[Dict[str, str]]) + :return: None + """ indexes_position = "left" if file_format == "csv" else "right" schema, partition_cols_schema = Glue._build_schema(dataframe=dataframe, partition_cols=partition_cols, @@ -75,7 +100,10 @@ def metadata_to_glue(self, path=path, file_format=file_format, compression=compression, - extra_args=extra_args) + extra_args=extra_args, + description=description, + parameters=parameters, + columns_comments=columns_comments) if partition_cols: partitions_tuples = Glue._parse_partitions_tuples(objects_paths=objects_paths, partition_cols=partition_cols) @@ -111,7 +139,26 @@ def create_table(self, file_format, compression, partition_cols_schema=None, - extra_args=None): + extra_args=None, + description: Optional[str] = None, + parameters: Optional[Dict[str, str]] = None, + columns_comments: Optional[Dict[str, str]] = None) -> None: + """ + Create Glue table (Catalog) + + :param database: AWS Glue Database name + :param table: AWS Glue table name + :param schema: Table schema + :param path: AWS S3 path (E.g. s3://bucket-name/folder_name/ + :param file_format: "csv" or "parquet" + :param compression: None, gzip, snappy, etc + :param partition_cols_schema: Partitions schema + :param extra_args: Extra arguments specific for each file formats (E.g. "sep" for CSV) + :param description: Table description + :param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]]) + :param columns_comments: Columns names and the related comments (Optional[Dict[str, str]]) + :return: None + """ if file_format == "parquet": table_input = Glue.parquet_table_definition(table, partition_cols_schema, schema, path, compression) elif file_format == "csv": @@ -123,6 +170,20 @@ def create_table(self, extra_args=extra_args) else: raise UnsupportedFileFormat(file_format) + if description is not None: + table_input["Description"] = description + if parameters is not None: + for k, v in parameters.items(): + table_input["Parameters"][k] = v + if columns_comments is not None: + for col in table_input["StorageDescriptor"]["Columns"]: + name = col["Name"] + if name in columns_comments: + col["Comment"] = columns_comments[name] + for par in table_input["PartitionKeys"]: + name = par["Name"] + if name in columns_comments: + par["Comment"] = columns_comments[name] self._client_glue.create_table(DatabaseName=database, TableInput=table_input) def add_partitions(self, database, table, partition_paths, file_format, compression, extra_args=None): @@ -390,3 +451,182 @@ def get_table_location(self, database: str, table: str): return res["Table"]["StorageDescriptor"]["Location"] except KeyError: raise InvalidTable(f"{database}.{table}") + + def get_databases(self, catalog_id: Optional[str] = None) -> Iterator[Dict[str, Any]]: + """ + Get an iterator of databases + + :param catalog_id: The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default. + :return: Iterator[Dict[str, Any]] of Databases + """ + paginator = self._client_glue.get_paginator("get_databases") + if catalog_id is None: + response_iterator = paginator.paginate() + else: + response_iterator = paginator.paginate(CatalogId=catalog_id) + for page in response_iterator: + for db in page["DatabaseList"]: + yield db + + def get_tables(self, + catalog_id: Optional[str] = None, + database: Optional[str] = None, + name_contains: Optional[str] = None, + name_prefix: Optional[str] = None, + name_suffix: Optional[str] = None) -> Iterator[Dict[str, Any]]: + """ + Get an iterator of tables + + :param catalog_id: The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default. + :param database: Filter a specific database + :param name_contains: Select by a specific string on table name + :param name_prefix: Select by a specific prefix on table name + :param name_suffix: Select by a specific suffix on table name + :return: Iterator[Dict[str, Any]] of Tables + """ + paginator = self._client_glue.get_paginator("get_tables") + args: Dict[str, str] = {} + if catalog_id is not None: + args["CatalogId"] = catalog_id + if (name_prefix is not None) and (name_suffix is not None) and (name_contains is not None): + args["Expression"] = f"{name_prefix}.*{name_contains}.*{name_suffix}" + elif (name_prefix is not None) and (name_suffix is not None): + args["Expression"] = f"{name_prefix}.*{name_suffix}" + elif name_contains is not None: + args["Expression"] = f".*{name_contains}.*" + elif name_prefix is not None: + args["Expression"] = f"{name_prefix}.*" + elif name_suffix is not None: + args["Expression"] = f".*{name_suffix}" + if database is not None: + databases = [database] + else: + databases = [x["Name"] for x in self.get_databases(catalog_id=catalog_id)] + for db in databases: + args["DatabaseName"] = db + response_iterator = paginator.paginate(**args) + for page in response_iterator: + for tbl in page["TableList"]: + yield tbl + + def tables(self, + limit: int = 100, + catalog_id: Optional[str] = None, + database: Optional[str] = None, + search_text: Optional[str] = None, + name_contains: Optional[str] = None, + name_prefix: Optional[str] = None, + name_suffix: Optional[str] = None) -> DataFrame: + """ + Get a Dataframe with tables filtered by a search term, prefix, suffix. + + :param limit: Max number of tables + :param catalog_id: The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default. + :param database: Glue database name + :param search_text: Select only tables with the given string in table's properties + :param name_contains: Select by a specific string on table name + :param name_prefix: Select only tables with the given string in the name prefix + :param name_suffix: Select only tables with the given string in the name suffix + :return: Pandas Dataframe filled by formatted infos + """ + if search_text is None: + table_iter = self.get_tables(catalog_id=catalog_id, + database=database, + name_contains=name_contains, + name_prefix=name_prefix, + name_suffix=name_suffix) + tables: List[Dict[str, Any]] = list(islice(table_iter, limit)) + else: + tables = list(self.search_tables(text=search_text, catalog_id=catalog_id)) + if database is not None: + tables = [x for x in tables if x["DatabaseName"] == database] + if name_contains is not None: + tables = [x for x in tables if name_contains in x["Name"]] + if name_prefix is not None: + tables = [x for x in tables if x["Name"].startswith(name_prefix)] + if name_suffix is not None: + tables = [x for x in tables if x["Name"].endswith(name_suffix)] + tables = tables[:limit] + + df_dict: Dict[str, List] = {"Database": [], "Table": [], "Description": [], "Columns": [], "Partitions": []} + for table in tables: + df_dict["Database"].append(table["DatabaseName"]) + df_dict["Table"].append(table["Name"]) + if "Description" in table: + df_dict["Description"].append(table["Description"]) + else: + df_dict["Description"].append("") + df_dict["Columns"].append(", ".join([x["Name"] for x in table["StorageDescriptor"]["Columns"]])) + df_dict["Partitions"].append(", ".join([x["Name"] for x in table["PartitionKeys"]])) + return DataFrame(data=df_dict) + + def search_tables(self, text: str, catalog_id: Optional[str] = None): + """ + Get iterator of tables filtered by a search string. + + :param text: Select only tables with the given string in table's properties. + :param catalog_id: The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default. + :return: Iterator of tables + """ + args: Dict[str, Any] = {"SearchText": text} + if catalog_id is not None: + args["CatalogId"] = catalog_id + response = self._client_glue.search_tables(**args) + for tbl in response["TableList"]: + yield tbl + while "NextToken" in response: + args["NextToken"] = response["NextToken"] + response = self._client_glue.search_tables(**args) + for tbl in response["TableList"]: + yield tbl + + def databases(self, limit: int = 100, catalog_id: Optional[str] = None) -> DataFrame: + """ + Get iterator of databases. + + :param limit: Max number of tables + :param catalog_id: The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default. + :return: Pandas Dataframe filled by formatted infos + """ + database_iter = self.get_databases(catalog_id=catalog_id) + dbs = islice(database_iter, limit) + df_dict: Dict[str, List] = {"Database": [], "Description": []} + for db in dbs: + df_dict["Database"].append(db["Name"]) + if "Description" in db: + df_dict["Description"].append(db["Description"]) + else: + df_dict["Description"].append("") + return DataFrame(data=df_dict) + + def table(self, database: str, name: str, catalog_id: Optional[str] = None) -> DataFrame: + """ + Get table details as Pandas Dataframe + + :param database: Glue database name + :param name: Table name + :param catalog_id: The ID of the Data Catalog from which to retrieve Databases. If none is provided, the AWS account ID is used by default. + :return: Pandas Dataframe filled by formatted infos + """ + if catalog_id is None: + table: Dict[str, Any] = self._client_glue.get_table(DatabaseName=database, Name=name)["Table"] + else: + table = self._client_glue.get_table(CatalogId=catalog_id, DatabaseName=database, Name=name)["Table"] + df_dict: Dict[str, List] = {"Column Name": [], "Type": [], "Partition": [], "Comment": []} + for col in table["StorageDescriptor"]["Columns"]: + df_dict["Column Name"].append(col["Name"]) + df_dict["Type"].append(col["Type"]) + df_dict["Partition"].append(False) + if "Comment" in table: + df_dict["Comment"].append(table["Comment"]) + else: + df_dict["Comment"].append("") + for col in table["PartitionKeys"]: + df_dict["Column Name"].append(col["Name"]) + df_dict["Type"].append(col["Type"]) + df_dict["Partition"].append(True) + if "Comment" in table: + df_dict["Comment"].append(table["Comment"]) + else: + df_dict["Comment"].append("") + return DataFrame(data=df_dict) diff --git a/awswrangler/pandas.py b/awswrangler/pandas.py index 6224d8c63..eda9569fd 100644 --- a/awswrangler/pandas.py +++ b/awswrangler/pandas.py @@ -627,21 +627,22 @@ def _apply_dates_to_generator(generator, parse_dates): df[col] = df[col].dt.date.replace(to_replace={pd.NaT: None}) yield df - def to_csv( - self, - dataframe, - path, - sep=",", - serde="OpenCSVSerDe", - database: Optional[str] = None, - table=None, - partition_cols=None, - preserve_index=True, - mode="append", - procs_cpu_bound=None, - procs_io_bound=None, - inplace=True, - ): + def to_csv(self, + dataframe, + path, + sep=",", + serde="OpenCSVSerDe", + database: Optional[str] = None, + table=None, + partition_cols=None, + preserve_index=True, + mode="append", + procs_cpu_bound=None, + procs_io_bound=None, + inplace=True, + description: Optional[str] = None, + parameters: Optional[Dict[str, str]] = None, + columns_comments: Optional[Dict[str, str]] = None): """ Write a Pandas Dataframe as CSV files on S3 Optionally writes metadata on AWS Glue. @@ -658,6 +659,9 @@ def to_csv( :param procs_cpu_bound: Number of cores used for CPU bound tasks :param procs_io_bound: Number of cores used for I/O bound tasks :param inplace: True is cheapest (CPU and Memory) but False leaves your DataFrame intact + :param description: Table description + :param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]]) + :param columns_comments: Columns names and the related comments (Optional[Dict[str, str]]) :return: List of objects written on S3 """ if serde not in Pandas.VALID_CSV_SERDES: @@ -675,7 +679,10 @@ def to_csv( procs_cpu_bound=procs_cpu_bound, procs_io_bound=procs_io_bound, extra_args=extra_args, - inplace=inplace) + inplace=inplace, + description=description, + parameters=parameters, + columns_comments=columns_comments) def to_parquet(self, dataframe, @@ -689,7 +696,10 @@ def to_parquet(self, procs_cpu_bound=None, procs_io_bound=None, cast_columns=None, - inplace=True): + inplace=True, + description: Optional[str] = None, + parameters: Optional[Dict[str, str]] = None, + columns_comments: Optional[Dict[str, str]] = None): """ Write a Pandas Dataframe as parquet files on S3 Optionally writes metadata on AWS Glue. @@ -706,6 +716,9 @@ def to_parquet(self, :param procs_io_bound: Number of cores used for I/O bound tasks :param cast_columns: Dictionary of columns names and Athena/Glue types to be casted (E.g. {"col name": "bigint", "col2 name": "int"}) :param inplace: True is cheapest (CPU and Memory) but False leaves your DataFrame intact + :param description: Table description + :param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]]) + :param columns_comments: Columns names and the related comments (Optional[Dict[str, str]]) :return: List of objects written on S3 """ return self.to_s3(dataframe=dataframe, @@ -720,7 +733,10 @@ def to_parquet(self, procs_cpu_bound=procs_cpu_bound, procs_io_bound=procs_io_bound, cast_columns=cast_columns, - inplace=inplace) + inplace=inplace, + description=description, + parameters=parameters, + columns_comments=columns_comments) def to_s3(self, dataframe: pd.DataFrame, @@ -736,7 +752,10 @@ def to_s3(self, procs_io_bound=None, cast_columns=None, extra_args=None, - inplace: bool = True) -> List[str]: + inplace: bool = True, + description: Optional[str] = None, + parameters: Optional[Dict[str, str]] = None, + columns_comments: Optional[Dict[str, str]] = None) -> List[str]: """ Write a Pandas Dataframe on S3 Optionally writes metadata on AWS Glue. @@ -755,6 +774,9 @@ def to_s3(self, :param cast_columns: Dictionary of columns names and Athena/Glue types to be casted. (E.g. {"col name": "bigint", "col2 name": "int"}) (Only for "parquet" file_format) :param extra_args: Extra arguments specific for each file formats (E.g. "sep" for CSV) :param inplace: True is cheapest (CPU and Memory) but False leaves your DataFrame intact + :param description: Table description + :param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]]) + :param columns_comments: Columns names and the related comments (Optional[Dict[str, str]]) :return: List of objects written on S3 """ if partition_cols is None: @@ -810,7 +832,10 @@ def to_s3(self, mode=mode, compression=compression, cast_columns=cast_columns, - extra_args=extra_args) + extra_args=extra_args, + description=description, + parameters=parameters, + columns_comments=columns_comments) return objects_paths def data_to_s3(self, diff --git a/awswrangler/spark.py b/awswrangler/spark.py index 4636634da..c98766722 100644 --- a/awswrangler/spark.py +++ b/awswrangler/spark.py @@ -164,7 +164,10 @@ def create_glue_table(self, sep=",", partition_by=None, load_partitions=True, - replace_if_exists=True): + replace_if_exists=True, + description: Optional[str] = None, + parameters: Optional[Dict[str, str]] = None, + columns_comments: Optional[Dict[str, str]] = None): """ Create a Glue metadata table pointing for some dataset stored on AWS S3. @@ -179,6 +182,9 @@ def create_glue_table(self, :param table: Glue table name. If not passed, extracted from the path :param load_partitions: Load partitions after the table creation :param replace_if_exists: Drop table and recreates that if already exists + :param description: Table description + :param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]]) + :param columns_comments: Columns names and the related comments (Optional[Dict[str, str]]) :return: None """ file_format = file_format.lower() @@ -210,7 +216,10 @@ def create_glue_table(self, path=path, file_format=file_format, compression=compression, - extra_args=extra_args) + extra_args=extra_args, + description=description, + parameters=parameters, + columns_comments=columns_comments) if load_partitions: self._session.athena.repair_table(database=database, table=table) diff --git a/demo/getting_started.ipynb b/demo/getting_started.ipynb index e53740434..9e3caa07f 100644 --- a/demo/getting_started.ipynb +++ b/demo/getting_started.ipynb @@ -28,6 +28,275 @@ "source": [ "wr.__version__" ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
DatabaseDescription
0awswrangler_testAWS Data Wrangler Test Arena - Glue Database
1default
\n", + "
" + ], + "text/plain": [ + " Database Description\n", + "0 awswrangler_test AWS Data Wrangler Test Arena - Glue Database\n", + "1 default " + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "wr.glue.databases()" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
DatabaseTableDescriptionColumnsPartitions
0awswrangler_testtest2col1, datecol
1awswrangler_testtest_move_objects_0id, valuename, date
2awswrangler_testtest_table_with_dotwith_spaces, with_dash, accent, with_dot, came...camel_case
3defaultnew_tabledispatching_base_num, pickup_date, locationid,...
4defaulttaxisdispatching_base_num, pickup_date, locationid,...year
5defaulttestid, name, value, date
6defaulttest2id, value, datename
7defaulttest3id, name, value, date
8defaulttest4id, value, datename
\n", + "
" + ], + "text/plain": [ + " Database Table Description \\\n", + "0 awswrangler_test test2 \n", + "1 awswrangler_test test_move_objects_0 \n", + "2 awswrangler_test test_table_with_dot \n", + "3 default new_table \n", + "4 default taxis \n", + "5 default test \n", + "6 default test2 \n", + "7 default test3 \n", + "8 default test4 \n", + "\n", + " Columns Partitions \n", + "0 col1, datecol \n", + "1 id, value name, date \n", + "2 with_spaces, with_dash, accent, with_dot, came... camel_case \n", + "3 dispatching_base_num, pickup_date, locationid,... \n", + "4 dispatching_base_num, pickup_date, locationid,... year \n", + "5 id, name, value, date \n", + "6 id, value, date name \n", + "7 id, name, value, date \n", + "8 id, value, date name " + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "wr.glue.tables()" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
Column NameTypePartitionComment
0col1stringFalse
1datecoldateFalse
\n", + "
" + ], + "text/plain": [ + " Column Name Type Partition Comment\n", + "0 col1 string False \n", + "1 datecol date False " + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "wr.glue.table(database=\"awswrangler_test\", name=\"test2\")" + ] } ], "metadata": { diff --git a/testing/test_awswrangler/test_glue.py b/testing/test_awswrangler/test_glue.py index 2b13e44cb..17957db7d 100644 --- a/testing/test_awswrangler/test_glue.py +++ b/testing/test_awswrangler/test_glue.py @@ -80,3 +80,65 @@ def test_get_table_python_types(session, database, table): assert ptypes["value"] == float assert ptypes["name"] == str assert ptypes["date"] == str + + +def test_get_databases(session, database): + dbs = list(session.glue.get_databases()) + assert len(dbs) > 0 + for db in dbs: + if db["Name"] == database: + assert db["Description"] == "AWS Data Wrangler Test Arena - Glue Database" + + +def test_get_tables(session, table): + tables = list(session.glue.get_tables()) + assert len(tables) > 0 + for tbl in tables: + if tbl["Name"] == table: + assert tbl["TableType"] == "EXTERNAL_TABLE" + + +def test_get_tables_database(session, database): + tables = list(session.glue.get_tables(database=database)) + assert len(tables) > 0 + for tbl in tables: + assert tbl["DatabaseName"] == database + + +def test_get_tables_search(session, table): + tables = list(session.glue.search_tables(text="parquet")) + assert len(tables) > 0 + for tbl in tables: + if tbl["Name"] == table: + assert tbl["TableType"] == "EXTERNAL_TABLE" + + +def test_get_tables_prefix(session, table): + tables = list(session.glue.get_tables(prefix=table[:-1])) + assert len(tables) > 0 + for tbl in tables: + if tbl["Name"] == table: + assert tbl["TableType"] == "EXTERNAL_TABLE" + + +def test_get_tables_suffix(session, table): + tables = list(session.glue.get_tables(suffix=table[1:])) + assert len(tables) > 0 + for tbl in tables: + if tbl["Name"] == table: + assert tbl["TableType"] == "EXTERNAL_TABLE" + + +def test_glue_utils(session, database, table): + assert len(session.glue.databases().index) > 0 + assert len(session.glue.tables().index) > 0 + assert len(session.glue.table(database=database, name=table).index) > 0 + + +def test_glue_tables_full(session, database, table): + assert len( + session.glue.tables(database=database, + search_text="parquet", + name_contains=table[1:-1], + name_prefix=table[0], + name_suffix=table[-1]).index) > 0 diff --git a/testing/test_awswrangler/test_pandas.py b/testing/test_awswrangler/test_pandas.py index f41605a89..381e80c5f 100644 --- a/testing/test_awswrangler/test_pandas.py +++ b/testing/test_awswrangler/test_pandas.py @@ -1777,3 +1777,40 @@ def test_read_csv_list_iterator(bucket, sample, row_num): total_count += count wr.s3.delete_listed_objects(objects_paths=paths) assert total_count == row_num * n + + +def test_to_csv_metadata( + session, + bucket, + database, +): + session.glue.delete_table_if_exists(table="test_to_csv_metadata", database=database) + assert len(session.glue.tables(database=database, search_text="boo bar").index) == 0 + dataframe = pd.read_csv("data_samples/nano.csv") + session.pandas.to_csv(dataframe=dataframe, + database=database, + path=f"s3://{bucket}/test_to_csv_metadata/", + preserve_index=False, + mode="overwrite", + sep="|", + description="foo boo bar", + parameters={ + "123": "345", + "678": "910" + }, + columns_comments={ + "name": "zoo", + "value": "zaa" + }) + dataframe2 = None + for counter in range(10): + sleep(1) + dataframe2 = session.pandas.read_sql_athena(ctas_approach=False, + sql="select * from test_to_csv_metadata", + database=database) + if len(dataframe.index) == len(dataframe2.index): + break + assert len(dataframe.index) == len(dataframe2.index) + assert len(list(dataframe.columns)) == len(list(dataframe2.columns)) + assert len(session.glue.tables(database=database, search_text="boo bar").index) == 1 + assert len(session.glue.tables(database=database, search_text="value").index) > 0