diff --git a/.github/workflows/static-checking.yml b/.github/workflows/static-checking.yml index a23a74d99..76291af88 100644 --- a/.github/workflows/static-checking.yml +++ b/.github/workflows/static-checking.yml @@ -25,7 +25,7 @@ jobs: with: python-version: ${{ matrix.python-version }} - name: Setup Environment - run: ./setup-dev-env.sh + run: ./requirements.sh - name: CloudFormation Lint run: cfn-lint -t testing/cloudformation.yaml - name: Documentation Lint diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index b41e294d7..336209ef2 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -82,7 +82,7 @@ We may ask you to sign a [Contributor License Agreement (CLA)](http://en.wikiped * Then run the command bellow to install all dependencies: -`./setup-dev-env.sh` +`./requirements.sh` * Go to the ``testing`` directory diff --git a/README.md b/README.md index 99771e475..16e17b916 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,9 @@ # AWS Data Wrangler *Pandas on AWS* ---- - -**NOTE** - -Due the new major version `1.0.0` with breaking changes, please make sure that all your old projects has dependencies frozen on the desired version (e.g. `pip install awswrangler==0.3.2`). You can always read the legacy docs [here](https://aws-data-wrangler.readthedocs.io/en/legacy/). - ---- - ![AWS Data Wrangler](docs/source/_static/logo2.png?raw=true "AWS Data Wrangler") -[![Release](https://img.shields.io/badge/release-1.1.2-brightgreen.svg)](https://pypi.org/project/awswrangler/) +[![Release](https://img.shields.io/badge/release-1.2.0-brightgreen.svg)](https://pypi.org/project/awswrangler/) [![Python Version](https://img.shields.io/badge/python-3.6%20%7C%203.7%20%7C%203.8-brightgreen.svg)](https://anaconda.org/conda-forge/awswrangler) [![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) @@ -84,6 +76,7 @@ df = wr.db.read_sql_query("SELECT * FROM external_schema.my_table", con=engine) - [11 - CSV Datasets](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/11%20-%20CSV%20Datasets.ipynb) - [12 - CSV Crawler](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/12%20-%20CSV%20Crawler.ipynb) - [13 - Merging Datasets on S3](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/13%20-%20Merging%20Datasets%20on%20S3.ipynb) + - [14 - Schema Evolution](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/14%20-%20Schema%20Evolution.ipynb) - [15 - EMR](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/15%20-%20EMR.ipynb) - [16 - EMR & Docker](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/16%20-%20EMR%20%26%20Docker.ipynb) - [**API Reference**](https://aws-data-wrangler.readthedocs.io/en/latest/api.html) @@ -95,3 +88,4 @@ df = wr.db.read_sql_query("SELECT * FROM external_schema.my_table", con=engine) - [CloudWatch Logs](https://aws-data-wrangler.readthedocs.io/en/latest/api.html#cloudwatch-logs) - [**License**](https://github.com/awslabs/aws-data-wrangler/blob/master/LICENSE) - [**Contributing**](https://github.com/awslabs/aws-data-wrangler/blob/master/CONTRIBUTING.md) +- [**Legacy Docs** (pre-1.0.0)](https://aws-data-wrangler.readthedocs.io/en/legacy/) diff --git a/awswrangler/__metadata__.py b/awswrangler/__metadata__.py index 8ec6474ad..4dd4a4921 100644 --- a/awswrangler/__metadata__.py +++ b/awswrangler/__metadata__.py @@ -7,5 +7,5 @@ __title__ = "awswrangler" __description__ = "Pandas on AWS." -__version__ = "1.1.2" +__version__ = "1.2.0" __license__ = "Apache License 2.0" diff --git a/awswrangler/_data_types.py b/awswrangler/_data_types.py index 819ce0f98..8cd84e7b8 100644 --- a/awswrangler/_data_types.py +++ b/awswrangler/_data_types.py @@ -376,25 +376,11 @@ def athena_types_from_pyarrow_schema( _logger.debug("columns_types: %s", columns_types) partitions_types: Optional[Dict[str, str]] = None if partitions is not None: - partitions_types = {p.name: pyarrow2athena(p.dictionary.type) for p in partitions} + partitions_types = {p.name: pyarrow2athena(p.dictionary.type) for p in partitions} # pragma: no cover _logger.debug("partitions_types: %s", partitions_types) return columns_types, partitions_types -def athena_partitions_from_pyarrow_partitions( - path: str, partitions: pyarrow.parquet.ParquetPartitions -) -> Dict[str, List[str]]: - """Extract the related Athena partitions values from any PyArrow Partitions.""" - path = path if path[-1] == "/" else f"{path}/" - partitions_values: Dict[str, List[str]] = {} - names: List[str] = [p.name for p in partitions] - for values in zip(*[p.keys for p in partitions]): - suffix: str = "/".join([f"{n}={v}" for n, v in zip(names, values)]) - suffix = suffix if suffix[-1] == "/" else f"{suffix}/" - partitions_values[f"{path}{suffix}"] = list(values) - return partitions_values - - def cast_pandas_with_athena_types(df: pd.DataFrame, dtype: Dict[str, str]) -> pd.DataFrame: """Cast columns in a Pandas DataFrame.""" for col, athena_type in dtype.items(): @@ -410,10 +396,25 @@ def cast_pandas_with_athena_types(df: pd.DataFrame, dtype: Dict[str, str]) -> pd df[col] = ( df[col] .astype("string") - .apply(lambda x: Decimal(str(x)) if str(x) not in ("", "none", " ", "") else None) + .apply(lambda x: Decimal(str(x)) if str(x) not in ("", "none", "None", " ", "") else None) ) + elif pandas_type == "string": + curr_type: str = str(df[col].dtypes) + if curr_type.startswith("int") or curr_type.startswith("float"): + df[col] = df[col].astype(str).astype("string") + else: + df[col] = df[col].astype("string") else: - df[col] = df[col].astype(pandas_type) + try: + df[col] = df[col].astype(pandas_type) + except TypeError as ex: + if "object cannot be converted to an IntegerDtype" not in str(ex): + raise ex # pragma: no cover + df[col] = ( + df[col] + .apply(lambda x: int(x) if str(x) not in ("", "none", "None", " ", "") else None) + .astype(pandas_type) + ) return df diff --git a/awswrangler/_utils.py b/awswrangler/_utils.py index 20c9f27ec..e4a6a16dd 100644 --- a/awswrangler/_utils.py +++ b/awswrangler/_utils.py @@ -3,6 +3,7 @@ import logging import math import os +import random from typing import Any, Dict, Generator, List, Optional, Tuple import boto3 # type: ignore @@ -11,14 +12,20 @@ import psycopg2 # type: ignore import s3fs # type: ignore -logger: logging.Logger = logging.getLogger(__name__) +from awswrangler import exceptions + +_logger: logging.Logger = logging.getLogger(__name__) def ensure_session(session: Optional[boto3.Session] = None) -> boto3.Session: """Ensure that a valid boto3.Session will be returned.""" if session is not None: return session - return boto3.Session() + # Ensure the boto3's default session is used so that its parameters can be + # set via boto3.setup_default_session() + if boto3.DEFAULT_SESSION is not None: + return boto3.DEFAULT_SESSION + return boto3.Session() # pragma: no cover def client(service_name: str, session: Optional[boto3.Session] = None) -> boto3.client: @@ -124,6 +131,8 @@ def chunkify(lst: List[Any], num_chunks: int = 1, max_length: Optional[int] = No [[0, 1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]] """ + if not lst: + return [] # pragma: no cover n: int = num_chunks if max_length is None else int(math.ceil((float(len(lst)) / float(max_length)))) np_chunks = np.array_split(lst, n) return [arr.tolist() for arr in np_chunks if len(arr) > 0] @@ -179,3 +188,54 @@ def get_region_from_subnet(subnet_id: str, boto3_session: Optional[boto3.Session session: boto3.Session = ensure_session(session=boto3_session) client_ec2: boto3.client = client(service_name="ec2", session=session) return client_ec2.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["AvailabilityZone"][:9] + + +def extract_partitions_from_paths( + path: str, paths: List[str] +) -> Tuple[Optional[Dict[str, str]], Optional[Dict[str, List[str]]]]: + """Extract partitions from Amazon S3 paths.""" + path = path if path.endswith("/") else f"{path}/" + partitions_types: Dict[str, str] = {} + partitions_values: Dict[str, List[str]] = {} + for p in paths: + if path not in p: + raise exceptions.InvalidArgumentValue( + f"Object {p} is not under the root path ({path})." + ) # pragma: no cover + path_wo_filename: str = p.rpartition("/")[0] + "/" + if path_wo_filename not in partitions_values: + path_wo_prefix: str = p.replace(f"{path}/", "") + dirs: List[str] = [x for x in path_wo_prefix.split("/") if (x != "") and ("=" in x)] + if dirs: + values_tups: List[Tuple[str, str]] = [tuple(x.split("=")[:2]) for x in dirs] # type: ignore + values_dics: Dict[str, str] = dict(values_tups) + p_values: List[str] = list(values_dics.values()) + p_types: Dict[str, str] = {x: "string" for x in values_dics.keys()} + if not partitions_types: + partitions_types = p_types + if p_values: + partitions_types = p_types + partitions_values[path_wo_filename] = p_values + elif p_types != partitions_types: # pragma: no cover + raise exceptions.InvalidSchemaConvergence( + f"At least two different partitions schema detected: {partitions_types} and {p_types}" + ) + if not partitions_types: + return None, None + return partitions_types, partitions_values + + +def list_sampling(lst: List[Any], sampling: float) -> List[Any]: + """Random List sampling.""" + if sampling > 1.0 or sampling <= 0.0: # pragma: no cover + raise exceptions.InvalidArgumentValue(f"Argument must be [0.0 < value <= 1.0]. {sampling} received.") + _len: int = len(lst) + if _len == 0: + return [] # pragma: no cover + num_samples: int = int(round(_len * sampling)) + num_samples = _len if num_samples > _len else num_samples + num_samples = 1 if num_samples < 1 else num_samples + _logger.debug("_len: %s", _len) + _logger.debug("sampling: %s", sampling) + _logger.debug("num_samples: %s", num_samples) + return random.sample(population=lst, k=num_samples) diff --git a/awswrangler/athena.py b/awswrangler/athena.py index 671dabd42..8708c8857 100644 --- a/awswrangler/athena.py +++ b/awswrangler/athena.py @@ -282,6 +282,8 @@ def repair_table( """ query = f"MSCK REPAIR TABLE `{table}`;" + if (database is not None) and (not database.startswith("`")): + database = f"`{database}`" session: boto3.Session = _utils.ensure_session(session=boto3_session) query_id = start_query_execution( sql=query, @@ -492,7 +494,7 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals,too-man path: str = f"{_s3_output}/{name}" ext_location: str = "\n" if wg_config["enforced"] is True else f",\n external_location = '{path}'\n" sql = ( - f"CREATE TABLE {name}\n" + f'CREATE TABLE "{name}"\n' f"WITH(\n" f" format = 'Parquet',\n" f" parquet_compression = 'SNAPPY'" @@ -512,7 +514,20 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals,too-man boto3_session=session, ) _logger.debug("query_id: %s", query_id) - query_response: Dict[str, Any] = wait_query(query_execution_id=query_id, boto3_session=session) + try: + query_response: Dict[str, Any] = wait_query(query_execution_id=query_id, boto3_session=session) + except exceptions.QueryFailed as ex: + if ctas_approach is True: + if "Column name not specified" in str(ex): + raise exceptions.InvalidArgumentValue( + "Please, define all columns names in your query. (E.g. 'SELECT MAX(col1) AS max_col1, ...')" + ) + if "Column type is unknown" in str(ex): + raise exceptions.InvalidArgumentValue( + "Please, define all columns types in your query. " + "(E.g. 'SELECT CAST(NULL AS INTEGER) AS MY_COL, ...')" + ) + raise ex # pragma: no cover if query_response["QueryExecution"]["Status"]["State"] in ["FAILED", "CANCELLED"]: # pragma: no cover reason: str = query_response["QueryExecution"]["Status"]["StateChangeReason"] message_error: str = f"Query error: {reason}" diff --git a/awswrangler/catalog.py b/awswrangler/catalog.py index 41ef24460..4a8d6b2d3 100644 --- a/awswrangler/catalog.py +++ b/awswrangler/catalog.py @@ -150,9 +150,33 @@ def create_parquet_table( """ table = sanitize_table_name(table=table) partitions_types = {} if partitions_types is None else partitions_types - table_input: Dict[str, Any] = _parquet_table_definition( - table=table, path=path, columns_types=columns_types, partitions_types=partitions_types, compression=compression - ) + session: boto3.Session = _utils.ensure_session(session=boto3_session) + cat_table_input: Optional[Dict[str, Any]] = _get_table_input(database=database, table=table, boto3_session=session) + table_input: Dict[str, Any] + if (cat_table_input is not None) and (mode in ("append", "overwrite_partitions")): + table_input = cat_table_input + updated: bool = False + cat_cols: Dict[str, str] = {x["Name"]: x["Type"] for x in table_input["StorageDescriptor"]["Columns"]} + for c, t in columns_types.items(): + if c not in cat_cols: + _logger.debug("New column %s with type %s.", c, t) + table_input["StorageDescriptor"]["Columns"].append({"Name": c, "Type": t}) + updated = True + elif t != cat_cols[c]: # Data type change detected! + raise exceptions.InvalidArgumentValue( + f"Data type change detected on column {c}. Old type: {cat_cols[c]}. New type {t}." + ) + if updated is True: + mode = "update" + else: + table_input = _parquet_table_definition( + table=table, + path=path, + columns_types=columns_types, + partitions_types=partitions_types, + compression=compression, + ) + table_exist: bool = cat_table_input is not None _create_table( database=database, table=table, @@ -161,8 +185,9 @@ def create_parquet_table( columns_comments=columns_comments, mode=mode, catalog_versioning=catalog_versioning, - boto3_session=boto3_session, + boto3_session=session, table_input=table_input, + table_exist=table_exist, ) @@ -240,11 +265,12 @@ def add_parquet_partitions( ... ) """ - inputs: List[Dict[str, Any]] = [ - _parquet_partition_definition(location=k, values=v, compression=compression) - for k, v in partitions_values.items() - ] - _add_partitions(database=database, table=table, boto3_session=boto3_session, inputs=inputs) + if partitions_values: + inputs: List[Dict[str, Any]] = [ + _parquet_partition_definition(location=k, values=v, compression=compression) + for k, v in partitions_values.items() + ] + _add_partitions(database=database, table=table, boto3_session=boto3_session, inputs=inputs) def _parquet_partition_definition(location: str, values: List[str], compression: Optional[str]) -> Dict[str, Any]: @@ -265,7 +291,9 @@ def _parquet_partition_definition(location: str, values: List[str], compression: } -def get_table_types(database: str, table: str, boto3_session: Optional[boto3.Session] = None) -> Dict[str, str]: +def get_table_types( + database: str, table: str, boto3_session: Optional[boto3.Session] = None +) -> Optional[Dict[str, str]]: """Get all columns and types from a table. Parameters @@ -279,8 +307,8 @@ def get_table_types(database: str, table: str, boto3_session: Optional[boto3.Ses Returns ------- - Dict[str, str] - A dictionary as {'col name': 'col data type'}. + Optional[Dict[str, str]] + If table exists, a dictionary like {'col name': 'col data type'}. Otherwise None. Examples -------- @@ -290,7 +318,10 @@ def get_table_types(database: str, table: str, boto3_session: Optional[boto3.Ses """ client_glue: boto3.client = _utils.client(service_name="glue", session=boto3_session) - response: Dict[str, Any] = client_glue.get_table(DatabaseName=database, Name=table) + try: + response: Dict[str, Any] = client_glue.get_table(DatabaseName=database, Name=table) + except client_glue.exceptions.EntityNotFoundException: + return None dtypes: Dict[str, str] = {} for col in response["Table"]["StorageDescriptor"]["Columns"]: dtypes[col["Name"]] = col["Type"] @@ -639,7 +670,7 @@ def table( def _sanitize_name(name: str) -> str: name = "".join(c for c in unicodedata.normalize("NFD", name) if unicodedata.category(c) != "Mn") # strip accents - name = re.sub("[^A-Za-z0-9_]+", "_", name) # Removing non alphanumeric characters + name = re.sub("[^A-Za-z0-9_]+", "_", name) # Replacing non alphanumeric characters by underscore return re.sub("([a-z0-9])([A-Z])", r"\1_\2", name).lower() # Converting CamelCase to snake_case @@ -937,6 +968,7 @@ def create_csv_table( compression=compression, sep=sep, ) + session: boto3.Session = _utils.ensure_session(session=boto3_session) _create_table( database=database, table=table, @@ -945,8 +977,9 @@ def create_csv_table( columns_comments=columns_comments, mode=mode, catalog_versioning=catalog_versioning, - boto3_session=boto3_session, + boto3_session=session, table_input=table_input, + table_exist=does_table_exist(database=database, table=table, boto3_session=session), ) @@ -960,6 +993,7 @@ def _create_table( catalog_versioning: bool, boto3_session: Optional[boto3.Session], table_input: Dict[str, Any], + table_exist: bool, ): if description is not None: table_input["Description"] = description @@ -977,13 +1011,12 @@ def _create_table( par["Comment"] = columns_comments[name] session: boto3.Session = _utils.ensure_session(session=boto3_session) client_glue: boto3.client = _utils.client(service_name="glue", session=session) - exist: bool = does_table_exist(database=database, table=table, boto3_session=session) - if mode not in ("overwrite", "append", "overwrite_partitions"): # pragma: no cover + skip_archive: bool = not catalog_versioning + if mode not in ("overwrite", "append", "overwrite_partitions", "update"): # pragma: no cover raise exceptions.InvalidArgument( f"{mode} is not a valid mode. It must be 'overwrite', 'append' or 'overwrite_partitions'." ) - if (exist is True) and (mode == "overwrite"): - skip_archive: bool = not catalog_versioning + if (table_exist is True) and (mode == "overwrite"): partitions_values: List[List[str]] = list( _get_partitions(database=database, table=table, boto3_session=session).values() ) @@ -991,9 +1024,12 @@ def _create_table( DatabaseName=database, TableName=table, PartitionsToDelete=[{"Values": v} for v in partitions_values] ) client_glue.update_table(DatabaseName=database, TableInput=table_input, SkipArchive=skip_archive) - elif (exist is True) and (mode in ("append", "overwrite_partitions")) and (parameters is not None): - upsert_table_parameters(parameters=parameters, database=database, table=table, boto3_session=session) - elif exist is False: + elif (table_exist is True) and (mode in ("append", "overwrite_partitions", "update")): + if parameters is not None: + upsert_table_parameters(parameters=parameters, database=database, table=table, boto3_session=session) + if mode == "update": + client_glue.update_table(DatabaseName=database, TableInput=table_input, SkipArchive=skip_archive) + elif table_exist is False: client_glue.create_table(DatabaseName=database, TableInput=table_input) @@ -1378,6 +1414,88 @@ def get_table_parameters( return parameters +def get_table_description( + database: str, table: str, catalog_id: Optional[str] = None, boto3_session: Optional[boto3.Session] = None +) -> str: + """Get table description. + + Parameters + ---------- + database : str + Database name. + table : str + Table name. + catalog_id : str, optional + The ID of the Data Catalog from which to retrieve Databases. + If none is provided, the AWS account ID is used by default. + boto3_session : boto3.Session(), optional + Boto3 Session. The default boto3 session will be used if boto3_session receive None. + + Returns + ------- + str + Description. + + Examples + -------- + >>> import awswrangler as wr + >>> desc = wr.catalog.get_table_description(database="...", table="...") + + """ + client_glue: boto3.client = _utils.client(service_name="glue", session=boto3_session) + args: Dict[str, str] = {} + if catalog_id is not None: + args["CatalogId"] = catalog_id # pragma: no cover + args["DatabaseName"] = database + args["Name"] = table + response: Dict[str, Any] = client_glue.get_table(**args) + desc: str = response["Table"]["Description"] + return desc + + +def get_columns_comments( + database: str, table: str, catalog_id: Optional[str] = None, boto3_session: Optional[boto3.Session] = None +) -> Dict[str, str]: + """Get all columns comments. + + Parameters + ---------- + database : str + Database name. + table : str + Table name. + catalog_id : str, optional + The ID of the Data Catalog from which to retrieve Databases. + If none is provided, the AWS account ID is used by default. + boto3_session : boto3.Session(), optional + Boto3 Session. The default boto3 session will be used if boto3_session receive None. + + Returns + ------- + Dict[str, str] + Columns comments. e.g. {"col1": "foo boo bar"}. + + Examples + -------- + >>> import awswrangler as wr + >>> pars = wr.catalog.get_table_parameters(database="...", table="...") + + """ + client_glue: boto3.client = _utils.client(service_name="glue", session=boto3_session) + args: Dict[str, str] = {} + if catalog_id is not None: + args["CatalogId"] = catalog_id # pragma: no cover + args["DatabaseName"] = database + args["Name"] = table + response: Dict[str, Any] = client_glue.get_table(**args) + comments: Dict[str, str] = {} + for c in response["Table"]["StorageDescriptor"]["Columns"]: + comments[c["Name"]] = c["Comment"] + for p in response["Table"]["PartitionKeys"]: + comments[p["Name"]] = p["Comment"] + return comments + + def upsert_table_parameters( parameters: Dict[str, str], database: str, @@ -1464,14 +1582,36 @@ def overwrite_table_parameters( ... table="...") """ + session: boto3.Session = _utils.ensure_session(session=boto3_session) + table_input: Optional[Dict[str, Any]] = _get_table_input( + database=database, table=table, catalog_id=catalog_id, boto3_session=session + ) + if table_input is None: + raise exceptions.InvalidTable(f"Table {table} does not exist on database {database}.") + table_input["Parameters"] = parameters + args2: Dict[str, Union[str, Dict[str, Any]]] = {} + if catalog_id is not None: + args2["CatalogId"] = catalog_id # pragma: no cover + args2["DatabaseName"] = database + args2["TableInput"] = table_input + client_glue: boto3.client = _utils.client(service_name="glue", session=session) + client_glue.update_table(**args2) + return parameters + + +def _get_table_input( + database: str, table: str, boto3_session: Optional[boto3.Session], catalog_id: Optional[str] = None +) -> Optional[Dict[str, str]]: client_glue: boto3.client = _utils.client(service_name="glue", session=boto3_session) args: Dict[str, str] = {} if catalog_id is not None: args["CatalogId"] = catalog_id # pragma: no cover args["DatabaseName"] = database args["Name"] = table - response: Dict[str, Any] = client_glue.get_table(**args) - response["Table"]["Parameters"] = parameters + try: + response: Dict[str, Any] = client_glue.get_table(**args) + except client_glue.exceptions.EntityNotFoundException: + return None if "DatabaseName" in response["Table"]: del response["Table"]["DatabaseName"] if "CreateTime" in response["Table"]: @@ -1482,10 +1622,4 @@ def overwrite_table_parameters( del response["Table"]["CreatedBy"] if "IsRegisteredWithLakeFormation" in response["Table"]: del response["Table"]["IsRegisteredWithLakeFormation"] - args2: Dict[str, Union[str, Dict[str, Any]]] = {} - if catalog_id is not None: - args2["CatalogId"] = catalog_id # pragma: no cover - args2["DatabaseName"] = database - args2["TableInput"] = response["Table"] - client_glue.update_table(**args2) - return parameters + return response["Table"] diff --git a/awswrangler/exceptions.py b/awswrangler/exceptions.py index b7ad39237..c552d8a5d 100644 --- a/awswrangler/exceptions.py +++ b/awswrangler/exceptions.py @@ -83,3 +83,7 @@ class InvalidRedshiftSortkey(Exception): class InvalidRedshiftPrimaryKeys(Exception): """InvalidRedshiftPrimaryKeys exception.""" + + +class InvalidSchemaConvergence(Exception): + """InvalidSchemaMerge exception.""" diff --git a/awswrangler/s3.py b/awswrangler/s3.py index a6de341b4..b68a1e433 100644 --- a/awswrangler/s3.py +++ b/awswrangler/s3.py @@ -289,7 +289,14 @@ def _split_paths_by_bucket(paths: List[str]) -> Dict[str, List[str]]: def _delete_objects(bucket: str, keys: List[str], client_s3: boto3.client) -> None: _logger.debug("len(keys): %s", len(keys)) batch: List[Dict[str, str]] = [{"Key": key} for key in keys] - client_s3.delete_objects(Bucket=bucket, Delete={"Objects": batch}) + res = client_s3.delete_objects(Bucket=bucket, Delete={"Objects": batch}) + deleted = res.get("Deleted") + if deleted is not None: + for i in deleted: + _logger.debug("s3://%s/%s has been deleted.", bucket, i.get("Key")) + errors = res.get("Errors") + if errors is not None: # pragma: no cover + raise exceptions.ServiceApiError(errors) def describe_objects( @@ -620,11 +627,18 @@ def to_csv( # pylint: disable=too-many-arguments ) if df.empty is True: raise exceptions.EmptyDataFrame() - session: boto3.Session = _utils.ensure_session(session=boto3_session) + + # Sanitize table to respect Athena's standards partition_cols = partition_cols if partition_cols else [] dtype = dtype if dtype else {} columns_comments = columns_comments if columns_comments else {} partitions_values: Dict[str, List[str]] = {} + df = catalog.sanitize_dataframe_columns_names(df=df) + partition_cols = [catalog.sanitize_column_name(p) for p in partition_cols] + dtype = {catalog.sanitize_column_name(k): v.lower() for k, v in dtype.items()} + columns_comments = {catalog.sanitize_column_name(k): v for k, v in columns_comments.items()} + + session: boto3.Session = _utils.ensure_session(session=boto3_session) fs: s3fs.S3FileSystem = _utils.get_fs(session=session, s3_additional_kwargs=s3_additional_kwargs) if dataset is False: if partition_cols: @@ -646,14 +660,14 @@ def to_csv( # pylint: disable=too-many-arguments mode = "append" if mode is None else mode if columns: df = df[columns] - if (database is not None) and (table is not None): # Normalize table to respect Athena's standards - df = catalog.sanitize_dataframe_columns_names(df=df) - partition_cols = [catalog.sanitize_column_name(p) for p in partition_cols] - dtype = {catalog.sanitize_column_name(k): v.lower() for k, v in dtype.items()} - columns_comments = {catalog.sanitize_column_name(k): v for k, v in columns_comments.items()} - exist: bool = catalog.does_table_exist(database=database, table=table, boto3_session=session) - if (exist is True) and (mode in ("append", "overwrite_partitions")): - for k, v in catalog.get_table_types(database=database, table=table, boto3_session=session).items(): + if ( + (mode in ("append", "overwrite_partitions")) and (database is not None) and (table is not None) + ): # Fetching Catalog Types + catalog_types: Optional[Dict[str, str]] = catalog.get_table_types( + database=database, table=table, boto3_session=session + ) + if catalog_types is not None: + for k, v in catalog_types.items(): dtype[k] = v df = catalog.drop_duplicated_columns(df=df) paths, partitions_values = _to_csv_dataset( @@ -1036,17 +1050,29 @@ def to_parquet( # pylint: disable=too-many-arguments ) if df.empty is True: raise exceptions.EmptyDataFrame() - session: boto3.Session = _utils.ensure_session(session=boto3_session) + + # Sanitize table to respect Athena's standards partition_cols = partition_cols if partition_cols else [] dtype = dtype if dtype else {} columns_comments = columns_comments if columns_comments else {} partitions_values: Dict[str, List[str]] = {} + df = catalog.sanitize_dataframe_columns_names(df=df) + partition_cols = [catalog.sanitize_column_name(p) for p in partition_cols] + dtype = {catalog.sanitize_column_name(k): v.lower() for k, v in dtype.items()} + columns_comments = {catalog.sanitize_column_name(k): v for k, v in columns_comments.items()} + df = catalog.drop_duplicated_columns(df=df) + + session: boto3.Session = _utils.ensure_session(session=boto3_session) cpus: int = _utils.ensure_cpu_count(use_threads=use_threads) fs: s3fs.S3FileSystem = _utils.get_fs(session=session, s3_additional_kwargs=s3_additional_kwargs) compression_ext: Optional[str] = _COMPRESSION_2_EXT.get(compression, None) if compression_ext is None: raise exceptions.InvalidCompression(f"{compression} is invalid, please use None, snappy or gzip.") if dataset is False: + if path.endswith("/"): # pragma: no cover + raise exceptions.InvalidArgumentValue( + "If , the argument should be a object path, not a directory." + ) if partition_cols: raise exceptions.InvalidArgumentCombination("Please, pass dataset=True to be able to use partition_cols.") if mode is not None: @@ -1059,21 +1085,20 @@ def to_parquet( # pylint: disable=too-many-arguments ) paths = [ _to_parquet_file( - df=df, path=path, schema=None, index=index, compression=compression, cpus=cpus, fs=fs, dtype={} + df=df, path=path, schema=None, index=index, compression=compression, cpus=cpus, fs=fs, dtype=dtype ) ] else: mode = "append" if mode is None else mode - if (database is not None) and (table is not None): # Normalize table to respect Athena's standards - df = catalog.sanitize_dataframe_columns_names(df=df) - partition_cols = [catalog.sanitize_column_name(p) for p in partition_cols] - dtype = {catalog.sanitize_column_name(k): v.lower() for k, v in dtype.items()} - columns_comments = {catalog.sanitize_column_name(k): v for k, v in columns_comments.items()} - exist: bool = catalog.does_table_exist(database=database, table=table, boto3_session=session) - if (exist is True) and (mode in ("append", "overwrite_partitions")): - for k, v in catalog.get_table_types(database=database, table=table, boto3_session=session).items(): + if ( + (mode in ("append", "overwrite_partitions")) and (database is not None) and (table is not None) + ): # Fetching Catalog Types + catalog_types: Optional[Dict[str, str]] = catalog.get_table_types( + database=database, table=table, boto3_session=session + ) + if catalog_types is not None: + for k, v in catalog_types.items(): dtype[k] = v - df = catalog.drop_duplicated_columns(df=df) paths, partitions_values = _to_parquet_dataset( df=df, path=path, @@ -1467,25 +1492,26 @@ def _read_text( ) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]: if "iterator" in pandas_kwargs: raise exceptions.InvalidArgument("Please, use chunksize instead of iterator.") - paths: List[str] = _path2list(path=path, boto3_session=boto3_session) + session: boto3.Session = _utils.ensure_session(session=boto3_session) + paths: List[str] = _path2list(path=path, boto3_session=session) _logger.debug("paths:\n%s", paths) if chunksize is not None: dfs: Iterator[pd.DataFrame] = _read_text_chunksize( parser_func=parser_func, paths=paths, - boto3_session=boto3_session, + boto3_session=session, chunksize=chunksize, pandas_args=pandas_kwargs, s3_additional_kwargs=s3_additional_kwargs, ) return dfs - if use_threads is False: + if (use_threads is False) or (boto3_session is not None): df: pd.DataFrame = pd.concat( objs=[ _read_text_full( parser_func=parser_func, path=p, - boto3_session=boto3_session, + boto3_session=session, pandas_args=pandas_kwargs, s3_additional_kwargs=s3_additional_kwargs, ) @@ -1502,7 +1528,7 @@ def _read_text( _read_text_full, repeat(parser_func), paths, - repeat(boto3_session), + repeat(None), # Boto3.Session repeat(pandas_kwargs), repeat(s3_additional_kwargs), ), @@ -1525,7 +1551,8 @@ def _read_text_chunksize( _logger.debug("path: %s", path) if pandas_args.get("compression", "infer") == "infer": pandas_args["compression"] = infer_compression(path, compression="infer") - with fs.open(path, "rb") as f: + mode: str = "r" if pandas_args.get("compression") is None else "rb" + with fs.open(path, mode) as f: reader: pandas.io.parsers.TextFileReader = parser_func(f, chunksize=chunksize, **pandas_args) for df in reader: yield df @@ -1541,7 +1568,8 @@ def _read_text_full( fs: s3fs.S3FileSystem = _utils.get_fs(session=boto3_session, s3_additional_kwargs=s3_additional_kwargs) if pandas_args.get("compression", "infer") == "infer": pandas_args["compression"] = infer_compression(path, compression="infer") - with fs.open(path, "rb") as f: + mode: str = "r" if pandas_args.get("compression") is None else "rb" + with fs.open(path, mode) as f: return parser_func(f, **pandas_args) @@ -1785,7 +1813,8 @@ def _table2df(table: pa.Table, categories: List[str] = None, use_threads: bool = def read_parquet_metadata( path: Union[str, List[str]], - filters: Optional[Union[List[Tuple], List[List[Tuple]]]] = None, + dtype: Optional[Dict[str, str]] = None, + sampling: float = 1.0, dataset: bool = False, use_threads: bool = True, boto3_session: Optional[boto3.Session] = None, @@ -1803,8 +1832,15 @@ def read_parquet_metadata( ---------- path : Union[str, List[str]] S3 prefix (e.g. s3://bucket/prefix) or list of S3 objects paths (e.g. [s3://bucket/key0, s3://bucket/key1]). - filters: Union[List[Tuple], List[List[Tuple]]], optional - List of filters to apply, like ``[[('x', '=', 0), ...], ...]``. + dtype : Dict[str, str], optional + Dictionary of columns names and Athena/Glue types to be casted. + Useful when you have columns with undetermined data types as partitions columns. + (e.g. {'col name': 'bigint', 'col2 name': 'int'}) + sampling : float + Random sample ratio of files that will have the metadata inspected. + Must be `0.0 < sampling <= 1.0`. + The higher, the more accurate. + The lower, the faster. dataset: bool If True read a parquet dataset instead of simple file(s) loading all the related partitions as columns. use_threads : bool @@ -1837,19 +1873,75 @@ def read_parquet_metadata( ... ]) """ + return _read_parquet_metadata( + path=path, dtype=dtype, sampling=sampling, dataset=dataset, use_threads=use_threads, boto3_session=boto3_session + )[:2] + + +def _read_parquet_metadata( + path: Union[str, List[str]], + dtype: Optional[Dict[str, str]], + sampling: float, + dataset: bool, + use_threads: bool, + boto3_session: Optional[boto3.Session], +) -> Tuple[Dict[str, str], Optional[Dict[str, str]], Optional[Dict[str, List[str]]]]: + session: boto3.Session = _utils.ensure_session(session=boto3_session) + if dataset is True: + if isinstance(path, str): + _path: Optional[str] = path if path.endswith("/") else f"{path}/" + paths: List[str] = _path2list(path=_path, boto3_session=session) + else: # pragma: no cover + raise exceptions.InvalidArgumentType("Argument must be str if dataset=True.") + else: + if isinstance(path, str): + _path = None + paths = _path2list(path=path, boto3_session=session) + elif isinstance(path, list): + _path = None + paths = path + else: # pragma: no cover + raise exceptions.InvalidArgumentType(f"Argument path must be str or List[str] instead of {type(path)}.") + schemas: List[Dict[str, str]] = [ + _read_parquet_metadata_file(path=x, use_threads=use_threads, boto3_session=session) + for x in _utils.list_sampling(lst=paths, sampling=sampling) + ] + _logger.debug("schemas: %s", schemas) + columns_types: Dict[str, str] = {} + for schema in schemas: + for column, _dtype in schema.items(): + if (column in columns_types) and (columns_types[column] != _dtype): # pragma: no cover + raise exceptions.InvalidSchemaConvergence( + f"Was detect at least 2 different types in column {column} ({columns_types[column]} and {dtype})." + ) + columns_types[column] = _dtype + partitions_types: Optional[Dict[str, str]] = None + partitions_values: Optional[Dict[str, List[str]]] = None + if (dataset is True) and (_path is not None): + partitions_types, partitions_values = _utils.extract_partitions_from_paths(path=_path, paths=paths) + if dtype: + for k, v in dtype.items(): + if columns_types and k in columns_types: + columns_types[k] = v + if partitions_types and k in partitions_types: + partitions_types[k] = v + _logger.debug("columns_types: %s", columns_types) + return columns_types, partitions_types, partitions_values + + +def _read_parquet_metadata_file(path: str, use_threads: bool, boto3_session: boto3.Session) -> Dict[str, str]: data: pyarrow.parquet.ParquetDataset = _read_parquet_init( - path=path, filters=filters, dataset=dataset, use_threads=use_threads, boto3_session=boto3_session - ) - return _data_types.athena_types_from_pyarrow_schema( - schema=data.schema.to_arrow_schema(), partitions=data.partitions + path=path, filters=None, dataset=False, use_threads=use_threads, boto3_session=boto3_session ) + return _data_types.athena_types_from_pyarrow_schema(schema=data.schema.to_arrow_schema(), partitions=None)[0] def store_parquet_metadata( path: str, database: str, table: str, - filters: Optional[Union[List[Tuple], List[List[Tuple]]]] = None, + dtype: Optional[Dict[str, str]] = None, + sampling: float = 1.0, dataset: bool = False, use_threads: bool = True, description: Optional[str] = None, @@ -1885,8 +1977,15 @@ def store_parquet_metadata( Glue/Athena catalog: Database name. table : str Glue/Athena catalog: Table name. - filters: Union[List[Tuple], List[List[Tuple]]], optional - List of filters to apply, like ``[[('x', '=', 0), ...], ...]``. + dtype : Dict[str, str], optional + Dictionary of columns names and Athena/Glue types to be casted. + Useful when you have columns with undetermined data types as partitions columns. + (e.g. {'col name': 'bigint', 'col2 name': 'int'}) + sampling : float + Random sample ratio of files that will have the metadata inspected. + Must be `0.0 < sampling <= 1.0`. + The higher, the more accurate. + The lower, the faster. dataset: bool If True read a parquet dataset instead of simple file(s) loading all the related partitions as columns. use_threads : bool @@ -1933,13 +2032,15 @@ def store_parquet_metadata( """ session: boto3.Session = _utils.ensure_session(session=boto3_session) - data: pyarrow.parquet.ParquetDataset = _read_parquet_init( - path=path, filters=filters, dataset=dataset, use_threads=use_threads, boto3_session=session - ) - partitions: Optional[pyarrow.parquet.ParquetPartitions] = data.partitions - columns_types, partitions_types = _data_types.athena_types_from_pyarrow_schema( - schema=data.schema.to_arrow_schema(), partitions=partitions + columns_types: Dict[str, str] + partitions_types: Optional[Dict[str, str]] + partitions_values: Optional[Dict[str, List[str]]] + columns_types, partitions_types, partitions_values = _read_parquet_metadata( + path=path, dtype=dtype, sampling=sampling, dataset=dataset, use_threads=use_threads, boto3_session=session ) + _logger.debug("columns_types: %s", columns_types) + _logger.debug("partitions_types: %s", partitions_types) + _logger.debug("partitions_values: %s", partitions_values) catalog.create_parquet_table( database=database, table=table, @@ -1953,16 +2054,14 @@ def store_parquet_metadata( catalog_versioning=catalog_versioning, boto3_session=session, ) - partitions_values: Dict[str, List[str]] = _data_types.athena_partitions_from_pyarrow_partitions( - path=path, partitions=partitions - ) - catalog.add_parquet_partitions( - database=database, - table=table, - partitions_values=partitions_values, - compression=compression, - boto3_session=session, - ) + if (partitions_types is not None) and (partitions_values is not None): + catalog.add_parquet_partitions( + database=database, + table=table, + partitions_values=partitions_values, + compression=compression, + boto3_session=session, + ) return columns_types, partitions_types, partitions_values diff --git a/building/lambda/build-lambda-layer.sh b/building/lambda/build-lambda-layer.sh index c9b642fd0..7cb7ad06b 100644 --- a/building/lambda/build-lambda-layer.sh +++ b/building/lambda/build-lambda-layer.sh @@ -14,7 +14,7 @@ export ARROW_HOME=$(pwd)/dist export LD_LIBRARY_PATH=$(pwd)/dist/lib:$LD_LIBRARY_PATH git clone \ - --branch apache-arrow-0.17.0 \ + --branch apache-arrow-0.17.1 \ --single-branch \ https://github.com/apache/arrow.git diff --git a/docs/source/api.rst b/docs/source/api.rst index 7f36e5aa5..5cd8e9e3c 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -1,5 +1,3 @@ -.. note:: Due the new major version 1.0.0 with breaking changes, please make sure that all your old projects has dependencies frozen on the desired version (e.g. `pip install awswrangler==0.3.2`). You can always check the legacy docs `here `_. - API Reference ============= @@ -11,18 +9,20 @@ Amazon S3 .. autosummary:: :toctree: stubs + copy_objects delete_objects describe_objects does_object_exist get_bucket_region - list_objects list_directories + list_objects + merge_datasets read_csv read_fwf read_json read_parquet - read_parquet_table read_parquet_metadata + read_parquet_table size_objects store_parquet_metadata to_csv @@ -30,8 +30,6 @@ Amazon S3 to_parquet wait_objects_exist wait_objects_not_exist - copy_objects - merge_datasets AWS Glue Catalog ---------------- @@ -41,31 +39,33 @@ AWS Glue Catalog .. autosummary:: :toctree: stubs - add_parquet_partitions - create_parquet_table add_csv_partitions + add_parquet_partitions create_csv_table + create_parquet_table databases delete_table_if_exists does_table_exist + drop_duplicated_columns + extract_athena_types + get_columns_comments + get_csv_partitions get_databases + get_engine get_parquet_partitions - get_csv_partitions + get_table_description get_table_location + get_table_parameters get_table_types get_tables - search_tables - table - tables + overwrite_table_parameters sanitize_column_name sanitize_dataframe_columns_names sanitize_table_name - drop_duplicated_columns - get_engine - extract_athena_types - get_table_parameters + search_tables + table + tables upsert_table_parameters - overwrite_table_parameters Amazon Athena ------------- @@ -75,15 +75,15 @@ Amazon Athena .. autosummary:: :toctree: stubs + create_athena_bucket + get_query_columns_types + get_work_group read_sql_query read_sql_table repair_table start_query_execution stop_query_execution wait_query - create_athena_bucket - get_query_columns_types - get_work_group Databases (Redshift, PostgreSQL, MySQL) --------------------------------------- @@ -93,13 +93,13 @@ Databases (Redshift, PostgreSQL, MySQL) .. autosummary:: :toctree: stubs - to_sql - read_sql_query - read_sql_table + copy_files_to_redshift + copy_to_redshift get_engine get_redshift_temp_engine - copy_to_redshift - copy_files_to_redshift + read_sql_query + read_sql_table + to_sql unload_redshift unload_redshift_to_files write_redshift_copy_manifest @@ -112,16 +112,16 @@ EMR .. autosummary:: :toctree: stubs + build_spark_step + build_step create_cluster get_cluster_state - terminate_cluster - submit_step - submit_spark_step + get_step_state submit_ecr_credentials_refresh + submit_spark_step + submit_step submit_steps - build_step - build_spark_step - get_step_state + terminate_cluster CloudWatch Logs --------------- diff --git a/docs/source/index.rst b/docs/source/index.rst index a20e727be..6c0380007 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -1,5 +1,3 @@ -.. note:: Due the new major version 1.0.0 with breaking changes, please make sure that all your old projects has dependencies frozen on the desired version (e.g. `pip install awswrangler==0.3.2`). You can always check the legacy docs `here `_. - Quick Start ----------- @@ -45,3 +43,4 @@ Read The Docs api License Contributing + Legacy Docs (pre-1.0.0) diff --git a/docs/source/install.rst b/docs/source/install.rst index 951171f2b..7cf8ce973 100644 --- a/docs/source/install.rst +++ b/docs/source/install.rst @@ -1,5 +1,3 @@ -.. note:: Due the new major version 1.0.0 with breaking changes, please make sure that all your old projects has dependencies frozen on the desired version (e.g. `pip install awswrangler==0.3.2`). You can always check the legacy docs `here `_. - Install ======= @@ -44,7 +42,7 @@ AWS Glue Wheel 2 - Upload the wheel file to any Amazon S3 location. -3 - Got to your Glue Python Shell job and point to the new file on s3. +3 - Go to your Glue Python Shell job and point to the new file on S3. Amazon SageMaker Notebook ------------------------- diff --git a/docs/source/what.rst b/docs/source/what.rst index 52d7d8194..0a169b74d 100644 --- a/docs/source/what.rst +++ b/docs/source/what.rst @@ -1,5 +1,3 @@ -.. note:: Due the new major version 1.0.0 with breaking changes, please make sure that all your old projects has dependencies frozen on the desired version (e.g. `pip install awswrangler==0.3.2`). You can always check the legacy docs `here `_. - What is AWS Data Wrangler? ========================== diff --git a/requirements-dev.txt b/requirements-dev.txt index 19ad71a62..27e89bc97 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,7 +1,7 @@ awscli>=1.18.0,<2.0.0 black~=19.3b0 pylint~=2.5.2 -flake8~=3.7.9 +flake8~=3.8.1 mypy~=0.770 isort~=4.3.21 pydocstyle~=5.0.2 @@ -10,8 +10,9 @@ tox~=3.15.0 pytest~=5.4.2 pytest-cov~=2.8.1 pytest-xdist~=1.32.0 -scikit-learn~=0.22.1 -cfn-lint~=0.30.1 +pytest-timeout~=1.3.4 +scikit-learn~=0.23.1 +cfn-lint~=0.32.0 cfn-flip~=1.2.3 twine~=3.1.1 wheel~=0.34.2 diff --git a/setup-dev-env.sh b/requirements.sh similarity index 100% rename from setup-dev-env.sh rename to requirements.sh diff --git a/requirements.txt b/requirements.txt index 40fe89901..3e7192b4f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,7 @@ botocore>=1.15.0,<2.0.0 numpy~=1.18.0 pandas~=1.0.0 pyarrow~=0.17.0 -s3fs~=0.4.0 +s3fs~=0.4.2 psycopg2-binary~=2.8.0 pymysql~=0.9.0 sqlalchemy-redshift~=0.7.0 diff --git a/testing/cloudformation.yaml b/testing/cloudformation.yaml index b62062112..70f5bb279 100644 --- a/testing/cloudformation.yaml +++ b/testing/cloudformation.yaml @@ -1,6 +1,5 @@ AWSTemplateFormatVersion: 2010-09-09 -Description: | - AWS Data Wrangler Development Infrastructure +Description: AWS Data Wrangler Development Infrastructure Parameters: DatabasesPassword: Type: String @@ -10,17 +9,17 @@ Resources: VPC: Type: AWS::EC2::VPC Properties: + Tags: + - Key: Env + Value: aws-data-wrangler CidrBlock: 10.19.224.0/19 EnableDnsSupport: true EnableDnsHostnames: true - Tags: - - Key: Name - Value: aws-data-wrangler InternetGateway: Type: AWS::EC2::InternetGateway Properties: Tags: - - Key: Name + - Key: Env Value: aws-data-wrangler InternetGatewayAttachment: Type: AWS::EC2::VPCGatewayAttachment @@ -32,6 +31,9 @@ Resources: PublicSubnet1: Type: AWS::EC2::Subnet Properties: + Tags: + - Key: Env + Value: aws-data-wrangler VpcId: Ref: VPC AvailabilityZone: @@ -40,12 +42,12 @@ Resources: - Fn::GetAZs: '' CidrBlock: 10.19.229.0/24 MapPublicIpOnLaunch: true - Tags: - - Key: Name - Value: aws-data-wrangler Public Subnet (AZ1) PublicSubnet2: Type: AWS::EC2::Subnet Properties: + Tags: + - Key: Env + Value: aws-data-wrangler VpcId: Ref: VPC AvailabilityZone: @@ -54,17 +56,14 @@ Resources: - Fn::GetAZs: '' CidrBlock: 10.19.230.0/24 MapPublicIpOnLaunch: true - Tags: - - Key: Name - Value: aws-data-wrangler Public Subnet (AZ2) PublicRouteTable: Type: AWS::EC2::RouteTable Properties: + Tags: + - Key: Env + Value: aws-data-wrangler VpcId: Ref: VPC - Tags: - - Key: Name - Value: aws-data-wrangler Public Routes DefaultPublicRoute: Type: AWS::EC2::Route DependsOn: InternetGatewayAttachment @@ -97,6 +96,9 @@ Resources: KmsKey: Type: AWS::KMS::Key Properties: + Tags: + - Key: Env + Value: aws-data-wrangler Description: Aws Data Wrangler Test Key. KeyPolicy: Version: '2012-10-17' @@ -112,7 +114,8 @@ Resources: - Sid: Allow administration of the key Effect: Allow Principal: - AWS: '*' + AWS: + Ref: AWS::AccountId Action: - kms:Create* - kms:Describe* @@ -130,6 +133,9 @@ Resources: Bucket: Type: AWS::S3::Bucket Properties: + Tags: + - Key: Env + Value: aws-data-wrangler PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true @@ -146,6 +152,9 @@ Resources: RedshiftRole: Type: AWS::IAM::Role Properties: + Tags: + - Key: Env + Value: aws-data-wrangler AssumeRolePolicyDocument: Version: 2012-10-17 Statement: @@ -205,12 +214,18 @@ Resources: RedshiftSubnetGroup: Type: AWS::Redshift::ClusterSubnetGroup Properties: + Tags: + - Key: Env + Value: aws-data-wrangler Description: AWS Data Wrangler Test Arena - Redshift Subnet Group SubnetIds: - Ref: PublicSubnet1 DatabaseSecurityGroup: Type: AWS::EC2::SecurityGroup Properties: + Tags: + - Key: Env + Value: aws-data-wrangler VpcId: Ref: VPC GroupDescription: AWS Data Wrangler Test Arena - Redshift security group @@ -228,6 +243,9 @@ Resources: Redshift: Type: AWS::Redshift::Cluster Properties: + Tags: + - Key: Env + Value: aws-data-wrangler DBName: test MasterUsername: test MasterUserPassword: @@ -264,6 +282,9 @@ Resources: RdsSubnetGroup: Type: AWS::RDS::DBSubnetGroup Properties: + Tags: + - Key: Env + Value: aws-data-wrangler DBSubnetGroupDescription: RDS Database Subnet Group SubnetIds: - Ref: PublicSubnet1 @@ -271,6 +292,9 @@ Resources: AuroraRole: Type: AWS::IAM::Role Properties: + Tags: + - Key: Env + Value: aws-data-wrangler AssumeRolePolicyDocument: Version: 2012-10-17 Statement: @@ -297,13 +321,20 @@ Resources: PostgresqlParameterGroup: Type: AWS::RDS::DBClusterParameterGroup Properties: + Tags: + - Key: Env + Value: aws-data-wrangler Description: Postgres 11 Family: aurora-postgresql11 Parameters: apg_plan_mgmt.capture_plan_baselines: 'off' AuroraClusterPostgresql: Type: AWS::RDS::DBCluster + DeletionPolicy: Delete Properties: + Tags: + - Key: Env + Value: aws-data-wrangler Engine: aurora-postgresql EngineVersion: '11.6' DBClusterIdentifier: postgresql-cluster-wrangler @@ -325,7 +356,11 @@ Resources: - Arn AuroraInstancePostgresql: Type: AWS::RDS::DBInstance + DeletionPolicy: Delete Properties: + Tags: + - Key: Env + Value: aws-data-wrangler Engine: aurora-postgresql EngineVersion: '11.6' DBInstanceIdentifier: postgresql-instance-wrangler @@ -338,6 +373,9 @@ Resources: MysqlParameterGroup: Type: AWS::RDS::DBClusterParameterGroup Properties: + Tags: + - Key: Env + Value: aws-data-wrangler Description: Mysql 5.7 Family: aurora-mysql5.7 Parameters: @@ -355,7 +393,11 @@ Resources: - Arn AuroraClusterMysql: Type: AWS::RDS::DBCluster + DeletionPolicy: Delete Properties: + Tags: + - Key: Env + Value: aws-data-wrangler Engine: aurora-mysql EngineVersion: '5.7' DBClusterIdentifier: mysql-cluster-wrangler @@ -377,7 +419,11 @@ Resources: - Arn AuroraInstanceMysql: Type: AWS::RDS::DBInstance + DeletionPolicy: Delete Properties: + Tags: + - Key: Env + Value: aws-data-wrangler Engine: aurora-mysql EngineVersion: '5.7' DBInstanceIdentifier: mysql-instance-wrangler @@ -459,21 +505,6 @@ Resources: PASSWORD: Ref: DatabasesPassword Name: aws-data-wrangler-mysql - DynamoDBTable: - Type: AWS::DynamoDB::Table - Properties: - AttributeDefinitions: - - AttributeName: attr_hash - AttributeType: S - - AttributeName: attr_range - AttributeType: S - KeySchema: - - AttributeName: attr_hash - KeyType: HASH - - AttributeName: attr_range - KeyType: RANGE - BillingMode: PAY_PER_REQUEST - TableName: aws-data-wrangler Outputs: BucketName: Value: @@ -542,12 +573,6 @@ Outputs: - AuroraInstanceMysql - Endpoint.Address Description: Mysql Address - DynamoDbTableARN: - Value: - Fn::GetAtt: - - DynamoDBTable - - Arn - Description: DynamoDB table name Region: Value: Ref: AWS::Region diff --git a/testing/test_awswrangler/_utils.py b/testing/test_awswrangler/_utils.py index b4c210cd1..40481c689 100644 --- a/testing/test_awswrangler/_utils.py +++ b/testing/test_awswrangler/_utils.py @@ -1,3 +1,4 @@ +import random from datetime import datetime from decimal import Decimal @@ -6,6 +7,8 @@ ts = lambda x: datetime.strptime(x, "%Y-%m-%d %H:%M:%S.%f") # noqa dt = lambda x: datetime.strptime(x, "%Y-%m-%d").date() # noqa +CFN_VALID_STATUS = ["CREATE_COMPLETE", "ROLLBACK_COMPLETE", "UPDATE_COMPLETE", "UPDATE_ROLLBACK_COMPLETE"] + def get_df(): df = pd.DataFrame( @@ -399,3 +402,8 @@ def ensure_data_types_csv(df): assert str(df["par0"].dtype).startswith("Int") if "par1" in df: assert str(df["par1"].dtype) == "string" + + +def get_time_str_with_random_suffix(): + time_str = datetime.utcnow().strftime("%Y%m%d%H%M%S%f") + return f"{time_str}_{random.randrange(16**4):04x}" diff --git a/testing/test_awswrangler/test_cloudwatch.py b/testing/test_awswrangler/test_cloudwatch.py index 6ae4cf673..6ac4e527a 100644 --- a/testing/test_awswrangler/test_cloudwatch.py +++ b/testing/test_awswrangler/test_cloudwatch.py @@ -7,6 +7,8 @@ import awswrangler as wr from awswrangler import exceptions +from ._utils import CFN_VALID_STATUS + logging.basicConfig(level=logging.INFO, format="[%(asctime)s][%(levelname)s][%(name)s][%(funcName)s] %(message)s") logging.getLogger("awswrangler").setLevel(logging.DEBUG) logging.getLogger("botocore.credentials").setLevel(logging.CRITICAL) @@ -15,8 +17,9 @@ @pytest.fixture(scope="module") def cloudformation_outputs(): response = boto3.client("cloudformation").describe_stacks(StackName="aws-data-wrangler") + stack = [x for x in response.get("Stacks") if x["StackStatus"] in CFN_VALID_STATUS][0] outputs = {} - for output in response.get("Stacks")[0].get("Outputs"): + for output in stack.get("Outputs"): outputs[output.get("OutputKey")] = output.get("OutputValue") yield outputs diff --git a/testing/test_awswrangler/test_data_lake.py b/testing/test_awswrangler/test_data_lake.py index dc7ee3868..e4f733876 100644 --- a/testing/test_awswrangler/test_data_lake.py +++ b/testing/test_awswrangler/test_data_lake.py @@ -1,9 +1,11 @@ import bz2 import datetime import gzip +import itertools import logging import lzma import math +import time from io import BytesIO, TextIOWrapper import boto3 @@ -12,8 +14,9 @@ import awswrangler as wr -from ._utils import (ensure_data_types, ensure_data_types_category, ensure_data_types_csv, get_df, get_df_cast, - get_df_category, get_df_csv, get_df_list, get_query_long) +from ._utils import (CFN_VALID_STATUS, ensure_data_types, ensure_data_types_category, ensure_data_types_csv, get_df, + get_df_cast, get_df_category, get_df_csv, get_df_list, get_query_long, + get_time_str_with_random_suffix) logging.basicConfig(level=logging.INFO, format="[%(asctime)s][%(levelname)s][%(name)s][%(funcName)s] %(message)s") logging.getLogger("awswrangler").setLevel(logging.DEBUG) @@ -23,8 +26,9 @@ @pytest.fixture(scope="module") def cloudformation_outputs(): response = boto3.client("cloudformation").describe_stacks(StackName="aws-data-wrangler") + stack = [x for x in response.get("Stacks") if x["StackStatus"] in CFN_VALID_STATUS][0] outputs = {} - for output in response.get("Stacks")[0].get("Outputs"): + for output in stack.get("Outputs"): outputs[output.get("OutputKey")] = output.get("OutputValue") yield outputs @@ -58,7 +62,7 @@ def external_schema(cloudformation_outputs, database): IAM_ROLE '{cloudformation_outputs["RedshiftRole"]}' REGION '{region}'; """ - engine = wr.catalog.get_engine(connection=f"aws-data-wrangler-redshift") + engine = wr.catalog.get_engine(connection="aws-data-wrangler-redshift") with engine.connect() as con: con.execute(sql) yield "aws_data_wrangler_external" @@ -157,6 +161,54 @@ def workgroup3(bucket, kms_key): yield wkg_name +@pytest.fixture(scope="function") +def path(bucket): + s3_path = f"s3://{bucket}/{get_time_str_with_random_suffix()}/" + print(f"S3 Path: {s3_path}") + time.sleep(1) + objs = wr.s3.list_objects(s3_path) + wr.s3.delete_objects(path=objs) + wr.s3.wait_objects_not_exist(objs) + yield s3_path + time.sleep(1) + objs = wr.s3.list_objects(s3_path) + wr.s3.delete_objects(path=objs) + wr.s3.wait_objects_not_exist(objs) + + +@pytest.fixture(scope="function") +def table(database): + name = f"tbl_{get_time_str_with_random_suffix()}" + print(f"Table name: {name}") + wr.catalog.delete_table_if_exists(database=database, table=name) + yield name + wr.catalog.delete_table_if_exists(database=database, table=name) + + +@pytest.fixture(scope="function") +def path2(bucket): + s3_path = f"s3://{bucket}/{get_time_str_with_random_suffix()}/" + print(f"S3 Path: {s3_path}") + time.sleep(1) + objs = wr.s3.list_objects(s3_path) + wr.s3.delete_objects(path=objs) + wr.s3.wait_objects_not_exist(objs) + yield s3_path + time.sleep(1) + objs = wr.s3.list_objects(s3_path) + wr.s3.delete_objects(path=objs) + wr.s3.wait_objects_not_exist(objs) + + +@pytest.fixture(scope="function") +def table2(database): + name = f"tbl_{get_time_str_with_random_suffix()}" + print(f"Table name: {name}") + wr.catalog.delete_table_if_exists(database=database, table=name) + yield name + wr.catalog.delete_table_if_exists(database=database, table=name) + + def test_athena_ctas(bucket, database, kms_key): wr.s3.delete_objects(path=f"s3://{bucket}/test_athena_ctas/") wr.s3.delete_objects(path=f"s3://{bucket}/test_athena_ctas_result/") @@ -202,7 +254,7 @@ def test_athena_ctas(bucket, database, kms_key): # keep_files=False wr.s3.delete_objects(path=s3_output) dfs = wr.athena.read_sql_query( - sql=f"SELECT * FROM test_athena_ctas", + sql="SELECT * FROM test_athena_ctas", database=database, ctas_approach=True, chunksize=1, @@ -220,7 +272,7 @@ def test_athena_ctas(bucket, database, kms_key): # keep_files=True wr.s3.delete_objects(path=s3_output) dfs = wr.athena.read_sql_query( - sql=f"SELECT * FROM test_athena_ctas", + sql="SELECT * FROM test_athena_ctas", database=database, ctas_approach=True, chunksize=2, @@ -242,11 +294,11 @@ def test_athena_ctas(bucket, database, kms_key): wr.s3.delete_objects(path=f"s3://{bucket}/test_athena_ctas_result/") -def test_athena(bucket, database, kms_key, workgroup0, workgroup1): - wr.s3.delete_objects(path=f"s3://{bucket}/test_athena/") +def test_athena(path, database, kms_key, workgroup0, workgroup1): + wr.catalog.delete_table_if_exists(database=database, table="__test_athena") paths = wr.s3.to_parquet( df=get_df(), - path=f"s3://{bucket}/test_athena", + path=path, index=True, use_threads=True, dataset=True, @@ -280,10 +332,6 @@ def test_athena(bucket, database, kms_key, workgroup0, workgroup1): ensure_data_types(df=df) wr.athena.repair_table(table="__test_athena", database=database) wr.catalog.delete_table_if_exists(database=database, table="__test_athena") - wr.s3.delete_objects(path=paths) - wr.s3.wait_objects_not_exist(paths=paths) - wr.s3.delete_objects(path=f"s3://{bucket}/athena_workgroup0/") - wr.s3.delete_objects(path=f"s3://{bucket}/athena_workgroup1/") def test_csv(bucket): @@ -330,13 +378,15 @@ def test_json(bucket): wr.s3.delete_objects(path=[path0, path1], use_threads=False) -def test_fwf(bucket): +def test_fwf(path): text = "1 Herfelingen27-12-18\n2 Lambusart14-06-18\n3Spormaggiore15-04-18" - path0 = f"s3://{bucket}/test_fwf0.txt" - path1 = f"s3://{bucket}/test_fwf1.txt" client_s3 = boto3.client("s3") - client_s3.put_object(Body=text, Bucket=bucket, Key="test_fwf0.txt") - client_s3.put_object(Body=text, Bucket=bucket, Key="test_fwf1.txt") + path0 = f"{path}/0.txt" + bucket, key = wr._utils.parse_path(path0) + client_s3.put_object(Body=text, Bucket=bucket, Key=key) + path1 = f"{path}/1.txt" + bucket, key = wr._utils.parse_path(path1) + client_s3.put_object(Body=text, Bucket=bucket, Key=key) wr.s3.wait_objects_exist(paths=[path0, path1]) df = wr.s3.read_fwf(path=path0, use_threads=False, widths=[1, 12, 8], names=["id", "name", "date"]) assert len(df.index) == 3 @@ -344,7 +394,6 @@ def test_fwf(bucket): df = wr.s3.read_fwf(path=[path0, path1], use_threads=True, widths=[1, 12, 8], names=["id", "name", "date"]) assert len(df.index) == 6 assert len(df.columns) == 3 - wr.s3.delete_objects(path=[path0, path1], use_threads=False) def test_parquet(bucket): @@ -524,28 +573,27 @@ def test_parquet_catalog_casting(bucket, database): assert wr.catalog.delete_table_if_exists(database=database, table="__test_parquet_catalog_casting") is True -def test_catalog(bucket, database): +def test_catalog(path, database, table): account_id = boto3.client("sts").get_caller_identity().get("Account") - path = f"s3://{bucket}/test_catalog/" - wr.catalog.delete_table_if_exists(database=database, table="test_catalog") - assert wr.catalog.does_table_exist(database=database, table="test_catalog") is False + assert wr.catalog.does_table_exist(database=database, table=table) is False wr.catalog.create_parquet_table( database=database, - table="test_catalog", + table=table, path=path, columns_types={"col0": "int", "col1": "double"}, partitions_types={"y": "int", "m": "int"}, compression="snappy", ) - wr.catalog.create_parquet_table( - database=database, table="test_catalog", path=path, columns_types={"col0": "string"}, mode="append" - ) - assert wr.catalog.does_table_exist(database=database, table="test_catalog") is True - assert wr.catalog.delete_table_if_exists(database=database, table="test_catalog") is True - assert wr.catalog.delete_table_if_exists(database=database, table="test_catalog") is False + with pytest.raises(wr.exceptions.InvalidArgumentValue): + wr.catalog.create_parquet_table( + database=database, table=table, path=path, columns_types={"col0": "string"}, mode="append" + ) + assert wr.catalog.does_table_exist(database=database, table=table) is True + assert wr.catalog.delete_table_if_exists(database=database, table=table) is True + assert wr.catalog.delete_table_if_exists(database=database, table=table) is False wr.catalog.create_parquet_table( database=database, - table="test_catalog", + table=table, path=path, columns_types={"col0": "int", "col1": "double"}, partitions_types={"y": "int", "m": "int"}, @@ -553,22 +601,23 @@ def test_catalog(bucket, database): description="Foo boo bar", parameters={"tag": "test"}, columns_comments={"col0": "my int", "y": "year"}, + mode="overwrite", ) wr.catalog.add_parquet_partitions( database=database, - table="test_catalog", + table=table, partitions_values={f"{path}y=2020/m=1/": ["2020", "1"], f"{path}y=2021/m=2/": ["2021", "2"]}, compression="snappy", ) - assert wr.catalog.get_table_location(database=database, table="test_catalog") == path - partitions_values = wr.catalog.get_parquet_partitions(database=database, table="test_catalog") + assert wr.catalog.get_table_location(database=database, table=table) == path + partitions_values = wr.catalog.get_parquet_partitions(database=database, table=table) assert len(partitions_values) == 2 partitions_values = wr.catalog.get_parquet_partitions( - database=database, table="test_catalog", catalog_id=account_id, expression="y = 2021 AND m = 2" + database=database, table=table, catalog_id=account_id, expression="y = 2021 AND m = 2" ) assert len(partitions_values) == 1 assert len(set(partitions_values[f"{path}y=2021/m=2/"]) & {"2021", "2"}) == 2 - dtypes = wr.catalog.get_table_types(database=database, table="test_catalog") + dtypes = wr.catalog.get_table_types(database=database, table=table) assert dtypes["col0"] == "int" assert dtypes["col1"] == "double" assert dtypes["y"] == "int" @@ -579,7 +628,7 @@ def test_catalog(bucket, database): tables = list(wr.catalog.get_tables()) assert len(tables) > 0 for tbl in tables: - if tbl["Name"] == "test_catalog": + if tbl["Name"] == table: assert tbl["TableType"] == "EXTERNAL_TABLE" tables = list(wr.catalog.get_tables(database=database)) assert len(tables) > 0 @@ -589,37 +638,41 @@ def test_catalog(bucket, database): tables = list(wr.catalog.search_tables(text="parquet", catalog_id=account_id)) assert len(tables) > 0 for tbl in tables: - if tbl["Name"] == "test_catalog": + if tbl["Name"] == table: assert tbl["TableType"] == "EXTERNAL_TABLE" # prefix - tables = list(wr.catalog.get_tables(name_prefix="test_cat", catalog_id=account_id)) + tables = list(wr.catalog.get_tables(name_prefix=table[:4], catalog_id=account_id)) assert len(tables) > 0 for tbl in tables: - if tbl["Name"] == "test_catalog": + if tbl["Name"] == table: assert tbl["TableType"] == "EXTERNAL_TABLE" # suffix - tables = list(wr.catalog.get_tables(name_suffix="_catalog", catalog_id=account_id)) + tables = list(wr.catalog.get_tables(name_suffix=table[-4:], catalog_id=account_id)) assert len(tables) > 0 for tbl in tables: - if tbl["Name"] == "test_catalog": + if tbl["Name"] == table: assert tbl["TableType"] == "EXTERNAL_TABLE" # name_contains - tables = list(wr.catalog.get_tables(name_contains="cat", catalog_id=account_id)) + tables = list(wr.catalog.get_tables(name_contains=table[4:-4], catalog_id=account_id)) assert len(tables) > 0 for tbl in tables: - if tbl["Name"] == "test_catalog": + if tbl["Name"] == table: assert tbl["TableType"] == "EXTERNAL_TABLE" # prefix & suffix & name_contains - tables = list(wr.catalog.get_tables(name_prefix="t", name_contains="_", name_suffix="g", catalog_id=account_id)) + tables = list( + wr.catalog.get_tables( + name_prefix=table[0], name_contains=table[3], name_suffix=table[-1], catalog_id=account_id + ) + ) assert len(tables) > 0 for tbl in tables: - if tbl["Name"] == "test_catalog": + if tbl["Name"] == table: assert tbl["TableType"] == "EXTERNAL_TABLE" # prefix & suffix - tables = list(wr.catalog.get_tables(name_prefix="t", name_suffix="g", catalog_id=account_id)) + tables = list(wr.catalog.get_tables(name_prefix=table[0], name_suffix=table[-1], catalog_id=account_id)) assert len(tables) > 0 for tbl in tables: - if tbl["Name"] == "test_catalog": + if tbl["Name"] == table: assert tbl["TableType"] == "EXTERNAL_TABLE" # DataFrames assert len(wr.catalog.databases().index) > 0 @@ -629,17 +682,18 @@ def test_catalog(bucket, database): wr.catalog.tables( database=database, search_text="parquet", - name_prefix="t", - name_contains="_", - name_suffix="g", + name_prefix=table[0], + name_contains=table[3], + name_suffix=table[-1], catalog_id=account_id, ).index ) > 0 ) - assert len(wr.catalog.table(database=database, table="test_catalog").index) > 0 - assert len(wr.catalog.table(database=database, table="test_catalog", catalog_id=account_id).index) > 0 - assert wr.catalog.delete_table_if_exists(database=database, table="test_catalog") is True + assert len(wr.catalog.table(database=database, table=table).index) > 0 + assert len(wr.catalog.table(database=database, table=table, catalog_id=account_id).index) > 0 + with pytest.raises(wr.exceptions.InvalidTable): + wr.catalog.overwrite_table_parameters({"foo": "boo"}, database, "fake_table") def test_s3_get_bucket_region(bucket, region): @@ -671,7 +725,7 @@ def test_athena_query_failed(database): def test_athena_read_list(database): with pytest.raises(wr.exceptions.UnsupportedType): - wr.athena.read_sql_query(sql=f"SELECT ARRAY[1, 2, 3]", database=database, ctas_approach=False) + wr.athena.read_sql_query(sql="SELECT ARRAY[1, 2, 3]", database=database, ctas_approach=False) def test_sanitize_names(): @@ -788,15 +842,13 @@ def test_category(bucket, database): assert wr.catalog.delete_table_if_exists(database=database, table="test_category") is True -def test_parquet_validate_schema(bucket, database): - path = f"s3://{bucket}/test_parquet_file_validate/" - wr.s3.delete_objects(path=path) +def test_parquet_validate_schema(path): df = pd.DataFrame({"id": [1, 2, 3]}) - path_file = f"s3://{bucket}/test_parquet_file_validate/0.parquet" + path_file = f"{path}0.parquet" wr.s3.to_parquet(df=df, path=path_file) wr.s3.wait_objects_exist(paths=[path_file]) df2 = pd.DataFrame({"id2": [1, 2, 3], "val": ["foo", "boo", "bar"]}) - path_file2 = f"s3://{bucket}/test_parquet_file_validate/1.parquet" + path_file2 = f"{path}1.parquet" wr.s3.to_parquet(df=df2, path=path_file2) wr.s3.wait_objects_exist(paths=[path_file2], use_threads=False) df3 = wr.s3.read_parquet(path=path, validate_schema=False) @@ -804,9 +856,6 @@ def test_parquet_validate_schema(bucket, database): assert len(df3.columns) == 3 with pytest.raises(ValueError): wr.s3.read_parquet(path=path, validate_schema=True) - with pytest.raises(ValueError): - wr.s3.store_parquet_metadata(path=path, database=database, table="test_parquet_validate_schema", dataset=True) - wr.s3.delete_objects(path=path) def test_csv_dataset(bucket, database): @@ -1074,10 +1123,7 @@ def test_csv_compress(bucket, compression): wr.s3.delete_objects(path=path) -def test_parquet_char_length(bucket, database, external_schema): - path = f"s3://{bucket}/test_parquet_char_length/" - table = "test_parquet_char_length" - +def test_parquet_char_length(path, database, table, external_schema): df = pd.DataFrame( {"id": [1, 2], "cchar": ["foo", "boo"], "date": [datetime.date(2020, 1, 1), datetime.date(2020, 1, 2)]} ) @@ -1108,9 +1154,6 @@ def test_parquet_char_length(bucket, database, external_schema): assert len(df2.columns) == 3 assert df2.id.sum() == 3 - wr.s3.delete_objects(path=path) - assert wr.catalog.delete_table_if_exists(database=database, table=table) is True - def test_merge(bucket): path = f"s3://{bucket}/test_merge/" @@ -1217,8 +1260,10 @@ def test_parquet_chunked(bucket, database, col2, chunked): @pytest.mark.parametrize("workgroup", [None, 0, 1, 2, 3]) @pytest.mark.parametrize("encryption", [None, "SSE_S3", "SSE_KMS"]) +# @pytest.mark.parametrize("workgroup", [3]) +# @pytest.mark.parametrize("encryption", [None]) def test_athena_encryption( - bucket, database, kms_key, encryption, workgroup, workgroup0, workgroup1, workgroup2, workgroup3 + path, path2, database, table, table2, kms_key, encryption, workgroup, workgroup0, workgroup1, workgroup2, workgroup3 ): kms_key = None if (encryption == "SSE_S3") or (encryption is None) else kms_key if workgroup == 0: @@ -1229,18 +1274,11 @@ def test_athena_encryption( workgroup = workgroup2 elif workgroup == 3: workgroup = workgroup3 - table = f"test_athena_encryption_{str(encryption).lower()}_{str(workgroup).lower()}" - path = f"s3://{bucket}/{table}/" - wr.s3.delete_objects(path=path) df = pd.DataFrame({"a": [1, 2], "b": ["foo", "boo"]}) paths = wr.s3.to_parquet( df=df, path=path, dataset=True, mode="overwrite", database=database, table=table, s3_additional_kwargs=None )["paths"] wr.s3.wait_objects_exist(paths=paths, use_threads=False) - temp_table = table + "2" - s3_output = f"s3://{bucket}/encryptio_s3_output/" - final_destination = f"{s3_output}{temp_table}/" - wr.s3.delete_objects(path=final_destination) df2 = wr.athena.read_sql_table( table=table, ctas_approach=True, @@ -1249,16 +1287,12 @@ def test_athena_encryption( workgroup=workgroup, kms_key=kms_key, keep_files=True, - ctas_temp_table_name=temp_table, - s3_output=s3_output, + ctas_temp_table_name=table2, + s3_output=path2, ) - assert wr.catalog.does_table_exist(database=database, table=temp_table) is False - assert len(wr.s3.list_objects(path=s3_output)) > 2 - print(df2) + assert wr.catalog.does_table_exist(database=database, table=table2) is False assert len(df2.index) == 2 assert len(df2.columns) == 2 - wr.catalog.delete_table_if_exists(database=database, table=table) - wr.s3.delete_objects(path=paths) def test_athena_nested(bucket, database): @@ -1440,10 +1474,7 @@ def test_parquet_uint64(bucket): wr.s3.delete_objects(path=path) -def test_parquet_overwrite_partition_cols(bucket, database, external_schema): - table = "test_parquet_overwrite_partition_cols" - path = f"s3://{bucket}/{table}/" - wr.s3.delete_objects(path=path) +def test_parquet_overwrite_partition_cols(path, database, table, external_schema): df = pd.DataFrame({"c0": [1, 2, 1, 2], "c1": [1, 2, 1, 2], "c2": [2, 1, 2, 1]}) paths = wr.s3.to_parquet( @@ -1476,9 +1507,6 @@ def test_parquet_overwrite_partition_cols(bucket, database, external_schema): assert df.c1.sum() == 6 assert df.c2.sum() == 6 - wr.s3.delete_objects(path=path) - wr.catalog.delete_table_if_exists(database=database, table=table) - def test_catalog_parameters(bucket, database): table = "test_catalog_parameters" @@ -1538,3 +1566,565 @@ def test_catalog_parameters(bucket, database): wr.s3.delete_objects(path=path) wr.catalog.delete_table_if_exists(database=database, table=table) + + +def test_metadata_partitions(path): + path = f"{path}0.parquet" + df = pd.DataFrame({"c0": [0, 1, 2], "c1": ["3", "4", "5"], "c2": [6.0, 7.0, 8.0]}) + paths = wr.s3.to_parquet(df=df, path=path, dataset=False)["paths"] + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + columns_types, partitions_types = wr.s3.read_parquet_metadata(path=path, dataset=False) + assert len(columns_types) == len(df.columns) + assert columns_types.get("c0") == "bigint" + assert columns_types.get("c1") == "string" + assert columns_types.get("c2") == "double" + + +@pytest.mark.parametrize("partition_cols", [None, ["c2"], ["c1", "c2"]]) +def test_metadata_partitions_dataset(path, partition_cols): + df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5], "c2": [6, 7, 8]}) + paths = wr.s3.to_parquet(df=df, path=path, dataset=True, partition_cols=partition_cols)["paths"] + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + columns_types, partitions_types = wr.s3.read_parquet_metadata(path=path, dataset=True) + partitions_types = partitions_types if partitions_types is not None else {} + assert len(columns_types) + len(partitions_types) == len(df.columns) + assert columns_types.get("c0") == "bigint" + assert (columns_types.get("c1") == "bigint") or (partitions_types.get("c1") == "string") + assert (columns_types.get("c1") == "bigint") or (partitions_types.get("c1") == "string") + + +@pytest.mark.parametrize("partition_cols", [None, ["c2"], ["c1", "c2"]]) +def test_store_metadata_partitions_dataset(database, table, path, partition_cols): + df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5], "c2": [6, 7, 8]}) + paths = wr.s3.to_parquet(df=df, path=path, dataset=True, partition_cols=partition_cols)["paths"] + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + wr.s3.store_parquet_metadata(path=path, database=database, table=table, dataset=True) + df2 = wr.athena.read_sql_table(table=table, database=database) + assert len(df.index) == len(df2.index) + assert len(df.columns) == len(df2.columns) + assert df.c0.sum() == df2.c0.sum() + assert df.c1.sum() == df2.c1.astype(int).sum() + assert df.c2.sum() == df2.c2.astype(int).sum() + + +def test_json_chunksize(path): + num_files = 10 + df = pd.DataFrame({"id": [1, 2, 3], "value": ["foo", "boo", "bar"]}) + paths = [f"{path}{i}.json" for i in range(num_files)] + for p in paths: + wr.s3.to_json(df, p, orient="records", lines=True) + wr.s3.wait_objects_exist(paths) + dfs = list(wr.s3.read_json(paths, lines=True, chunksize=1)) + assert len(dfs) == (3 * num_files) + for d in dfs: + assert len(d.columns) == 2 + assert d.id.iloc[0] in (1, 2, 3) + assert d.value.iloc[0] in ("foo", "boo", "bar") + + +def test_parquet_cast_string(path): + df = pd.DataFrame({"id": [1, 2, 3], "value": ["foo", "boo", "bar"]}) + path_file = f"{path}0.parquet" + wr.s3.to_parquet(df, path_file, dtype={"id": "string"}) + wr.s3.wait_objects_exist([path_file]) + df2 = wr.s3.read_parquet(path_file) + assert str(df2.id.dtypes) == "string" + df2["id"] = df2["id"].astype(int) + assert df.shape == df2.shape + for col, row in tuple(itertools.product(df.columns, range(3))): + assert df[col].iloc[row] == df2[col].iloc[row] + + +@pytest.mark.parametrize("partition_cols", [None, ["c2"], ["value", "c2"]]) +def test_parquet_cast_string_dataset(path, partition_cols): + df = pd.DataFrame({"id": [1, 2, 3], "value": ["foo", "boo", "bar"], "c2": [4, 5, 6], "c3": [7.0, 8.0, 9.0]}) + paths = wr.s3.to_parquet( + df, path, dataset=True, partition_cols=partition_cols, dtype={"id": "string", "c3": "string"} + )["paths"] + wr.s3.wait_objects_exist(paths) + df2 = wr.s3.read_parquet(path, dataset=True).sort_values("id", ignore_index=True) + assert str(df2.id.dtypes) == "string" + assert str(df2.c3.dtypes) == "string" + df2["id"] = df2["id"].astype(int) + df2["c3"] = df2["c3"].astype(float) + assert df.shape == df2.shape + for col, row in tuple(itertools.product(df.columns, range(3))): + assert df[col].iloc[row] == df2[col].iloc[row] + + +@pytest.mark.parametrize("partition_cols", [None, ["c2"], ["c1", "c2"]]) +def test_store_metadata_partitions_sample_dataset(database, table, path, partition_cols): + num_files = 10 + df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5], "c2": [6, 7, 8]}) + for _ in range(num_files): + paths = wr.s3.to_parquet(df=df, path=path, dataset=True, partition_cols=partition_cols)["paths"] + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + wr.s3.store_parquet_metadata( + path=path, database=database, table=table, dtype={"c1": "bigint", "c2": "smallint"}, sampling=0.25, dataset=True + ) + df2 = wr.athena.read_sql_table(table=table, database=database) + assert len(df.index) * num_files == len(df2.index) + assert len(df.columns) == len(df2.columns) + assert df.c0.sum() * num_files == df2.c0.sum() + assert df.c1.sum() * num_files == df2.c1.sum() + assert df.c2.sum() * num_files == df2.c2.sum() + + +def test_athena_undefined_column(database): + with pytest.raises(wr.exceptions.InvalidArgumentValue): + wr.athena.read_sql_query("SELECT 1", database) + with pytest.raises(wr.exceptions.InvalidArgumentValue): + wr.athena.read_sql_query("SELECT NULL AS my_null", database) + + +def test_to_parquet_file_sanitize(path): + df = pd.DataFrame({"C0": [0, 1], "camelCase": [2, 3], "c**--2": [4, 5]}) + path_file = f"{path}0.parquet" + wr.s3.to_parquet(df, path_file) + wr.s3.wait_objects_exist([path_file]) + df2 = wr.s3.read_parquet(path_file) + assert df.shape == df2.shape + assert list(df2.columns) == ["c0", "camel_case", "c_2"] + assert df2.c0.sum() == 1 + assert df2.camel_case.sum() == 5 + assert df2.c_2.sum() == 9 + + +def test_to_parquet_modes(database, table, path, external_schema): + + # Round 1 - Warm up + df = pd.DataFrame({"c0": [0, None]}, dtype="Int64") + paths = wr.s3.to_parquet( + df=df, + path=path, + dataset=True, + mode="overwrite", + database=database, + table=table, + description="c0", + parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index))}, + columns_comments={"c0": "0"}, + )["paths"] + wr.s3.wait_objects_exist(paths=paths) + df2 = wr.athena.read_sql_table(table, database) + assert df.shape == df2.shape + assert df.c0.sum() == df2.c0.sum() + parameters = wr.catalog.get_table_parameters(database, table) + assert len(parameters) == 5 + assert parameters["num_cols"] == str(len(df2.columns)) + assert parameters["num_rows"] == str(len(df2.index)) + assert wr.catalog.get_table_description(database, table) == "c0" + comments = wr.catalog.get_columns_comments(database, table) + assert len(comments) == len(df.columns) + assert comments["c0"] == "0" + + # Round 2 - Overwrite + df = pd.DataFrame({"c1": [None, 1, None]}, dtype="Int16") + paths = wr.s3.to_parquet( + df=df, + path=path, + dataset=True, + mode="overwrite", + database=database, + table=table, + description="c1", + parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index))}, + columns_comments={"c1": "1"}, + )["paths"] + wr.s3.wait_objects_exist(paths=paths) + df2 = wr.athena.read_sql_table(table, database) + assert df.shape == df2.shape + assert df.c1.sum() == df2.c1.sum() + parameters = wr.catalog.get_table_parameters(database, table) + assert len(parameters) == 5 + assert parameters["num_cols"] == str(len(df2.columns)) + assert parameters["num_rows"] == str(len(df2.index)) + assert wr.catalog.get_table_description(database, table) == "c1" + comments = wr.catalog.get_columns_comments(database, table) + assert len(comments) == len(df.columns) + assert comments["c1"] == "1" + + # Round 3 - Append + df = pd.DataFrame({"c1": [None, 2, None]}, dtype="Int8") + paths = wr.s3.to_parquet( + df=df, + path=path, + dataset=True, + mode="append", + database=database, + table=table, + description="c1", + parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index) * 2)}, + columns_comments={"c1": "1"}, + )["paths"] + wr.s3.wait_objects_exist(paths=paths) + df2 = wr.athena.read_sql_table(table, database) + assert len(df.columns) == len(df2.columns) + assert len(df.index) * 2 == len(df2.index) + assert df.c1.sum() + 1 == df2.c1.sum() + parameters = wr.catalog.get_table_parameters(database, table) + assert len(parameters) == 5 + assert parameters["num_cols"] == str(len(df2.columns)) + assert parameters["num_rows"] == str(len(df2.index)) + assert wr.catalog.get_table_description(database, table) == "c1" + comments = wr.catalog.get_columns_comments(database, table) + assert len(comments) == len(df.columns) + assert comments["c1"] == "1" + + # Round 4 - Append + New Column + df = pd.DataFrame({"c2": ["a", None, "b"], "c1": [None, None, None]}) + paths = wr.s3.to_parquet( + df=df, + path=path, + dataset=True, + mode="append", + database=database, + table=table, + description="c1+c2", + parameters={"num_cols": "2", "num_rows": "9"}, + columns_comments={"c1": "1", "c2": "2"}, + )["paths"] + wr.s3.wait_objects_exist(paths=paths) + df2 = wr.athena.read_sql_table(table, database) + assert len(df2.columns) == 2 + assert len(df2.index) == 9 + assert df2.c1.sum() == 3 + parameters = wr.catalog.get_table_parameters(database, table) + assert len(parameters) == 5 + assert parameters["num_cols"] == "2" + assert parameters["num_rows"] == "9" + assert wr.catalog.get_table_description(database, table) == "c1+c2" + comments = wr.catalog.get_columns_comments(database, table) + assert len(comments) == len(df.columns) + assert comments["c1"] == "1" + assert comments["c2"] == "2" + + # Round 5 - Append + New Column + Wrong Types + df = pd.DataFrame({"c2": [1], "c3": [True], "c1": ["1"]}) + paths = wr.s3.to_parquet( + df=df, + path=path, + dataset=True, + mode="append", + database=database, + table=table, + description="c1+c2+c3", + parameters={"num_cols": "3", "num_rows": "10"}, + columns_comments={"c1": "1!", "c2": "2!", "c3": "3"}, + )["paths"] + wr.s3.wait_objects_exist(paths=paths) + df2 = wr.athena.read_sql_table(table, database) + assert len(df2.columns) == 3 + assert len(df2.index) == 10 + assert df2.c1.sum() == 4 + parameters = wr.catalog.get_table_parameters(database, table) + assert len(parameters) == 5 + assert parameters["num_cols"] == "3" + assert parameters["num_rows"] == "10" + assert wr.catalog.get_table_description(database, table) == "c1+c2+c3" + comments = wr.catalog.get_columns_comments(database, table) + assert len(comments) == len(df.columns) + assert comments["c1"] == "1!" + assert comments["c2"] == "2!" + assert comments["c3"] == "3" + engine = wr.catalog.get_engine("aws-data-wrangler-redshift") + df3 = wr.db.read_sql_table(con=engine, table=table, schema=external_schema) + assert len(df3.columns) == 3 + assert len(df3.index) == 10 + assert df3.c1.sum() == 4 + + # Round 6 - Overwrite Partitioned + df = pd.DataFrame({"c0": ["foo", None], "c1": [0, 1]}) + paths = wr.s3.to_parquet( + df=df, + path=path, + dataset=True, + mode="overwrite", + database=database, + table=table, + partition_cols=["c1"], + description="c0+c1", + parameters={"num_cols": "2", "num_rows": "2"}, + columns_comments={"c0": "zero", "c1": "one"}, + )["paths"] + wr.s3.wait_objects_exist(paths=paths) + df2 = wr.athena.read_sql_table(table, database) + assert df.shape == df2.shape + assert df.c1.sum() == df2.c1.sum() + parameters = wr.catalog.get_table_parameters(database, table) + assert len(parameters) == 5 + assert parameters["num_cols"] == "2" + assert parameters["num_rows"] == "2" + assert wr.catalog.get_table_description(database, table) == "c0+c1" + comments = wr.catalog.get_columns_comments(database, table) + assert len(comments) == len(df.columns) + assert comments["c0"] == "zero" + assert comments["c1"] == "one" + + # Round 7 - Overwrite Partitions + df = pd.DataFrame({"c0": [None, None], "c1": [0, 2]}) + paths = wr.s3.to_parquet( + df=df, + path=path, + dataset=True, + mode="overwrite_partitions", + database=database, + table=table, + partition_cols=["c1"], + description="c0+c1", + parameters={"num_cols": "2", "num_rows": "3"}, + columns_comments={"c0": "zero", "c1": "one"}, + )["paths"] + wr.s3.wait_objects_exist(paths=paths) + df2 = wr.athena.read_sql_table(table, database) + assert len(df2.columns) == 2 + assert len(df2.index) == 3 + assert df2.c1.sum() == 3 + parameters = wr.catalog.get_table_parameters(database, table) + assert len(parameters) == 5 + assert parameters["num_cols"] == "2" + assert parameters["num_rows"] == "3" + assert wr.catalog.get_table_description(database, table) == "c0+c1" + comments = wr.catalog.get_columns_comments(database, table) + assert len(comments) == len(df.columns) + assert comments["c0"] == "zero" + assert comments["c1"] == "one" + + # Round 8 - Overwrite Partitions + New Column + Wrong Type + df = pd.DataFrame({"c0": [1, 2], "c1": ["1", "3"], "c2": [True, False]}) + paths = wr.s3.to_parquet( + df=df, + path=path, + dataset=True, + mode="overwrite_partitions", + database=database, + table=table, + partition_cols=["c1"], + description="c0+c1+c2", + parameters={"num_cols": "3", "num_rows": "4"}, + columns_comments={"c0": "zero", "c1": "one", "c2": "two"}, + )["paths"] + wr.s3.wait_objects_exist(paths=paths) + df2 = wr.athena.read_sql_table(table, database) + assert len(df2.columns) == 3 + assert len(df2.index) == 4 + assert df2.c1.sum() == 6 + parameters = wr.catalog.get_table_parameters(database, table) + assert len(parameters) == 5 + assert parameters["num_cols"] == "3" + assert parameters["num_rows"] == "4" + assert wr.catalog.get_table_description(database, table) == "c0+c1+c2" + comments = wr.catalog.get_columns_comments(database, table) + assert len(comments) == len(df.columns) + assert comments["c0"] == "zero" + assert comments["c1"] == "one" + assert comments["c2"] == "two" + engine = wr.catalog.get_engine("aws-data-wrangler-redshift") + df3 = wr.db.read_sql_table(con=engine, table=table, schema=external_schema) + assert len(df3.columns) == 3 + assert len(df3.index) == 4 + assert df3.c1.sum() == 6 + + +def test_store_parquet_metadata_modes(database, table, path, external_schema): + + # Round 1 - Warm up + df = pd.DataFrame({"c0": [0, None]}, dtype="Int64") + paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite")["paths"] + wr.s3.wait_objects_exist(paths=paths) + wr.s3.store_parquet_metadata( + path=path, + dataset=True, + mode="overwrite", + database=database, + table=table, + description="c0", + parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index))}, + columns_comments={"c0": "0"}, + ) + df2 = wr.athena.read_sql_table(table, database) + assert df.shape == df2.shape + assert df.c0.sum() == df2.c0.sum() + parameters = wr.catalog.get_table_parameters(database, table) + assert len(parameters) == 5 + assert parameters["num_cols"] == str(len(df2.columns)) + assert parameters["num_rows"] == str(len(df2.index)) + assert wr.catalog.get_table_description(database, table) == "c0" + comments = wr.catalog.get_columns_comments(database, table) + assert len(comments) == len(df.columns) + assert comments["c0"] == "0" + + # Round 2 - Overwrite + df = pd.DataFrame({"c1": [None, 1, None]}, dtype="Int16") + paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite")["paths"] + wr.s3.wait_objects_exist(paths=paths) + wr.s3.store_parquet_metadata( + path=path, + dataset=True, + mode="overwrite", + database=database, + table=table, + description="c1", + parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index))}, + columns_comments={"c1": "1"}, + ) + df2 = wr.athena.read_sql_table(table, database) + assert df.shape == df2.shape + assert df.c1.sum() == df2.c1.sum() + parameters = wr.catalog.get_table_parameters(database, table) + assert len(parameters) == 5 + assert parameters["num_cols"] == str(len(df2.columns)) + assert parameters["num_rows"] == str(len(df2.index)) + assert wr.catalog.get_table_description(database, table) == "c1" + comments = wr.catalog.get_columns_comments(database, table) + assert len(comments) == len(df.columns) + assert comments["c1"] == "1" + + # Round 3 - Append + df = pd.DataFrame({"c1": [None, 2, None]}, dtype="Int16") + paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="append")["paths"] + wr.s3.wait_objects_exist(paths=paths) + wr.s3.store_parquet_metadata( + path=path, + dataset=True, + mode="append", + database=database, + table=table, + description="c1", + parameters={"num_cols": str(len(df.columns)), "num_rows": str(len(df.index) * 2)}, + columns_comments={"c1": "1"}, + ) + df2 = wr.athena.read_sql_table(table, database) + assert len(df.columns) == len(df2.columns) + assert len(df.index) * 2 == len(df2.index) + assert df.c1.sum() + 1 == df2.c1.sum() + parameters = wr.catalog.get_table_parameters(database, table) + assert len(parameters) == 5 + assert parameters["num_cols"] == str(len(df2.columns)) + assert parameters["num_rows"] == str(len(df2.index)) + assert wr.catalog.get_table_description(database, table) == "c1" + comments = wr.catalog.get_columns_comments(database, table) + assert len(comments) == len(df.columns) + assert comments["c1"] == "1" + + # Round 4 - Append + New Column + df = pd.DataFrame({"c2": ["a", None, "b"], "c1": [None, 1, None]}) + df["c1"] = df["c1"].astype("Int16") + paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="append")["paths"] + wr.s3.wait_objects_exist(paths=paths) + wr.s3.store_parquet_metadata( + path=path, + dataset=True, + mode="append", + database=database, + table=table, + description="c1+c2", + parameters={"num_cols": "2", "num_rows": "9"}, + columns_comments={"c1": "1", "c2": "2"}, + ) + df2 = wr.athena.read_sql_table(table, database) + assert len(df2.columns) == 2 + assert len(df2.index) == 9 + assert df2.c1.sum() == 4 + parameters = wr.catalog.get_table_parameters(database, table) + assert len(parameters) == 5 + assert parameters["num_cols"] == "2" + assert parameters["num_rows"] == "9" + assert wr.catalog.get_table_description(database, table) == "c1+c2" + comments = wr.catalog.get_columns_comments(database, table) + assert len(comments) == len(df.columns) + assert comments["c1"] == "1" + assert comments["c2"] == "2" + + # Round 5 - Overwrite Partitioned + df = pd.DataFrame({"c0": ["foo", None], "c1": [0, 1]}) + paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite", partition_cols=["c1"])["paths"] + wr.s3.wait_objects_exist(paths=paths) + wr.s3.store_parquet_metadata( + path=path, + dataset=True, + mode="overwrite", + database=database, + table=table, + description="c0+c1", + parameters={"num_cols": "2", "num_rows": "2"}, + columns_comments={"c0": "zero", "c1": "one"}, + ) + df2 = wr.athena.read_sql_table(table, database) + assert df.shape == df2.shape + assert df.c1.sum() == df2.c1.astype(int).sum() + parameters = wr.catalog.get_table_parameters(database, table) + assert len(parameters) == 5 + assert parameters["num_cols"] == "2" + assert parameters["num_rows"] == "2" + assert wr.catalog.get_table_description(database, table) == "c0+c1" + comments = wr.catalog.get_columns_comments(database, table) + assert len(comments) == len(df.columns) + assert comments["c0"] == "zero" + assert comments["c1"] == "one" + + # Round 6 - Overwrite Partitions + df = pd.DataFrame({"c0": [None, "boo"], "c1": [0, 2]}) + paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite_partitions", partition_cols=["c1"])[ + "paths" + ] + wr.s3.wait_objects_exist(paths=paths) + wr.s3.store_parquet_metadata( + path=path, + dataset=True, + mode="append", + database=database, + table=table, + description="c0+c1", + parameters={"num_cols": "2", "num_rows": "3"}, + columns_comments={"c0": "zero", "c1": "one"}, + ) + df2 = wr.athena.read_sql_table(table, database) + assert len(df2.columns) == 2 + assert len(df2.index) == 3 + assert df2.c1.astype(int).sum() == 3 + parameters = wr.catalog.get_table_parameters(database, table) + assert len(parameters) == 5 + assert parameters["num_cols"] == "2" + assert parameters["num_rows"] == "3" + assert wr.catalog.get_table_description(database, table) == "c0+c1" + comments = wr.catalog.get_columns_comments(database, table) + assert len(comments) == len(df.columns) + assert comments["c0"] == "zero" + assert comments["c1"] == "one" + + # Round 7 - Overwrite Partitions + New Column + df = pd.DataFrame({"c0": ["bar", None], "c1": [1, 3], "c2": [True, False]}) + paths = wr.s3.to_parquet(df=df, path=path, dataset=True, mode="overwrite_partitions", partition_cols=["c1"])[ + "paths" + ] + wr.s3.wait_objects_exist(paths=paths) + wr.s3.store_parquet_metadata( + path=path, + dataset=True, + mode="append", + database=database, + table=table, + description="c0+c1+c2", + parameters={"num_cols": "3", "num_rows": "4"}, + columns_comments={"c0": "zero", "c1": "one", "c2": "two"}, + ) + df2 = wr.athena.read_sql_table(table, database) + assert len(df2.columns) == 3 + assert len(df2.index) == 4 + assert df2.c1.astype(int).sum() == 6 + parameters = wr.catalog.get_table_parameters(database, table) + assert len(parameters) == 5 + assert parameters["num_cols"] == "3" + assert parameters["num_rows"] == "4" + assert wr.catalog.get_table_description(database, table) == "c0+c1+c2" + comments = wr.catalog.get_columns_comments(database, table) + assert len(comments) == len(df.columns) + assert comments["c0"] == "zero" + assert comments["c1"] == "one" + assert comments["c2"] == "two" + engine = wr.catalog.get_engine("aws-data-wrangler-redshift") + df3 = wr.db.read_sql_table(con=engine, table=table, schema=external_schema) + assert len(df3.columns) == 3 + assert len(df3.index) == 4 + assert df3.c1.astype(int).sum() == 6 diff --git a/testing/test_awswrangler/test_db.py b/testing/test_awswrangler/test_db.py index 6e5bbd9ce..1e07f8bf9 100644 --- a/testing/test_awswrangler/test_db.py +++ b/testing/test_awswrangler/test_db.py @@ -9,7 +9,7 @@ import awswrangler as wr -from ._utils import ensure_data_types, ensure_data_types_category, get_df, get_df_category +from ._utils import CFN_VALID_STATUS, ensure_data_types, ensure_data_types_category, get_df, get_df_category logging.basicConfig(level=logging.INFO, format="[%(asctime)s][%(levelname)s][%(name)s][%(funcName)s] %(message)s") logging.getLogger("awswrangler").setLevel(logging.DEBUG) @@ -19,8 +19,9 @@ @pytest.fixture(scope="module") def cloudformation_outputs(): response = boto3.client("cloudformation").describe_stacks(StackName="aws-data-wrangler") + stack = [x for x in response.get("Stacks") if x["StackStatus"] in CFN_VALID_STATUS][0] outputs = {} - for output in response.get("Stacks")[0].get("Outputs"): + for output in stack.get("Outputs"): outputs[output.get("OutputKey")] = output.get("OutputValue") yield outputs @@ -70,7 +71,7 @@ def external_schema(cloudformation_outputs, parameters, glue_database): IAM_ROLE '{parameters["redshift"]["role"]}' REGION '{region}'; """ - engine = wr.catalog.get_engine(connection=f"aws-data-wrangler-redshift") + engine = wr.catalog.get_engine(connection="aws-data-wrangler-redshift") with engine.connect() as con: con.execute(sql) yield "aws_data_wrangler_external" @@ -159,7 +160,7 @@ def test_redshift_temp_engine(parameters): def test_postgresql_param(): - engine = wr.catalog.get_engine(connection=f"aws-data-wrangler-postgresql") + engine = wr.catalog.get_engine(connection="aws-data-wrangler-postgresql") df = wr.db.read_sql_query(sql="SELECT %(value)s as col0", con=engine, params={"value": 1}) assert df["col0"].iloc[0] == 1 df = wr.db.read_sql_query(sql="SELECT %s as col0", con=engine, params=[1]) @@ -169,7 +170,7 @@ def test_postgresql_param(): def test_redshift_copy_unload(bucket, parameters): path = f"s3://{bucket}/test_redshift_copy/" df = get_df().drop(["iint8", "binary"], axis=1, inplace=False) - engine = wr.catalog.get_engine(connection=f"aws-data-wrangler-redshift") + engine = wr.catalog.get_engine(connection="aws-data-wrangler-redshift") wr.db.copy_to_redshift( df=df, path=path, @@ -219,7 +220,7 @@ def test_redshift_copy_unload(bucket, parameters): def test_redshift_copy_upsert(bucket, parameters): - engine = wr.catalog.get_engine(connection=f"aws-data-wrangler-redshift") + engine = wr.catalog.get_engine(connection="aws-data-wrangler-redshift") df = pd.DataFrame({"id": list((range(1_000))), "val": list(["foo" if i % 2 == 0 else "boo" for i in range(1_000)])}) df3 = pd.DataFrame( {"id": list((range(1_000, 1_500))), "val": list(["foo" if i % 2 == 0 else "boo" for i in range(500)])} @@ -312,7 +313,7 @@ def test_redshift_copy_upsert(bucket, parameters): ) def test_redshift_exceptions(bucket, parameters, diststyle, distkey, sortstyle, sortkey, exc): df = pd.DataFrame({"id": [1], "name": "joe"}) - engine = wr.catalog.get_engine(connection=f"aws-data-wrangler-redshift") + engine = wr.catalog.get_engine(connection="aws-data-wrangler-redshift") path = f"s3://{bucket}/test_redshift_exceptions_{random.randint(0, 1_000_000)}/" with pytest.raises(exc): wr.db.copy_to_redshift( @@ -346,7 +347,7 @@ def test_redshift_spectrum(bucket, glue_database, external_schema): partition_cols=["par_int"], )["paths"] wr.s3.wait_objects_exist(paths=paths, use_threads=False) - engine = wr.catalog.get_engine(connection=f"aws-data-wrangler-redshift") + engine = wr.catalog.get_engine(connection="aws-data-wrangler-redshift") with engine.connect() as con: cursor = con.execute(f"SELECT * FROM {external_schema}.test_redshift_spectrum") rows = cursor.fetchall() @@ -360,7 +361,7 @@ def test_redshift_spectrum(bucket, glue_database, external_schema): def test_redshift_category(bucket, parameters): path = f"s3://{bucket}/test_redshift_category/" df = get_df_category().drop(["binary"], axis=1, inplace=False) - engine = wr.catalog.get_engine(connection=f"aws-data-wrangler-redshift") + engine = wr.catalog.get_engine(connection="aws-data-wrangler-redshift") wr.db.copy_to_redshift( df=df, path=path, @@ -398,7 +399,7 @@ def test_redshift_unload_extras(bucket, parameters, kms_key_id): schema = parameters["redshift"]["schema"] path = f"s3://{bucket}/{table}/" wr.s3.delete_objects(path=path) - engine = wr.catalog.get_engine(connection=f"aws-data-wrangler-redshift") + engine = wr.catalog.get_engine(connection="aws-data-wrangler-redshift") df = pd.DataFrame({"id": [1, 2], "name": ["foo", "boo"]}) wr.db.to_sql(df=df, con=engine, name=table, schema=schema, if_exists="replace", index=False) paths = wr.db.unload_redshift_to_files( @@ -465,7 +466,7 @@ def test_to_sql_cast(parameters, db_type): def test_uuid(parameters): table = "test_uuid" schema = parameters["postgresql"]["schema"] - engine = wr.catalog.get_engine(connection=f"aws-data-wrangler-postgresql") + engine = wr.catalog.get_engine(connection="aws-data-wrangler-postgresql") df = pd.DataFrame( { "id": [1, 2, 3], diff --git a/testing/test_awswrangler/test_emr.py b/testing/test_awswrangler/test_emr.py index 38c725694..e414fc2e5 100644 --- a/testing/test_awswrangler/test_emr.py +++ b/testing/test_awswrangler/test_emr.py @@ -6,6 +6,8 @@ import awswrangler as wr +from ._utils import CFN_VALID_STATUS + logging.basicConfig(level=logging.INFO, format="[%(asctime)s][%(levelname)s][%(name)s][%(funcName)s] %(message)s") logging.getLogger("awswrangler").setLevel(logging.DEBUG) logging.getLogger("botocore.credentials").setLevel(logging.CRITICAL) @@ -14,8 +16,9 @@ @pytest.fixture(scope="module") def cloudformation_outputs(): response = boto3.client("cloudformation").describe_stacks(StackName="aws-data-wrangler") + stack = [x for x in response.get("Stacks") if x["StackStatus"] in CFN_VALID_STATUS][0] outputs = {} - for output in response.get("Stacks")[0].get("Outputs"): + for output in stack.get("Outputs"): outputs[output.get("OutputKey")] = output.get("OutputValue") yield outputs diff --git a/testing/test_awswrangler/test_metadata.py b/testing/test_awswrangler/test_metadata.py index f41b28641..89b2a8b0c 100644 --- a/testing/test_awswrangler/test_metadata.py +++ b/testing/test_awswrangler/test_metadata.py @@ -2,7 +2,7 @@ def test_metadata(): - assert wr.__version__ == "1.1.2" + assert wr.__version__ == "1.2.0" assert wr.__title__ == "awswrangler" assert wr.__description__ == "Pandas on AWS." assert wr.__license__ == "Apache License 2.0" diff --git a/testing/test_awswrangler/test_session.py b/testing/test_awswrangler/test_session.py new file mode 100644 index 000000000..172f97bba --- /dev/null +++ b/testing/test_awswrangler/test_session.py @@ -0,0 +1,20 @@ +import logging + +import boto3 + +import awswrangler as wr + +logging.basicConfig(level=logging.INFO, format="[%(asctime)s][%(levelname)s][%(name)s][%(funcName)s] %(message)s") +logging.getLogger("awswrangler").setLevel(logging.DEBUG) +logging.getLogger("botocore.credentials").setLevel(logging.CRITICAL) + + +def test_default_session(): + boto3.setup_default_session(region_name="us-east-1") + assert wr._utils.ensure_session().region_name == "us-east-1" + boto3.setup_default_session(region_name="us-east-2") + assert wr._utils.ensure_session().region_name == "us-east-2" + boto3.setup_default_session(region_name="us-west-1") + assert wr._utils.ensure_session().region_name == "us-west-1" + boto3.setup_default_session(region_name="us-west-2") + assert wr._utils.ensure_session().region_name == "us-west-2" diff --git a/tox.ini b/tox.ini index 018d4cd87..288b563dc 100644 --- a/tox.ini +++ b/tox.ini @@ -5,13 +5,14 @@ envlist = py{37,38,36} deps = pytest pytest-xdist + pytest-timeout moto commands = - pytest -n 8 testing/test_awswrangler + pytest --timeout=900 -n 8 testing/test_awswrangler [testenv:py36] deps = {[testenv]deps} pytest-cov commands = - pytest --cov=awswrangler -n 8 testing/test_awswrangler + pytest --timeout=600 --cov=awswrangler -n 8 testing/test_awswrangler diff --git a/tutorials/01 - Introduction.ipynb b/tutorials/01 - Introduction.ipynb index 78691fd02..c78baf660 100644 --- a/tutorials/01 - Introduction.ipynb +++ b/tutorials/01 - Introduction.ipynb @@ -69,16 +69,16 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 1, "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "'1.0.2'" + "'1.2.0'" ] }, - "execution_count": 2, + "execution_count": 1, "metadata": {}, "output_type": "execute_result" } @@ -88,6 +88,13 @@ "\n", "wr.__version__" ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { diff --git a/tutorials/02 - Sessions.ipynb b/tutorials/02 - Sessions.ipynb index 97b376968..2ff88ad1a 100644 --- a/tutorials/02 - Sessions.ipynb +++ b/tutorials/02 - Sessions.ipynb @@ -19,7 +19,7 @@ "\n", "Wrangler will not store any kind of state internally, and users is in charge of all the Sessions management, if necessary.\n", "\n", - "Most Wrangler functions receive the optional `boto3_session` argument. If None is received, a default boto3 Session will be temporary created to run the function." + "Most Wrangler functions receive the optional `boto3_session` argument. If None is received, the default boto3 Session will be used." ] }, { @@ -36,7 +36,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Using the default Sessions" + "## Using the default Session" ] }, { @@ -63,7 +63,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "## Using custom Sessions" + "## Customizing and using the default Session" ] }, { @@ -83,15 +83,69 @@ } ], "source": [ - "wr.s3.does_object_exist(\"s3://noaa-ghcn-pds/fake\", boto3_session=boto3.Session(region_name=\"us-east-2\"))" + "boto3.setup_default_session(region_name=\"us-east-2\")\n", + "\n", + "wr.s3.does_object_exist(\"s3://noaa-ghcn-pds/fake\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Using a new custom Session" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": { + "collapsed": false, + "jupyter": { + "outputs_hidden": false + }, + "pycharm": { + "name": "#%%\n" + } + }, + "outputs": [ + { + "data": { + "text/plain": [ + "False" + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "my_session = boto3.Session(region_name=\"us-east-2\")\n", + "\n", + "wr.s3.does_object_exist(\"s3://noaa-ghcn-pds/fake\", boto3_session=my_session)" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 5, "metadata": {}, - "outputs": [], - "source": [] + "outputs": [ + { + "data": { + "text/plain": [ + "False" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "my_session = boto3.Session(region_name=\"us-east-2\")\n", + "\n", + "wr.s3.does_object_exist(\"s3://noaa-ghcn-pds/fake\", boto3_session=my_session)" + ] } ], "metadata": { diff --git a/tutorials/03 - Amazon S3.ipynb b/tutorials/03 - Amazon S3.ipynb index 8fdad9a54..90eff872e 100644 --- a/tutorials/03 - Amazon S3.ipynb +++ b/tutorials/03 - Amazon S3.ipynb @@ -82,7 +82,7 @@ "name": "stdin", "output_type": "stream", "text": [ - " ··········································\n" + " ············\n" ] } ], @@ -109,25 +109,13 @@ "cell_type": "code", "execution_count": 3, "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "{'paths': ['s3://aws-data-wrangler-test-bucket-ql9ou148dw6r/csv/file2.csv'],\n", - " 'partitions_values': {}}" - ] - }, - "execution_count": 3, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "path1 = f\"s3://{bucket}/csv/file1.csv\"\n", "path2 = f\"s3://{bucket}/csv/file2.csv\"\n", "\n", "wr.s3.to_csv(df1, path1, index=False)\n", - "wr.s3.to_csv(df2, path2, index=False)" + "wr.s3.to_csv(df2, path2, index=False);" ] }, { diff --git a/tutorials/04 - Parquet Datasets.ipynb b/tutorials/04 - Parquet Datasets.ipynb index 868cccf53..d9d44325d 100644 --- a/tutorials/04 - Parquet Datasets.ipynb +++ b/tutorials/04 - Parquet Datasets.ipynb @@ -50,7 +50,7 @@ "name": "stdin", "output_type": "stream", "text": [ - " ··········································\n" + " ············\n" ] } ], @@ -184,31 +184,31 @@ " \n", " \n", " 0\n", - " 3\n", - " bar\n", - " 2020-01-03\n", - " \n", - " \n", - " 1\n", " 1\n", " foo\n", " 2020-01-01\n", " \n", " \n", - " 2\n", + " 1\n", " 2\n", " boo\n", " 2020-01-02\n", " \n", + " \n", + " 2\n", + " 3\n", + " bar\n", + " 2020-01-03\n", + " \n", " \n", "\n", "" ], "text/plain": [ " id value date\n", - "0 3 bar 2020-01-03\n", - "1 1 foo 2020-01-01\n", - "2 2 boo 2020-01-02" + "0 1 foo 2020-01-01\n", + "1 2 boo 2020-01-02\n", + "2 3 bar 2020-01-03" ] }, "execution_count": 4, diff --git a/tutorials/14 - Schema Evolution.ipynb b/tutorials/14 - Schema Evolution.ipynb new file mode 100644 index 000000000..6dd01a633 --- /dev/null +++ b/tutorials/14 - Schema Evolution.ipynb @@ -0,0 +1,417 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "[![AWS Data Wrangler](_static/logo.png \"AWS Data Wrangler\")](https://github.com/awslabs/aws-data-wrangler)\n", + "\n", + "# 14 - Schema Evolution\n", + "\n", + "Wrangler support new **columns** on Parquet Dataset through:\n", + "\n", + "- [wr.s3.to_parquet()](https://aws-data-wrangler.readthedocs.io/en/latest/stubs/awswrangler.s3.to_parquet.html#awswrangler.s3.to_parquet)\n", + "- [wr.s3.store_parquet_metadata()](https://aws-data-wrangler.readthedocs.io/en/latest/stubs/awswrangler.s3.store_parquet_metadata.html#awswrangler.s3.store_parquet_metadata) i.e. \"Crawler\"" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "from datetime import date\n", + "import awswrangler as wr\n", + "import pandas as pd" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Enter your bucket name:" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdin", + "output_type": "stream", + "text": [ + " ············\n" + ] + } + ], + "source": [ + "import getpass\n", + "bucket = getpass.getpass()\n", + "path = f\"s3://{bucket}/dataset/\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Creating the Dataset" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
idvalue
01foo
12boo
\n", + "
" + ], + "text/plain": [ + " id value\n", + "0 1 foo\n", + "1 2 boo" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df = pd.DataFrame({\n", + " \"id\": [1, 2],\n", + " \"value\": [\"foo\", \"boo\"],\n", + "})\n", + "\n", + "wr.s3.to_parquet(\n", + " df=df,\n", + " path=path,\n", + " dataset=True,\n", + " mode=\"overwrite\",\n", + " database=\"aws_data_wrangler\",\n", + " table=\"my_table\"\n", + ")\n", + "\n", + "wr.s3.read_parquet(path, dataset=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Schema Version 0 on Glue Catalog (AWS Console)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "![Glue Console](_static/glue_catalog_version_0.png \"Glue Console\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Appending with NEW COLUMNS" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
idvaluedateflag
03bar2020-01-03True
14<NA>2020-01-04False
21fooNone<NA>
32booNone<NA>
\n", + "
" + ], + "text/plain": [ + " id value date flag\n", + "0 3 bar 2020-01-03 True\n", + "1 4 2020-01-04 False\n", + "2 1 foo None \n", + "3 2 boo None " + ] + }, + "execution_count": 4, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df = pd.DataFrame({\n", + " \"id\": [3, 4],\n", + " \"value\": [\"bar\", None],\n", + " \"date\": [date(2020, 1, 3), date(2020, 1, 4)],\n", + " \"flag\": [True, False]\n", + "})\n", + "\n", + "wr.s3.to_parquet(\n", + " df=df,\n", + " path=path,\n", + " dataset=True,\n", + " mode=\"append\",\n", + " database=\"aws_data_wrangler\",\n", + " table=\"my_table\",\n", + " catalog_versioning=True # Optional\n", + ")\n", + "\n", + "wr.s3.read_parquet(path, dataset=True, validate_schema=False)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Schema Version 1 on Glue Catalog (AWS Console)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "![Glue Console](_static/glue_catalog_version_1.png \"Glue Console\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Reading from Athena" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
idvaluedateflag
03bar2020-01-03True
14<NA>2020-01-04False
21fooNone<NA>
32booNone<NA>
\n", + "
" + ], + "text/plain": [ + " id value date flag\n", + "0 3 bar 2020-01-03 True\n", + "1 4 2020-01-04 False\n", + "2 1 foo None \n", + "3 2 boo None " + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "wr.athena.read_sql_table(table=\"my_table\", database=\"aws_data_wrangler\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Cleaning Up" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "wr.s3.delete_objects(path)\n", + "wr.catalog.delete_table_if_exists(table=\"my_table\", database=\"aws_data_wrangler\")" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "conda_python3", + "language": "python", + "name": "conda_python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.5" + }, + "pycharm": { + "stem_cell": { + "cell_type": "raw", + "metadata": { + "collapsed": false + }, + "source": [] + } + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/tutorials/_static/glue_catalog_version_0.png b/tutorials/_static/glue_catalog_version_0.png new file mode 100644 index 000000000..51a81ee76 Binary files /dev/null and b/tutorials/_static/glue_catalog_version_0.png differ diff --git a/tutorials/_static/glue_catalog_version_1.png b/tutorials/_static/glue_catalog_version_1.png new file mode 100644 index 000000000..c5646b9b5 Binary files /dev/null and b/tutorials/_static/glue_catalog_version_1.png differ