Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3f14923
add test cases for s3 delete objects
bryanyang0528 May 20, 2020
9c18f95
Merge pull request #248 from bryanyang0528/add_test_raise_s3_exception
igorborgest May 20, 2020
53db236
Add private subnet on cloudformation.yaml
igorborgest May 20, 2020
0ecf533
Fix get_region_from_subnet bug
May 21, 2020
fa0e0b1
Fix awswrangler/emr.py:939:121: E501 line too long (124 > 120 charact…
May 21, 2020
8de64d8
Merge pull request #253 from jiajie999/master
igorborgest May 21, 2020
dbb06ca
Decreasing MySQL size for tests.
igorborgest May 21, 2020
5227f4a
Add moto test (Minimal test) to GitHub Actions.
igorborgest May 21, 2020
89e0895
Removing pytest timeout from GitHub actions.
igorborgest May 21, 2020
3ab86c5
fixing get_region_from_subnet(). #252
igorborgest May 21, 2020
5c96f27
Removing parallelism from pytest on GitHub actions.
igorborgest May 21, 2020
6aa1a44
Add test cases for get bukcet, object exists, list dirs and list file…
bryanyang0528 May 21, 2020
fd695d4
Remove Moto Test from GitHub Action
igorborgest May 21, 2020
e4ac084
Merge pull request #255 from bryanyang0528/add_test_cases_for_s3_list…
igorborgest May 21, 2020
ed7a260
add tests for describe objects
bryanyang0528 May 23, 2020
3951348
refactor naming
bryanyang0528 May 23, 2020
524f59e
add test for no object
bryanyang0528 May 23, 2020
73b89c5
Merge pull request #257 from bryanyang0528/fixed_confusing_variable
igorborgest May 23, 2020
6658422
Merge pull request #258 from bryanyang0528/add_test_for_descibe_objects
igorborgest May 23, 2020
4977fbd
refactor naming
bryanyang0528 May 24, 2020
ce12472
add test cases
bryanyang0528 May 25, 2020
ac49441
fixed typo
bryanyang0528 May 25, 2020
dc2eb62
Merge pull request #262 from bryanyang0528/refactor_naming
igorborgest May 25, 2020
ae5b0d5
Merge pull request #260 from bryanyang0528/add_test_for_object_relate…
igorborgest May 25, 2020
83dc5ad
Fix bug to write partitions in reverse order.
igorborgest May 27, 2020
8cb058c
Add black code style to test_moto.py. #254
igorborgest May 27, 2020
3979587
Merge pull request #264 from awslabs/partitions-order
igorborgest May 27, 2020
7b5b489
Fix dtype argument documentation.
igorborgest May 27, 2020
cfe558e
Bumping version to 1.3.0.
igorborgest May 27, 2020
5ca2974
Bumping dependencies versions.
igorborgest May 27, 2020
f6aaf6f
Increasing micro versions range for SQLAlchemy. #259
igorborgest May 27, 2020
5e3a533
Add cast for nest types for wr.s3.to_parquet. #263
igorborgest May 27, 2020
ec85502
Add support to Partition Projection. :rocket:
igorborgest May 27, 2020
a660e59
Add Partition Projection tutorial (17).
igorborgest May 28, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ indent-string=' '
max-line-length=120

# Maximum number of lines in a module.
max-module-lines=2500
max-module-lines=4000

# List of optional constructs for which whitespace checking is disabled. `dict-
# separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}.
Expand Down Expand Up @@ -547,7 +547,7 @@ valid-metaclass-classmethod-first-arg=cls
[DESIGN]

# Maximum number of arguments for function / method.
max-args=15
max-args=20

# Maximum number of attributes for a class (see R0902).
max-attributes=7
Expand All @@ -556,10 +556,10 @@ max-attributes=7
max-bool-expr=5

# Maximum number of branch for function / method body.
max-branches=15
max-branches=20

# Maximum number of locals for function / method body.
max-locals=30
max-locals=35

# Maximum number of parents for a class (see R0901).
max-parents=7
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

![AWS Data Wrangler](docs/source/_static/logo2.png?raw=true "AWS Data Wrangler")

[![Release](https://img.shields.io/badge/release-1.2.0-brightgreen.svg)](https://pypi.org/project/awswrangler/)
[![Release](https://img.shields.io/badge/release-1.3.0-brightgreen.svg)](https://pypi.org/project/awswrangler/)
[![Python Version](https://img.shields.io/badge/python-3.6%20%7C%203.7%20%7C%203.8-brightgreen.svg)](https://anaconda.org/conda-forge/awswrangler)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
Expand Down Expand Up @@ -79,6 +79,7 @@ df = wr.db.read_sql_query("SELECT * FROM external_schema.my_table", con=engine)
- [14 - Schema Evolution](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/14%20-%20Schema%20Evolution.ipynb)
- [15 - EMR](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/15%20-%20EMR.ipynb)
- [16 - EMR & Docker](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/16%20-%20EMR%20%26%20Docker.ipynb)
- [17 - Partition Projection](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/17%20-%20Partition%20Projection.ipynb)
- [**API Reference**](https://aws-data-wrangler.readthedocs.io/en/latest/api.html)
- [Amazon S3](https://aws-data-wrangler.readthedocs.io/en/latest/api.html#amazon-s3)
- [AWS Glue Catalog](https://aws-data-wrangler.readthedocs.io/en/latest/api.html#aws-glue-catalog)
Expand Down
2 changes: 1 addition & 1 deletion awswrangler/__metadata__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@

__title__ = "awswrangler"
__description__ = "Pandas on AWS."
__version__ = "1.2.0"
__version__ = "1.3.0"
__license__ = "Apache License 2.0"
33 changes: 21 additions & 12 deletions awswrangler/_data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

def athena2pyarrow(dtype: str) -> pa.DataType: # pylint: disable=too-many-return-statements
"""Athena to PyArrow data types conversion."""
dtype = dtype.lower()
dtype = dtype.lower().replace(" ", "")
if dtype == "tinyint":
return pa.int8()
if dtype == "smallint":
Expand All @@ -47,6 +47,12 @@ def athena2pyarrow(dtype: str) -> pa.DataType: # pylint: disable=too-many-retur
if dtype.startswith("decimal"):
precision, scale = dtype.replace("decimal(", "").replace(")", "").split(sep=",")
return pa.decimal128(precision=int(precision), scale=int(scale))
if dtype.startswith("array"):
return pa.large_list(athena2pyarrow(dtype=dtype[6:-1]))
if dtype.startswith("struct"):
return pa.struct([(f.split(":", 1)[0], athena2pyarrow(f.split(":", 1)[1])) for f in dtype[7:-1].split(",")])
if dtype.startswith("map"): # pragma: no cover
return pa.map_(athena2pyarrow(dtype[4:-1].split(",", 1)[0]), athena2pyarrow(dtype[4:-1].split(",", 1)[1]))
raise exceptions.UnsupportedType(f"Unsupported Athena type: {dtype}") # pragma: no cover


Expand Down Expand Up @@ -77,8 +83,6 @@ def athena2pandas(dtype: str) -> str: # pylint: disable=too-many-branches,too-m
return "decimal"
if dtype in ("binary", "varbinary"):
return "bytes"
if dtype == "array": # pragma: no cover
return "list"
raise exceptions.UnsupportedType(f"Unsupported Athena type: {dtype}") # pragma: no cover


Expand Down Expand Up @@ -143,9 +147,9 @@ def pyarrow2athena(dtype: pa.DataType) -> str: # pylint: disable=too-many-branc
if pa.types.is_list(dtype):
return f"array<{pyarrow2athena(dtype=dtype.value_type)}>"
if pa.types.is_struct(dtype):
return f"struct<{', '.join([f'{f.name}:{pyarrow2athena(dtype=f.type)}' for f in dtype])}>"
return f"struct<{','.join([f'{f.name}:{pyarrow2athena(dtype=f.type)}' for f in dtype])}>"
if pa.types.is_map(dtype): # pragma: no cover
return f"map<{pyarrow2athena(dtype=dtype.key_type)},{pyarrow2athena(dtype=dtype.item_type)}>"
return f"map<{pyarrow2athena(dtype=dtype.key_type)}, {pyarrow2athena(dtype=dtype.item_type)}>"
if dtype == pa.null():
raise exceptions.UndetectedType("We can not infer the data type from an entire null object column")
raise exceptions.UnsupportedType(f"Unsupported Pyarrow type: {dtype}") # pragma: no cover
Expand Down Expand Up @@ -321,7 +325,7 @@ def athena_types_from_pandas(
athena_columns_types: Dict[str, str] = {}
for k, v in pa_columns_types.items():
if v is None:
athena_columns_types[k] = casts[k]
athena_columns_types[k] = casts[k].replace(" ", "")
else:
athena_columns_types[k] = pyarrow2athena(dtype=v)
_logger.debug("athena_columns_types: %s", athena_columns_types)
Expand All @@ -341,12 +345,12 @@ def athena_types_from_pandas_partitioned(
df=df, index=index, dtype=dtype, index_left=index_left
)
columns_types: Dict[str, str] = {}
for col, typ in athena_columns_types.items():
if col not in partitions:
columns_types[col] = typ
partitions_types: Dict[str, str] = {}
for k, v in athena_columns_types.items():
if k in partitions:
partitions_types[k] = v
else:
columns_types[k] = v
for par in partitions:
partitions_types[par] = athena_columns_types[par]
return columns_types, partitions_types


Expand Down Expand Up @@ -384,7 +388,12 @@ def athena_types_from_pyarrow_schema(
def cast_pandas_with_athena_types(df: pd.DataFrame, dtype: Dict[str, str]) -> pd.DataFrame:
"""Cast columns in a Pandas DataFrame."""
for col, athena_type in dtype.items():
if col in df.columns:
if (
(col in df.columns)
and (not athena_type.startswith("array"))
and (not athena_type.startswith("struct"))
and (not athena_type.startswith("map"))
):
pandas_type: str = athena2pandas(dtype=athena_type)
if pandas_type == "datetime64":
df[col] = pd.to_datetime(df[col])
Expand Down
17 changes: 15 additions & 2 deletions awswrangler/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,24 @@ def get_account_id(boto3_session: Optional[boto3.Session] = None) -> str:
return client(service_name="sts", session=session).get_caller_identity().get("Account")


def get_region_from_subnet(subnet_id: str, boto3_session: Optional[boto3.Session] = None) -> str:
def get_region_from_subnet(subnet_id: str, boto3_session: Optional[boto3.Session] = None) -> str: # pragma: no cover
"""Extract region from Subnet ID."""
session: boto3.Session = ensure_session(session=boto3_session)
client_ec2: boto3.client = client(service_name="ec2", session=session)
return client_ec2.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["AvailabilityZone"][:9]
return client_ec2.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["AvailabilityZone"][:-1]


def get_region_from_session(boto3_session: Optional[boto3.Session] = None, default_region: Optional[str] = None) -> str:
"""Extract region from session."""
session: boto3.Session = ensure_session(session=boto3_session)
region: Optional[str] = session.region_name
if region is not None:
return region
if default_region is not None: # pragma: no cover
return default_region
raise exceptions.InvalidArgument(
"There is no region_name defined on boto3, please configure it."
) # pragma: no cover


def extract_partitions_from_paths(
Expand Down
129 changes: 123 additions & 6 deletions awswrangler/catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ def create_parquet_table(
columns_comments: Optional[Dict[str, str]] = None,
mode: str = "overwrite",
catalog_versioning: bool = False,
projection_enabled: bool = False,
projection_types: Optional[Dict[str, str]] = None,
projection_ranges: Optional[Dict[str, str]] = None,
projection_values: Optional[Dict[str, str]] = None,
projection_intervals: Optional[Dict[str, str]] = None,
projection_digits: Optional[Dict[str, str]] = None,
boto3_session: Optional[boto3.Session] = None,
) -> None:
"""Create a Parquet Table (Metadata Only) in the AWS Glue Catalog.
Expand Down Expand Up @@ -124,6 +130,29 @@ def create_parquet_table(
'overwrite' to recreate any possible existing table or 'append' to keep any possible existing table.
catalog_versioning : bool
If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
projection_enabled : bool
Enable Partition Projection on Athena (https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html)
projection_types : Optional[Dict[str, str]]
Dictionary of partitions names and Athena projections types.
Valid types: "enum", "integer", "date", "injected"
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
(e.g. {'col_name': 'enum', 'col2_name': 'integer'})
projection_ranges: Optional[Dict[str, str]]
Dictionary of partitions names and Athena projections ranges.
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
(e.g. {'col_name': '0,10', 'col2_name': '-1,8675309'})
projection_values: Optional[Dict[str, str]]
Dictionary of partitions names and Athena projections values.
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
(e.g. {'col_name': 'A,B,Unknown', 'col2_name': 'foo,boo,bar'})
projection_intervals: Optional[Dict[str, str]]
Dictionary of partitions names and Athena projections intervals.
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
(e.g. {'col_name': '1', 'col2_name': '5'})
projection_digits: Optional[Dict[str, str]]
Dictionary of partitions names and Athena projections digits.
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
(e.g. {'col_name': '1', 'col2_name': '2'})
boto3_session : boto3.Session(), optional
Boto3 Session. The default boto3 session will be used if boto3_session receive None.

Expand All @@ -150,6 +179,7 @@ def create_parquet_table(
"""
table = sanitize_table_name(table=table)
partitions_types = {} if partitions_types is None else partitions_types

session: boto3.Session = _utils.ensure_session(session=boto3_session)
cat_table_input: Optional[Dict[str, Any]] = _get_table_input(database=database, table=table, boto3_session=session)
table_input: Dict[str, Any]
Expand Down Expand Up @@ -188,6 +218,13 @@ def create_parquet_table(
boto3_session=session,
table_input=table_input,
table_exist=table_exist,
partitions_types=partitions_types,
projection_enabled=projection_enabled,
projection_types=projection_types,
projection_ranges=projection_ranges,
projection_values=projection_values,
projection_intervals=projection_intervals,
projection_digits=projection_digits,
)


Expand Down Expand Up @@ -903,6 +940,12 @@ def create_csv_table(
catalog_versioning: bool = False,
sep: str = ",",
boto3_session: Optional[boto3.Session] = None,
projection_enabled: bool = False,
projection_types: Optional[Dict[str, str]] = None,
projection_ranges: Optional[Dict[str, str]] = None,
projection_values: Optional[Dict[str, str]] = None,
projection_intervals: Optional[Dict[str, str]] = None,
projection_digits: Optional[Dict[str, str]] = None,
) -> None:
"""Create a CSV Table (Metadata Only) in the AWS Glue Catalog.

Expand Down Expand Up @@ -934,6 +977,29 @@ def create_csv_table(
If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it.
sep : str
String of length 1. Field delimiter for the output file.
projection_enabled : bool
Enable Partition Projection on Athena (https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html)
projection_types : Optional[Dict[str, str]]
Dictionary of partitions names and Athena projections types.
Valid types: "enum", "integer", "date", "injected"
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
(e.g. {'col_name': 'enum', 'col2_name': 'integer'})
projection_ranges: Optional[Dict[str, str]]
Dictionary of partitions names and Athena projections ranges.
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
(e.g. {'col_name': '0,10', 'col2_name': '-1,8675309'})
projection_values: Optional[Dict[str, str]]
Dictionary of partitions names and Athena projections values.
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
(e.g. {'col_name': 'A,B,Unknown', 'col2_name': 'foo,boo,bar'})
projection_intervals: Optional[Dict[str, str]]
Dictionary of partitions names and Athena projections intervals.
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
(e.g. {'col_name': '1', 'col2_name': '5'})
projection_digits: Optional[Dict[str, str]]
Dictionary of partitions names and Athena projections digits.
https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html
(e.g. {'col_name': '1', 'col2_name': '2'})
boto3_session : boto3.Session(), optional
Boto3 Session. The default boto3 session will be used if boto3_session receive None.

Expand Down Expand Up @@ -980,27 +1046,77 @@ def create_csv_table(
boto3_session=session,
table_input=table_input,
table_exist=does_table_exist(database=database, table=table, boto3_session=session),
partitions_types=partitions_types,
projection_enabled=projection_enabled,
projection_types=projection_types,
projection_ranges=projection_ranges,
projection_values=projection_values,
projection_intervals=projection_intervals,
projection_digits=projection_digits,
)


def _create_table(
def _create_table( # pylint: disable=too-many-branches,too-many-statements
database: str,
table: str,
description: Optional[str],
parameters: Optional[Dict[str, str]],
columns_comments: Optional[Dict[str, str]],
mode: str,
catalog_versioning: bool,
boto3_session: Optional[boto3.Session],
table_input: Dict[str, Any],
table_exist: bool,
projection_enabled: bool,
partitions_types: Optional[Dict[str, str]] = None,
columns_comments: Optional[Dict[str, str]] = None,
projection_types: Optional[Dict[str, str]] = None,
projection_ranges: Optional[Dict[str, str]] = None,
projection_values: Optional[Dict[str, str]] = None,
projection_intervals: Optional[Dict[str, str]] = None,
projection_digits: Optional[Dict[str, str]] = None,
):
# Description
if description is not None:
table_input["Description"] = description
if parameters is not None:
for k, v in parameters.items():
table_input["Parameters"][k] = v
if columns_comments is not None:

# Parameters & Projection
parameters = parameters if parameters else {}
partitions_types = partitions_types if partitions_types else {}
projection_types = projection_types if projection_types else {}
projection_ranges = projection_ranges if projection_ranges else {}
projection_values = projection_values if projection_values else {}
projection_intervals = projection_intervals if projection_intervals else {}
projection_digits = projection_digits if projection_digits else {}
projection_types = {sanitize_column_name(k): v for k, v in projection_types.items()}
projection_ranges = {sanitize_column_name(k): v for k, v in projection_ranges.items()}
projection_values = {sanitize_column_name(k): v for k, v in projection_values.items()}
projection_intervals = {sanitize_column_name(k): v for k, v in projection_intervals.items()}
projection_digits = {sanitize_column_name(k): v for k, v in projection_digits.items()}
for k, v in partitions_types.items():
if v == "date":
table_input["Parameters"][f"projection.{k}.format"] = "yyyy-MM-dd"
elif v == "timestamp":
table_input["Parameters"][f"projection.{k}.format"] = "yyyy-MM-dd HH:mm:ss"
table_input["Parameters"][f"projection.{k}.interval.unit"] = "SECONDS"
table_input["Parameters"][f"projection.{k}.interval"] = "1"
for k, v in projection_types.items():
table_input["Parameters"][f"projection.{k}.type"] = v
for k, v in projection_ranges.items():
table_input["Parameters"][f"projection.{k}.range"] = v
for k, v in projection_values.items():
table_input["Parameters"][f"projection.{k}.values"] = v
for k, v in projection_intervals.items():
table_input["Parameters"][f"projection.{k}.interval"] = str(v)
for k, v in projection_digits.items():
table_input["Parameters"][f"projection.{k}.digits"] = str(v)
parameters["projection.enabled"] = "true" if projection_enabled is True else "false"
for k, v in parameters.items():
table_input["Parameters"][k] = v

# Column comments
columns_comments = columns_comments if columns_comments else {}
columns_comments = {sanitize_column_name(k): v for k, v in columns_comments.items()}
if columns_comments:
for col in table_input["StorageDescriptor"]["Columns"]:
name: str = col["Name"]
if name in columns_comments:
Expand All @@ -1009,6 +1125,7 @@ def _create_table(
name = par["Name"]
if name in columns_comments:
par["Comment"] = columns_comments[name]

session: boto3.Session = _utils.ensure_session(session=boto3_session)
client_glue: boto3.client = _utils.client(service_name="glue", session=session)
skip_archive: bool = not catalog_versioning
Expand Down
Loading