From 3f14923204ca3422985050a9bde8a4e6bd185046 Mon Sep 17 00:00:00 2001 From: Bryan Date: Wed, 20 May 2020 17:51:45 +0800 Subject: [PATCH 01/26] add test cases for s3 delete objects --- testing/test_awswrangler/test_moto.py | 33 +++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/testing/test_awswrangler/test_moto.py b/testing/test_awswrangler/test_moto.py index 2adc7aec8..4c8c0f8b9 100644 --- a/testing/test_awswrangler/test_moto.py +++ b/testing/test_awswrangler/test_moto.py @@ -1,4 +1,7 @@ import boto3 +import mock +from botocore.exceptions import ClientError +import botocore import moto import pytest @@ -52,6 +55,36 @@ def test_parquet(s3): assert len(df.columns) == 18 +def test_s3_delete_object_success(s3): + path = "s3://bucket/test.parquet" + wr.s3.to_parquet(df=get_df_list(), path=path, index=False, dataset=True, partition_cols=["par0", "par1"]) + df = wr.s3.read_parquet(path=path, dataset=True) + ensure_data_types(df, has_list=True) + + wr.s3.delete_objects(path=path) + with pytest.raises(OSError): + wr.s3.read_parquet(path=path, dataset=True) + + +def test_s3_raise_delete_object_exception_success(s3): + path = "s3://bucket/test.parquet" + wr.s3.to_parquet(df=get_df_list(), path=path, index=False, dataset=True, partition_cols=["par0", "par1"]) + df = wr.s3.read_parquet(path=path, dataset=True) + ensure_data_types(df, has_list=True) + + call = botocore.client.BaseClient._make_api_call + + def mock_make_api_call(self, operation_name, kwarg): + if operation_name == 'DeleteObjects': + parsed_response = {'Error': {'Code': '500', 'Message': 'Test Error'}} + raise ClientError(parsed_response, operation_name) + return call(self, operation_name, kwarg) + + with mock.patch('botocore.client.BaseClient._make_api_call', new=mock_make_api_call): + with pytest.raises(ClientError): + wr.s3.delete_objects(path=path) + + def test_emr(s3, emr, sts, subnet): session = boto3.Session(region_name="us-west-1") cluster_id = wr.emr.create_cluster( From 53db236e34ae4e3968a7c80e8aef3ed8d8fafe90 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Wed, 20 May 2020 19:09:29 -0300 Subject: [PATCH 02/26] Add private subnet on cloudformation.yaml --- testing/cloudformation.sh | 7 +-- testing/cloudformation.yaml | 63 +++++++++++++++++++++++++-- testing/test_awswrangler/test_moto.py | 10 ++--- 3 files changed, 67 insertions(+), 13 deletions(-) diff --git a/testing/cloudformation.sh b/testing/cloudformation.sh index e10912aed..e80655935 100755 --- a/testing/cloudformation.sh +++ b/testing/cloudformation.sh @@ -7,14 +7,11 @@ cfn-flip -c -l -n cloudformation.yaml temp.yaml cfn-lint -t temp.yaml mv temp.yaml cloudformation.yaml -read -rp "Databases password (e.g. 123456Ab): " password +read -rp "Databases password [123456Ab]: " password +password=${password:-123456Ab} aws cloudformation deploy \ --template-file cloudformation.yaml \ --stack-name aws-data-wrangler \ --capabilities CAPABILITY_IAM \ --parameter-overrides DatabasesPassword="$password" - -aws cloudformation update-termination-protection \ - --enable-termination-protection \ - --stack-name aws-data-wrangler diff --git a/testing/cloudformation.yaml b/testing/cloudformation.yaml index 70f5bb279..6c3f36e35 100644 --- a/testing/cloudformation.yaml +++ b/testing/cloudformation.yaml @@ -56,6 +56,20 @@ Resources: - Fn::GetAZs: '' CidrBlock: 10.19.230.0/24 MapPublicIpOnLaunch: true + PrivateSubnet: + Type: AWS::EC2::Subnet + Properties: + Tags: + - Key: Env + Value: aws-data-wrangler + VpcId: + Ref: VPC + AvailabilityZone: + Fn::Select: + - 0 + - Fn::GetAZs: '' + CidrBlock: 10.19.231.0/24 + MapPublicIpOnLaunch: false PublicRouteTable: Type: AWS::EC2::RouteTable Properties: @@ -87,6 +101,49 @@ Resources: Ref: PublicRouteTable SubnetId: Ref: PublicSubnet2 + NatGatewayEIP: + Type: AWS::EC2::EIP + DependsOn: InternetGatewayAttachment + Properties: + Tags: + - Key: Env + Value: aws-data-wrangler + Domain: vpc + NatGateway: + Type: AWS::EC2::NatGateway + Properties: + Tags: + - Key: Env + Value: aws-data-wrangler + AllocationId: + Fn::GetAtt: + - NatGatewayEIP + - AllocationId + SubnetId: + Ref: PublicSubnet1 + PrivateRouteTable: + Type: AWS::EC2::RouteTable + Properties: + Tags: + - Key: Env + Value: aws-data-wrangler + VpcId: + Ref: VPC + DefaultPrivateRoute: + Type: AWS::EC2::Route + Properties: + RouteTableId: + Ref: PrivateRouteTable + DestinationCidrBlock: 0.0.0.0/0 + NatGatewayId: + Ref: NatGateway + PrivateSubnetRouteTableAssociation: + Type: AWS::EC2::SubnetRouteTableAssociation + Properties: + RouteTableId: + Ref: PrivateRouteTable + SubnetId: + Ref: PrivateSubnet KmsKeyAlias: Type: AWS::KMS::Alias Properties: @@ -449,7 +506,7 @@ Resources: SecurityGroupIdList: - Ref: DatabaseSecurityGroup SubnetId: - Ref: PublicSubnet1 + Ref: PrivateSubnet ConnectionProperties: JDBC_CONNECTION_URL: Fn::Sub: jdbc:redshift://${Redshift.Endpoint.Address}:${Redshift.Endpoint.Port}/test @@ -473,7 +530,7 @@ Resources: SecurityGroupIdList: - Ref: DatabaseSecurityGroup SubnetId: - Ref: PublicSubnet1 + Ref: PrivateSubnet ConnectionProperties: JDBC_CONNECTION_URL: Fn::Sub: jdbc:postgresql://${AuroraInstancePostgresql.Endpoint.Address}:${AuroraInstancePostgresql.Endpoint.Port}/postgres @@ -497,7 +554,7 @@ Resources: SecurityGroupIdList: - Ref: DatabaseSecurityGroup SubnetId: - Ref: PublicSubnet1 + Ref: PrivateSubnet ConnectionProperties: JDBC_CONNECTION_URL: Fn::Sub: jdbc:mysql://${AuroraInstanceMysql.Endpoint.Address}:${AuroraInstanceMysql.Endpoint.Port}/test diff --git a/testing/test_awswrangler/test_moto.py b/testing/test_awswrangler/test_moto.py index 4c8c0f8b9..ba8d605f4 100644 --- a/testing/test_awswrangler/test_moto.py +++ b/testing/test_awswrangler/test_moto.py @@ -1,9 +1,9 @@ import boto3 -import mock -from botocore.exceptions import ClientError import botocore +import mock import moto import pytest +from botocore.exceptions import ClientError import awswrangler as wr @@ -75,12 +75,12 @@ def test_s3_raise_delete_object_exception_success(s3): call = botocore.client.BaseClient._make_api_call def mock_make_api_call(self, operation_name, kwarg): - if operation_name == 'DeleteObjects': - parsed_response = {'Error': {'Code': '500', 'Message': 'Test Error'}} + if operation_name == "DeleteObjects": + parsed_response = {"Error": {"Code": "500", "Message": "Test Error"}} raise ClientError(parsed_response, operation_name) return call(self, operation_name, kwarg) - with mock.patch('botocore.client.BaseClient._make_api_call', new=mock_make_api_call): + with mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call): with pytest.raises(ClientError): wr.s3.delete_objects(path=path) From 0ecf533b0f82e7e7d09ef761bab72f691ab0dc88 Mon Sep 17 00:00:00 2001 From: jiajie Date: Thu, 21 May 2020 20:35:27 +0800 Subject: [PATCH 03/26] Fix get_region_from_subnet bug When using region like 'ap-south-1', get_region_from_subnet(***) in _utils.py return 'ap-south-' as region name. Add get_region_from_session fix it. --- awswrangler/_utils.py | 7 +++++++ awswrangler/emr.py | 14 ++++++++------ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/awswrangler/_utils.py b/awswrangler/_utils.py index e4a6a16dd..7aae75b26 100644 --- a/awswrangler/_utils.py +++ b/awswrangler/_utils.py @@ -187,9 +187,16 @@ def get_region_from_subnet(subnet_id: str, boto3_session: Optional[boto3.Session """Extract region from Subnet ID.""" session: boto3.Session = ensure_session(session=boto3_session) client_ec2: boto3.client = client(service_name="ec2", session=session) + # This is wrong, when using region ap-south-1 return client_ec2.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["AvailabilityZone"][:9] +def get_region_from_session(boto3_session: Optional[boto3.Session] = None) -> str: + """Extract region from session.""" + session: boto3.Session = ensure_session(session=boto3_session) + return session.region_name + + def extract_partitions_from_paths( path: str, paths: List[str] ) -> Tuple[Optional[Dict[str, str]], Optional[Dict[str, List[str]]]]: diff --git a/awswrangler/emr.py b/awswrangler/emr.py index 5a93d752d..bf20193cc 100644 --- a/awswrangler/emr.py +++ b/awswrangler/emr.py @@ -53,7 +53,7 @@ def _get_default_logging_path( _account_id = account_id if (region is None) and (subnet_id is not None): boto3_session = _utils.ensure_session(session=boto3_session) - _region: str = _utils.get_region_from_subnet(subnet_id=subnet_id, boto3_session=boto3_session) + _region: str = _utils.get_region_from_session(boto3_session=boto3_session) elif (region is None) and (subnet_id is None): raise exceptions.InvalidArgumentCombination("You must pass region or subnet_id or both.") else: @@ -63,7 +63,7 @@ def _get_default_logging_path( def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-statements account_id: str = _utils.get_account_id(boto3_session=pars["boto3_session"]) - region: str = _utils.get_region_from_subnet(subnet_id=pars["subnet_id"], boto3_session=pars["boto3_session"]) + region: str = _utils.get_region_from_session(boto3_session=pars["boto3_session"]) # S3 Logging path if pars.get("logging_s3_path") is None: @@ -155,6 +155,7 @@ def _build_cluster_args(**pars): # pylint: disable=too-many-branches,too-many-s ], } ) + if spark_env is not None: args["Configurations"].append( { @@ -934,7 +935,8 @@ def submit_ecr_credentials_refresh( session: boto3.Session = _utils.ensure_session(session=boto3_session) client_s3: boto3.client = _utils.client(service_name="s3", session=session) bucket, key = _utils.parse_path(path=path_script) - client_s3.put_object(Body=_get_ecr_credentials_refresh_content().encode(encoding="utf-8"), Bucket=bucket, Key=key) + region: str = _utils.get_region_from_session(boto3_session=boto3_session) + client_s3.put_object(Body=_get_ecr_credentials_refresh_content(region).encode(encoding="utf-8"), Bucket=bucket, Key=key) command: str = f"spark-submit --deploy-mode cluster {path_script}" name: str = "ECR Credentials Refresh" step: Dict[str, Any] = build_step( @@ -946,14 +948,14 @@ def submit_ecr_credentials_refresh( return response["StepIds"][0] -def _get_ecr_credentials_refresh_content() -> str: - return """ +def _get_ecr_credentials_refresh_content(region) -> str: + return f""" import subprocess from pyspark.sql import SparkSession spark = SparkSession.builder.appName("ECR Setup Job").getOrCreate() COMMANDS = [ - "sudo -s eval $(aws ecr get-login --region us-east-1 --no-include-email)", + "sudo -s eval $(aws ecr get-login --region {region} --no-include-email)", "sudo hdfs dfs -put -f /root/.docker/config.json /user/hadoop/" ] From fa0e0b18888a840dc60d02d88771089e0fdfec00 Mon Sep 17 00:00:00 2001 From: jiajie Date: Thu, 21 May 2020 21:12:15 +0800 Subject: [PATCH 04/26] Fix awswrangler/emr.py:939:121: E501 line too long (124 > 120 characters) --- awswrangler/emr.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/awswrangler/emr.py b/awswrangler/emr.py index bf20193cc..e8114c698 100644 --- a/awswrangler/emr.py +++ b/awswrangler/emr.py @@ -936,7 +936,8 @@ def submit_ecr_credentials_refresh( client_s3: boto3.client = _utils.client(service_name="s3", session=session) bucket, key = _utils.parse_path(path=path_script) region: str = _utils.get_region_from_session(boto3_session=boto3_session) - client_s3.put_object(Body=_get_ecr_credentials_refresh_content(region).encode(encoding="utf-8"), Bucket=bucket, Key=key) + client_s3.put_object( + Body=_get_ecr_credentials_refresh_content(region).encode(encoding="utf-8"), Bucket=bucket, Key=key) command: str = f"spark-submit --deploy-mode cluster {path_script}" name: str = "ECR Credentials Refresh" step: Dict[str, Any] = build_step( From dbb06ca0fcda02fefe495e895297426e3b70b5b4 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Thu, 21 May 2020 11:52:00 -0300 Subject: [PATCH 05/26] Decreasing MySQL size for tests. --- testing/cloudformation.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/cloudformation.yaml b/testing/cloudformation.yaml index 6c3f36e35..355de2a05 100644 --- a/testing/cloudformation.yaml +++ b/testing/cloudformation.yaml @@ -486,7 +486,7 @@ Resources: DBInstanceIdentifier: mysql-instance-wrangler DBClusterIdentifier: Ref: AuroraClusterMysql - DBInstanceClass: db.t3.medium + DBInstanceClass: db.t3.small DBSubnetGroupName: Ref: RdsSubnetGroup PubliclyAccessible: true From 5227f4a08eebf54b8c963b87f234597fb2cf1075 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Thu, 21 May 2020 12:37:27 -0300 Subject: [PATCH 06/26] Add moto test (Minimal test) to GitHub Actions. --- .github/workflows/moto-test.yml | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 .github/workflows/moto-test.yml diff --git a/.github/workflows/moto-test.yml b/.github/workflows/moto-test.yml new file mode 100644 index 000000000..c39827790 --- /dev/null +++ b/.github/workflows/moto-test.yml @@ -0,0 +1,30 @@ +name: Moto Test + +on: + push: + branches: + - master + - dev + pull_request: + branches: + - master + - dev + +jobs: + build: + + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.6] + + steps: + - uses: actions/checkout@v2 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v1 + with: + python-version: ${{ matrix.python-version }} + - name: Setup Environment + run: ./requirements.sh + - name: Run moto tests (Minimal tests) + run: pytest --timeout=10 testing/test_awswrangler/test_moto.py From 89e0895a2e85271c209fc3127e228730bf0d7eff Mon Sep 17 00:00:00 2001 From: igorborgest Date: Thu, 21 May 2020 12:58:55 -0300 Subject: [PATCH 07/26] Removing pytest timeout from GitHub actions. --- .github/workflows/moto-test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/moto-test.yml b/.github/workflows/moto-test.yml index c39827790..6be757222 100644 --- a/.github/workflows/moto-test.yml +++ b/.github/workflows/moto-test.yml @@ -27,4 +27,4 @@ jobs: - name: Setup Environment run: ./requirements.sh - name: Run moto tests (Minimal tests) - run: pytest --timeout=10 testing/test_awswrangler/test_moto.py + run: pytest testing/test_awswrangler/test_moto.py From 3ab86c5e0008e70d31628eb812fec04cffb91f01 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Thu, 21 May 2020 13:03:38 -0300 Subject: [PATCH 08/26] fixing get_region_from_subnet(). #252 --- awswrangler/_utils.py | 16 +++++++++++----- awswrangler/emr.py | 12 ++++-------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/awswrangler/_utils.py b/awswrangler/_utils.py index 7aae75b26..b869d78c1 100644 --- a/awswrangler/_utils.py +++ b/awswrangler/_utils.py @@ -183,18 +183,24 @@ def get_account_id(boto3_session: Optional[boto3.Session] = None) -> str: return client(service_name="sts", session=session).get_caller_identity().get("Account") -def get_region_from_subnet(subnet_id: str, boto3_session: Optional[boto3.Session] = None) -> str: +def get_region_from_subnet(subnet_id: str, boto3_session: Optional[boto3.Session] = None) -> str: # pragma: no cover """Extract region from Subnet ID.""" session: boto3.Session = ensure_session(session=boto3_session) client_ec2: boto3.client = client(service_name="ec2", session=session) - # This is wrong, when using region ap-south-1 - return client_ec2.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["AvailabilityZone"][:9] + return client_ec2.describe_subnets(SubnetIds=[subnet_id])["Subnets"][0]["AvailabilityZone"][:-1] -def get_region_from_session(boto3_session: Optional[boto3.Session] = None) -> str: +def get_region_from_session(boto3_session: Optional[boto3.Session] = None, default_region: Optional[str] = None) -> str: """Extract region from session.""" session: boto3.Session = ensure_session(session=boto3_session) - return session.region_name + region: Optional[str] = session.region_name + if region is not None: + return region + if default_region is not None: # pragma: no cover + return default_region + raise exceptions.InvalidArgument( + "There is no region_name defined on boto3, please configure it." + ) # pragma: no cover def extract_partitions_from_paths( diff --git a/awswrangler/emr.py b/awswrangler/emr.py index e8114c698..9e67361b3 100644 --- a/awswrangler/emr.py +++ b/awswrangler/emr.py @@ -52,7 +52,6 @@ def _get_default_logging_path( else: _account_id = account_id if (region is None) and (subnet_id is not None): - boto3_session = _utils.ensure_session(session=boto3_session) _region: str = _utils.get_region_from_session(boto3_session=boto3_session) elif (region is None) and (subnet_id is None): raise exceptions.InvalidArgumentCombination("You must pass region or subnet_id or both.") @@ -857,11 +856,7 @@ def build_step( if region is not None: # pragma: no cover _region: str = region else: - session: boto3.Session = _utils.ensure_session(session=boto3_session) - if session.region_name is not None: - _region = session.region_name - else: # pragma: no cover - _region = "us-east-1" + _region = _utils.get_region_from_session(boto3_session=boto3_session, default_region="us-east-1") jar = f"s3://{_region}.elasticmapreduce/libs/script-runner/script-runner.jar" step: Dict[str, Any] = { "Name": name, @@ -937,7 +932,8 @@ def submit_ecr_credentials_refresh( bucket, key = _utils.parse_path(path=path_script) region: str = _utils.get_region_from_session(boto3_session=boto3_session) client_s3.put_object( - Body=_get_ecr_credentials_refresh_content(region).encode(encoding="utf-8"), Bucket=bucket, Key=key) + Body=_get_ecr_credentials_refresh_content(region=region).encode(encoding="utf-8"), Bucket=bucket, Key=key + ) command: str = f"spark-submit --deploy-mode cluster {path_script}" name: str = "ECR Credentials Refresh" step: Dict[str, Any] = build_step( @@ -949,7 +945,7 @@ def submit_ecr_credentials_refresh( return response["StepIds"][0] -def _get_ecr_credentials_refresh_content(region) -> str: +def _get_ecr_credentials_refresh_content(region: str) -> str: return f""" import subprocess from pyspark.sql import SparkSession From 5c96f27efbbc5677da7039b9b20353c2823a1655 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Thu, 21 May 2020 13:17:42 -0300 Subject: [PATCH 09/26] Removing parallelism from pytest on GitHub actions. --- .github/workflows/moto-test.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/moto-test.yml b/.github/workflows/moto-test.yml index 6be757222..a46d0c691 100644 --- a/.github/workflows/moto-test.yml +++ b/.github/workflows/moto-test.yml @@ -26,5 +26,5 @@ jobs: python-version: ${{ matrix.python-version }} - name: Setup Environment run: ./requirements.sh - - name: Run moto tests (Minimal tests) - run: pytest testing/test_awswrangler/test_moto.py + - name: Run Moto test (Minimal test) + run: pytest -n 1 testing/test_awswrangler/test_moto.py From 6aa1a441a9c635a79decffe9f99e43404f1aa53c Mon Sep 17 00:00:00 2001 From: Bryan Date: Fri, 22 May 2020 00:23:03 +0800 Subject: [PATCH 10/26] Add test cases for get bukcet, object exists, list dirs and list files. Refactor the scope of mock s3 --- testing/test_awswrangler/test_moto.py | 40 +++++++++++++++++++++++++-- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/testing/test_awswrangler/test_moto.py b/testing/test_awswrangler/test_moto.py index ba8d605f4..9e754d47a 100644 --- a/testing/test_awswrangler/test_moto.py +++ b/testing/test_awswrangler/test_moto.py @@ -10,11 +10,12 @@ from ._utils import ensure_data_types, get_df_csv, get_df_list -@pytest.fixture(scope="module") +@pytest.fixture(scope="function") def s3(): with moto.mock_s3(): - boto3.resource("s3").create_bucket(Bucket="bucket") - yield True + s3 = boto3.resource("s3") + s3.create_bucket(Bucket="bucket") + yield s3 @pytest.fixture(scope="module") @@ -38,6 +39,39 @@ def subnet(): yield subnet.id +def test_get_bucket_region_succeed(s3): + region = wr.s3.get_bucket_region('bucket', boto3_session=boto3.Session()) + assert region == 'us-east-1' + + +def test_object_not_exist_succeed(s3): + result = wr.s3.does_object_exist('s3://bucket/test.csv') + assert result is False + + +def test_object_exist_succeed(s3): + path = "s3://bucket/test.csv" + wr.s3.to_csv(df=get_df_csv(), path=path, index=False) + result = wr.s3.does_object_exist(path) + assert result is True + + +def test_list_directories_succeed(s3): + path = "s3://bucket" + s3_object1 = s3.Object("bucket", "foo/foo.tmp") + s3_object2 = s3.Object("bucket", "bar/bar.tmp") + s3_object1.put(Body=b'foo') + s3_object2.put(Body=b'bar') + + dirs = wr.s3.list_directories(path) + files = wr.s3.list_objects(path) + + assert sorted(dirs) == sorted(["s3://bucket/foo/", + "s3://bucket/bar/"]) + assert sorted(files) == sorted(["s3://bucket/foo/foo.tmp", + "s3://bucket/bar/bar.tmp"]) + + def test_csv(s3): path = "s3://bucket/test.csv" wr.s3.to_csv(df=get_df_csv(), path=path, index=False) From fd695d4a7a7627b57685cdf7680bb3096395fb4e Mon Sep 17 00:00:00 2001 From: igorborgest Date: Thu, 21 May 2020 13:26:35 -0300 Subject: [PATCH 11/26] Remove Moto Test from GitHub Action --- .github/workflows/moto-test.yml | 30 ------------------------------ 1 file changed, 30 deletions(-) delete mode 100644 .github/workflows/moto-test.yml diff --git a/.github/workflows/moto-test.yml b/.github/workflows/moto-test.yml deleted file mode 100644 index a46d0c691..000000000 --- a/.github/workflows/moto-test.yml +++ /dev/null @@ -1,30 +0,0 @@ -name: Moto Test - -on: - push: - branches: - - master - - dev - pull_request: - branches: - - master - - dev - -jobs: - build: - - runs-on: ubuntu-latest - strategy: - matrix: - python-version: [3.6] - - steps: - - uses: actions/checkout@v2 - - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v1 - with: - python-version: ${{ matrix.python-version }} - - name: Setup Environment - run: ./requirements.sh - - name: Run Moto test (Minimal test) - run: pytest -n 1 testing/test_awswrangler/test_moto.py From ed7a260c5ef7576bc54a685807dd637da1649352 Mon Sep 17 00:00:00 2001 From: Bryan Date: Sat, 23 May 2020 22:45:58 +0800 Subject: [PATCH 12/26] add tests for describe objects --- testing/test_awswrangler/test_moto.py | 48 +++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/testing/test_awswrangler/test_moto.py b/testing/test_awswrangler/test_moto.py index 9e754d47a..3bb8bca52 100644 --- a/testing/test_awswrangler/test_moto.py +++ b/testing/test_awswrangler/test_moto.py @@ -72,6 +72,54 @@ def test_list_directories_succeed(s3): "s3://bucket/bar/bar.tmp"]) +def test_describe_one_object_succeed(s3): + bucket = "bucket" + key = "foo/foo.tmp" + s3_object = s3.Object(bucket, key) + s3_object.put(Body=b'foo') + + desc = wr.s3.describe_objects("s3://{}/{}".format(bucket, key)) + print(desc) + assert isinstance(desc, dict) + assert list(desc.keys()) == ['s3://bucket/foo/foo.tmp'] + + +def test_describe_list_of_objects_succeed(s3): + bucket = "bucket" + keys = ["foo/foo.tmp", + "bar/bar.tmp"] + + for key in keys: + s3_object = s3.Object(bucket, key) + s3_object.put(Body=b'test') + + desc = wr.s3.describe_objects( + ["s3://{}/{}".format(bucket, key) for key in keys] + ) + + assert isinstance(desc, dict) + assert sorted(list(desc.keys())) == sorted(["s3://bucket/foo/foo.tmp", + "s3://bucket/bar/bar.tmp"]) + + +def test_describe_list_of_objects_under_same_prefix_succeed(s3): + bucket = "bucket" + keys = ["foo/foo.tmp", + "bar/bar.tmp"] + + for key in keys: + s3_object = s3.Object(bucket, key) + s3_object.put(Body=b'test') + + desc = wr.s3.describe_objects( + "s3://{}".format(bucket) + ) + + assert isinstance(desc, dict) + assert sorted(list(desc.keys())) == sorted(["s3://bucket/foo/foo.tmp", + "s3://bucket/bar/bar.tmp"]) + + def test_csv(s3): path = "s3://bucket/test.csv" wr.s3.to_csv(df=get_df_csv(), path=path, index=False) From 3951348c6e4f56f22648138d94238b138227b067 Mon Sep 17 00:00:00 2001 From: Bryan Date: Sat, 23 May 2020 22:48:20 +0800 Subject: [PATCH 13/26] refactor naming --- awswrangler/s3.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/awswrangler/s3.py b/awswrangler/s3.py index b68a1e433..8af32e33c 100644 --- a/awswrangler/s3.py +++ b/awswrangler/s3.py @@ -355,8 +355,8 @@ def describe_objects( cpus: int = _utils.ensure_cpu_count(use_threads=use_threads) with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor: resp_list = list(executor.map(_describe_object, paths, repeat(wait_time), repeat(client_s3))) - desc_list: Dict[str, Dict[str, Any]] = dict(resp_list) - return desc_list + desc_dict: Dict[str, Dict[str, Any]] = dict(resp_list) + return desc_dict def _describe_object( From 524f59e5c0514da3bebf71ace47cde9d20a4cbac Mon Sep 17 00:00:00 2001 From: Bryan Date: Sat, 23 May 2020 22:59:49 +0800 Subject: [PATCH 14/26] add test for no object --- testing/test_awswrangler/test_moto.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/testing/test_awswrangler/test_moto.py b/testing/test_awswrangler/test_moto.py index 3bb8bca52..fde0d78cb 100644 --- a/testing/test_awswrangler/test_moto.py +++ b/testing/test_awswrangler/test_moto.py @@ -72,6 +72,14 @@ def test_list_directories_succeed(s3): "s3://bucket/bar/bar.tmp"]) +def test_describe_no_object_succeed(s3): + + desc = wr.s3.describe_objects("s3://bucket") + + assert isinstance(desc, dict) + assert desc == {} + + def test_describe_one_object_succeed(s3): bucket = "bucket" key = "foo/foo.tmp" @@ -79,7 +87,7 @@ def test_describe_one_object_succeed(s3): s3_object.put(Body=b'foo') desc = wr.s3.describe_objects("s3://{}/{}".format(bucket, key)) - print(desc) + assert isinstance(desc, dict) assert list(desc.keys()) == ['s3://bucket/foo/foo.tmp'] From 4977fbd4bdc592644dc0e9eac128e78a807e3aa4 Mon Sep 17 00:00:00 2001 From: Bryan Date: Sun, 24 May 2020 11:34:24 +0800 Subject: [PATCH 15/26] refactor naming --- awswrangler/s3.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/awswrangler/s3.py b/awswrangler/s3.py index 8af32e33c..807e57f75 100644 --- a/awswrangler/s3.py +++ b/awswrangler/s3.py @@ -425,8 +425,8 @@ def size_objects( desc_list: Dict[str, Dict[str, Any]] = describe_objects( path=path, wait_time=wait_time, use_threads=use_threads, boto3_session=boto3_session ) - size_list: Dict[str, Optional[int]] = {k: d.get("ContentLength", None) for k, d in desc_list.items()} - return size_list + size_dict: Dict[str, Optional[int]] = {k: d.get("ContentLength", None) for k, d in desc_list.items()} + return size_dictg def to_csv( # pylint: disable=too-many-arguments From ce12472c1ae8357b9cc9b383b4d8747079a096df Mon Sep 17 00:00:00 2001 From: Bryan Date: Mon, 25 May 2020 22:47:10 +0800 Subject: [PATCH 16/26] add test cases --- testing/test_awswrangler/test_moto.py | 95 +++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/testing/test_awswrangler/test_moto.py b/testing/test_awswrangler/test_moto.py index fde0d78cb..6a9606847 100644 --- a/testing/test_awswrangler/test_moto.py +++ b/testing/test_awswrangler/test_moto.py @@ -39,6 +39,14 @@ def subnet(): yield subnet.id +def get_content_md5(desc: dict): + result = desc\ + .get('ResponseMetadata') \ + .get('HTTPHeaders') \ + .get('content-md5') + return result + + def test_get_bucket_region_succeed(s3): region = wr.s3.get_bucket_region('bucket', boto3_session=boto3.Session()) assert region == 'us-east-1' @@ -128,6 +136,93 @@ def test_describe_list_of_objects_under_same_prefix_succeed(s3): "s3://bucket/bar/bar.tmp"]) +def test_size_objects_without_object_succeed(s3): + size = wr.s3.size_objects("s3://bucket") + + assert isinstance(size, dict) + assert size == {} + + +def test_size_list_of_objects_succeed(s3): + bucket = "bucket" + s3_object1 = s3.Object(bucket, "foo/foo.tmp") + s3_object2 = s3.Object(bucket, "bar/bar.tmp") + s3_object1.put(Body=b'foofoo') + s3_object2.put(Body=b'bar') + + size = wr.s3.size_objects("s3://{}".format(bucket)) + + assert isinstance(size, dict) + assert size == {"s3://bucket/foo/foo.tmp": 6, + "s3://bucket/bar/bar.tmp": 3} + + +def test_copy_one_object_without_replace_filename_succeed(s3): + bucket = "bucket" + key = "foo/foo.tmp" + s3_object = s3.Object(bucket, key) + s3_object.put(Body=b'foo') + + wr.s3.copy_objects(paths=["s3://{}/{}".format(bucket, key)], + source_path="s3://{}/foo".format(bucket), + target_path="s3://bucket/bar") + + desc_source = wr.s3.describe_objects("s3://bucket/foo/foo.tmp") + desc_target = wr.s3.describe_objects("s3://bucket/bar/foo.tmp") + + assert get_content_md5(desc_target.get('s3://bucket/bar/foo.tmp')) == \ + get_content_md5(desc_source.get('s3://bucket/foo/foo.tmp')) + + +def test_copy_one_object_with_replace_filename_succeed(s3): + bucket = "bucket" + key = "foo/foo.tmp" + s3_object = s3.Object(bucket, key) + s3_object.put(Body=b'foo') + + wr.s3.copy_objects(paths=["s3://{}/{}".format(bucket, key)], + source_path="s3://{}/foo".format(bucket), + target_path="s3://bucket/bar", + replace_filenames={"foo.tmp": "bar.tmp"}) + + desc_source = wr.s3.describe_objects("s3://bucket/foo/foo.tmp") + desc_target = wr.s3.describe_objects("s3://bucket/bar/bar.tmp") + + assert get_content_md5(desc_target.get('s3://bucket/bar/bar.tmp')) ==\ + get_content_md5(desc_source.get('s3://bucket/foo/foo.tmp')) + + +def test_copy_objects_without_replace_filename_succeed(s3): + bucket = "bucket" + keys = ["foo/foo1.tmp", + "foo/foo2.tmp", + "foo/foo3.tmp"] + + for key in keys: + s3_object = s3.Object(bucket, key) + s3_object.put(Body=b'foo') + + wr.s3.copy_objects(paths=["s3://{}/{}".format(bucket, key) for key in keys], + source_path="s3://{}/foo".format(bucket), + target_path="s3://bucket/bar") + + desc_source = wr.s3.describe_objects( + "s3://{}/foo".format(bucket) + ) + desc_target = wr.s3.describe_objects( + "s3://{}/bar".format(bucket) + ) + + assert isinstance(desc_target, dict) + assert len(desc_source) == 3 + assert len(desc_target) == 3 + assert sorted(list(desc_target.keys())) == sorted( + ["s3://bucket/bar/foo1.tmp", + "s3://bucket/bar/foo2.tmp", + "s3://bucket/bar/foo3.tmp"] + ) + + def test_csv(s3): path = "s3://bucket/test.csv" wr.s3.to_csv(df=get_df_csv(), path=path, index=False) From ac4944112460c426a3a16b733ac9afec48965d06 Mon Sep 17 00:00:00 2001 From: Bryan Date: Mon, 25 May 2020 22:51:12 +0800 Subject: [PATCH 17/26] fixed typo --- awswrangler/s3.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/awswrangler/s3.py b/awswrangler/s3.py index 807e57f75..39b4efd39 100644 --- a/awswrangler/s3.py +++ b/awswrangler/s3.py @@ -426,7 +426,7 @@ def size_objects( path=path, wait_time=wait_time, use_threads=use_threads, boto3_session=boto3_session ) size_dict: Dict[str, Optional[int]] = {k: d.get("ContentLength", None) for k, d in desc_list.items()} - return size_dictg + return size_dict def to_csv( # pylint: disable=too-many-arguments From 83dc5ad4eebce6dced280fd2c9746eda362ebe67 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Tue, 26 May 2020 23:14:02 -0300 Subject: [PATCH 18/26] Fix bug to write partitions in reverse order. --- awswrangler/_data_types.py | 10 +++++----- testing/test_awswrangler/test_data_lake.py | 14 ++++++++++++++ 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/awswrangler/_data_types.py b/awswrangler/_data_types.py index 8cd84e7b8..fd972753a 100644 --- a/awswrangler/_data_types.py +++ b/awswrangler/_data_types.py @@ -341,12 +341,12 @@ def athena_types_from_pandas_partitioned( df=df, index=index, dtype=dtype, index_left=index_left ) columns_types: Dict[str, str] = {} + for col, typ in athena_columns_types.items(): + if col not in partitions: + columns_types[col] = typ partitions_types: Dict[str, str] = {} - for k, v in athena_columns_types.items(): - if k in partitions: - partitions_types[k] = v - else: - columns_types[k] = v + for par in partitions: + partitions_types[par] = athena_columns_types[par] return columns_types, partitions_types diff --git a/testing/test_awswrangler/test_data_lake.py b/testing/test_awswrangler/test_data_lake.py index e4f733876..86ce13e4b 100644 --- a/testing/test_awswrangler/test_data_lake.py +++ b/testing/test_awswrangler/test_data_lake.py @@ -2128,3 +2128,17 @@ def test_store_parquet_metadata_modes(database, table, path, external_schema): assert len(df3.columns) == 3 assert len(df3.index) == 4 assert df3.c1.astype(int).sum() == 6 + + +@pytest.mark.parametrize("partition_cols", [None, ["c1"], ["c2"], ["c1", "c2"], ["c2", "c1"]]) +def test_to_parquet_reverse_partitions(database, table, path, partition_cols): + df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5], "c2": [6, 7, 8]}) + paths = wr.s3.to_parquet( + df=df, path=path, dataset=True, database=database, table=table, partition_cols=partition_cols + )["paths"] + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + df2 = wr.athena.read_sql_table(table=table, database=database) + assert df.shape == df2.shape + assert df.c0.sum() == df2.c0.sum() + assert df.c1.sum() == df2.c1.sum() + assert df.c2.sum() == df2.c2.sum() From 8cb058cfefcddc92775ae186ee869b8fc035cb4e Mon Sep 17 00:00:00 2001 From: igorborgest Date: Tue, 26 May 2020 23:21:17 -0300 Subject: [PATCH 19/26] Add black code style to test_moto.py. #254 --- testing/test_awswrangler/test_moto.py | 114 +++++++++++--------------- 1 file changed, 50 insertions(+), 64 deletions(-) diff --git a/testing/test_awswrangler/test_moto.py b/testing/test_awswrangler/test_moto.py index 6a9606847..01bde208f 100644 --- a/testing/test_awswrangler/test_moto.py +++ b/testing/test_awswrangler/test_moto.py @@ -40,20 +40,17 @@ def subnet(): def get_content_md5(desc: dict): - result = desc\ - .get('ResponseMetadata') \ - .get('HTTPHeaders') \ - .get('content-md5') + result = desc.get("ResponseMetadata").get("HTTPHeaders").get("content-md5") return result def test_get_bucket_region_succeed(s3): - region = wr.s3.get_bucket_region('bucket', boto3_session=boto3.Session()) - assert region == 'us-east-1' + region = wr.s3.get_bucket_region("bucket", boto3_session=boto3.Session()) + assert region == "us-east-1" def test_object_not_exist_succeed(s3): - result = wr.s3.does_object_exist('s3://bucket/test.csv') + result = wr.s3.does_object_exist("s3://bucket/test.csv") assert result is False @@ -68,16 +65,14 @@ def test_list_directories_succeed(s3): path = "s3://bucket" s3_object1 = s3.Object("bucket", "foo/foo.tmp") s3_object2 = s3.Object("bucket", "bar/bar.tmp") - s3_object1.put(Body=b'foo') - s3_object2.put(Body=b'bar') + s3_object1.put(Body=b"foo") + s3_object2.put(Body=b"bar") dirs = wr.s3.list_directories(path) files = wr.s3.list_objects(path) - assert sorted(dirs) == sorted(["s3://bucket/foo/", - "s3://bucket/bar/"]) - assert sorted(files) == sorted(["s3://bucket/foo/foo.tmp", - "s3://bucket/bar/bar.tmp"]) + assert sorted(dirs) == sorted(["s3://bucket/foo/", "s3://bucket/bar/"]) + assert sorted(files) == sorted(["s3://bucket/foo/foo.tmp", "s3://bucket/bar/bar.tmp"]) def test_describe_no_object_succeed(s3): @@ -92,48 +87,40 @@ def test_describe_one_object_succeed(s3): bucket = "bucket" key = "foo/foo.tmp" s3_object = s3.Object(bucket, key) - s3_object.put(Body=b'foo') + s3_object.put(Body=b"foo") desc = wr.s3.describe_objects("s3://{}/{}".format(bucket, key)) assert isinstance(desc, dict) - assert list(desc.keys()) == ['s3://bucket/foo/foo.tmp'] + assert list(desc.keys()) == ["s3://bucket/foo/foo.tmp"] def test_describe_list_of_objects_succeed(s3): bucket = "bucket" - keys = ["foo/foo.tmp", - "bar/bar.tmp"] + keys = ["foo/foo.tmp", "bar/bar.tmp"] for key in keys: s3_object = s3.Object(bucket, key) - s3_object.put(Body=b'test') + s3_object.put(Body=b"test") - desc = wr.s3.describe_objects( - ["s3://{}/{}".format(bucket, key) for key in keys] - ) + desc = wr.s3.describe_objects(["s3://{}/{}".format(bucket, key) for key in keys]) assert isinstance(desc, dict) - assert sorted(list(desc.keys())) == sorted(["s3://bucket/foo/foo.tmp", - "s3://bucket/bar/bar.tmp"]) + assert sorted(list(desc.keys())) == sorted(["s3://bucket/foo/foo.tmp", "s3://bucket/bar/bar.tmp"]) def test_describe_list_of_objects_under_same_prefix_succeed(s3): bucket = "bucket" - keys = ["foo/foo.tmp", - "bar/bar.tmp"] + keys = ["foo/foo.tmp", "bar/bar.tmp"] for key in keys: s3_object = s3.Object(bucket, key) - s3_object.put(Body=b'test') + s3_object.put(Body=b"test") - desc = wr.s3.describe_objects( - "s3://{}".format(bucket) - ) + desc = wr.s3.describe_objects("s3://{}".format(bucket)) assert isinstance(desc, dict) - assert sorted(list(desc.keys())) == sorted(["s3://bucket/foo/foo.tmp", - "s3://bucket/bar/bar.tmp"]) + assert sorted(list(desc.keys())) == sorted(["s3://bucket/foo/foo.tmp", "s3://bucket/bar/bar.tmp"]) def test_size_objects_without_object_succeed(s3): @@ -147,79 +134,78 @@ def test_size_list_of_objects_succeed(s3): bucket = "bucket" s3_object1 = s3.Object(bucket, "foo/foo.tmp") s3_object2 = s3.Object(bucket, "bar/bar.tmp") - s3_object1.put(Body=b'foofoo') - s3_object2.put(Body=b'bar') + s3_object1.put(Body=b"foofoo") + s3_object2.put(Body=b"bar") size = wr.s3.size_objects("s3://{}".format(bucket)) assert isinstance(size, dict) - assert size == {"s3://bucket/foo/foo.tmp": 6, - "s3://bucket/bar/bar.tmp": 3} + assert size == {"s3://bucket/foo/foo.tmp": 6, "s3://bucket/bar/bar.tmp": 3} def test_copy_one_object_without_replace_filename_succeed(s3): bucket = "bucket" key = "foo/foo.tmp" s3_object = s3.Object(bucket, key) - s3_object.put(Body=b'foo') + s3_object.put(Body=b"foo") - wr.s3.copy_objects(paths=["s3://{}/{}".format(bucket, key)], - source_path="s3://{}/foo".format(bucket), - target_path="s3://bucket/bar") + wr.s3.copy_objects( + paths=["s3://{}/{}".format(bucket, key)], + source_path="s3://{}/foo".format(bucket), + target_path="s3://bucket/bar", + ) desc_source = wr.s3.describe_objects("s3://bucket/foo/foo.tmp") desc_target = wr.s3.describe_objects("s3://bucket/bar/foo.tmp") - assert get_content_md5(desc_target.get('s3://bucket/bar/foo.tmp')) == \ - get_content_md5(desc_source.get('s3://bucket/foo/foo.tmp')) + assert get_content_md5(desc_target.get("s3://bucket/bar/foo.tmp")) == get_content_md5( + desc_source.get("s3://bucket/foo/foo.tmp") + ) def test_copy_one_object_with_replace_filename_succeed(s3): bucket = "bucket" key = "foo/foo.tmp" s3_object = s3.Object(bucket, key) - s3_object.put(Body=b'foo') + s3_object.put(Body=b"foo") - wr.s3.copy_objects(paths=["s3://{}/{}".format(bucket, key)], - source_path="s3://{}/foo".format(bucket), - target_path="s3://bucket/bar", - replace_filenames={"foo.tmp": "bar.tmp"}) + wr.s3.copy_objects( + paths=["s3://{}/{}".format(bucket, key)], + source_path="s3://{}/foo".format(bucket), + target_path="s3://bucket/bar", + replace_filenames={"foo.tmp": "bar.tmp"}, + ) desc_source = wr.s3.describe_objects("s3://bucket/foo/foo.tmp") desc_target = wr.s3.describe_objects("s3://bucket/bar/bar.tmp") - assert get_content_md5(desc_target.get('s3://bucket/bar/bar.tmp')) ==\ - get_content_md5(desc_source.get('s3://bucket/foo/foo.tmp')) + assert get_content_md5(desc_target.get("s3://bucket/bar/bar.tmp")) == get_content_md5( + desc_source.get("s3://bucket/foo/foo.tmp") + ) def test_copy_objects_without_replace_filename_succeed(s3): bucket = "bucket" - keys = ["foo/foo1.tmp", - "foo/foo2.tmp", - "foo/foo3.tmp"] + keys = ["foo/foo1.tmp", "foo/foo2.tmp", "foo/foo3.tmp"] for key in keys: s3_object = s3.Object(bucket, key) - s3_object.put(Body=b'foo') - - wr.s3.copy_objects(paths=["s3://{}/{}".format(bucket, key) for key in keys], - source_path="s3://{}/foo".format(bucket), - target_path="s3://bucket/bar") + s3_object.put(Body=b"foo") - desc_source = wr.s3.describe_objects( - "s3://{}/foo".format(bucket) - ) - desc_target = wr.s3.describe_objects( - "s3://{}/bar".format(bucket) + wr.s3.copy_objects( + paths=["s3://{}/{}".format(bucket, key) for key in keys], + source_path="s3://{}/foo".format(bucket), + target_path="s3://bucket/bar", ) + desc_source = wr.s3.describe_objects("s3://{}/foo".format(bucket)) + desc_target = wr.s3.describe_objects("s3://{}/bar".format(bucket)) + assert isinstance(desc_target, dict) assert len(desc_source) == 3 assert len(desc_target) == 3 assert sorted(list(desc_target.keys())) == sorted( - ["s3://bucket/bar/foo1.tmp", - "s3://bucket/bar/foo2.tmp", - "s3://bucket/bar/foo3.tmp"] + ["s3://bucket/bar/foo1.tmp", "s3://bucket/bar/foo2.tmp", "s3://bucket/bar/foo3.tmp"] ) From 7b5b4897332109f9dfb686f186ddbb98d7b28f32 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Tue, 26 May 2020 23:35:18 -0300 Subject: [PATCH 20/26] Fix dtype argument documentation. --- awswrangler/s3.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/awswrangler/s3.py b/awswrangler/s3.py index 39b4efd39..fa913ffb1 100644 --- a/awswrangler/s3.py +++ b/awswrangler/s3.py @@ -502,19 +502,18 @@ def to_csv( # pylint: disable=too-many-arguments Glue/Athena catalog: Database name. table : str, optional Glue/Athena catalog: Table name. - dtype: Dict[str, str], optional + dtype : Dict[str, str], optional Dictionary of columns names and Athena/Glue types to be casted. Useful when you have columns with undetermined or mixed data types. - Only takes effect if dataset=True. (e.g. {'col name': 'bigint', 'col2 name': 'int'}) - description: str, optional + description : str, optional Glue/Athena catalog: Table description - parameters: Dict[str, str], optional + parameters : Dict[str, str], optional Glue/Athena catalog: Key/value pairs to tag the table. - columns_comments: Dict[str, str], optional + columns_comments : Dict[str, str], optional Glue/Athena catalog: Columns names and the related comments (e.g. {'col0': 'Column 0.', 'col1': 'Column 1.', 'col2': 'Partition.'}). - pandas_kwargs: + pandas_kwargs : keyword arguments forwarded to pandas.DataFrame.to_csv() https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_csv.html @@ -925,16 +924,15 @@ def to_parquet( # pylint: disable=too-many-arguments Glue/Athena catalog: Database name. table : str, optional Glue/Athena catalog: Table name. - dtype: Dict[str, str], optional + dtype : Dict[str, str], optional Dictionary of columns names and Athena/Glue types to be casted. Useful when you have columns with undetermined or mixed data types. - Only takes effect if dataset=True. (e.g. {'col name': 'bigint', 'col2 name': 'int'}) - description: str, optional + description : str, optional Glue/Athena catalog: Table description - parameters: Dict[str, str], optional + parameters : Dict[str, str], optional Glue/Athena catalog: Key/value pairs to tag the table. - columns_comments: Dict[str, str], optional + columns_comments : Dict[str, str], optional Glue/Athena catalog: Columns names and the related comments (e.g. {'col0': 'Column 0.', 'col1': 'Column 1.', 'col2': 'Partition.'}). From cfe558ed5a524a735ae73afe670d62f6cc680432 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Wed, 27 May 2020 00:08:23 -0300 Subject: [PATCH 21/26] Bumping version to 1.3.0. --- README.md | 2 +- awswrangler/__metadata__.py | 2 +- testing/test_awswrangler/test_metadata.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 16e17b916..ad1fa927e 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ ![AWS Data Wrangler](docs/source/_static/logo2.png?raw=true "AWS Data Wrangler") -[![Release](https://img.shields.io/badge/release-1.2.0-brightgreen.svg)](https://pypi.org/project/awswrangler/) +[![Release](https://img.shields.io/badge/release-1.3.0-brightgreen.svg)](https://pypi.org/project/awswrangler/) [![Python Version](https://img.shields.io/badge/python-3.6%20%7C%203.7%20%7C%203.8-brightgreen.svg)](https://anaconda.org/conda-forge/awswrangler) [![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) diff --git a/awswrangler/__metadata__.py b/awswrangler/__metadata__.py index 4dd4a4921..26d9ff44e 100644 --- a/awswrangler/__metadata__.py +++ b/awswrangler/__metadata__.py @@ -7,5 +7,5 @@ __title__ = "awswrangler" __description__ = "Pandas on AWS." -__version__ = "1.2.0" +__version__ = "1.3.0" __license__ = "Apache License 2.0" diff --git a/testing/test_awswrangler/test_metadata.py b/testing/test_awswrangler/test_metadata.py index 89b2a8b0c..3dff3f55a 100644 --- a/testing/test_awswrangler/test_metadata.py +++ b/testing/test_awswrangler/test_metadata.py @@ -2,7 +2,7 @@ def test_metadata(): - assert wr.__version__ == "1.2.0" + assert wr.__version__ == "1.3.0" assert wr.__title__ == "awswrangler" assert wr.__description__ == "Pandas on AWS." assert wr.__license__ == "Apache License 2.0" From 5ca29745e3f10550dc60bb2e906c44eadfa7f8c5 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Wed, 27 May 2020 00:25:21 -0300 Subject: [PATCH 22/26] Bumping dependencies versions. --- requirements-dev.txt | 12 ++++++------ requirements.sh | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/requirements-dev.txt b/requirements-dev.txt index 27e89bc97..4850658c4 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,22 +1,22 @@ awscli>=1.18.0,<2.0.0 black~=19.3b0 pylint~=2.5.2 -flake8~=3.8.1 +flake8~=3.8.2 mypy~=0.770 isort~=4.3.21 pydocstyle~=5.0.2 doc8~=0.8.0 -tox~=3.15.0 +tox~=3.15.1 pytest~=5.4.2 -pytest-cov~=2.8.1 +pytest-cov~=2.9.0 pytest-xdist~=1.32.0 pytest-timeout~=1.3.4 scikit-learn~=0.23.1 -cfn-lint~=0.32.0 +cfn-lint~=0.32.1 cfn-flip~=1.2.3 twine~=3.1.1 wheel~=0.34.2 -sphinx~=3.0.3 +sphinx~=3.0.4 sphinx_bootstrap_theme~=0.7.1 moto~=1.3.14 -jupyterlab~=2.1.2 \ No newline at end of file +jupyterlab~=2.1.3 \ No newline at end of file diff --git a/requirements.sh b/requirements.sh index 27bf47f50..a21ed10a8 100755 --- a/requirements.sh +++ b/requirements.sh @@ -2,5 +2,5 @@ set -ex pip install --upgrade pip -pip install -r requirements-dev.txt -pip install -e . +pip install --upgrade -r requirements-dev.txt +pip install --upgrade -e . From f6aaf6f00e18ffe4e8789fb4a5bcdccfe76ac798 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Wed, 27 May 2020 00:26:28 -0300 Subject: [PATCH 23/26] Increasing micro versions range for SQLAlchemy. #259 --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 3e7192b4f..c8eedd045 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,4 +7,4 @@ s3fs~=0.4.2 psycopg2-binary~=2.8.0 pymysql~=0.9.0 sqlalchemy-redshift~=0.7.0 -SQLAlchemy==1.3.13 \ No newline at end of file +SQLAlchemy>=1.3.10,<1.3.16 \ No newline at end of file From 5e3a5335da3459f89f7265297142705a927f399d Mon Sep 17 00:00:00 2001 From: igorborgest Date: Wed, 27 May 2020 13:06:02 -0300 Subject: [PATCH 24/26] Add cast for nest types for wr.s3.to_parquet. #263 --- awswrangler/_data_types.py | 23 +++++++---- testing/test_awswrangler/test_data_lake.py | 46 ++++++++++++++++++++-- 2 files changed, 59 insertions(+), 10 deletions(-) diff --git a/awswrangler/_data_types.py b/awswrangler/_data_types.py index fd972753a..50cc0e372 100644 --- a/awswrangler/_data_types.py +++ b/awswrangler/_data_types.py @@ -21,7 +21,7 @@ def athena2pyarrow(dtype: str) -> pa.DataType: # pylint: disable=too-many-return-statements """Athena to PyArrow data types conversion.""" - dtype = dtype.lower() + dtype = dtype.lower().replace(" ", "") if dtype == "tinyint": return pa.int8() if dtype == "smallint": @@ -47,6 +47,12 @@ def athena2pyarrow(dtype: str) -> pa.DataType: # pylint: disable=too-many-retur if dtype.startswith("decimal"): precision, scale = dtype.replace("decimal(", "").replace(")", "").split(sep=",") return pa.decimal128(precision=int(precision), scale=int(scale)) + if dtype.startswith("array"): + return pa.large_list(athena2pyarrow(dtype=dtype[6:-1])) + if dtype.startswith("struct"): + return pa.struct([(f.split(":", 1)[0], athena2pyarrow(f.split(":", 1)[1])) for f in dtype[7:-1].split(",")]) + if dtype.startswith("map"): # pragma: no cover + return pa.map_(athena2pyarrow(dtype[4:-1].split(",", 1)[0]), athena2pyarrow(dtype[4:-1].split(",", 1)[1])) raise exceptions.UnsupportedType(f"Unsupported Athena type: {dtype}") # pragma: no cover @@ -77,8 +83,6 @@ def athena2pandas(dtype: str) -> str: # pylint: disable=too-many-branches,too-m return "decimal" if dtype in ("binary", "varbinary"): return "bytes" - if dtype == "array": # pragma: no cover - return "list" raise exceptions.UnsupportedType(f"Unsupported Athena type: {dtype}") # pragma: no cover @@ -143,9 +147,9 @@ def pyarrow2athena(dtype: pa.DataType) -> str: # pylint: disable=too-many-branc if pa.types.is_list(dtype): return f"array<{pyarrow2athena(dtype=dtype.value_type)}>" if pa.types.is_struct(dtype): - return f"struct<{', '.join([f'{f.name}:{pyarrow2athena(dtype=f.type)}' for f in dtype])}>" + return f"struct<{','.join([f'{f.name}:{pyarrow2athena(dtype=f.type)}' for f in dtype])}>" if pa.types.is_map(dtype): # pragma: no cover - return f"map<{pyarrow2athena(dtype=dtype.key_type)},{pyarrow2athena(dtype=dtype.item_type)}>" + return f"map<{pyarrow2athena(dtype=dtype.key_type)}, {pyarrow2athena(dtype=dtype.item_type)}>" if dtype == pa.null(): raise exceptions.UndetectedType("We can not infer the data type from an entire null object column") raise exceptions.UnsupportedType(f"Unsupported Pyarrow type: {dtype}") # pragma: no cover @@ -321,7 +325,7 @@ def athena_types_from_pandas( athena_columns_types: Dict[str, str] = {} for k, v in pa_columns_types.items(): if v is None: - athena_columns_types[k] = casts[k] + athena_columns_types[k] = casts[k].replace(" ", "") else: athena_columns_types[k] = pyarrow2athena(dtype=v) _logger.debug("athena_columns_types: %s", athena_columns_types) @@ -384,7 +388,12 @@ def athena_types_from_pyarrow_schema( def cast_pandas_with_athena_types(df: pd.DataFrame, dtype: Dict[str, str]) -> pd.DataFrame: """Cast columns in a Pandas DataFrame.""" for col, athena_type in dtype.items(): - if col in df.columns: + if ( + (col in df.columns) + and (not athena_type.startswith("array")) + and (not athena_type.startswith("struct")) + and (not athena_type.startswith("map")) + ): pandas_type: str = athena2pandas(dtype=athena_type) if pandas_type == "datetime64": df[col] = pd.to_datetime(df[col]) diff --git a/testing/test_awswrangler/test_data_lake.py b/testing/test_awswrangler/test_data_lake.py index 86ce13e4b..9dfbaa23c 100644 --- a/testing/test_awswrangler/test_data_lake.py +++ b/testing/test_awswrangler/test_data_lake.py @@ -1295,9 +1295,7 @@ def test_athena_encryption( assert len(df2.columns) == 2 -def test_athena_nested(bucket, database): - table = "test_athena_nested" - path = f"s3://{bucket}/{table}/" +def test_athena_nested(path, database, table): df = pd.DataFrame( { "c0": [[1, 2, 3], [4, 5, 6]], @@ -2142,3 +2140,45 @@ def test_to_parquet_reverse_partitions(database, table, path, partition_cols): assert df.c0.sum() == df2.c0.sum() assert df.c1.sum() == df2.c1.sum() assert df.c2.sum() == df2.c2.sum() + + +def test_to_parquet_nested_append(database, table, path): + df = pd.DataFrame( + { + "c0": [[1, 2, 3], [4, 5, 6]], + "c1": [[[1, 2], [3, 4]], [[5, 6], [7, 8]]], + "c2": [[["a", "b"], ["c", "d"]], [["e", "f"], ["g", "h"]]], + "c3": [[], [[[[[[[[1]]]]]]]]], + "c4": [{"a": 1}, {"a": 1}], + "c5": [{"a": {"b": {"c": [1, 2]}}}, {"a": {"b": {"c": [3, 4]}}}], + } + ) + paths = wr.s3.to_parquet(df=df, path=path, dataset=True, database=database, table=table)["paths"] + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + df2 = wr.athena.read_sql_query(sql=f"SELECT c0, c1, c2, c4 FROM {table}", database=database) + assert len(df2.index) == 2 + assert len(df2.columns) == 4 + paths = wr.s3.to_parquet(df=df, path=path, dataset=True, database=database, table=table)["paths"] + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + df2 = wr.athena.read_sql_query(sql=f"SELECT c0, c1, c2, c4 FROM {table}", database=database) + assert len(df2.index) == 4 + assert len(df2.columns) == 4 + + +def test_to_parquet_nested_cast(database, table, path): + df = pd.DataFrame({"c0": [[1, 2, 3], [4, 5, 6]], "c1": [[], []], "c2": [{"a": 1, "b": 2}, {"a": 3, "b": 4}]}) + paths = wr.s3.to_parquet( + df=df, + path=path, + dataset=True, + database=database, + table=table, + dtype={"c0": "array", "c1": "array", "c2": "struct"}, + )["paths"] + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + df = pd.DataFrame({"c0": [[1, 2, 3], [4, 5, 6]], "c1": [["a"], ["b"]], "c2": [{"a": 1, "b": 2}, {"a": 3, "b": 4}]}) + paths = wr.s3.to_parquet(df=df, path=path, dataset=True, database=database, table=table)["paths"] + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + df2 = wr.athena.read_sql_query(sql=f"SELECT c0, c2 FROM {table}", database=database) + assert len(df2.index) == 4 + assert len(df2.columns) == 2 From ec855027f0a122fff22c20a900e444b608646911 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Wed, 27 May 2020 18:17:19 -0300 Subject: [PATCH 25/26] Add support to Partition Projection. :rocket: --- .pylintrc | 8 +- awswrangler/catalog.py | 129 ++++++++++++- awswrangler/s3.py | 135 +++++++++++++- .../test_athena_projections.py | 170 ++++++++++++++++++ testing/test_awswrangler/test_data_lake.py | 30 ++-- 5 files changed, 439 insertions(+), 33 deletions(-) create mode 100644 testing/test_awswrangler/test_athena_projections.py diff --git a/.pylintrc b/.pylintrc index 4f41cb3fb..d9490f9f8 100644 --- a/.pylintrc +++ b/.pylintrc @@ -331,7 +331,7 @@ indent-string=' ' max-line-length=120 # Maximum number of lines in a module. -max-module-lines=2500 +max-module-lines=4000 # List of optional constructs for which whitespace checking is disabled. `dict- # separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}. @@ -547,7 +547,7 @@ valid-metaclass-classmethod-first-arg=cls [DESIGN] # Maximum number of arguments for function / method. -max-args=15 +max-args=20 # Maximum number of attributes for a class (see R0902). max-attributes=7 @@ -556,10 +556,10 @@ max-attributes=7 max-bool-expr=5 # Maximum number of branch for function / method body. -max-branches=15 +max-branches=20 # Maximum number of locals for function / method body. -max-locals=30 +max-locals=35 # Maximum number of parents for a class (see R0901). max-parents=7 diff --git a/awswrangler/catalog.py b/awswrangler/catalog.py index 4a8d6b2d3..057f9e4a5 100644 --- a/awswrangler/catalog.py +++ b/awswrangler/catalog.py @@ -94,6 +94,12 @@ def create_parquet_table( columns_comments: Optional[Dict[str, str]] = None, mode: str = "overwrite", catalog_versioning: bool = False, + projection_enabled: bool = False, + projection_types: Optional[Dict[str, str]] = None, + projection_ranges: Optional[Dict[str, str]] = None, + projection_values: Optional[Dict[str, str]] = None, + projection_intervals: Optional[Dict[str, str]] = None, + projection_digits: Optional[Dict[str, str]] = None, boto3_session: Optional[boto3.Session] = None, ) -> None: """Create a Parquet Table (Metadata Only) in the AWS Glue Catalog. @@ -124,6 +130,29 @@ def create_parquet_table( 'overwrite' to recreate any possible existing table or 'append' to keep any possible existing table. catalog_versioning : bool If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it. + projection_enabled : bool + Enable Partition Projection on Athena (https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html) + projection_types : Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections types. + Valid types: "enum", "integer", "date", "injected" + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': 'enum', 'col2_name': 'integer'}) + projection_ranges: Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections ranges. + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': '0,10', 'col2_name': '-1,8675309'}) + projection_values: Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections values. + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': 'A,B,Unknown', 'col2_name': 'foo,boo,bar'}) + projection_intervals: Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections intervals. + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': '1', 'col2_name': '5'}) + projection_digits: Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections digits. + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': '1', 'col2_name': '2'}) boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. @@ -150,6 +179,7 @@ def create_parquet_table( """ table = sanitize_table_name(table=table) partitions_types = {} if partitions_types is None else partitions_types + session: boto3.Session = _utils.ensure_session(session=boto3_session) cat_table_input: Optional[Dict[str, Any]] = _get_table_input(database=database, table=table, boto3_session=session) table_input: Dict[str, Any] @@ -188,6 +218,13 @@ def create_parquet_table( boto3_session=session, table_input=table_input, table_exist=table_exist, + partitions_types=partitions_types, + projection_enabled=projection_enabled, + projection_types=projection_types, + projection_ranges=projection_ranges, + projection_values=projection_values, + projection_intervals=projection_intervals, + projection_digits=projection_digits, ) @@ -903,6 +940,12 @@ def create_csv_table( catalog_versioning: bool = False, sep: str = ",", boto3_session: Optional[boto3.Session] = None, + projection_enabled: bool = False, + projection_types: Optional[Dict[str, str]] = None, + projection_ranges: Optional[Dict[str, str]] = None, + projection_values: Optional[Dict[str, str]] = None, + projection_intervals: Optional[Dict[str, str]] = None, + projection_digits: Optional[Dict[str, str]] = None, ) -> None: """Create a CSV Table (Metadata Only) in the AWS Glue Catalog. @@ -934,6 +977,29 @@ def create_csv_table( If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it. sep : str String of length 1. Field delimiter for the output file. + projection_enabled : bool + Enable Partition Projection on Athena (https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html) + projection_types : Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections types. + Valid types: "enum", "integer", "date", "injected" + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': 'enum', 'col2_name': 'integer'}) + projection_ranges: Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections ranges. + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': '0,10', 'col2_name': '-1,8675309'}) + projection_values: Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections values. + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': 'A,B,Unknown', 'col2_name': 'foo,boo,bar'}) + projection_intervals: Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections intervals. + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': '1', 'col2_name': '5'}) + projection_digits: Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections digits. + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': '1', 'col2_name': '2'}) boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. @@ -980,27 +1046,77 @@ def create_csv_table( boto3_session=session, table_input=table_input, table_exist=does_table_exist(database=database, table=table, boto3_session=session), + partitions_types=partitions_types, + projection_enabled=projection_enabled, + projection_types=projection_types, + projection_ranges=projection_ranges, + projection_values=projection_values, + projection_intervals=projection_intervals, + projection_digits=projection_digits, ) -def _create_table( +def _create_table( # pylint: disable=too-many-branches,too-many-statements database: str, table: str, description: Optional[str], parameters: Optional[Dict[str, str]], - columns_comments: Optional[Dict[str, str]], mode: str, catalog_versioning: bool, boto3_session: Optional[boto3.Session], table_input: Dict[str, Any], table_exist: bool, + projection_enabled: bool, + partitions_types: Optional[Dict[str, str]] = None, + columns_comments: Optional[Dict[str, str]] = None, + projection_types: Optional[Dict[str, str]] = None, + projection_ranges: Optional[Dict[str, str]] = None, + projection_values: Optional[Dict[str, str]] = None, + projection_intervals: Optional[Dict[str, str]] = None, + projection_digits: Optional[Dict[str, str]] = None, ): + # Description if description is not None: table_input["Description"] = description - if parameters is not None: - for k, v in parameters.items(): - table_input["Parameters"][k] = v - if columns_comments is not None: + + # Parameters & Projection + parameters = parameters if parameters else {} + partitions_types = partitions_types if partitions_types else {} + projection_types = projection_types if projection_types else {} + projection_ranges = projection_ranges if projection_ranges else {} + projection_values = projection_values if projection_values else {} + projection_intervals = projection_intervals if projection_intervals else {} + projection_digits = projection_digits if projection_digits else {} + projection_types = {sanitize_column_name(k): v for k, v in projection_types.items()} + projection_ranges = {sanitize_column_name(k): v for k, v in projection_ranges.items()} + projection_values = {sanitize_column_name(k): v for k, v in projection_values.items()} + projection_intervals = {sanitize_column_name(k): v for k, v in projection_intervals.items()} + projection_digits = {sanitize_column_name(k): v for k, v in projection_digits.items()} + for k, v in partitions_types.items(): + if v == "date": + table_input["Parameters"][f"projection.{k}.format"] = "yyyy-MM-dd" + elif v == "timestamp": + table_input["Parameters"][f"projection.{k}.format"] = "yyyy-MM-dd HH:mm:ss" + table_input["Parameters"][f"projection.{k}.interval.unit"] = "SECONDS" + table_input["Parameters"][f"projection.{k}.interval"] = "1" + for k, v in projection_types.items(): + table_input["Parameters"][f"projection.{k}.type"] = v + for k, v in projection_ranges.items(): + table_input["Parameters"][f"projection.{k}.range"] = v + for k, v in projection_values.items(): + table_input["Parameters"][f"projection.{k}.values"] = v + for k, v in projection_intervals.items(): + table_input["Parameters"][f"projection.{k}.interval"] = str(v) + for k, v in projection_digits.items(): + table_input["Parameters"][f"projection.{k}.digits"] = str(v) + parameters["projection.enabled"] = "true" if projection_enabled is True else "false" + for k, v in parameters.items(): + table_input["Parameters"][k] = v + + # Column comments + columns_comments = columns_comments if columns_comments else {} + columns_comments = {sanitize_column_name(k): v for k, v in columns_comments.items()} + if columns_comments: for col in table_input["StorageDescriptor"]["Columns"]: name: str = col["Name"] if name in columns_comments: @@ -1009,6 +1125,7 @@ def _create_table( name = par["Name"] if name in columns_comments: par["Comment"] = columns_comments[name] + session: boto3.Session = _utils.ensure_session(session=boto3_session) client_glue: boto3.client = _utils.client(service_name="glue", session=session) skip_archive: bool = not catalog_versioning diff --git a/awswrangler/s3.py b/awswrangler/s3.py index fa913ffb1..b13ccff63 100644 --- a/awswrangler/s3.py +++ b/awswrangler/s3.py @@ -448,6 +448,13 @@ def to_csv( # pylint: disable=too-many-arguments description: Optional[str] = None, parameters: Optional[Dict[str, str]] = None, columns_comments: Optional[Dict[str, str]] = None, + regular_partitions: bool = True, + projection_enabled: bool = False, + projection_types: Optional[Dict[str, str]] = None, + projection_ranges: Optional[Dict[str, str]] = None, + projection_values: Optional[Dict[str, str]] = None, + projection_intervals: Optional[Dict[str, str]] = None, + projection_digits: Optional[Dict[str, str]] = None, **pandas_kwargs, ) -> Dict[str, Union[List[str], Dict[str, List[str]]]]: """Write CSV file or dataset on Amazon S3. @@ -513,6 +520,34 @@ def to_csv( # pylint: disable=too-many-arguments columns_comments : Dict[str, str], optional Glue/Athena catalog: Columns names and the related comments (e.g. {'col0': 'Column 0.', 'col1': 'Column 1.', 'col2': 'Partition.'}). + regular_partitions : bool + Create regular partitions (Non projected partitions) on Glue Catalog. + Disable when you will work only with Partition Projection. + Keep enabled even when working with projections is useful to keep + Redshift Spectrum working with the regular partitions. + projection_enabled : bool + Enable Partition Projection on Athena (https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html) + projection_types : Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections types. + Valid types: "enum", "integer", "date", "injected" + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': 'enum', 'col2_name': 'integer'}) + projection_ranges: Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections ranges. + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': '0,10', 'col2_name': '-1,8675309'}) + projection_values: Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections values. + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': 'A,B,Unknown', 'col2_name': 'foo,boo,bar'}) + projection_intervals: Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections intervals. + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': '1', 'col2_name': '5'}) + projection_digits: Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections digits. + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': '1', 'col2_name': '2'}) pandas_kwargs : keyword arguments forwarded to pandas.DataFrame.to_csv() https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_csv.html @@ -630,12 +665,10 @@ def to_csv( # pylint: disable=too-many-arguments # Sanitize table to respect Athena's standards partition_cols = partition_cols if partition_cols else [] dtype = dtype if dtype else {} - columns_comments = columns_comments if columns_comments else {} partitions_values: Dict[str, List[str]] = {} df = catalog.sanitize_dataframe_columns_names(df=df) partition_cols = [catalog.sanitize_column_name(p) for p in partition_cols] dtype = {catalog.sanitize_column_name(k): v.lower() for k, v in dtype.items()} - columns_comments = {catalog.sanitize_column_name(k): v for k, v in columns_comments.items()} session: boto3.Session = _utils.ensure_session(session=boto3_session) fs: s3fs.S3FileSystem = _utils.get_fs(session=session, s3_additional_kwargs=s3_additional_kwargs) @@ -698,8 +731,14 @@ def to_csv( # pylint: disable=too-many-arguments mode=mode, catalog_versioning=catalog_versioning, sep=sep, + projection_enabled=projection_enabled, + projection_types=projection_types, + projection_ranges=projection_ranges, + projection_values=projection_values, + projection_intervals=projection_intervals, + projection_digits=projection_digits, ) - if partitions_values: + if partitions_values and (regular_partitions is True): _logger.debug("partitions_values:\n%s", partitions_values) catalog.add_csv_partitions( database=database, table=table, partitions_values=partitions_values, boto3_session=session, sep=sep @@ -873,6 +912,13 @@ def to_parquet( # pylint: disable=too-many-arguments description: Optional[str] = None, parameters: Optional[Dict[str, str]] = None, columns_comments: Optional[Dict[str, str]] = None, + regular_partitions: bool = True, + projection_enabled: bool = False, + projection_types: Optional[Dict[str, str]] = None, + projection_ranges: Optional[Dict[str, str]] = None, + projection_values: Optional[Dict[str, str]] = None, + projection_intervals: Optional[Dict[str, str]] = None, + projection_digits: Optional[Dict[str, str]] = None, ) -> Dict[str, Union[List[str], Dict[str, List[str]]]]: """Write Parquet file or dataset on Amazon S3. @@ -935,6 +981,34 @@ def to_parquet( # pylint: disable=too-many-arguments columns_comments : Dict[str, str], optional Glue/Athena catalog: Columns names and the related comments (e.g. {'col0': 'Column 0.', 'col1': 'Column 1.', 'col2': 'Partition.'}). + regular_partitions : bool + Create regular partitions (Non projected partitions) on Glue Catalog. + Disable when you will work only with Partition Projection. + Keep enabled even when working with projections is useful to keep + Redshift Spectrum working with the regular partitions. + projection_enabled : bool + Enable Partition Projection on Athena (https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html) + projection_types : Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections types. + Valid types: "enum", "integer", "date", "injected" + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': 'enum', 'col2_name': 'integer'}) + projection_ranges: Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections ranges. + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': '0,10', 'col2_name': '-1,8675309'}) + projection_values: Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections values. + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': 'A,B,Unknown', 'col2_name': 'foo,boo,bar'}) + projection_intervals: Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections intervals. + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': '1', 'col2_name': '5'}) + projection_digits: Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections digits. + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': '1', 'col2_name': '2'}) Returns ------- @@ -1052,12 +1126,10 @@ def to_parquet( # pylint: disable=too-many-arguments # Sanitize table to respect Athena's standards partition_cols = partition_cols if partition_cols else [] dtype = dtype if dtype else {} - columns_comments = columns_comments if columns_comments else {} partitions_values: Dict[str, List[str]] = {} df = catalog.sanitize_dataframe_columns_names(df=df) partition_cols = [catalog.sanitize_column_name(p) for p in partition_cols] dtype = {catalog.sanitize_column_name(k): v.lower() for k, v in dtype.items()} - columns_comments = {catalog.sanitize_column_name(k): v for k, v in columns_comments.items()} df = catalog.drop_duplicated_columns(df=df) session: boto3.Session = _utils.ensure_session(session=boto3_session) @@ -1128,8 +1200,14 @@ def to_parquet( # pylint: disable=too-many-arguments boto3_session=session, mode=mode, catalog_versioning=catalog_versioning, + projection_enabled=projection_enabled, + projection_types=projection_types, + projection_ranges=projection_ranges, + projection_values=projection_values, + projection_intervals=projection_intervals, + projection_digits=projection_digits, ) - if partitions_values: + if partitions_values and (regular_partitions is True): _logger.debug("partitions_values:\n%s", partitions_values) catalog.add_parquet_partitions( database=database, @@ -1934,7 +2012,7 @@ def _read_parquet_metadata_file(path: str, use_threads: bool, boto3_session: bot return _data_types.athena_types_from_pyarrow_schema(schema=data.schema.to_arrow_schema(), partitions=None)[0] -def store_parquet_metadata( +def store_parquet_metadata( # pylint: disable=too-many-arguments path: str, database: str, table: str, @@ -1948,6 +2026,13 @@ def store_parquet_metadata( compression: Optional[str] = None, mode: str = "overwrite", catalog_versioning: bool = False, + regular_partitions: bool = True, + projection_enabled: bool = False, + projection_types: Optional[Dict[str, str]] = None, + projection_ranges: Optional[Dict[str, str]] = None, + projection_values: Optional[Dict[str, str]] = None, + projection_intervals: Optional[Dict[str, str]] = None, + projection_digits: Optional[Dict[str, str]] = None, boto3_session: Optional[boto3.Session] = None, ) -> Tuple[Dict[str, str], Optional[Dict[str, str]], Optional[Dict[str, List[str]]]]: """Infer and store parquet metadata on AWS Glue Catalog. @@ -2002,6 +2087,34 @@ def store_parquet_metadata( 'overwrite' to recreate any possible existing table or 'append' to keep any possible existing table. catalog_versioning : bool If True and `mode="overwrite"`, creates an archived version of the table catalog before updating it. + regular_partitions : bool + Create regular partitions (Non projected partitions) on Glue Catalog. + Disable when you will work only with Partition Projection. + Keep enabled even when working with projections is useful to keep + Redshift Spectrum working with the regular partitions. + projection_enabled : bool + Enable Partition Projection on Athena (https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html) + projection_types : Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections types. + Valid types: "enum", "integer", "date", "injected" + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': 'enum', 'col2_name': 'integer'}) + projection_ranges: Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections ranges. + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': '0,10', 'col2_name': '-1,8675309'}) + projection_values: Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections values. + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': 'A,B,Unknown', 'col2_name': 'foo,boo,bar'}) + projection_intervals: Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections intervals. + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': '1', 'col2_name': '5'}) + projection_digits: Optional[Dict[str, str]] + Dictionary of partitions names and Athena projections digits. + https://docs.aws.amazon.com/athena/latest/ug/partition-projection-supported-types.html + (e.g. {'col_name': '1', 'col2_name': '2'}) boto3_session : boto3.Session(), optional Boto3 Session. The default boto3 session will be used if boto3_session receive None. @@ -2050,9 +2163,15 @@ def store_parquet_metadata( columns_comments=columns_comments, mode=mode, catalog_versioning=catalog_versioning, + projection_enabled=projection_enabled, + projection_types=projection_types, + projection_ranges=projection_ranges, + projection_values=projection_values, + projection_intervals=projection_intervals, + projection_digits=projection_digits, boto3_session=session, ) - if (partitions_types is not None) and (partitions_values is not None): + if (partitions_types is not None) and (partitions_values is not None) and (regular_partitions is True): catalog.add_parquet_partitions( database=database, table=table, diff --git a/testing/test_awswrangler/test_athena_projections.py b/testing/test_awswrangler/test_athena_projections.py new file mode 100644 index 000000000..66506ac37 --- /dev/null +++ b/testing/test_awswrangler/test_athena_projections.py @@ -0,0 +1,170 @@ +import logging +import time + +import boto3 +import pandas as pd +import pytest + +import awswrangler as wr + +from ._utils import CFN_VALID_STATUS, dt, get_time_str_with_random_suffix, ts + +logging.basicConfig(level=logging.INFO, format="[%(asctime)s][%(levelname)s][%(name)s][%(funcName)s] %(message)s") +logging.getLogger("awswrangler").setLevel(logging.DEBUG) +logging.getLogger("botocore.credentials").setLevel(logging.CRITICAL) + + +@pytest.fixture(scope="module") +def cloudformation_outputs(): + response = boto3.client("cloudformation").describe_stacks(StackName="aws-data-wrangler") + stack = [x for x in response.get("Stacks") if x["StackStatus"] in CFN_VALID_STATUS][0] + outputs = {} + for output in stack.get("Outputs"): + outputs[output.get("OutputKey")] = output.get("OutputValue") + yield outputs + + +@pytest.fixture(scope="module") +def region(cloudformation_outputs): + yield cloudformation_outputs["Region"] + + +@pytest.fixture(scope="module") +def bucket(cloudformation_outputs): + yield cloudformation_outputs["BucketName"] + + +@pytest.fixture(scope="module") +def database(cloudformation_outputs): + yield cloudformation_outputs["GlueDatabaseName"] + + +@pytest.fixture(scope="module") +def external_schema(cloudformation_outputs, database): + region = cloudformation_outputs.get("Region") + sql = f""" + CREATE EXTERNAL SCHEMA IF NOT EXISTS aws_data_wrangler_external FROM data catalog + DATABASE '{database}' + IAM_ROLE '{cloudformation_outputs["RedshiftRole"]}' + REGION '{region}'; + """ + engine = wr.catalog.get_engine(connection="aws-data-wrangler-redshift") + with engine.connect() as con: + con.execute(sql) + yield "aws_data_wrangler_external" + + +@pytest.fixture(scope="function") +def path(bucket): + s3_path = f"s3://{bucket}/{get_time_str_with_random_suffix()}/" + print(f"S3 Path: {s3_path}") + time.sleep(1) + objs = wr.s3.list_objects(s3_path) + wr.s3.delete_objects(path=objs) + wr.s3.wait_objects_not_exist(objs) + yield s3_path + time.sleep(1) + objs = wr.s3.list_objects(s3_path) + wr.s3.delete_objects(path=objs) + wr.s3.wait_objects_not_exist(objs) + + +@pytest.fixture(scope="function") +def table(database): + name = f"tbl_{get_time_str_with_random_suffix()}" + print(f"Table name: {name}") + wr.catalog.delete_table_if_exists(database=database, table=name) + yield name + wr.catalog.delete_table_if_exists(database=database, table=name) + + +def test_to_parquet_projection_integer(database, table, path): + df = pd.DataFrame({"c0": [0, 1, 2], "c1": [0, 1, 2], "c2": [0, 100, 200], "c3": [0, 1, 2]}) + paths = wr.s3.to_parquet( + df=df, + path=path, + dataset=True, + database=database, + table=table, + partition_cols=["c1", "c2", "c3"], + regular_partitions=False, + projection_enabled=True, + projection_types={"c1": "integer", "c2": "integer", "c3": "integer"}, + projection_ranges={"c1": "0,2", "c2": "0,200", "c3": "0,2"}, + projection_intervals={"c2": "100"}, + projection_digits={"c3": "1"}, + )["paths"] + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + df2 = wr.athena.read_sql_table(table, database) + assert df.shape == df2.shape + assert df.c0.sum() == df2.c0.sum() + assert df.c1.sum() == df2.c1.sum() + assert df.c2.sum() == df2.c2.sum() + assert df.c3.sum() == df2.c3.sum() + + +def test_to_parquet_projection_enum(database, table, path): + df = pd.DataFrame({"c0": [0, 1, 2], "c1": [1, 2, 3], "c2": ["foo", "boo", "bar"]}) + paths = wr.s3.to_parquet( + df=df, + path=path, + dataset=True, + database=database, + table=table, + partition_cols=["c1", "c2"], + regular_partitions=False, + projection_enabled=True, + projection_types={"c1": "enum", "c2": "enum"}, + projection_values={"c1": "1,2,3", "c2": "foo,boo,bar"}, + )["paths"] + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + df2 = wr.athena.read_sql_table(table, database) + assert df.shape == df2.shape + assert df.c0.sum() == df2.c0.sum() + assert df.c1.sum() == df2.c1.sum() + + +def test_to_parquet_projection_date(database, table, path): + df = pd.DataFrame( + { + "c0": [0, 1, 2], + "c1": [dt("2020-01-01"), dt("2020-01-02"), dt("2020-01-03")], + "c2": [ts("2020-01-01 01:01:01.0"), ts("2020-01-01 01:01:02.0"), ts("2020-01-01 01:01:03.0")], + } + ) + paths = wr.s3.to_parquet( + df=df, + path=path, + dataset=True, + database=database, + table=table, + partition_cols=["c1", "c2"], + regular_partitions=False, + projection_enabled=True, + projection_types={"c1": "date", "c2": "date"}, + projection_ranges={"c1": "2020-01-01,2020-01-03", "c2": "2020-01-01 01:01:00,2020-01-01 01:01:03"}, + )["paths"] + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + df2 = wr.athena.read_sql_table(table, database) + print(df2) + assert df.shape == df2.shape + assert df.c0.sum() == df2.c0.sum() + + +def test_to_parquet_projection_injected(database, table, path): + df = pd.DataFrame({"c0": [0, 1, 2], "c1": ["foo", "boo", "bar"], "c2": ["0", "1", "2"]}) + paths = wr.s3.to_parquet( + df=df, + path=path, + dataset=True, + database=database, + table=table, + partition_cols=["c1", "c2"], + regular_partitions=False, + projection_enabled=True, + projection_types={"c1": "injected", "c2": "injected"}, + )["paths"] + wr.s3.wait_objects_exist(paths=paths, use_threads=False) + df2 = wr.athena.read_sql_query(f"SELECT * FROM {table} WHERE c1='foo' AND c2='0'", database) + assert df2.shape == (1, 3) + assert df2.c0.iloc[0] == 0 diff --git a/testing/test_awswrangler/test_data_lake.py b/testing/test_awswrangler/test_data_lake.py index 9dfbaa23c..bcd6e3f6d 100644 --- a/testing/test_awswrangler/test_data_lake.py +++ b/testing/test_awswrangler/test_data_lake.py @@ -1708,7 +1708,7 @@ def test_to_parquet_modes(database, table, path, external_schema): assert df.shape == df2.shape assert df.c0.sum() == df2.c0.sum() parameters = wr.catalog.get_table_parameters(database, table) - assert len(parameters) == 5 + assert len(parameters) >= 5 assert parameters["num_cols"] == str(len(df2.columns)) assert parameters["num_rows"] == str(len(df2.index)) assert wr.catalog.get_table_description(database, table) == "c0" @@ -1734,7 +1734,7 @@ def test_to_parquet_modes(database, table, path, external_schema): assert df.shape == df2.shape assert df.c1.sum() == df2.c1.sum() parameters = wr.catalog.get_table_parameters(database, table) - assert len(parameters) == 5 + assert len(parameters) >= 5 assert parameters["num_cols"] == str(len(df2.columns)) assert parameters["num_rows"] == str(len(df2.index)) assert wr.catalog.get_table_description(database, table) == "c1" @@ -1761,7 +1761,7 @@ def test_to_parquet_modes(database, table, path, external_schema): assert len(df.index) * 2 == len(df2.index) assert df.c1.sum() + 1 == df2.c1.sum() parameters = wr.catalog.get_table_parameters(database, table) - assert len(parameters) == 5 + assert len(parameters) >= 5 assert parameters["num_cols"] == str(len(df2.columns)) assert parameters["num_rows"] == str(len(df2.index)) assert wr.catalog.get_table_description(database, table) == "c1" @@ -1788,7 +1788,7 @@ def test_to_parquet_modes(database, table, path, external_schema): assert len(df2.index) == 9 assert df2.c1.sum() == 3 parameters = wr.catalog.get_table_parameters(database, table) - assert len(parameters) == 5 + assert len(parameters) >= 5 assert parameters["num_cols"] == "2" assert parameters["num_rows"] == "9" assert wr.catalog.get_table_description(database, table) == "c1+c2" @@ -1816,7 +1816,7 @@ def test_to_parquet_modes(database, table, path, external_schema): assert len(df2.index) == 10 assert df2.c1.sum() == 4 parameters = wr.catalog.get_table_parameters(database, table) - assert len(parameters) == 5 + assert len(parameters) >= 5 assert parameters["num_cols"] == "3" assert parameters["num_rows"] == "10" assert wr.catalog.get_table_description(database, table) == "c1+c2+c3" @@ -1850,7 +1850,7 @@ def test_to_parquet_modes(database, table, path, external_schema): assert df.shape == df2.shape assert df.c1.sum() == df2.c1.sum() parameters = wr.catalog.get_table_parameters(database, table) - assert len(parameters) == 5 + assert len(parameters) >= 5 assert parameters["num_cols"] == "2" assert parameters["num_rows"] == "2" assert wr.catalog.get_table_description(database, table) == "c0+c1" @@ -1879,7 +1879,7 @@ def test_to_parquet_modes(database, table, path, external_schema): assert len(df2.index) == 3 assert df2.c1.sum() == 3 parameters = wr.catalog.get_table_parameters(database, table) - assert len(parameters) == 5 + assert len(parameters) >= 5 assert parameters["num_cols"] == "2" assert parameters["num_rows"] == "3" assert wr.catalog.get_table_description(database, table) == "c0+c1" @@ -1908,7 +1908,7 @@ def test_to_parquet_modes(database, table, path, external_schema): assert len(df2.index) == 4 assert df2.c1.sum() == 6 parameters = wr.catalog.get_table_parameters(database, table) - assert len(parameters) == 5 + assert len(parameters) >= 5 assert parameters["num_cols"] == "3" assert parameters["num_rows"] == "4" assert wr.catalog.get_table_description(database, table) == "c0+c1+c2" @@ -1944,7 +1944,7 @@ def test_store_parquet_metadata_modes(database, table, path, external_schema): assert df.shape == df2.shape assert df.c0.sum() == df2.c0.sum() parameters = wr.catalog.get_table_parameters(database, table) - assert len(parameters) == 5 + assert len(parameters) >= 5 assert parameters["num_cols"] == str(len(df2.columns)) assert parameters["num_rows"] == str(len(df2.index)) assert wr.catalog.get_table_description(database, table) == "c0" @@ -1970,7 +1970,7 @@ def test_store_parquet_metadata_modes(database, table, path, external_schema): assert df.shape == df2.shape assert df.c1.sum() == df2.c1.sum() parameters = wr.catalog.get_table_parameters(database, table) - assert len(parameters) == 5 + assert len(parameters) >= 5 assert parameters["num_cols"] == str(len(df2.columns)) assert parameters["num_rows"] == str(len(df2.index)) assert wr.catalog.get_table_description(database, table) == "c1" @@ -1997,7 +1997,7 @@ def test_store_parquet_metadata_modes(database, table, path, external_schema): assert len(df.index) * 2 == len(df2.index) assert df.c1.sum() + 1 == df2.c1.sum() parameters = wr.catalog.get_table_parameters(database, table) - assert len(parameters) == 5 + assert len(parameters) >= 5 assert parameters["num_cols"] == str(len(df2.columns)) assert parameters["num_rows"] == str(len(df2.index)) assert wr.catalog.get_table_description(database, table) == "c1" @@ -2025,7 +2025,7 @@ def test_store_parquet_metadata_modes(database, table, path, external_schema): assert len(df2.index) == 9 assert df2.c1.sum() == 4 parameters = wr.catalog.get_table_parameters(database, table) - assert len(parameters) == 5 + assert len(parameters) >= 5 assert parameters["num_cols"] == "2" assert parameters["num_rows"] == "9" assert wr.catalog.get_table_description(database, table) == "c1+c2" @@ -2052,7 +2052,7 @@ def test_store_parquet_metadata_modes(database, table, path, external_schema): assert df.shape == df2.shape assert df.c1.sum() == df2.c1.astype(int).sum() parameters = wr.catalog.get_table_parameters(database, table) - assert len(parameters) == 5 + assert len(parameters) >= 5 assert parameters["num_cols"] == "2" assert parameters["num_rows"] == "2" assert wr.catalog.get_table_description(database, table) == "c0+c1" @@ -2082,7 +2082,7 @@ def test_store_parquet_metadata_modes(database, table, path, external_schema): assert len(df2.index) == 3 assert df2.c1.astype(int).sum() == 3 parameters = wr.catalog.get_table_parameters(database, table) - assert len(parameters) == 5 + assert len(parameters) >= 5 assert parameters["num_cols"] == "2" assert parameters["num_rows"] == "3" assert wr.catalog.get_table_description(database, table) == "c0+c1" @@ -2112,7 +2112,7 @@ def test_store_parquet_metadata_modes(database, table, path, external_schema): assert len(df2.index) == 4 assert df2.c1.astype(int).sum() == 6 parameters = wr.catalog.get_table_parameters(database, table) - assert len(parameters) == 5 + assert len(parameters) >= 5 assert parameters["num_cols"] == "3" assert parameters["num_rows"] == "4" assert wr.catalog.get_table_description(database, table) == "c0+c1+c2" From a660e5980109b2bb970798bb82c4229b9033e67c Mon Sep 17 00:00:00 2001 From: igorborgest Date: Wed, 27 May 2020 21:25:56 -0300 Subject: [PATCH 26/26] Add Partition Projection tutorial (17). --- README.md | 1 + ...ojections.py => test_athena_projection.py} | 0 tutorials/17 - Partition Projection.ipynb | 802 ++++++++++++++++++ 3 files changed, 803 insertions(+) rename testing/test_awswrangler/{test_athena_projections.py => test_athena_projection.py} (100%) create mode 100644 tutorials/17 - Partition Projection.ipynb diff --git a/README.md b/README.md index ad1fa927e..0f01a2d7c 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ df = wr.db.read_sql_query("SELECT * FROM external_schema.my_table", con=engine) - [14 - Schema Evolution](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/14%20-%20Schema%20Evolution.ipynb) - [15 - EMR](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/15%20-%20EMR.ipynb) - [16 - EMR & Docker](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/16%20-%20EMR%20%26%20Docker.ipynb) + - [17 - Partition Projection](https://github.com/awslabs/aws-data-wrangler/blob/master/tutorials/17%20-%20Partition%20Projection.ipynb) - [**API Reference**](https://aws-data-wrangler.readthedocs.io/en/latest/api.html) - [Amazon S3](https://aws-data-wrangler.readthedocs.io/en/latest/api.html#amazon-s3) - [AWS Glue Catalog](https://aws-data-wrangler.readthedocs.io/en/latest/api.html#aws-glue-catalog) diff --git a/testing/test_awswrangler/test_athena_projections.py b/testing/test_awswrangler/test_athena_projection.py similarity index 100% rename from testing/test_awswrangler/test_athena_projections.py rename to testing/test_awswrangler/test_athena_projection.py diff --git a/tutorials/17 - Partition Projection.ipynb b/tutorials/17 - Partition Projection.ipynb new file mode 100644 index 000000000..deb9452e2 --- /dev/null +++ b/tutorials/17 - Partition Projection.ipynb @@ -0,0 +1,802 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "[![AWS Data Wrangler](_static/logo.png \"AWS Data Wrangler\")](https://github.com/awslabs/aws-data-wrangler)\n", + "\n", + "# 17 - Partition Projection\n", + "\n", + "https://docs.aws.amazon.com/athena/latest/ug/partition-projection.html" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import awswrangler as wr\n", + "import pandas as pd\n", + "from datetime import datetime\n", + "import getpass" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Enter your bucket name:" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdin", + "output_type": "stream", + "text": [ + " ············\n" + ] + } + ], + "source": [ + "bucket = getpass.getpass()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Integer projection" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
valueyearmonthday
0120191025
1220201126
2320211227
\n", + "
" + ], + "text/plain": [ + " value year month day\n", + "0 1 2019 10 25\n", + "1 2 2020 11 26\n", + "2 3 2021 12 27" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df = pd.DataFrame({\n", + " \"value\": [1, 2, 3],\n", + " \"year\": [2019, 2020, 2021],\n", + " \"month\": [10, 11, 12],\n", + " \"day\": [25, 26, 27]\n", + "})\n", + "\n", + "df" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "wr.s3.to_parquet(\n", + " df=df,\n", + " path=f\"s3://{bucket}/table_integer/\",\n", + " dataset=True,\n", + " partition_cols=[\"year\", \"month\", \"day\"],\n", + " database=\"default\",\n", + " table=\"table_integer\",\n", + " projection_enabled=True,\n", + " projection_types={\n", + " \"year\": \"integer\",\n", + " \"month\": \"integer\",\n", + " \"day\": \"integer\"\n", + " },\n", + " projection_ranges={\n", + " \"year\": \"2000,2025\",\n", + " \"month\": \"1,12\",\n", + " \"day\": \"1,31\"\n", + " },\n", + ");" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
valueyearmonthday
0220201126
1320211227
2120191025
\n", + "
" + ], + "text/plain": [ + " value year month day\n", + "0 2 2020 11 26\n", + "1 3 2021 12 27\n", + "2 1 2019 10 25" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "wr.athena.read_sql_query(f\"SELECT * FROM table_integer\", database=\"default\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Enum projection" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
valuecity
01São Paulo
12Tokio
23Seattle
\n", + "
" + ], + "text/plain": [ + " value city\n", + "0 1 São Paulo\n", + "1 2 Tokio\n", + "2 3 Seattle" + ] + }, + "execution_count": 6, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df = pd.DataFrame({\n", + " \"value\": [1, 2, 3],\n", + " \"city\": [\"São Paulo\", \"Tokio\", \"Seattle\"],\n", + "})\n", + "\n", + "df" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "wr.s3.to_parquet(\n", + " df=df,\n", + " path=f\"s3://{bucket}/table_enum/\",\n", + " dataset=True,\n", + " partition_cols=[\"city\"],\n", + " database=\"default\",\n", + " table=\"table_enum\",\n", + " projection_enabled=True,\n", + " projection_types={\n", + " \"city\": \"enum\",\n", + " },\n", + " projection_values={\n", + " \"city\": \"São Paulo,Tokio,Seattle\"\n", + " },\n", + ");" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
valuecity
02Tokio
11São Paulo
23Seattle
\n", + "
" + ], + "text/plain": [ + " value city\n", + "0 2 Tokio\n", + "1 1 São Paulo\n", + "2 3 Seattle" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "wr.athena.read_sql_query(f\"SELECT * FROM table_enum\", database=\"default\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Date projection" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
valuedtts
012020-01-012020-01-01 00:00:00
122020-01-022020-01-01 00:00:01
232020-01-032020-01-01 00:00:02
\n", + "
" + ], + "text/plain": [ + " value dt ts\n", + "0 1 2020-01-01 2020-01-01 00:00:00\n", + "1 2 2020-01-02 2020-01-01 00:00:01\n", + "2 3 2020-01-03 2020-01-01 00:00:02" + ] + }, + "execution_count": 9, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "ts = lambda x: datetime.strptime(x, \"%Y-%m-%d %H:%M:%S\")\n", + "dt = lambda x: datetime.strptime(x, \"%Y-%m-%d\").date()\n", + "\n", + "df = pd.DataFrame({\n", + " \"value\": [1, 2, 3],\n", + " \"dt\": [dt(\"2020-01-01\"), dt(\"2020-01-02\"), dt(\"2020-01-03\")],\n", + " \"ts\": [ts(\"2020-01-01 00:00:00\"), ts(\"2020-01-01 00:00:01\"), ts(\"2020-01-01 00:00:02\")],\n", + "})\n", + "\n", + "df" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "wr.s3.to_parquet(\n", + " df=df,\n", + " path=f\"s3://{bucket}/table_date/\",\n", + " dataset=True,\n", + " partition_cols=[\"dt\", \"ts\"],\n", + " database=\"default\",\n", + " table=\"table_date\",\n", + " projection_enabled=True,\n", + " projection_types={\n", + " \"dt\": \"date\",\n", + " \"ts\": \"date\",\n", + " },\n", + " projection_ranges={\n", + " \"dt\": \"2020-01-01,2020-01-03\",\n", + " \"ts\": \"2020-01-01 00:00:00,2020-01-01 00:00:02\"\n", + " },\n", + ");" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
valuedtts
032020-01-032020-01-01 00:00:02
112020-01-012020-01-01 00:00:00
222020-01-022020-01-01 00:00:01
\n", + "
" + ], + "text/plain": [ + " value dt ts\n", + "0 3 2020-01-03 2020-01-01 00:00:02\n", + "1 1 2020-01-01 2020-01-01 00:00:00\n", + "2 2 2020-01-02 2020-01-01 00:00:01" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "wr.athena.read_sql_query(f\"SELECT * FROM table_date\", database=\"default\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Injected projection" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
valueuuid
01761e2488-a078-11ea-bb37-0242ac130002
12b89ed095-8179-4635-9537-88592c0f6bc3
2387adc586-ce88-4f0a-b1c8-bf8e00d32249
\n", + "
" + ], + "text/plain": [ + " value uuid\n", + "0 1 761e2488-a078-11ea-bb37-0242ac130002\n", + "1 2 b89ed095-8179-4635-9537-88592c0f6bc3\n", + "2 3 87adc586-ce88-4f0a-b1c8-bf8e00d32249" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df = pd.DataFrame({\n", + " \"value\": [1, 2, 3],\n", + " \"uuid\": [\"761e2488-a078-11ea-bb37-0242ac130002\", \"b89ed095-8179-4635-9537-88592c0f6bc3\", \"87adc586-ce88-4f0a-b1c8-bf8e00d32249\"],\n", + "})\n", + "\n", + "df" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [], + "source": [ + "wr.s3.to_parquet(\n", + " df=df,\n", + " path=f\"s3://{bucket}/table_injected/\",\n", + " dataset=True,\n", + " partition_cols=[\"uuid\"],\n", + " database=\"default\",\n", + " table=\"table_injected\",\n", + " projection_enabled=True,\n", + " projection_types={\n", + " \"uuid\": \"injected\",\n", + " }\n", + ");" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
valueuuid
02b89ed095-8179-4635-9537-88592c0f6bc3
\n", + "
" + ], + "text/plain": [ + " value uuid\n", + "0 2 b89ed095-8179-4635-9537-88592c0f6bc3" + ] + }, + "execution_count": 14, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "wr.athena.read_sql_query(\n", + " sql=f\"SELECT * FROM table_injected WHERE uuid='b89ed095-8179-4635-9537-88592c0f6bc3'\",\n", + " database=\"default\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Cleaning Up" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [], + "source": [ + "wr.s3.delete_objects(f\"s3://{bucket}/table_integer/\")\n", + "wr.s3.delete_objects(f\"s3://{bucket}/table_enum/\")\n", + "wr.s3.delete_objects(f\"s3://{bucket}/table_date/\")\n", + "wr.s3.delete_objects(f\"s3://{bucket}/table_injected/\")" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [], + "source": [ + "wr.catalog.delete_table_if_exists(table=\"table_integer\", database=\"default\")\n", + "wr.catalog.delete_table_if_exists(table=\"table_enum\", database=\"default\")\n", + "wr.catalog.delete_table_if_exists(table=\"table_date\", database=\"default\")\n", + "wr.catalog.delete_table_if_exists(table=\"table_injected\", database=\"default\");" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.10" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}