Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9893b74
Initial Commit
jaidisido Jan 27, 2021
ea9986d
Minor - Refactoring Work Units Logic
jaidisido Jan 28, 2021
f235e7d
Major - Checkpoint w/ functional read code/example
jaidisido Jan 29, 2021
df1bfb7
Initial Commit
jaidisido Jan 27, 2021
9630dd9
Minor - Refactoring Work Units Logic
jaidisido Jan 28, 2021
55f624d
Major - Checkpoint w/ functional read code/example
jaidisido Jan 29, 2021
3ec606b
Merge branch 'feature/lf-transactions' of https://github.com/awslabs/…
jaidisido Jan 29, 2021
8a501c9
Minor - Removing unnecessary ensure_session
jaidisido Jan 31, 2021
f3015b2
Minor - Adding changes from comments and review
jaidisido Feb 1, 2021
e42eb44
Minor - Adding Abort, Begin, Commit and Extend transactions
jaidisido Feb 13, 2021
1ca72b1
Merge branch 'main-governed-tables' into feature/lf-transactions
jaidisido Feb 13, 2021
e7ad4c8
Minor - Adding missing functions
jaidisido Feb 13, 2021
ed2f1b1
Merge branch 'feature/lf-transactions' of https://github.com/awslabs/…
jaidisido Feb 13, 2021
dca75f1
Minor - Adding missing @property
jaidisido Feb 13, 2021
cea72c4
Minor - Disable too many public methods
jaidisido Feb 13, 2021
e47b066
Minor - Checkpoint
jaidisido Feb 14, 2021
4970a39
Major - Governed tables write operations tested
jaidisido Feb 15, 2021
be33b0d
Minor - Adding validate flow on branches
jaidisido Feb 15, 2021
424048e
Minor - reducing static checks
jaidisido Feb 15, 2021
e6eca7b
Minor - Adding to_csv code
jaidisido Feb 20, 2021
866084b
Minor - Disabling too-many-branches
jaidisido Feb 20, 2021
65a5b09
Major - Ready for release
jaidisido Feb 22, 2021
78194ed
Minor - Proofreading
jaidisido Feb 23, 2021
e7cc97f
Minor - Removing needless use_threads argument
jaidisido Feb 23, 2021
9bbd007
Minor - Removing the need to specify table_type when table is already…
jaidisido Feb 25, 2021
16788e3
Minor - Fixing _catalog_id call
jaidisido Feb 25, 2021
6e48f61
Minor - Clarifying SQL filter operation
jaidisido Feb 25, 2021
1108178
Minor - Removing type ignore
jaidisido Feb 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,17 @@ or

``./cloudformation/delete-databases.sh``

### Enabling Lake Formation:
If your feature is related to AWS Lake Formation, there are a number of additional steps required in order to complete testing:

1. In the AWS console, enable Lake Formation by setting your IAM role as an Administrator and by unchecking the boxes in the ``Data Catalog Settings`` section

2. In the ``./cloudformation/base.yaml`` template file, set ``EnableLakeFormation`` to ``True``. Then run the ``./deploy-base.sh`` once more to add an AWS Glue Database and an S3 bucket registered with Lake Formation

3. Back in the console, in the ``Data Locations`` section, grant your IAM role access to the S3 Lake Formation bucket (``s3://aws-wrangler-base-lakeformation...``)

4. Finally, in the ``Data Permissions`` section, grant your IAM role ``Super`` permissions on both the ``aws_data_wrangler`` and ``aws_data_wrangler_lakeformation`` databases

## Recommended Visual Studio Code Recommended setting

```json
Expand Down
2 changes: 2 additions & 0 deletions awswrangler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
dynamodb,
emr,
exceptions,
lakeformation,
mysql,
postgresql,
quicksight,
Expand All @@ -40,6 +41,7 @@
"s3",
"sts",
"redshift",
"lakeformation",
"mysql",
"postgresql",
"secretsmanager",
Expand Down
13 changes: 12 additions & 1 deletion awswrangler/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,13 @@ class _ConfigArg(NamedTuple):
"redshift_endpoint_url": _ConfigArg(dtype=str, nullable=True, enforced=True),
"kms_endpoint_url": _ConfigArg(dtype=str, nullable=True, enforced=True),
"emr_endpoint_url": _ConfigArg(dtype=str, nullable=True, enforced=True),
"lakeformation_endpoint_url": _ConfigArg(dtype=str, nullable=True, enforced=True),
# Botocore config
"botocore_config": _ConfigArg(dtype=botocore.config.Config, nullable=True),
}


class _Config: # pylint: disable=too-many-instance-attributes
class _Config: # pylint: disable=too-many-instance-attributes,too-many-public-methods
"""Wrangler's Configuration class."""

def __init__(self) -> None:
Expand All @@ -60,6 +61,7 @@ def __init__(self) -> None:
self.redshift_endpoint_url = None
self.kms_endpoint_url = None
self.emr_endpoint_url = None
self.lakeformation_endpoint_url = None
self.botocore_config = None
for name in _CONFIG_ARGS:
self._load_config(name=name)
Expand Down Expand Up @@ -342,6 +344,15 @@ def emr_endpoint_url(self) -> Optional[str]:
def emr_endpoint_url(self, value: Optional[str]) -> None:
self._set_config_value(key="emr_endpoint_url", value=value)

@property
def lakeformation_endpoint_url(self) -> Optional[str]:
"""Property lakeformation_endpoint_url."""
return cast(Optional[str], self["lakeformation_endpoint_url"])

@lakeformation_endpoint_url.setter
def lakeformation_endpoint_url(self, value: Optional[str]) -> None:
self._set_config_value(key="lakeformation_endpoint_url", value=value)

@property
def botocore_config(self) -> botocore.config.Config:
"""Property botocore_config."""
Expand Down
2 changes: 2 additions & 0 deletions awswrangler/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ def _get_endpoint_url(service_name: str) -> Optional[str]:
endpoint_url = _config.config.kms_endpoint_url
elif service_name == "emr" and _config.config.emr_endpoint_url is not None:
endpoint_url = _config.config.emr_endpoint_url
elif service_name == "lakeformation" and _config.config.lakeformation_endpoint_url is not None:
endpoint_url = _config.config.lakeformation_endpoint_url
return endpoint_url


Expand Down
4 changes: 2 additions & 2 deletions awswrangler/athena/_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,8 +761,8 @@ def read_sql_query(

>>> import awswrangler as wr
>>> df = wr.athena.read_sql_query(
... sql="SELECT * FROM my_table WHERE name=:name;",
... params={"name": "filtered_name"}
... sql="SELECT * FROM my_table WHERE name=:name; AND city=:city;",
... params={"name": "'filtered_name'", "city": "'filtered_city'"}
... )

"""
Expand Down
20 changes: 18 additions & 2 deletions awswrangler/catalog/_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def _create_table( # pylint: disable=too-many-branches,too-many-statements
catalog_versioning: bool,
boto3_session: Optional[boto3.Session],
table_input: Dict[str, Any],
table_type: Optional[str],
table_exist: bool,
projection_enabled: bool,
partitions_types: Optional[Dict[str, str]],
Expand Down Expand Up @@ -118,7 +119,8 @@ def _create_table( # pylint: disable=too-many-branches,too-many-statements
f"{mode} is not a valid mode. It must be 'overwrite', 'append' or 'overwrite_partitions'."
)
if table_exist is True and mode == "overwrite":
delete_all_partitions(table=table, database=database, catalog_id=catalog_id, boto3_session=session)
if table_type != "GOVERNED":
delete_all_partitions(table=table, database=database, catalog_id=catalog_id, boto3_session=session)
_logger.debug("Updating table (%s)...", mode)
client_glue.update_table(
**_catalog_id(
Expand Down Expand Up @@ -214,6 +216,7 @@ def _create_parquet_table(
table: str,
path: str,
columns_types: Dict[str, str],
table_type: Optional[str],
partitions_types: Optional[Dict[str, str]],
bucketing_info: Optional[Tuple[List[str], int]],
catalog_id: Optional[str],
Expand Down Expand Up @@ -253,6 +256,7 @@ def _create_parquet_table(
table=table,
path=path,
columns_types=columns_types,
table_type=table_type,
partitions_types=partitions_types,
bucketing_info=bucketing_info,
compression=compression,
Expand All @@ -269,6 +273,7 @@ def _create_parquet_table(
catalog_versioning=catalog_versioning,
boto3_session=boto3_session,
table_input=table_input,
table_type=table_type,
table_exist=table_exist,
partitions_types=partitions_types,
projection_enabled=projection_enabled,
Expand All @@ -284,8 +289,9 @@ def _create_parquet_table(
def _create_csv_table(
database: str,
table: str,
path: str,
path: Optional[str],
columns_types: Dict[str, str],
table_type: Optional[str],
partitions_types: Optional[Dict[str, str]],
bucketing_info: Optional[Tuple[List[str], int]],
description: Optional[str],
Expand Down Expand Up @@ -324,6 +330,7 @@ def _create_csv_table(
table=table,
path=path,
columns_types=columns_types,
table_type=table_type,
partitions_types=partitions_types,
bucketing_info=bucketing_info,
compression=compression,
Expand All @@ -342,6 +349,7 @@ def _create_csv_table(
catalog_versioning=catalog_versioning,
boto3_session=boto3_session,
table_input=table_input,
table_type=table_type,
table_exist=table_exist,
partitions_types=partitions_types,
projection_enabled=projection_enabled,
Expand Down Expand Up @@ -519,6 +527,7 @@ def create_parquet_table(
table: str,
path: str,
columns_types: Dict[str, str],
table_type: Optional[str] = None,
partitions_types: Optional[Dict[str, str]] = None,
bucketing_info: Optional[Tuple[List[str], int]] = None,
catalog_id: Optional[str] = None,
Expand Down Expand Up @@ -550,6 +559,8 @@ def create_parquet_table(
Amazon S3 path (e.g. s3://bucket/prefix/).
columns_types: Dict[str, str]
Dictionary with keys as column names and values as data types (e.g. {'col0': 'bigint', 'col1': 'double'}).
table_type: str, optional
The type of the Glue Table (EXTERNAL_TABLE, GOVERNED...). Set to EXTERNAL_TABLE if None
partitions_types: Dict[str, str], optional
Dictionary with keys as partition names and values as data types (e.g. {'col2': 'date'}).
bucketing_info: Tuple[List[str], int], optional
Expand Down Expand Up @@ -627,6 +638,7 @@ def create_parquet_table(
table=table,
path=path,
columns_types=columns_types,
table_type=table_type,
partitions_types=partitions_types,
bucketing_info=bucketing_info,
catalog_id=catalog_id,
Expand All @@ -653,6 +665,7 @@ def create_csv_table(
table: str,
path: str,
columns_types: Dict[str, str],
table_type: Optional[str] = None,
partitions_types: Optional[Dict[str, str]] = None,
bucketing_info: Optional[Tuple[List[str], int]] = None,
compression: Optional[str] = None,
Expand Down Expand Up @@ -686,6 +699,8 @@ def create_csv_table(
Amazon S3 path (e.g. s3://bucket/prefix/).
columns_types: Dict[str, str]
Dictionary with keys as column names and values as data types (e.g. {'col0': 'bigint', 'col1': 'double'}).
table_type: str, optional
The type of the Glue Table (EXTERNAL_TABLE, GOVERNED...). Set to EXTERNAL_TABLE if None
partitions_types: Dict[str, str], optional
Dictionary with keys as partition names and values as data types (e.g. {'col2': 'date'}).
bucketing_info: Tuple[List[str], int], optional
Expand Down Expand Up @@ -767,6 +782,7 @@ def create_csv_table(
table=table,
path=path,
columns_types=columns_types,
table_type=table_type,
partitions_types=partitions_types,
bucketing_info=bucketing_info,
catalog_id=catalog_id,
Expand Down
8 changes: 5 additions & 3 deletions awswrangler/catalog/_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def _parquet_table_definition(
table: str,
path: str,
columns_types: Dict[str, str],
table_type: Optional[str],
partitions_types: Dict[str, str],
bucketing_info: Optional[Tuple[List[str], int]],
compression: Optional[str],
Expand All @@ -39,7 +40,7 @@ def _parquet_table_definition(
return {
"Name": table,
"PartitionKeys": [{"Name": cname, "Type": dtype} for cname, dtype in partitions_types.items()],
"TableType": "EXTERNAL_TABLE",
"TableType": "EXTERNAL_TABLE" if table_type is None else table_type,
"Parameters": {"classification": "parquet", "compressionType": str(compression).lower(), "typeOfData": "file"},
"StorageDescriptor": {
"Columns": [{"Name": cname, "Type": dtype} for cname, dtype in columns_types.items()],
Expand Down Expand Up @@ -98,8 +99,9 @@ def _parquet_partition_definition(

def _csv_table_definition(
table: str,
path: str,
path: Optional[str],
columns_types: Dict[str, str],
table_type: Optional[str],
partitions_types: Dict[str, str],
bucketing_info: Optional[Tuple[List[str], int]],
compression: Optional[str],
Expand All @@ -120,7 +122,7 @@ def _csv_table_definition(
return {
"Name": table,
"PartitionKeys": [{"Name": cname, "Type": dtype} for cname, dtype in partitions_types.items()],
"TableType": "EXTERNAL_TABLE",
"TableType": "EXTERNAL_TABLE" if table_type is None else table_type,
"Parameters": parameters,
"StorageDescriptor": {
"Columns": [{"Name": cname, "Type": dtype} for cname, dtype in columns_types.items()],
Expand Down
20 changes: 20 additions & 0 deletions awswrangler/lakeformation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""Amazon Lake Formation Module."""

from awswrangler.lakeformation._read import read_sql_query, read_sql_table # noqa
from awswrangler.lakeformation._utils import ( # noqa
abort_transaction,
begin_transaction,
commit_transaction,
extend_transaction,
wait_query,
)

__all__ = [
"read_sql_query",
"read_sql_table",
"abort_transaction",
"begin_transaction",
"commit_transaction",
"extend_transaction",
"wait_query",
]
Loading