Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
40c9713
Adding Cepsa to "Who use" section
alvaropc Aug 18, 2020
548bc68
Added sanitize_table_name in _add.py
weishao-aws Aug 24, 2020
6086184
Added sanitize_table_name in _add.py
weishao-aws Aug 24, 2020
dfb5219
Update 002 - Sessions.ipynb
TerrellV Aug 24, 2020
afe06ce
Fix NaN in string columns issue #362 (#363)
isrsal Aug 25, 2020
96c8872
Bumping dev dependencies versions.
igorborgest Aug 25, 2020
f2ae307
Updating string cast.
igorborgest Aug 25, 2020
2bf9208
Improving concurrent table creation/append.
igorborgest Aug 25, 2020
6c803a4
Fix bug to delete more than 25 partitions.
igorborgest Aug 26, 2020
5cb2235
Improve duplicated column names exception message. #355
igorborgest Aug 26, 2020
afc11b5
Improve wr.s3.to_* arguments validation.
igorborgest Aug 26, 2020
6b28181
Improve pandas_kwargs docs/exceptions.
igorborgest Aug 26, 2020
11bd30b
Add "powered by Apache Arrow" in the README.
igorborgest Aug 26, 2020
c79f8b3
Add schema_evolution argument to s3.to_parquet(). #353
igorborgest Aug 26, 2020
0434ec7
Add support for timezone and index for wr.s3.read_parquet().
igorborgest Aug 27, 2020
9132009
Add pytest-timeout.
igorborgest Aug 27, 2020
20a341e
Updated with personal github page.
alvaropc Aug 27, 2020
966cabc
Adjust botocore.config.Config.
igorborgest Aug 27, 2020
3466b98
Apply black code style.
igorborgest Aug 27, 2020
08b4ff4
Merge branch 'dev' into alvaropc-add-cepsa
igorborgest Aug 27, 2020
1db0efb
Merge pull request #354 from alvaropc/alvaropc-add-cepsa
igorborgest Aug 27, 2020
fe93961
Update 'who uses' section.
igorborgest Aug 27, 2020
f1d424d
Fix typos in the docs.
igorborgest Aug 27, 2020
19dd685
Small improvements.
igorborgest Aug 27, 2020
50b80ae
Improve index recovery.
igorborgest Aug 27, 2020
c64e7e0
Add support for multiindex recovery.
igorborgest Aug 27, 2020
83de304
Bumping to version 1.9.0.
igorborgest Aug 27, 2020
2e7a3cb
Improving date cast for wr.s3.to_parquet(). #365
igorborgest Aug 28, 2020
8ee2a77
Updating mypy settings.
igorborgest Aug 28, 2020
fc89ef1
Replace s3fs dependency by builtin mechanisms. (#370)
igorborgest Aug 30, 2020
c0ae07a
S3 filesystem abstraction improvements.
igorborgest Aug 31, 2020
5b39a17
Improving s3 cache strategy.
igorborgest Aug 31, 2020
3f3a9b7
Fix s3 filesystem abstraction bugs.
igorborgest Aug 31, 2020
9710f61
Update get_table_types example (#371)
bppont Aug 31, 2020
3fad112
Improve async writing orchestration (multithreading).
igorborgest Aug 31, 2020
adb1319
Update tutorials.
igorborgest Aug 31, 2020
e178b90
Handling new Athena exception.
igorborgest Aug 31, 2020
2a41fe6
Increasing test timeout.
igorborgest Aug 31, 2020
460f71d
Bumping version to 1.9.0
igorborgest Sep 1, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/bug_report.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ A clear and concise description of what the bug is.
**To Reproduce**
Steps to reproduce the behavior. Also add details about Python and Wrangler's version and how the library was installed.

*P.S. Don't attach file. Please, prefer add code snippets directly in the message body.*
*P.S. Don't attach files. Please, prefer add code snippets directly in the message body.*
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/enhancement-request.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ A clear and concise description of what the problem is. Ex. I'm always frustrate
**Describe the solution you'd like**
A clear and concise description of what you want to happen.

*P.S. Don't attach file. Please, prefer add code snippets directly in the message body.*
*P.S. Don't attach files. Please, prefer add code snippets directly in the message body.*
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/feature_request.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ A clear and concise description of what the problem is. Ex. I'm always frustrate
**Describe the solution you'd like**
A clear and concise description of what you want to happen.

*P.S. Don't attach file. Please, prefer add code snippets directly in the message body.*
*P.S. Don't attach files. Please, prefer add code snippets directly in the message body.*
2 changes: 1 addition & 1 deletion .github/ISSUE_TEMPLATE/question.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ assignees: ''

---

*P.S. Don't attach file. Please, prefer add code snippets directly in the message body.*
*P.S. Don't attach files. Please, prefer add code snippets directly in the message body.*
2 changes: 1 addition & 1 deletion .github/workflows/static-checking.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ jobs:
- name: Black style
run: black --check --line-length 120 --target-version py36 awswrangler tests
- name: Imports order check (isort)
run: isort -rc --check-only awswrangler tests
run: isort --check-only awswrangler tests
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ dmypy.json
output/

# Development
dev/
/dev/
metrics/
python/

Expand Down
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ indent-string=' '
max-line-length=120

# Maximum number of lines in a module.
max-module-lines=1250
max-module-lines=1500

# List of optional constructs for which whitespace checking is disabled. `dict-
# separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}.
Expand Down
29 changes: 17 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

> An [AWS Professional Service](https://aws.amazon.com/professional-services/) open source initiative | aws-proserve-opensource@amazon.com

[![Release](https://img.shields.io/badge/release-1.8.1-brightgreen.svg)](https://pypi.org/project/awswrangler/)
[![Release](https://img.shields.io/badge/release-1.9.0-brightgreen.svg)](https://pypi.org/project/awswrangler/)
[![Python Version](https://img.shields.io/badge/python-3.6%20%7C%203.7%20%7C%203.8-brightgreen.svg)](https://anaconda.org/conda-forge/awswrangler)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
Expand All @@ -15,10 +15,12 @@
![Static Checking](https://github.com/awslabs/aws-data-wrangler/workflows/Static%20Checking/badge.svg?branch=master)
[![Documentation Status](https://readthedocs.org/projects/aws-data-wrangler/badge/?version=latest)](https://aws-data-wrangler.readthedocs.io/?badge=latest)

| Source | Downloads | Page | Installation Command |
|-----------|---------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------|--------------------------------------------|
| **PyPi** | [![PyPI Downloads](https://pepy.tech/badge/awswrangler)](https://pypi.org/project/awswrangler/) | [Link](https://pypi.org/project/awswrangler/) | `pip install awswrangler` |
| **Conda** | [![Conda Downloads](https://img.shields.io/conda/dn/conda-forge/awswrangler.svg)](https://anaconda.org/conda-forge/awswrangler) | [Link](https://anaconda.org/conda-forge/awswrangler) | `conda install -c conda-forge awswrangler` |
| Source | Downloads | Installation Command |
|--------|-----------|----------------------|
| **[PyPi](https://pypi.org/project/awswrangler/)** | [![PyPI Downloads](https://pepy.tech/badge/awswrangler)](https://pypi.org/project/awswrangler/) | `pip install awswrangler` |
| **[Conda](https://anaconda.org/conda-forge/awswrangler)** | [![Conda Downloads](https://img.shields.io/conda/dn/conda-forge/awswrangler.svg)](https://anaconda.org/conda-forge/awswrangler) | `conda install -c conda-forge awswrangler` |

Powered By [<img src="https://arrow.apache.org/img/arrow.png" width="200">](https://arrow.apache.org/powered_by/)

## Table of contents

Expand Down Expand Up @@ -121,10 +123,13 @@ Knowing which companies are using this library is important to help prioritize t

Please send a PR with your company name and @githubhandle if you may.

1. [Digio](https://www.digio.com.br/) [[@afonsomy](https://github.com/afonsomy)]
2. [Pier](https://www.pier.digital/) [[@flaviomax](https://github.com/flaviomax)]
3. [M4U](https://www.m4u.com.br/) [[@Thiago-Dantas](https://github.com/Thiago-Dantas)]
4. [Serasa Experian](https://www.serasaexperian.com.br/) [[@andre-marcos-perez](https://github.com/andre-marcos-perez)]
5. [LINE TV](https://www.linetv.tw/) [[@bryanyang0528](https://github.com/bryanyang0528)]
6. [OKRA Technologies](https://okra.ai) [[@JPFrancoia](https://github.com/JPFrancoia), [@schot](https://github.com/schot)]
7. [DNX](https://www.dnx.solutions/) [[@DNXLabs](https://github.com/DNXLabs)]
* [Amazon](https://www.amazon.com/)
* [AWS](https://aws.amazon.com/)
* [Cepsa](https://cepsa.com) [[@alvaropc](https://github.com/alvaropc)]
* [Digio](https://www.digio.com.br/) [[@afonsomy](https://github.com/afonsomy)]
* [DNX](https://www.dnx.solutions/) [[@DNXLabs](https://github.com/DNXLabs)]
* [LINE TV](https://www.linetv.tw/) [[@bryanyang0528](https://github.com/bryanyang0528)]
* [M4U](https://www.m4u.com.br/) [[@Thiago-Dantas](https://github.com/Thiago-Dantas)]
* [OKRA Technologies](https://okra.ai) [[@JPFrancoia](https://github.com/JPFrancoia), [@schot](https://github.com/schot)]
* [Pier](https://www.pier.digital/) [[@flaviomax](https://github.com/flaviomax)]
* [Serasa Experian](https://www.serasaexperian.com.br/) [[@andre-marcos-perez](https://github.com/andre-marcos-perez)]
3 changes: 0 additions & 3 deletions THIRD_PARTY.txt
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,6 @@ Copyright 2013-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
** pandas; version 1.1.0 -- https://pandas.pydata.org/
Copyright (c) 2008-2012, AQR Capital Management, LLC, Lambda Foundry, Inc. and PyData Development Team
All rights reserved.
** s3fs; version 4.2.0 -- https://s3fs.readthedocs.io/en/latest/
Copyright (c) 2016, Continuum Analytics, Inc. and contributors
All rights reserved.
** numpy; version 1.19.1 -- https://numpy.org/
Copyright (c) 2005-2020, NumPy Developers.
All rights reserved.
Expand Down
2 changes: 1 addition & 1 deletion awswrangler/__metadata__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@

__title__: str = "awswrangler"
__description__: str = "Pandas on AWS."
__version__: str = "1.8.1"
__version__: str = "1.9.0"
__license__: str = "Apache License 2.0"
20 changes: 10 additions & 10 deletions awswrangler/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import os
from typing import Any, Callable, Dict, List, NamedTuple, Optional, Tuple, Type, Union, cast

import pandas as pd # type: ignore
import pandas as pd

from awswrangler import _utils, exceptions

Expand All @@ -29,7 +29,7 @@ class _ConfigArg(NamedTuple):
"database": _ConfigArg(dtype=str, nullable=True),
"max_cache_query_inspections": _ConfigArg(dtype=int, nullable=False),
"max_cache_seconds": _ConfigArg(dtype=int, nullable=False),
"s3fs_block_size": _ConfigArg(dtype=int, nullable=False, enforced=True),
"s3_block_size": _ConfigArg(dtype=int, nullable=False, enforced=True),
}


Expand Down Expand Up @@ -138,8 +138,8 @@ def _apply_type(name: str, value: Any, dtype: Type[Union[str, bool, int]], nulla
exceptions.InvalidArgumentValue(f"{name} configuration does not accept a null value. Please pass {dtype}.")
try:
return dtype(value) if isinstance(value, dtype) is False else value
except ValueError:
raise exceptions.InvalidConfiguration(f"Config {name} must receive a {dtype} value.")
except ValueError as ex:
raise exceptions.InvalidConfiguration(f"Config {name} must receive a {dtype} value.") from ex

@staticmethod
def _is_null(value: _ConfigValueType) -> bool:
Expand Down Expand Up @@ -206,13 +206,13 @@ def max_cache_seconds(self, value: int) -> None:
self._set_config_value(key="max_cache_seconds", value=value)

@property
def s3fs_block_size(self) -> int:
"""Property s3fs_block_size."""
return cast(int, self["s3fs_block_size"])
def s3_block_size(self) -> int:
"""Property s3_block_size."""
return cast(int, self["s3_block_size"])

@s3fs_block_size.setter
def s3fs_block_size(self, value: int) -> None:
self._set_config_value(key="s3fs_block_size", value=value)
@s3_block_size.setter
def s3_block_size(self, value: int) -> None:
self._set_config_value(key="s3_block_size", value=value)


def _inject_config_doc(doc: Optional[str], available_configs: Tuple[str, ...]) -> str:
Expand Down
39 changes: 21 additions & 18 deletions awswrangler/_data_types.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
"""Internal (private) Data Types Module."""

import datetime
import logging
import re
from decimal import Decimal
from typing import Any, Dict, List, Match, Optional, Sequence, Tuple

import pandas as pd # type: ignore
import pyarrow as pa # type: ignore
import pyarrow.parquet # type: ignore
import sqlalchemy # type: ignore
import sqlalchemy.dialects.mysql # type: ignore
import sqlalchemy.dialects.postgresql # type: ignore
import sqlalchemy_redshift.dialect # type: ignore
from sqlalchemy.sql.visitors import VisitableType # type: ignore
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet
import sqlalchemy
import sqlalchemy.dialects.mysql
import sqlalchemy.dialects.postgresql
import sqlalchemy_redshift.dialect
from sqlalchemy.sql.visitors import VisitableType

from awswrangler import _utils, exceptions

Expand Down Expand Up @@ -444,27 +446,28 @@ def _normalize_pandas_dtype_name(dtype: str) -> str:
return dtype


def _cast2date(value: Any) -> Any:
if isinstance(value, float) and (np.isnan(value) or np.isinf(value)):
return None
if pd.isna(value) or value is None:
return None
if isinstance(value, datetime.date):
return value
return pd.to_datetime(value).date()


def _cast_pandas_column(df: pd.DataFrame, col: str, current_type: str, desired_type: str) -> pd.DataFrame:
if desired_type == "datetime64":
df[col] = pd.to_datetime(df[col])
elif desired_type == "date":
df[col] = pd.to_datetime(df[col]).dt.date.replace(to_replace={pd.NaT: None})
df[col] = df[col].apply(lambda x: _cast2date(value=x)).replace(to_replace={pd.NaT: None})
elif desired_type == "bytes":
df[col] = df[col].astype("string").str.encode(encoding="utf-8").replace(to_replace={pd.NA: None})
elif desired_type == "decimal":
# First cast to string
df = _cast_pandas_column(df=df, col=col, current_type=current_type, desired_type="string")
# Then cast to decimal
df[col] = df[col].apply(lambda x: Decimal(str(x)) if str(x) not in ("", "none", "None", " ", "<NA>") else None)
elif desired_type == "string":
if current_type.lower().startswith("int") is True:
df[col] = df[col].astype(str).astype("string")
elif current_type.startswith("float") is True:
df[col] = df[col].astype(str).astype("string")
elif current_type in ("object", "category"):
df[col] = df[col].astype(str).astype("string")
else:
df[col] = df[col].astype("string")
else:
try:
df[col] = df[col].astype(desired_type)
Expand Down
97 changes: 55 additions & 42 deletions awswrangler/_utils.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
"""Internal (private) Utilities Module."""

import copy
import itertools
import logging
import math
import os
import random
import time
from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Union, cast
from concurrent.futures import FIRST_COMPLETED, Future, wait
from typing import Any, Callable, Dict, Generator, List, Optional, Sequence, Tuple, Union, cast

import boto3 # type: ignore
import botocore.config # type: ignore
import numpy as np # type: ignore
import pandas as pd # type: ignore
import psycopg2 # type: ignore
import s3fs # type: ignore
import boto3
import botocore.config
import numpy as np
import pandas as pd
import psycopg2

from awswrangler import exceptions
from awswrangler._config import apply_configs

_logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -63,14 +63,20 @@ def boto3_from_primitives(primitives: Optional[Boto3PrimitivesType] = None) -> b
def client(service_name: str, session: Optional[boto3.Session] = None) -> boto3.client:
"""Create a valid boto3.client."""
return ensure_session(session=session).client(
service_name=service_name, use_ssl=True, config=botocore.config.Config(retries={"max_attempts": 15})
service_name=service_name,
use_ssl=True,
config=botocore.config.Config(retries={"max_attempts": 10}, connect_timeout=10, max_pool_connections=30),
)


def resource(service_name: str, session: Optional[boto3.Session] = None) -> boto3.resource:
"""Create a valid boto3.resource."""
return ensure_session(session=session).resource(
service_name=service_name, use_ssl=True, config=botocore.config.Config(retries={"max_attempts": 15})
service_name=service_name,
use_ssl=True,
config=botocore.config.Config(
retries={"max_attempts": 10, "mode": "adaptive"}, connect_timeout=10, max_pool_connections=30
),
)


Expand Down Expand Up @@ -172,37 +178,6 @@ def chunkify(lst: List[Any], num_chunks: int = 1, max_length: Optional[int] = No
return [arr.tolist() for arr in np_chunks if len(arr) > 0]


@apply_configs
def get_fs(
s3fs_block_size: int,
session: Optional[Union[boto3.Session, Dict[str, Optional[str]]]] = None,
s3_additional_kwargs: Optional[Dict[str, str]] = None,
) -> s3fs.S3FileSystem:
"""Build a S3FileSystem from a given boto3 session."""
fs: s3fs.S3FileSystem = s3fs.S3FileSystem(
anon=False,
use_ssl=True,
default_cache_type="readahead",
default_fill_cache=False,
default_block_size=s3fs_block_size,
config_kwargs={"retries": {"max_attempts": 15}},
session=ensure_session(session=session)._session, # pylint: disable=protected-access
s3_additional_kwargs=s3_additional_kwargs,
use_listings_cache=False,
skip_instance_cache=True,
)
fs.invalidate_cache()
fs.clear_instance_cache()
return fs


def open_file(fs: s3fs.S3FileSystem, **kwargs: Any) -> Any:
"""Open s3fs file with retries to overcome eventual consistency."""
fs.invalidate_cache()
fs.clear_instance_cache()
return try_it(f=fs.open, ex=FileNotFoundError, **kwargs)


def empty_generator() -> Generator[None, None, None]:
"""Empty Generator."""
yield from ()
Expand Down Expand Up @@ -276,7 +251,13 @@ def check_duplicated_columns(df: pd.DataFrame) -> Any:
"""Raise an exception if there are duplicated columns names."""
duplicated: List[str] = df.loc[:, df.columns.duplicated()].columns.to_list()
if duplicated:
raise exceptions.InvalidDataFrame(f"There is duplicated column names in your DataFrame: {duplicated}")
raise exceptions.InvalidDataFrame(
f"There are duplicated column names in your DataFrame: {duplicated}. "
f"Note that your columns may have been sanitized and it can be the cause of "
f"the duplicity. Wrangler sanitization removes all special characters and "
f"also converts CamelCase to snake_case. So you must avoid columns like "
f"['MyCol', 'my_col'] in your DataFrame."
)


def try_it(f: Callable[..., Any], ex: Any, base: float = 1.0, max_num_tries: int = 3, **kwargs: Any) -> Any:
Expand All @@ -294,3 +275,35 @@ def try_it(f: Callable[..., Any], ex: Any, base: float = 1.0, max_num_tries: int
delay = random.uniform(base, delay * 3)
_logger.error("Retrying %s | Fail number %s/%s | Exception: %s", f, i + 1, max_num_tries, exception)
time.sleep(delay)


def get_even_chunks_sizes(total_size: int, chunk_size: int, upper_bound: bool) -> Tuple[int, ...]:
"""Calculate even chunks sizes (Best effort)."""
round_func: Callable[[float], float] = math.ceil if upper_bound is True else math.floor
num_chunks: int = int(round_func(float(total_size) / float(chunk_size)))
num_chunks = 1 if num_chunks < 1 else num_chunks
base_size: int = int(total_size / num_chunks)
rest: int = total_size % num_chunks
sizes: List[int] = list(itertools.repeat(base_size, num_chunks))
for i in range(rest):
i_cycled: int = i % len(sizes)
sizes[i_cycled] += 1
return tuple(sizes)


def get_running_futures(seq: Sequence[Future]) -> Tuple[Future, ...]: # type: ignore
"""Filter only running futures."""
return tuple(f for f in seq if f.running())


def wait_any_future_available(seq: Sequence[Future]) -> None: # type: ignore
"""Wait until any future became available."""
wait(fs=seq, timeout=None, return_when=FIRST_COMPLETED)


def block_waiting_available_thread(seq: Sequence[Future], max_workers: int) -> None: # type: ignore
"""Block until any thread became available."""
running: Tuple[Future, ...] = get_running_futures(seq=seq) # type: ignore
while len(running) >= max_workers:
wait_any_future_available(seq=running)
running = get_running_futures(seq=running)
10 changes: 7 additions & 3 deletions awswrangler/athena/_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import uuid
from typing import Any, Dict, Iterator, List, Match, NamedTuple, Optional, Union

import boto3 # type: ignore
import botocore.exceptions # type: ignore
import pandas as pd # type: ignore
import boto3
import botocore.exceptions
import pandas as pd

from awswrangler import _utils, catalog, exceptions, s3
from awswrangler._config import apply_configs
Expand Down Expand Up @@ -365,6 +365,10 @@ def _resolve_query_without_cache_ctas(
)
except botocore.exceptions.ClientError as ex:
error: Dict[str, Any] = ex.response["Error"]
if error["Code"] == "InvalidRequestException" and "Exception parsing query" in error["Message"]:
raise exceptions.InvalidCtasApproachQuery(
"Is not possible to wrap this query into a CTAS statement. Please use ctas_approach=False."
)
if error["Code"] == "InvalidRequestException" and "extraneous input" in error["Message"]:
raise exceptions.InvalidCtasApproachQuery(
"Is not possible to wrap this query into a CTAS statement. Please use ctas_approach=False."
Expand Down
Loading