Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
cc41448
Add tags in the CloudFormation template.
igorborgest May 13, 2020
1e97689
Update requirements.sh
igorborgest May 13, 2020
babeff4
raise Exception if delete_objects failed
bryanyang0528 May 15, 2020
8f5211f
Merge pull request #238 from bryanyang0528/fixed_delete_fail_but_no_e…
igorborgest May 15, 2020
10091b7
Refactoring the parquet crawler
igorborgest May 15, 2020
2243aac
100% test coverage.
igorborgest May 15, 2020
ee6b05f
Add user-friendly msgs for Athena errors. #239
igorborgest May 15, 2020
c6e5a19
Fix Flake8 validation issues.
igorborgest May 15, 2020
7d2106f
Fix DataFrame sanitize for single files in to_parquet(). #240
igorborgest May 16, 2020
6330e37
Standardize SQLAlchemy types.
igorborgest May 17, 2020
a0d0777
First schema evolution propose for parquet datasets. #232
igorborgest May 18, 2020
6604c06
Add test_store_parquet_metadata_modes()
igorborgest May 19, 2020
9c36b70
Bumping s3fs micro version. #236
igorborgest May 19, 2020
c40e443
Merge pull request #243 from awslabs/schema-evolution
igorborgest May 19, 2020
78144ab
Fix typos in install doc
KeltonKarboviak May 19, 2020
4347d57
Merge pull request #244 from KeltonKarboviak/patch-1
igorborgest May 19, 2020
c8ea710
_utils: Make sure boto3's default session is used
mrshu May 19, 2020
144a8f4
Merge pull request #245 from mrshu/mrshu/use-boto3-default-session
igorborgest May 19, 2020
54b3386
Add tests to the new default session. #245
igorborgest May 19, 2020
3869b87
Merge pull request #246 from awslabs/default-session
igorborgest May 19, 2020
69894a2
Bumping version to 1.2.0
igorborgest May 19, 2020
2e2641b
Add Schema Evolution tutorial
igorborgest May 19, 2020
de6246f
Update docs
igorborgest May 19, 2020
585ce8e
Update tests timeout to 15 min.
igorborgest May 20, 2020
c9ec52c
Bumping Apache Arrow version on lambda layer.
igorborgest May 20, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/static-checking.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ jobs:
with:
python-version: ${{ matrix.python-version }}
- name: Setup Environment
run: ./setup-dev-env.sh
run: ./requirements.sh
- name: CloudFormation Lint
run: cfn-lint -t testing/cloudformation.yaml
- name: Documentation Lint
Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ We may ask you to sign a [Contributor License Agreement (CLA)](http://en.wikiped

* Then run the command bellow to install all dependencies:

`./setup-dev-env.sh`
`./requirements.sh`

* Go to the ``testing`` directory

Expand Down
12 changes: 3 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,9 @@
# AWS Data Wrangler
*Pandas on AWS*

---

**NOTE**

Due the new major version `1.0.0` with breaking changes, please make sure that all your old projects has dependencies frozen on the desired version (e.g. `pip install awswrangler==0.3.2`). You can always read the legacy docs [here](https://aws-data-wrangler.readthedocs.io/en/legacy/).

---

![AWS Data Wrangler](docs/source/_static/logo2.png?raw=true "AWS Data Wrangler")

[![Release](https://img.shields.io/badge/release-1.1.2-brightgreen.svg)](https://pypi.org/project/awswrangler/)
[![Release](https://img.shields.io/badge/release-1.2.0-brightgreen.svg)](https://pypi.org/project/awswrangler/)
[![Python Version](https://img.shields.io/badge/python-3.6%20%7C%203.7%20%7C%203.8-brightgreen.svg)](https://anaconda.org/conda-forge/awswrangler)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
Expand Down Expand Up @@ -84,6 +76,7 @@ df = wr.db.read_sql_query("SELECT * FROM external_schema.my_table", con=engine)
- [11 - CSV Datasets](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/11%20-%20CSV%20Datasets.ipynb)
- [12 - CSV Crawler](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/12%20-%20CSV%20Crawler.ipynb)
- [13 - Merging Datasets on S3](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/13%20-%20Merging%20Datasets%20on%20S3.ipynb)
- [14 - Schema Evolution](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/14%20-%20Schema%20Evolution.ipynb)
- [15 - EMR](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/15%20-%20EMR.ipynb)
- [16 - EMR & Docker](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/16%20-%20EMR%20%26%20Docker.ipynb)
- [**API Reference**](https://aws-data-wrangler.readthedocs.io/en/latest/api.html)
Expand All @@ -95,3 +88,4 @@ df = wr.db.read_sql_query("SELECT * FROM external_schema.my_table", con=engine)
- [CloudWatch Logs](https://aws-data-wrangler.readthedocs.io/en/latest/api.html#cloudwatch-logs)
- [**License**](https://github.com/awslabs/aws-data-wrangler/blob/master/LICENSE)
- [**Contributing**](https://github.com/awslabs/aws-data-wrangler/blob/master/CONTRIBUTING.md)
- [**Legacy Docs** (pre-1.0.0)](https://aws-data-wrangler.readthedocs.io/en/legacy/)
2 changes: 1 addition & 1 deletion awswrangler/__metadata__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@

__title__ = "awswrangler"
__description__ = "Pandas on AWS."
__version__ = "1.1.2"
__version__ = "1.2.0"
__license__ = "Apache License 2.0"
35 changes: 18 additions & 17 deletions awswrangler/_data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,25 +376,11 @@ def athena_types_from_pyarrow_schema(
_logger.debug("columns_types: %s", columns_types)
partitions_types: Optional[Dict[str, str]] = None
if partitions is not None:
partitions_types = {p.name: pyarrow2athena(p.dictionary.type) for p in partitions}
partitions_types = {p.name: pyarrow2athena(p.dictionary.type) for p in partitions} # pragma: no cover
_logger.debug("partitions_types: %s", partitions_types)
return columns_types, partitions_types


def athena_partitions_from_pyarrow_partitions(
path: str, partitions: pyarrow.parquet.ParquetPartitions
) -> Dict[str, List[str]]:
"""Extract the related Athena partitions values from any PyArrow Partitions."""
path = path if path[-1] == "/" else f"{path}/"
partitions_values: Dict[str, List[str]] = {}
names: List[str] = [p.name for p in partitions]
for values in zip(*[p.keys for p in partitions]):
suffix: str = "/".join([f"{n}={v}" for n, v in zip(names, values)])
suffix = suffix if suffix[-1] == "/" else f"{suffix}/"
partitions_values[f"{path}{suffix}"] = list(values)
return partitions_values


def cast_pandas_with_athena_types(df: pd.DataFrame, dtype: Dict[str, str]) -> pd.DataFrame:
"""Cast columns in a Pandas DataFrame."""
for col, athena_type in dtype.items():
Expand All @@ -410,10 +396,25 @@ def cast_pandas_with_athena_types(df: pd.DataFrame, dtype: Dict[str, str]) -> pd
df[col] = (
df[col]
.astype("string")
.apply(lambda x: Decimal(str(x)) if str(x) not in ("", "none", " ", "<NA>") else None)
.apply(lambda x: Decimal(str(x)) if str(x) not in ("", "none", "None", " ", "<NA>") else None)
)
elif pandas_type == "string":
curr_type: str = str(df[col].dtypes)
if curr_type.startswith("int") or curr_type.startswith("float"):
df[col] = df[col].astype(str).astype("string")
else:
df[col] = df[col].astype("string")
else:
df[col] = df[col].astype(pandas_type)
try:
df[col] = df[col].astype(pandas_type)
except TypeError as ex:
if "object cannot be converted to an IntegerDtype" not in str(ex):
raise ex # pragma: no cover
df[col] = (
df[col]
.apply(lambda x: int(x) if str(x) not in ("", "none", "None", " ", "<NA>") else None)
.astype(pandas_type)
)
return df


Expand Down
64 changes: 62 additions & 2 deletions awswrangler/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import math
import os
import random
from typing import Any, Dict, Generator, List, Optional, Tuple

import boto3 # type: ignore
Expand All @@ -11,14 +12,20 @@
import psycopg2 # type: ignore
import s3fs # type: ignore

logger: logging.Logger = logging.getLogger(__name__)
from awswrangler import exceptions

_logger: logging.Logger = logging.getLogger(__name__)


def ensure_session(session: Optional[boto3.Session] = None) -> boto3.Session:
"""Ensure that a valid boto3.Session will be returned."""
if session is not None:
return session
return boto3.Session()
# Ensure the boto3's default session is used so that its parameters can be
# set via boto3.setup_default_session()
if boto3.DEFAULT_SESSION is not None:
return boto3.DEFAULT_SESSION
return boto3.Session() # pragma: no cover


def client(service_name: str, session: Optional[boto3.Session] = None) -> boto3.client:
Expand Down Expand Up @@ -124,6 +131,8 @@ def chunkify(lst: List[Any], num_chunks: int = 1, max_length: Optional[int] = No
[[0, 1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]]

"""
if not lst:
return [] # pragma: no cover
n: int = num_chunks if max_length is None else int(math.ceil((float(len(lst)) / float(max_length))))
np_chunks = np.array_split(lst, n)
return [arr.tolist() for arr in np_chunks if len(arr) > 0]
Expand Down Expand Up @@ -179,3 +188,54 @@ def get_region_from_subnet(subnet_id: str, boto3_session: Optional[boto3.Session
session: boto3.Session = ensure_session(session=boto3_session)
client_ec2: boto3.client = client(service_name="ec2", session=session)
return client_ec2.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["AvailabilityZone"][:9]


def extract_partitions_from_paths(
path: str, paths: List[str]
) -> Tuple[Optional[Dict[str, str]], Optional[Dict[str, List[str]]]]:
"""Extract partitions from Amazon S3 paths."""
path = path if path.endswith("/") else f"{path}/"
partitions_types: Dict[str, str] = {}
partitions_values: Dict[str, List[str]] = {}
for p in paths:
if path not in p:
raise exceptions.InvalidArgumentValue(
f"Object {p} is not under the root path ({path})."
) # pragma: no cover
path_wo_filename: str = p.rpartition("/")[0] + "/"
if path_wo_filename not in partitions_values:
path_wo_prefix: str = p.replace(f"{path}/", "")
dirs: List[str] = [x for x in path_wo_prefix.split("/") if (x != "") and ("=" in x)]
if dirs:
values_tups: List[Tuple[str, str]] = [tuple(x.split("=")[:2]) for x in dirs] # type: ignore
values_dics: Dict[str, str] = dict(values_tups)
p_values: List[str] = list(values_dics.values())
p_types: Dict[str, str] = {x: "string" for x in values_dics.keys()}
if not partitions_types:
partitions_types = p_types
if p_values:
partitions_types = p_types
partitions_values[path_wo_filename] = p_values
elif p_types != partitions_types: # pragma: no cover
raise exceptions.InvalidSchemaConvergence(
f"At least two different partitions schema detected: {partitions_types} and {p_types}"
)
if not partitions_types:
return None, None
return partitions_types, partitions_values


def list_sampling(lst: List[Any], sampling: float) -> List[Any]:
"""Random List sampling."""
if sampling > 1.0 or sampling <= 0.0: # pragma: no cover
raise exceptions.InvalidArgumentValue(f"Argument <sampling> must be [0.0 < value <= 1.0]. {sampling} received.")
_len: int = len(lst)
if _len == 0:
return [] # pragma: no cover
num_samples: int = int(round(_len * sampling))
num_samples = _len if num_samples > _len else num_samples
num_samples = 1 if num_samples < 1 else num_samples
_logger.debug("_len: %s", _len)
_logger.debug("sampling: %s", sampling)
_logger.debug("num_samples: %s", num_samples)
return random.sample(population=lst, k=num_samples)
19 changes: 17 additions & 2 deletions awswrangler/athena.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ def repair_table(

"""
query = f"MSCK REPAIR TABLE `{table}`;"
if (database is not None) and (not database.startswith("`")):
database = f"`{database}`"
session: boto3.Session = _utils.ensure_session(session=boto3_session)
query_id = start_query_execution(
sql=query,
Expand Down Expand Up @@ -492,7 +494,7 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals,too-man
path: str = f"{_s3_output}/{name}"
ext_location: str = "\n" if wg_config["enforced"] is True else f",\n external_location = '{path}'\n"
sql = (
f"CREATE TABLE {name}\n"
f'CREATE TABLE "{name}"\n'
f"WITH(\n"
f" format = 'Parquet',\n"
f" parquet_compression = 'SNAPPY'"
Expand All @@ -512,7 +514,20 @@ def read_sql_query( # pylint: disable=too-many-branches,too-many-locals,too-man
boto3_session=session,
)
_logger.debug("query_id: %s", query_id)
query_response: Dict[str, Any] = wait_query(query_execution_id=query_id, boto3_session=session)
try:
query_response: Dict[str, Any] = wait_query(query_execution_id=query_id, boto3_session=session)
except exceptions.QueryFailed as ex:
if ctas_approach is True:
if "Column name not specified" in str(ex):
raise exceptions.InvalidArgumentValue(
"Please, define all columns names in your query. (E.g. 'SELECT MAX(col1) AS max_col1, ...')"
)
if "Column type is unknown" in str(ex):
raise exceptions.InvalidArgumentValue(
"Please, define all columns types in your query. "
"(E.g. 'SELECT CAST(NULL AS INTEGER) AS MY_COL, ...')"
)
raise ex # pragma: no cover
if query_response["QueryExecution"]["Status"]["State"] in ["FAILED", "CANCELLED"]: # pragma: no cover
reason: str = query_response["QueryExecution"]["Status"]["StateChangeReason"]
message_error: str = f"Query error: {reason}"
Expand Down
Loading