Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
8fd4966
initial draft
luigift Apr 18, 2020
863ba26
adding Pytorch as a development dependency
igorborgest Apr 19, 2020
2864dc0
Cleaning up initial draft
igorborgest Apr 19, 2020
4fed4c7
Add first test
igorborgest Apr 19, 2020
72c739c
add audio and image dataset
luigift Apr 19, 2020
f72810e
Add label_col to torch.SQLDataset
igorborgest Apr 20, 2020
bf1be07
Updating catersian product of pytest parameters
igorborgest Apr 20, 2020
1a41d18
Pivoting SQLDataset parser strategy to avoid cast losses.
igorborgest Apr 20, 2020
36c15e4
tested lambda & image datasets
luigift Apr 20, 2020
d4dcfc5
add audio test
luigift Apr 20, 2020
30dc2fa
Add test for torch.AudioS3Dataset
igorborgest Apr 20, 2020
0376aef
Add chunked=INTEGER option to ensure batch number of rows #192
igorborgest Apr 22, 2020
5a9a83f
s3 iterable dataset
luigift Apr 22, 2020
60232f4
add tutorial draft
luigift Apr 23, 2020
215fbd5
add torch extras_requirements to setuptools
luigift Apr 23, 2020
0ad9e4b
handle labels in S3IterableDataset
luigift Apr 23, 2020
5e72ddf
clear bucket in S3Iterable Dataset test
luigift Apr 23, 2020
5b399ac
update setuptools
luigift Apr 23, 2020
2db15b6
update pytorch tutorial
luigift Apr 23, 2020
5e647c6
Update tutorial
igorborgest Apr 23, 2020
b3d9fe2
parallel tests fix
luigift Apr 23, 2020
c091fa8
fix lint
luigift Apr 24, 2020
37b7f1e
update readme
luigift Apr 24, 2020
33d74c4
remove captalized requirement from docstring
luigift Apr 24, 2020
4b05b36
add torch requirements
luigift Apr 24, 2020
9ce624b
Add support to EMR with Docker
igorborgest Apr 25, 2020
c2db8cd
Add support to EMR with Docker #193
igorborgest Apr 25, 2020
487214f
Merge remote-tracking branch 'origin/emr-6' into emr-6
igorborgest Apr 25, 2020
9611a0a
Improve EMR tutorials #193
igorborgest Apr 25, 2020
3c3ca64
Splitting up the ecr_credentials to a individual function #193
igorborgest Apr 26, 2020
2eefb3a
Small update in the EMR tutorial
igorborgest Apr 26, 2020
86cdb30
fix init and docs
luigift Apr 26, 2020
b3c8c81
update tutorial
luigift Apr 26, 2020
f6927a4
rollback pytorch==1.5.0, due to torchaudio requirement
luigift Apr 27, 2020
f0f154b
Add wr.emr.submit_spark_step
igorborgest Apr 27, 2020
602bceb
Bumping version to 1.1.0
igorborgest Apr 27, 2020
a837658
Merge remote-tracking branch 'origin/master' into dev
igorborgest Apr 27, 2020
aeb8792
Improving the chunksize parser slicer algorithm
igorborgest Apr 27, 2020
5821c9a
Merge pull request #208 from awslabs/emr-6
igorborgest Apr 27, 2020
b34f648
Merge pull request #209 from awslabs/parquet-chunked
igorborgest Apr 27, 2020
797fba5
Update badges on README
igorborgest Apr 27, 2020
d9f107a
Add EMR tutorials to README
igorborgest Apr 27, 2020
3103e34
Merge branch 'dev' into pytorch
igorborgest Apr 27, 2020
7fd449e
Adapting to validations
igorborgest Apr 27, 2020
fd115d8
Bumping dev dependencies
igorborgest Apr 27, 2020
8fad37c
Bumping PyTorch libs versions
igorborgest Apr 27, 2020
85bfade
Replacing all f-string on logging commands
igorborgest Apr 27, 2020
910e3b6
100% test coverage on wr.torch
igorborgest Apr 27, 2020
be0d89e
Merge pull request #210 from awslabs/pytorch
igorborgest Apr 27, 2020
b4f6a36
Revisiting Athena encryption and workgroup #201
igorborgest Apr 28, 2020
a54a578
Merge pull request #212 from awslabs/athena-encryption
igorborgest Apr 28, 2020
2a26d4f
Decrease tox parallelism
igorborgest Apr 29, 2020
5298aaf
Add kms_key_id, max_file_size and region to Redshift Unload
igorborgest Apr 29, 2020
d4b27c6
Add KMS permission to Redshift Role
igorborgest Apr 29, 2020
924b0bb
Add Redshift tests
igorborgest Apr 29, 2020
ad22aea
Insignificant fix in _data_types.py
igorborgest Apr 29, 2020
0e068fe
Parquet chunksize now paginating on Pandas instead of PyArrow
igorborgest Apr 29, 2020
ca133a0
Linting
igorborgest Apr 29, 2020
e93fbfa
Merge pull request #214 from awslabs/redshift-unload
igorborgest Apr 29, 2020
e8660cb
Bumping dependencies versions
igorborgest May 2, 2020
b484ae1
Add support for query UUID columns on PostgreSQL and full NULL column…
igorborgest May 3, 2020
b748a35
Merge pull request #219 from awslabs/uuid-and-null
igorborgest May 3, 2020
08cf244
Add support to write nested types (array and struct).
igorborgest May 4, 2020
97f8763
Merge pull request #220 from awslabs/write-nested-types
igorborgest May 4, 2020
458bf26
Add keep_files and ctas_temp_table_name to wr.athena.read_*(). #203
igorborgest May 4, 2020
10ea9e8
Merge pull request #221 from awslabs/athena-args
igorborgest May 4, 2020
fe6f50b
Removing delete_table operations from catalog._create_table() and add…
igorborgest May 5, 2020
a6ba86c
add replace_filenames argument to wr.s3.copy_objects() #215
igorborgest May 5, 2020
5be05d3
Update README
igorborgest May 5, 2020
12d0f66
Updating requirements
igorborgest May 5, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions .github/workflows/static-checking.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@ jobs:
uses: actions/setup-python@v1
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install -r requirements-dev.txt
- name: Setup Environment
run: ./setup-dev-env.sh
- name: CloudFormation Lint
run: cfn-lint -t testing/cloudformation.yaml
- name: Documentation Lint
run: pydocstyle awswrangler/ --add-ignore=D204
run: pydocstyle awswrangler/ --add-ignore=D204,D403
- name: mypy check
run: mypy awswrangler
- name: Flake8 Lint
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ testing/*parameters-*.properties
testing/*requirements*.txt
testing/coverage/*
building/*requirements*.txt
building/arrow
building/lambda/arrow
/docs/coverage/
/docs/build/
/docs/source/_build/
Expand Down
3 changes: 2 additions & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ disable=print-statement,
comprehension-escape,
C0330,
C0103,
W1202
W1202,
too-few-public-methods

# Enable the message, report, category or checker with the given id(s). You can
# either give multiple identifier separated by comma (,) or put this option
Expand Down
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@

**NOTE**

We just released a new major version `1.0` with breaking changes. Please make sure that all your old projects has dependencies frozen on the desired version (e.g. `pip install awswrangler==0.3.2`).
Due the new major version `1.*.*` with breaking changes, please make sure that all your old projects has dependencies frozen on the desired version (e.g. `pip install awswrangler==0.3.2`).

---

![AWS Data Wrangler](docs/source/_static/logo2.png?raw=true "AWS Data Wrangler")

[![Release](https://img.shields.io/badge/release-1.0.4-brightgreen.svg)](https://pypi.org/project/awswrangler/)
[![Release](https://img.shields.io/badge/release-1.1.0-brightgreen.svg)](https://pypi.org/project/awswrangler/)
[![Python Version](https://img.shields.io/badge/python-3.6%20%7C%203.7%20%7C%203.8-brightgreen.svg)](https://anaconda.org/conda-forge/awswrangler)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![Checked with mypy](http://www.mypy-lang.org/static/mypy_badge.svg)](http://mypy-lang.org/)
[![Average time to resolve an issue](http://isitmaintained.com/badge/resolution/awslabs/aws-data-wrangler.svg)](http://isitmaintained.com/project/awslabs/aws-data-wrangler "Average time to resolve an issue")

[![Checked with mypy](http://www.mypy-lang.org/static/mypy_badge.svg)](http://mypy-lang.org/)
[![Coverage](https://img.shields.io/badge/coverage-100%25-brightgreen.svg)](https://pypi.org/project/awswrangler/)
![Static Checking](https://github.com/awslabs/aws-data-wrangler/workflows/Static%20Checking/badge.svg?branch=master)
[![Documentation Status](https://readthedocs.org/projects/aws-data-wrangler/badge/?version=latest)](https://aws-data-wrangler.readthedocs.io/?badge=latest)
Expand Down Expand Up @@ -85,6 +84,9 @@ df = wr.db.read_sql_query("SELECT * FROM external_schema.my_table", con=engine)
- [11 - CSV Datasets](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/11%20-%20CSV%20Datasets.ipynb)
- [12 - CSV Crawler](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/12%20-%20CSV%20Crawler.ipynb)
- [13 - Merging Datasets on S3](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/13%20-%20Merging%20Datasets%20on%20S3.ipynb)
- [14 - PyTorch](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/14%20-%20PyTorch.ipynb)
- [15 - EMR](https://github.com/awslabs/aws-data-wrangler/blob/dev/tutorials/15%20-%20EMR.ipynb)
- [16 - EMR & Docker](https://github.com/awslabs/aws-data-wrangler/blob/dev/tutorials/16%20-%20EMR%20%26%20Docker.ipynb)
- [**API Reference**](https://aws-data-wrangler.readthedocs.io/en/latest/api.html)
- [Amazon S3](https://aws-data-wrangler.readthedocs.io/en/latest/api.html#amazon-s3)
- [AWS Glue Catalog](https://aws-data-wrangler.readthedocs.io/en/latest/api.html#aws-glue-catalog)
Expand Down
5 changes: 5 additions & 0 deletions awswrangler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@
"""

import logging
from importlib.util import find_spec

from awswrangler import athena, catalog, cloudwatch, db, emr, exceptions, s3 # noqa
from awswrangler.__metadata__ import __description__, __license__, __title__, __version__ # noqa
from awswrangler._utils import get_account_id # noqa

if find_spec("torch") and find_spec("torchvision") and find_spec("torchaudio") and find_spec("PIL"):
from awswrangler import torch # noqa

logging.getLogger("awswrangler").addHandler(logging.NullHandler())
2 changes: 1 addition & 1 deletion awswrangler/__metadata__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@

__title__ = "awswrangler"
__description__ = "Pandas on AWS."
__version__ = "1.0.4"
__version__ = "1.1.0"
__license__ = "Apache License 2.0"
83 changes: 65 additions & 18 deletions awswrangler/_data_types.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Internal (private) Data Types Module."""

import logging
import re
from decimal import Decimal
from typing import Dict, List, Optional, Tuple
from typing import Any, Dict, List, Match, Optional, Sequence, Tuple

import pandas as pd # type: ignore
import pyarrow as pa # type: ignore
Expand Down Expand Up @@ -139,8 +140,10 @@ def pyarrow2athena(dtype: pa.DataType) -> str: # pylint: disable=too-many-branc
return f"decimal({dtype.precision},{dtype.scale})"
if pa.types.is_list(dtype):
return f"array<{pyarrow2athena(dtype=dtype.value_type)}>"
if pa.types.is_struct(dtype): # pragma: no cover
return f"struct<{', '.join([f'{f.name}: {pyarrow2athena(dtype=f.type)}' for f in dtype])}>"
if pa.types.is_struct(dtype):
return f"struct<{', '.join([f'{f.name}:{pyarrow2athena(dtype=f.type)}' for f in dtype])}>"
if pa.types.is_map(dtype): # pragma: no cover
return f"map<{pyarrow2athena(dtype=dtype.key_type)},{pyarrow2athena(dtype=dtype.item_type)}>"
if dtype == pa.null():
raise exceptions.UndetectedType("We can not infer the data type from an entire null object column")
raise exceptions.UnsupportedType(f"Unsupported Pyarrow type: {dtype}") # pragma: no cover
Expand All @@ -167,7 +170,7 @@ def pyarrow2pandas_extension( # pylint: disable=too-many-branches,too-many-retu

def pyarrow2sqlalchemy( # pylint: disable=too-many-branches,too-many-return-statements
dtype: pa.DataType, db_type: str
) -> VisitableType:
) -> Optional[VisitableType]:
"""Pyarrow to Athena data types conversion."""
if pa.types.is_int8(dtype):
return sqlalchemy.types.SmallInteger
Expand Down Expand Up @@ -207,14 +210,14 @@ def pyarrow2sqlalchemy( # pylint: disable=too-many-branches,too-many-return-sta
return sqlalchemy.types.Date
if pa.types.is_binary(dtype):
if db_type == "redshift":
raise exceptions.UnsupportedType(f"Binary columns are not supported for Redshift.") # pragma: no cover
raise exceptions.UnsupportedType("Binary columns are not supported for Redshift.") # pragma: no cover
return sqlalchemy.types.Binary
if pa.types.is_decimal(dtype):
return sqlalchemy.types.Numeric(precision=dtype.precision, scale=dtype.scale)
if pa.types.is_dictionary(dtype):
return pyarrow2sqlalchemy(dtype=dtype.value_type, db_type=db_type)
if dtype == pa.null(): # pragma: no cover
raise exceptions.UndetectedType("We can not infer the data type from an entire null object column")
return None
raise exceptions.UnsupportedType(f"Unsupported Pyarrow type: {dtype}") # pragma: no cover


Expand Down Expand Up @@ -243,12 +246,23 @@ def pyarrow_types_from_pandas(
else:
cols.append(name)

# Filling cols_dtypes and indexes
# Filling cols_dtypes
for col in cols:
_logger.debug("Inferring PyArrow type from column: %s", col)
try:
schema: pa.Schema = pa.Schema.from_pandas(df=df[[col]], preserve_index=False)
except pa.ArrowInvalid as ex: # pragma: no cover
cols_dtypes[col] = process_not_inferred_dtype(ex)
else:
cols_dtypes[col] = schema.field(col).type

# Filling indexes
indexes: List[str] = []
for field in pa.Schema.from_pandas(df=df[cols], preserve_index=index):
name = str(field.name)
cols_dtypes[name] = field.type
if (name not in df.columns) and (index is True):
if index is True:
for field in pa.Schema.from_pandas(df=df[[]], preserve_index=True):
name = str(field.name)
_logger.debug("Inferring PyArrow type from index: %s", name)
cols_dtypes[name] = field.type
indexes.append(name)

# Merging Index
Expand All @@ -257,10 +271,43 @@ def pyarrow_types_from_pandas(
# Filling schema
columns_types: Dict[str, pa.DataType]
columns_types = {n: cols_dtypes[n] for n in sorted_cols}
_logger.debug(f"columns_types: {columns_types}")
_logger.debug("columns_types: %s", columns_types)
return columns_types


def process_not_inferred_dtype(ex: pa.ArrowInvalid) -> pa.DataType:
"""Infer data type from PyArrow inference exception."""
ex_str = str(ex)
_logger.debug("PyArrow was not able to infer data type:\n%s", ex_str)
match: Optional[Match] = re.search(
pattern="Could not convert (.*) with type (.*): did not recognize "
"Python value type when inferring an Arrow data type",
string=ex_str,
)
if match is None:
raise ex # pragma: no cover
groups: Optional[Sequence[str]] = match.groups()
if groups is None:
raise ex # pragma: no cover
if len(groups) != 2:
raise ex # pragma: no cover
_logger.debug("groups: %s", groups)
type_str: str = groups[1]
if type_str == "UUID":
return pa.string()
raise ex # pragma: no cover


def process_not_inferred_array(ex: pa.ArrowInvalid, values: Any) -> pa.Array:
"""Infer `pyarrow.array` from PyArrow inference exception."""
dtype = process_not_inferred_dtype(ex=ex)
if dtype == pa.string():
array: pa.Array = pa.array(obj=[str(x) for x in values], type=dtype, safe=True)
else:
raise ex # pragma: no cover
return array


def athena_types_from_pandas(
df: pd.DataFrame, index: bool, dtype: Optional[Dict[str, str]] = None, index_left: bool = False
) -> Dict[str, str]:
Expand All @@ -275,7 +322,7 @@ def athena_types_from_pandas(
athena_columns_types[k] = casts[k]
else:
athena_columns_types[k] = pyarrow2athena(dtype=v)
_logger.debug(f"athena_columns_types: {athena_columns_types}")
_logger.debug("athena_columns_types: %s", athena_columns_types)
return athena_columns_types


Expand Down Expand Up @@ -315,7 +362,7 @@ def pyarrow_schema_from_pandas(
if (k in df.columns) and (k not in ignore):
columns_types[k] = athena2pyarrow(v)
columns_types = {k: v for k, v in columns_types.items() if v is not None}
_logger.debug(f"columns_types: {columns_types}")
_logger.debug("columns_types: %s", columns_types)
return pa.schema(fields=columns_types)


Expand All @@ -324,11 +371,11 @@ def athena_types_from_pyarrow_schema(
) -> Tuple[Dict[str, str], Optional[Dict[str, str]]]:
"""Extract the related Athena data types from any PyArrow Schema considering possible partitions."""
columns_types: Dict[str, str] = {str(f.name): pyarrow2athena(dtype=f.type) for f in schema}
_logger.debug(f"columns_types: {columns_types}")
_logger.debug("columns_types: %s", columns_types)
partitions_types: Optional[Dict[str, str]] = None
if partitions is not None:
partitions_types = {p.name: pyarrow2athena(p.dictionary.type) for p in partitions}
_logger.debug(f"partitions_types: {partitions_types}")
_logger.debug("partitions_types: %s", partitions_types)
return columns_types, partitions_types


Expand Down Expand Up @@ -372,7 +419,7 @@ def sqlalchemy_types_from_pandas(
df: pd.DataFrame, db_type: str, dtype: Optional[Dict[str, VisitableType]] = None
) -> Dict[str, VisitableType]:
"""Extract the related SQLAlchemy data types from any Pandas DataFrame."""
casts: Dict[str, VisitableType] = dtype if dtype else {}
casts: Dict[str, VisitableType] = dtype if dtype is not None else {}
pa_columns_types: Dict[str, Optional[pa.DataType]] = pyarrow_types_from_pandas(
df=df, index=False, ignore_cols=list(casts.keys())
)
Expand All @@ -382,5 +429,5 @@ def sqlalchemy_types_from_pandas(
sqlalchemy_columns_types[k] = casts[k]
else:
sqlalchemy_columns_types[k] = pyarrow2sqlalchemy(dtype=v, db_type=db_type)
_logger.debug(f"sqlalchemy_columns_types: {sqlalchemy_columns_types}")
_logger.debug("sqlalchemy_columns_types: %s", sqlalchemy_columns_types)
return sqlalchemy_columns_types
13 changes: 13 additions & 0 deletions awswrangler/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,16 @@ def ensure_postgresql_casts():
def get_directory(path: str) -> str:
"""Extract directory path."""
return path.rsplit(sep="/", maxsplit=1)[0] + "/"


def get_account_id(boto3_session: Optional[boto3.Session] = None) -> str:
"""Get Account ID."""
session: boto3.Session = ensure_session(session=boto3_session)
return client(service_name="sts", session=session).get_caller_identity().get("Account")


def get_region_from_subnet(subnet_id: str, boto3_session: Optional[boto3.Session] = None) -> str:
"""Extract region from Subnet ID."""
session: boto3.Session = ensure_session(session=boto3_session)
client_ec2: boto3.client = client(service_name="ec2", session=session)
return client_ec2.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["AvailabilityZone"][:9]
Loading