From 5f369594f12a3a305dd86b64af69f2585de91bf1 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Sun, 2 Feb 2020 13:45:02 -0300 Subject: [PATCH 1/2] Add support to Redshift Spectrum on Glue metadata --- awswrangler/glue.py | 4 +- awswrangler/redshift.py | 4 +- testing/deploy-cloudformation.sh | 12 +- testing/parameters.properties | 4 +- testing/template.yaml | 68 +++++++--- testing/test_awswrangler/test_aurora.py | 14 +-- testing/test_awswrangler/test_pandas.py | 36 +++--- testing/test_awswrangler/test_redshift.py | 145 ++++++++++++++++++---- 8 files changed, 217 insertions(+), 70 deletions(-) diff --git a/awswrangler/glue.py b/awswrangler/glue.py index 435441da3..168b6f810 100644 --- a/awswrangler/glue.py +++ b/awswrangler/glue.py @@ -379,6 +379,7 @@ def csv_partition_definition(partition, compression, extra_args=None): return { "StorageDescriptor": { "InputFormat": "org.apache.hadoop.mapred.TextInputFormat", + "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", "Location": partition[0], "Compressed": compressed, "SerdeInfo": { @@ -440,7 +441,8 @@ def parquet_partition_definition(partition, compression): compressed = False if compression is None else True return { "StorageDescriptor": { - "InputFormat": "org.apache.hadoop.mapred.TextInputFormat", + "InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat", + "OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat", "Location": partition[0], "Compressed": compressed, "SerdeInfo": { diff --git a/awswrangler/redshift.py b/awswrangler/redshift.py index 3efe7d00b..7fe606bf8 100644 --- a/awswrangler/redshift.py +++ b/awswrangler/redshift.py @@ -44,7 +44,7 @@ def _validate_connection(database, password, tcp_keepalive=True, application_name="aws-data-wrangler-validation", - validation_timeout=5): + validation_timeout=10): conn = pg8000.connect(database=database, host=host, port=int(port), @@ -66,7 +66,7 @@ def generate_connection(database, application_name="aws-data-wrangler", connection_timeout=1_200_000, statement_timeout=1_200_000, - validation_timeout=5): + validation_timeout=10): """ Generates a valid connection object to be passed to the load_table method diff --git a/testing/deploy-cloudformation.sh b/testing/deploy-cloudformation.sh index c9c20628f..0b28d1748 100755 --- a/testing/deploy-cloudformation.sh +++ b/testing/deploy-cloudformation.sh @@ -2,7 +2,11 @@ set -e aws cloudformation deploy \ ---template-file template.yaml \ ---stack-name aws-data-wrangler-test \ ---capabilities CAPABILITY_IAM \ ---parameter-overrides $(cat parameters.properties) + --template-file template.yaml \ + --stack-name aws-data-wrangler-test \ + --capabilities CAPABILITY_IAM \ + --parameter-overrides $(cat parameters.properties) + +aws cloudformation update-termination-protection \ + --enable-termination-protection \ + --stack-name aws-data-wrangler-test diff --git a/testing/parameters.properties b/testing/parameters.properties index 871fb6171..8e2ba42f2 100644 --- a/testing/parameters.properties +++ b/testing/parameters.properties @@ -2,5 +2,5 @@ VpcId=VPC_ID SubnetId=SUBNET_ID SubnetId2=SUBNET_ID2 SubnetAz=AVAILABILITY_ZONE -Password=REDSHIFT_PASSWORD -TestUser=AWS_USER_THAT_WILL_RUN_THE_TESTS_ON_CLI \ No newline at end of file +DatabasesPassword=REDSHIFT_PASSWORD +AWSUserForTests=AWS_USER_THAT_WILL_RUN_THE_TESTS_ON_CLI \ No newline at end of file diff --git a/testing/template.yaml b/testing/template.yaml index 3581382a7..0b95a4aac 100644 --- a/testing/template.yaml +++ b/testing/template.yaml @@ -16,10 +16,10 @@ Parameters: SubnetAz: Type: String Description: Subnet AZ - Password: + DatabasesPassword: Type: String - Description: Redshift Password - TestUser: + Description: Password for all databases + AWSUserForTests: Type: String Description: AWS User that will running the tests on the CLI @@ -49,7 +49,7 @@ Resources: - Sid: "Allow administration of the key" Effect: "Allow" Principal: - AWS: !Join ["", ["arn:aws:iam::", !Ref "AWS::AccountId", ":user/", !Ref TestUser]] + AWS: !Join ["", ["arn:aws:iam::", !Ref "AWS::AccountId", ":user/", !Ref AWSUserForTests]] Action: - "kms:Create*" - "kms:Describe*" @@ -95,7 +95,7 @@ Resources: - sts:AssumeRole Path: "/" Policies: - - PolicyName: S3GetAndList + - PolicyName: Root PolicyDocument: Version: 2012-10-17 Statement: @@ -107,6 +107,30 @@ Resources: Resource: - !Join ['', ['arn:aws:s3:::', !Ref Bucket]] - !Join ['', ['arn:aws:s3:::', !Ref Bucket, /*]] + - Effect: Allow + Action: + - "lakeformation:GrantPermissions" + Resource: "*" + - Effect: Allow + Action: + - "glue:SearchTables" + - "glue:GetConnections" + - "glue:GetDataCatalogEncryptionSettings" + - "glue:GetTables" + - "glue:GetTableVersions" + - "glue:GetPartitions" + - "glue:DeleteTableVersion" + - "glue:BatchGetPartition" + - "glue:GetDatabases" + - "glue:GetTags" + - "glue:GetTable" + - "glue:GetDatabase" + - "glue:GetPartition" + - "glue:GetTableVersion" + - "glue:GetConnection" + - "glue:GetUserDefinedFunction" + - "glue:GetUserDefinedFunctions" + Resource: "*" RedshiftSubnetGroup: Type: AWS::Redshift::ClusterSubnetGroup @@ -140,7 +164,7 @@ Resources: Properties: DBName: test MasterUsername: test - MasterUserPassword: !Ref Password + MasterUserPassword: !Ref DatabasesPassword NodeType: dc2.large ClusterType: single-node VpcSecurityGroupIds: @@ -223,7 +247,7 @@ Resources: Engine: aurora-postgresql DBClusterIdentifier : postgres-cluster-wrangler MasterUsername: test - MasterUserPassword: !Ref Password + MasterUserPassword: !Ref DatabasesPassword BackupRetentionPeriod: 1 DBSubnetGroupName: !Ref RdsSubnetGroup VpcSecurityGroupIds: @@ -264,12 +288,13 @@ Resources: Engine: aurora-mysql DBClusterIdentifier: mysql-cluster-wrangler MasterUsername: test - MasterUserPassword: !Ref Password + MasterUserPassword: !Ref DatabasesPassword BackupRetentionPeriod: 1 DBSubnetGroupName: !Ref RdsSubnetGroup VpcSecurityGroupIds: - !Ref DatabaseSecurityGroup DBClusterParameterGroupName: !Ref MysqlParameterGroup + DatabaseName: test AssociatedRoles: - RoleArn: !GetAtt AuroraRole.Arn @@ -277,6 +302,7 @@ Resources: Type: AWS::RDS::DBInstance Properties: Engine: aurora-mysql +# DBName: test DBInstanceIdentifier: mysql-instance-wrangler DBClusterIdentifier: !Ref AuroraClusterMysql DBInstanceClass: db.t3.medium @@ -285,6 +311,9 @@ Resources: RedshiftGlueConnection: Type: AWS::Glue::Connection + DependsOn: + - DatabaseSecurityGroup + - Redshift Properties: CatalogId: !Ref AWS::AccountId ConnectionInput: @@ -310,12 +339,15 @@ Resources: ], ], "USERNAME": test, - "PASSWORD": !Ref Password, + "PASSWORD": !Ref DatabasesPassword, } Name: "aws-data-wrangler-redshift" PostgresGlueConnection: Type: AWS::Glue::Connection + DependsOn: + - DatabaseSecurityGroup + - AuroraInstancePostgres Properties: CatalogId: !Ref AWS::AccountId ConnectionInput: @@ -341,12 +373,15 @@ Resources: ], ], "USERNAME": test, - "PASSWORD": !Ref Password, + "PASSWORD": !Ref DatabasesPassword, } Name: "aws-data-wrangler-postgres" MysqlGlueConnection: Type: AWS::Glue::Connection + DependsOn: + - DatabaseSecurityGroup + - AuroraInstanceMysql Properties: CatalogId: !Ref AWS::AccountId ConnectionInput: @@ -372,7 +407,7 @@ Resources: ], ], "USERNAME": test, - "PASSWORD": !Ref Password, + "PASSWORD": !Ref DatabasesPassword, } Name: "aws-data-wrangler-mysql" @@ -398,12 +433,12 @@ Outputs: Description: Name of the S3 Bucket used for tests. RedshiftAddress: Value: !GetAtt Redshift.Endpoint.Address - Description: Redshift Password. + Description: Redshift address. RedshiftPort: Value: !GetAtt Redshift.Endpoint.Port Description: Redshift Endpoint Port. - Password: - Value: !Ref Password + DatabasesPassword: + Value: !Ref DatabasesPassword Description: Password. RedshiftRole: Value: !GetAtt RedshiftRole.Arn @@ -434,4 +469,7 @@ Outputs: Description: Mysql Address DynamoDbTableARN: Value: !GetAtt DynamoDBTable.Arn - Description: DynamoDB table name \ No newline at end of file + Description: DynamoDB table name + Region: + Value: !Ref AWS::Region + Description: AWS Region \ No newline at end of file diff --git a/testing/test_awswrangler/test_aurora.py b/testing/test_awswrangler/test_aurora.py index 90f35701f..03407c96e 100644 --- a/testing/test_awswrangler/test_aurora.py +++ b/testing/test_awswrangler/test_aurora.py @@ -26,8 +26,8 @@ def postgres_parameters(cloudformation_outputs): postgres_parameters["PostgresAddress"] = cloudformation_outputs.get("PostgresAddress") else: raise Exception("You must deploy the test infrastructure using SAM!") - if "Password" in cloudformation_outputs: - postgres_parameters["Password"] = cloudformation_outputs.get("Password") + if "DatabasesPassword" in cloudformation_outputs: + postgres_parameters["DatabasesPassword"] = cloudformation_outputs.get("DatabasesPassword") else: raise Exception("You must deploy the test infrastructure using SAM!") yield postgres_parameters @@ -40,8 +40,8 @@ def mysql_parameters(cloudformation_outputs): mysql_parameters["MysqlAddress"] = cloudformation_outputs.get("MysqlAddress") else: raise Exception("You must deploy the test infrastructure using SAM!") - if "Password" in cloudformation_outputs: - mysql_parameters["Password"] = cloudformation_outputs.get("Password") + if "DatabasesPassword" in cloudformation_outputs: + mysql_parameters["DatabasesPassword"] = cloudformation_outputs.get("DatabasesPassword") else: raise Exception("You must deploy the test infrastructure using SAM!") yield mysql_parameters @@ -52,7 +52,7 @@ def test_postgres_connection(postgres_parameters): host=postgres_parameters["PostgresAddress"], port=3306, user="test", - password=postgres_parameters["Password"], + password=postgres_parameters["DatabasesPassword"], engine="postgres") cursor = conn.cursor() cursor.execute("SELECT 1 + 2, 3 + 4") @@ -68,7 +68,7 @@ def test_mysql_connection(mysql_parameters): host=mysql_parameters["MysqlAddress"], port=3306, user="test", - password=mysql_parameters["Password"], + password=mysql_parameters["DatabasesPassword"], engine="mysql") cursor = conn.cursor() cursor.execute("SELECT 1 + 2, 3 + 4") @@ -85,5 +85,5 @@ def test_invalid_engine(mysql_parameters): host=mysql_parameters["MysqlAddress"], port=3306, user="test", - password=mysql_parameters["Password"], + password=mysql_parameters["DatabasesPassword"], engine="foo") diff --git a/testing/test_awswrangler/test_pandas.py b/testing/test_awswrangler/test_pandas.py index 5b5cbee74..bd174402e 100644 --- a/testing/test_awswrangler/test_pandas.py +++ b/testing/test_awswrangler/test_pandas.py @@ -100,15 +100,15 @@ def postgres_parameters(cloudformation_outputs): postgres_parameters["PostgresAddress"] = cloudformation_outputs.get("PostgresAddress") else: raise Exception("You must deploy the test infrastructure using SAM!") - if "Password" in cloudformation_outputs: - postgres_parameters["Password"] = cloudformation_outputs.get("Password") + if "DatabasesPassword" in cloudformation_outputs: + postgres_parameters["DatabasesPassword"] = cloudformation_outputs.get("DatabasesPassword") else: raise Exception("You must deploy the test infrastructure using SAM!") conn = Aurora.generate_connection(database="postgres", host=postgres_parameters["PostgresAddress"], port=3306, user="test", - password=postgres_parameters["Password"], + password=postgres_parameters["DatabasesPassword"], engine="postgres") with conn.cursor() as cursor: sql = "CREATE EXTENSION IF NOT EXISTS aws_s3 CASCADE" @@ -125,15 +125,15 @@ def mysql_parameters(cloudformation_outputs): mysql_parameters["MysqlAddress"] = cloudformation_outputs.get("MysqlAddress") else: raise Exception("You must deploy the test infrastructure using SAM!") - if "Password" in cloudformation_outputs: - mysql_parameters["Password"] = cloudformation_outputs.get("Password") + if "DatabasesPassword" in cloudformation_outputs: + mysql_parameters["DatabasesPassword"] = cloudformation_outputs.get("DatabasesPassword") else: raise Exception("You must deploy the test infrastructure using SAM!") conn = Aurora.generate_connection(database="mysql", host=mysql_parameters["MysqlAddress"], port=3306, user="test", - password=mysql_parameters["Password"], + password=mysql_parameters["DatabasesPassword"], engine="mysql") with conn.cursor() as cursor: sql = "CREATE DATABASE IF NOT EXISTS test" @@ -1660,7 +1660,7 @@ def test_aurora_mysql_load_simple(bucket, mysql_parameters): host=mysql_parameters["MysqlAddress"], port=3306, user="test", - password=mysql_parameters["Password"], + password=mysql_parameters["DatabasesPassword"], engine="mysql") path = f"s3://{bucket}/test_aurora_mysql_load_simple" wr.pandas.to_aurora(dataframe=df, @@ -1688,7 +1688,7 @@ def test_aurora_postgres_load_simple(bucket, postgres_parameters): host=postgres_parameters["PostgresAddress"], port=3306, user="test", - password=postgres_parameters["Password"], + password=postgres_parameters["DatabasesPassword"], engine="postgres") path = f"s3://{bucket}/test_aurora_postgres_load_simple" wr.pandas.to_aurora( @@ -1719,7 +1719,7 @@ def test_aurora_mysql_unload_simple(bucket, mysql_parameters): host=mysql_parameters["MysqlAddress"], port=3306, user="test", - password=mysql_parameters["Password"], + password=mysql_parameters["DatabasesPassword"], engine="mysql") path = f"s3://{bucket}/test_aurora_mysql_unload_simple" wr.pandas.to_aurora(dataframe=df, @@ -1794,7 +1794,7 @@ def test_aurora_mysql_load_append(bucket, mysql_parameters): host=mysql_parameters["MysqlAddress"], port=3306, user="test", - password=mysql_parameters["Password"], + password=mysql_parameters["DatabasesPassword"], engine="mysql") path = f"s3://{bucket}/test_aurora_mysql_load_append" @@ -1844,7 +1844,7 @@ def test_aurora_postgres_load_append(bucket, postgres_parameters): host=postgres_parameters["PostgresAddress"], port=3306, user="test", - password=postgres_parameters["Password"], + password=postgres_parameters["DatabasesPassword"], engine="postgres") path = f"s3://{bucket}/test_aurora_postgres_load_append" @@ -1951,7 +1951,7 @@ def test_aurora_postgres_load_special(bucket, postgres_parameters): host=postgres_parameters["PostgresAddress"], port=3306, user="test", - password=postgres_parameters["Password"], + password=postgres_parameters["DatabasesPassword"], engine="postgres") with conn.cursor() as cursor: cursor.execute("SELECT * FROM public.test_aurora_postgres_special") @@ -2003,7 +2003,7 @@ def test_aurora_mysql_load_special(bucket, mysql_parameters): host=mysql_parameters["MysqlAddress"], port=3306, user="test", - password=mysql_parameters["Password"], + password=mysql_parameters["DatabasesPassword"], engine="mysql") with conn.cursor() as cursor: cursor.execute("SELECT * FROM test.test_aurora_mysql_special") @@ -2103,7 +2103,7 @@ def test_aurora_postgres_load_special2(bucket, postgres_parameters): host=postgres_parameters["PostgresAddress"], port=3306, user="test", - password=postgres_parameters["Password"], + password=postgres_parameters["DatabasesPassword"], engine="postgres") with conn.cursor() as cursor: cursor.execute("SELECT count(*) FROM public.test_aurora_postgres_load_special2") @@ -2161,7 +2161,7 @@ def test_aurora_mysql_load_special2(bucket, mysql_parameters): host=mysql_parameters["MysqlAddress"], port=3306, user="test", - password=mysql_parameters["Password"], + password=mysql_parameters["DatabasesPassword"], engine="mysql") with conn.cursor() as cursor: cursor.execute("SELECT count(*) FROM test.test_aurora_mysql_load_special2") @@ -2228,7 +2228,7 @@ def test_aurora_postgres_load_columns(bucket, postgres_parameters): host=postgres_parameters["PostgresAddress"], port=3306, user="test", - password=postgres_parameters["Password"], + password=postgres_parameters["DatabasesPassword"], engine="postgres") path = f"s3://{bucket}/test_aurora_postgres_load_columns" wr.pandas.to_aurora(dataframe=df, @@ -2272,7 +2272,7 @@ def test_aurora_mysql_load_columns(bucket, mysql_parameters): host=mysql_parameters["MysqlAddress"], port=3306, user="test", - password=mysql_parameters["Password"], + password=mysql_parameters["DatabasesPassword"], engine="mysql") path = f"s3://{bucket}/test_aurora_mysql_load_columns" wr.pandas.to_aurora(dataframe=df, @@ -2322,7 +2322,7 @@ def test_aurora_mysql_unload_null(bucket, mysql_parameters): host=mysql_parameters["MysqlAddress"], port=3306, user="test", - password=mysql_parameters["Password"], + password=mysql_parameters["DatabasesPassword"], engine="mysql") path = f"s3://{bucket}/test_aurora_mysql_unload_complex" wr.pandas.to_aurora(dataframe=df, diff --git a/testing/test_awswrangler/test_redshift.py b/testing/test_awswrangler/test_redshift.py index 450fc2b32..2bfdc24e3 100644 --- a/testing/test_awswrangler/test_redshift.py +++ b/testing/test_awswrangler/test_redshift.py @@ -2,6 +2,7 @@ import logging from datetime import date, datetime from decimal import Decimal +from time import sleep import pytest import boto3 @@ -39,7 +40,7 @@ def bucket(session, cloudformation_outputs): else: raise Exception("You must deploy the test infrastructure using SAM!") yield bucket - session.s3.delete_objects(path=f"s3://{bucket}/") + # session.s3.delete_objects(path=f"s3://{bucket}/") @pytest.fixture(scope="module") @@ -49,8 +50,8 @@ def redshift_parameters(cloudformation_outputs): redshift_parameters["RedshiftAddress"] = cloudformation_outputs.get("RedshiftAddress") else: raise Exception("You must deploy the test infrastructure using SAM!") - if "Password" in cloudformation_outputs: - redshift_parameters["Password"] = cloudformation_outputs.get("Password") + if "DatabasesPassword" in cloudformation_outputs: + redshift_parameters["DatabasesPassword"] = cloudformation_outputs.get("DatabasesPassword") else: raise Exception("You must deploy the test infrastructure using SAM!") if "RedshiftPort" in cloudformation_outputs: @@ -64,6 +65,51 @@ def redshift_parameters(cloudformation_outputs): yield redshift_parameters +@pytest.fixture(scope="module") +def glue_database(cloudformation_outputs): + if "GlueDatabaseName" in cloudformation_outputs: + database = cloudformation_outputs["GlueDatabaseName"] + else: + raise Exception("You must deploy the test infrastructure using Cloudformation!") + yield database + tables = wr.glue.tables(database=database)["Table"].tolist() + for t in tables: + print(f"Dropping: {database}.{t}...") + wr.glue.delete_table_if_exists(database=database, table=t) + + +@pytest.fixture(scope="module") +def external_schema(cloudformation_outputs, redshift_parameters, glue_database): + if "Region" in cloudformation_outputs: + region = cloudformation_outputs.get("Region") + else: + raise Exception("You must deploy the test infrastructure using SAM!") + sql1 = f""" + CREATE EXTERNAL SCHEMA IF NOT EXISTS aws_data_wrangler_external FROM data catalog + DATABASE '{glue_database}' + IAM_ROLE '{redshift_parameters.get("RedshiftRole")}' + REGION '{region}'; + """ + sql2 = f""" + GRANT ALL + ON EXTERNAL SCHEMA aws_data_wrangler_external + TO IAM_ROLE '{redshift_parameters.get("RedshiftRole")}' + """ + sql3 = f""" + GRANT ALL + ON EXTERNAL SCHEMA aws_data_wrangler_external + TO PUBLIC + """ + conn = wr.redshift.get_connection("aws-data-wrangler-redshift") + conn.autocommit = True + with conn.cursor() as cursor: + cursor.execute(sql1) + cursor.execute(sql2) + cursor.execute(sql3) + conn.close() + yield "aws_data_wrangler_external" + + @pytest.mark.parametrize( "sample_name,mode,factor,diststyle,distkey,sortstyle,sortkey", [ @@ -91,7 +137,7 @@ def test_to_redshift_pandas(session, bucket, redshift_parameters, sample_name, m host=redshift_parameters.get("RedshiftAddress"), port=redshift_parameters.get("RedshiftPort"), user="test", - password=redshift_parameters.get("Password"), + password=redshift_parameters.get("DatabasesPassword"), ) path = f"s3://{bucket}/redshift-load/" session.pandas.to_redshift( @@ -159,7 +205,7 @@ def test_to_redshift_pandas_glue(session, bucket, redshift_parameters, sample_na host=redshift_parameters.get("RedshiftAddress"), port=redshift_parameters.get("RedshiftPort"), user="test", - password=redshift_parameters.get("Password"), + password=redshift_parameters.get("DatabasesPassword"), ) cursor = con.cursor() cursor.execute("SELECT * from public.test") @@ -184,7 +230,7 @@ def test_to_redshift_pandas_cast(session, bucket, redshift_parameters): host=redshift_parameters.get("RedshiftAddress"), port=redshift_parameters.get("RedshiftPort"), user="test", - password=redshift_parameters.get("Password"), + password=redshift_parameters.get("DatabasesPassword"), ) path = f"s3://{bucket}/redshift-load/" session.pandas.to_redshift(dataframe=df, @@ -223,7 +269,7 @@ def test_to_redshift_pandas_exceptions(session, bucket, redshift_parameters, sam host=redshift_parameters.get("RedshiftAddress"), port=redshift_parameters.get("RedshiftPort"), user="test", - password=redshift_parameters.get("Password"), + password=redshift_parameters.get("DatabasesPassword"), ) path = f"s3://{bucket}/redshift-load/" with pytest.raises(exc): @@ -277,7 +323,7 @@ def test_to_redshift_spark(session, bucket, redshift_parameters, sample_name, mo host=redshift_parameters.get("RedshiftAddress"), port=redshift_parameters.get("RedshiftPort"), user="test", - password=redshift_parameters.get("Password"), + password=redshift_parameters.get("DatabasesPassword"), ) session.spark.to_redshift( dataframe=dataframe, @@ -314,7 +360,7 @@ def test_to_redshift_spark_big(session, bucket, redshift_parameters): host=redshift_parameters.get("RedshiftAddress"), port=redshift_parameters.get("RedshiftPort"), user="test", - password=redshift_parameters.get("Password"), + password=redshift_parameters.get("DatabasesPassword"), ) session.spark.to_redshift( dataframe=dataframe, @@ -342,7 +388,7 @@ def test_to_redshift_spark_bool(session, bucket, redshift_parameters): host=redshift_parameters.get("RedshiftAddress"), port=redshift_parameters.get("RedshiftPort"), user="test", - password=redshift_parameters.get("Password"), + password=redshift_parameters.get("DatabasesPassword"), ) session.spark.to_redshift( dataframe=dataframe, @@ -379,7 +425,7 @@ def test_stress_to_redshift_spark_big(session, bucket, redshift_parameters): host=redshift_parameters.get("RedshiftAddress"), port=redshift_parameters.get("RedshiftPort"), user="test", - password=redshift_parameters.get("Password"), + password=redshift_parameters.get("DatabasesPassword"), ) session.spark.to_redshift( dataframe=dataframe, @@ -414,7 +460,7 @@ def test_to_redshift_spark_exceptions(session, bucket, redshift_parameters, samp host=redshift_parameters.get("RedshiftAddress"), port=redshift_parameters.get("RedshiftPort"), user="test", - password=redshift_parameters.get("Password"), + password=redshift_parameters.get("DatabasesPassword"), ) with pytest.raises(exc): assert session.spark.to_redshift( @@ -453,7 +499,7 @@ def test_connection_timeout(redshift_parameters): host=redshift_parameters.get("RedshiftAddress"), port=12345, user="test", - password=redshift_parameters.get("Password"), + password=redshift_parameters.get("DatabasesPassword"), ) @@ -463,7 +509,7 @@ def test_connection_with_different_port_types(redshift_parameters): host=redshift_parameters.get("RedshiftAddress"), port=str(redshift_parameters.get("RedshiftPort")), user="test", - password=redshift_parameters.get("Password"), + password=redshift_parameters.get("DatabasesPassword"), ) conn.close() conn = Redshift.generate_connection( @@ -471,7 +517,7 @@ def test_connection_with_different_port_types(redshift_parameters): host=redshift_parameters.get("RedshiftAddress"), port=float(redshift_parameters.get("RedshiftPort")), user="test", - password=redshift_parameters.get("Password"), + password=redshift_parameters.get("DatabasesPassword"), ) conn.close() @@ -488,7 +534,7 @@ def test_to_redshift_pandas_decimal(session, bucket, redshift_parameters): host=redshift_parameters.get("RedshiftAddress"), port=redshift_parameters.get("RedshiftPort"), user="test", - password=redshift_parameters.get("Password"), + password=redshift_parameters.get("DatabasesPassword"), ) path = f"s3://{bucket}/redshift-load/" session.pandas.to_redshift( @@ -533,7 +579,7 @@ def test_to_redshift_spark_decimal(session, bucket, redshift_parameters): host=redshift_parameters.get("RedshiftAddress"), port=redshift_parameters.get("RedshiftPort"), user="test", - password=redshift_parameters.get("Password"), + password=redshift_parameters.get("DatabasesPassword"), ) path = f"s3://{bucket}/redshift-load2/" session.spark.to_redshift( @@ -572,7 +618,7 @@ def test_to_parquet(session, bucket, redshift_parameters): host=redshift_parameters.get("RedshiftAddress"), port=redshift_parameters.get("RedshiftPort"), user="test", - password=redshift_parameters.get("Password"), + password=redshift_parameters.get("DatabasesPassword"), ) path = f"s3://{bucket}/test_to_parquet/" session.pandas.to_redshift( @@ -609,7 +655,7 @@ def test_read_sql_redshift_pandas(session, bucket, redshift_parameters, sample_n host=redshift_parameters.get("RedshiftAddress"), port=redshift_parameters.get("RedshiftPort"), user="test", - password=redshift_parameters.get("Password"), + password=redshift_parameters.get("DatabasesPassword"), ) path = f"s3://{bucket}/test_read_sql_redshift_pandas/" session.pandas.to_redshift( @@ -639,7 +685,7 @@ def test_read_sql_redshift_pandas2(session, bucket, redshift_parameters): host=redshift_parameters.get("RedshiftAddress"), port=redshift_parameters.get("RedshiftPort"), user="test", - password=redshift_parameters.get("Password"), + password=redshift_parameters.get("DatabasesPassword"), ) path = f"s3://{bucket}/test_read_sql_redshift_pandas2/" session.pandas.to_redshift( @@ -669,7 +715,7 @@ def test_to_redshift_pandas_upsert(session, bucket, redshift_parameters): host=redshift_parameters.get("RedshiftAddress"), port=redshift_parameters.get("RedshiftPort"), user="test", - password=redshift_parameters.get("Password"), + password=redshift_parameters.get("DatabasesPassword"), ) df = pd.DataFrame({"id": list((range(1_000))), "val": list(["foo" if i % 2 == 0 else "boo" for i in range(1_000)])}) @@ -775,3 +821,60 @@ def test_read_sql_redshift_pandas_empty(session, bucket, redshift_parameters): connection="aws-data-wrangler-redshift", temp_s3_path=path2) assert df2.equals(pd.DataFrame()) + + +def test_spectrum_parquet(bucket, glue_database, external_schema): + df = pd.DataFrame({ + "id": [1, 2, 3, 4, 5], + "col_str": ["foo", None, "bar", None, "xoo"], + "par_int": [0, 1, 0, 1, 1] + }) + path = f"s3://{bucket}/test_spectrum_parquet/" + wr.pandas.to_parquet( + dataframe=df, + path=path, + database=glue_database, + mode="overwrite", + preserve_index=False, + partition_cols=["par_int"], + procs_cpu_bound=1 + ) + sleep(1) + conn = wr.redshift.get_connection("aws-data-wrangler-redshift") + with conn.cursor() as cursor: + cursor = cursor.execute(f"SELECT * FROM {external_schema}.test_spectrum_parquet") + rows = cursor.fetchall() + conn.commit() + conn.close() + assert len(rows) == len(df.index) + for row in rows: + assert len(row) == len(df.columns) + + +def test_spectrum_csv(bucket, glue_database, external_schema): + df = pd.DataFrame({ + "id": [1, 2, 3, 4, 5], + "col_str": ["foo", None, "bar", None, "xoo"], + "par_int": [0, 1, 0, 1, 1] + }) + path = f"s3://{bucket}/test_spectrum_csv/" + wr.pandas.to_parquet( + dataframe=df, + path=path, + database=glue_database, + mode="overwrite", + preserve_index=False, + partition_cols=["par_int"], + procs_cpu_bound=1 + ) + sleep(1) + conn = wr.redshift.get_connection("aws-data-wrangler-redshift") + with conn.cursor() as cursor: + cursor = cursor.execute(f"SELECT * FROM {external_schema}.test_spectrum_csv") + rows = cursor.fetchall() + conn.commit() + conn.close() + print(rows) + assert len(rows) == len(df.index) + for row in rows: + assert len(row) == len(df.columns) From 3998cca92bb3e8893c0e6f4cd83638be75c513c9 Mon Sep 17 00:00:00 2001 From: igorborgest Date: Sun, 2 Feb 2020 13:52:07 -0300 Subject: [PATCH 2/2] Updating pd_additional_kwargs docstring --- awswrangler/pandas.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/awswrangler/pandas.py b/awswrangler/pandas.py index 392375081..0705a5224 100644 --- a/awswrangler/pandas.py +++ b/awswrangler/pandas.py @@ -63,7 +63,7 @@ def read_csv(self, path: str, max_result_size: Optional[int] = None, **pd_additi :param path: Amazon S3 path (e.g. s3://bucket_name/key_name) :param max_result_size: Max number of bytes on each request to S3. It offers functionality similar to chunksize in pandas.read_csv(), but with higher performance - :param **pd_additional_kwargs: Additional parameters forwarded to pandas.read_csv + :param pd_additional_kwargs: Additional parameters forwarded to pandas.read_csv :return: Pandas Dataframe or Iterator of Pandas Dataframes if max_result_size != None """ @@ -1583,7 +1583,7 @@ def read_csv_list( :param paths: List of Amazon S3 paths (e.g. ['s3://bucket_name/key_name1', 's3://bucket_name/key_name2']) :param max_result_size: Max number of bytes on each request to S3. It offers functionality similar to chunksize in pandas.read_csv(), but with higher performance :param procs_cpu_bound: Number of cores used for CPU bound tasks - :param **pd_additional_kwargs: Additional parameters forwarded to pandas.read_csv + :param pd_additional_kwargs: Additional parameters forwarded to pandas.read_csv :return: Pandas Dataframe or Iterator of Pandas Dataframes if max_result_size != None """ if max_result_size is not None: @@ -1636,7 +1636,7 @@ def _read_csv_list_iterator(self, paths: List[str], max_result_size=None, **pd_a :param paths: List of Amazon S3 paths (e.g. ['s3://bucket_name/key_name1', 's3://bucket_name/key_name2']) :param max_result_size: Max number of bytes on each request to S3. It offers functionality similar to chunksize in pandas.read_csv(), but with higher performance - :param **pd_additional_kwargs: Additional parameters forwarded to pandas.read_csv + :param pd_additional_kwargs: Additional parameters forwarded to pandas.read_csv :return: Iterator of iterators of Pandas Dataframes """ for path in paths: