Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion awswrangler/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,7 @@ def csv_partition_definition(partition, compression, extra_args=None):
return {
"StorageDescriptor": {
"InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
"OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"Location": partition[0],
"Compressed": compressed,
"SerdeInfo": {
Expand Down Expand Up @@ -440,7 +441,8 @@ def parquet_partition_definition(partition, compression):
compressed = False if compression is None else True
return {
"StorageDescriptor": {
"InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
"InputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
"OutputFormat": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
"Location": partition[0],
"Compressed": compressed,
"SerdeInfo": {
Expand Down
6 changes: 3 additions & 3 deletions awswrangler/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def read_csv(self, path: str, max_result_size: Optional[int] = None, **pd_additi

:param path: Amazon S3 path (e.g. s3://bucket_name/key_name)
:param max_result_size: Max number of bytes on each request to S3. It offers functionality similar to chunksize in pandas.read_csv(), but with higher performance
:param **pd_additional_kwargs: Additional parameters forwarded to pandas.read_csv
:param pd_additional_kwargs: Additional parameters forwarded to pandas.read_csv
:return: Pandas Dataframe or Iterator of Pandas Dataframes if max_result_size != None
"""

Expand Down Expand Up @@ -1583,7 +1583,7 @@ def read_csv_list(
:param paths: List of Amazon S3 paths (e.g. ['s3://bucket_name/key_name1', 's3://bucket_name/key_name2'])
:param max_result_size: Max number of bytes on each request to S3. It offers functionality similar to chunksize in pandas.read_csv(), but with higher performance
:param procs_cpu_bound: Number of cores used for CPU bound tasks
:param **pd_additional_kwargs: Additional parameters forwarded to pandas.read_csv
:param pd_additional_kwargs: Additional parameters forwarded to pandas.read_csv
:return: Pandas Dataframe or Iterator of Pandas Dataframes if max_result_size != None
"""
if max_result_size is not None:
Expand Down Expand Up @@ -1636,7 +1636,7 @@ def _read_csv_list_iterator(self, paths: List[str], max_result_size=None, **pd_a

:param paths: List of Amazon S3 paths (e.g. ['s3://bucket_name/key_name1', 's3://bucket_name/key_name2'])
:param max_result_size: Max number of bytes on each request to S3. It offers functionality similar to chunksize in pandas.read_csv(), but with higher performance
:param **pd_additional_kwargs: Additional parameters forwarded to pandas.read_csv
:param pd_additional_kwargs: Additional parameters forwarded to pandas.read_csv
:return: Iterator of iterators of Pandas Dataframes
"""
for path in paths:
Expand Down
4 changes: 2 additions & 2 deletions awswrangler/redshift.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def _validate_connection(database,
password,
tcp_keepalive=True,
application_name="aws-data-wrangler-validation",
validation_timeout=5):
validation_timeout=10):
conn = pg8000.connect(database=database,
host=host,
port=int(port),
Expand All @@ -66,7 +66,7 @@ def generate_connection(database,
application_name="aws-data-wrangler",
connection_timeout=1_200_000,
statement_timeout=1_200_000,
validation_timeout=5):
validation_timeout=10):
"""
Generates a valid connection object to be passed to the load_table method

Expand Down
12 changes: 8 additions & 4 deletions testing/deploy-cloudformation.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
set -e

aws cloudformation deploy \
--template-file template.yaml \
--stack-name aws-data-wrangler-test \
--capabilities CAPABILITY_IAM \
--parameter-overrides $(cat parameters.properties)
--template-file template.yaml \
--stack-name aws-data-wrangler-test \
--capabilities CAPABILITY_IAM \
--parameter-overrides $(cat parameters.properties)

aws cloudformation update-termination-protection \
--enable-termination-protection \
--stack-name aws-data-wrangler-test
4 changes: 2 additions & 2 deletions testing/parameters.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ VpcId=VPC_ID
SubnetId=SUBNET_ID
SubnetId2=SUBNET_ID2
SubnetAz=AVAILABILITY_ZONE
Password=REDSHIFT_PASSWORD
TestUser=AWS_USER_THAT_WILL_RUN_THE_TESTS_ON_CLI
DatabasesPassword=REDSHIFT_PASSWORD
AWSUserForTests=AWS_USER_THAT_WILL_RUN_THE_TESTS_ON_CLI
68 changes: 53 additions & 15 deletions testing/template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ Parameters:
SubnetAz:
Type: String
Description: Subnet AZ
Password:
DatabasesPassword:
Type: String
Description: Redshift Password
TestUser:
Description: Password for all databases
AWSUserForTests:
Type: String
Description: AWS User that will running the tests on the CLI

Expand Down Expand Up @@ -49,7 +49,7 @@ Resources:
- Sid: "Allow administration of the key"
Effect: "Allow"
Principal:
AWS: !Join ["", ["arn:aws:iam::", !Ref "AWS::AccountId", ":user/", !Ref TestUser]]
AWS: !Join ["", ["arn:aws:iam::", !Ref "AWS::AccountId", ":user/", !Ref AWSUserForTests]]
Action:
- "kms:Create*"
- "kms:Describe*"
Expand Down Expand Up @@ -95,7 +95,7 @@ Resources:
- sts:AssumeRole
Path: "/"
Policies:
- PolicyName: S3GetAndList
- PolicyName: Root
PolicyDocument:
Version: 2012-10-17
Statement:
Expand All @@ -107,6 +107,30 @@ Resources:
Resource:
- !Join ['', ['arn:aws:s3:::', !Ref Bucket]]
- !Join ['', ['arn:aws:s3:::', !Ref Bucket, /*]]
- Effect: Allow
Action:
- "lakeformation:GrantPermissions"
Resource: "*"
- Effect: Allow
Action:
- "glue:SearchTables"
- "glue:GetConnections"
- "glue:GetDataCatalogEncryptionSettings"
- "glue:GetTables"
- "glue:GetTableVersions"
- "glue:GetPartitions"
- "glue:DeleteTableVersion"
- "glue:BatchGetPartition"
- "glue:GetDatabases"
- "glue:GetTags"
- "glue:GetTable"
- "glue:GetDatabase"
- "glue:GetPartition"
- "glue:GetTableVersion"
- "glue:GetConnection"
- "glue:GetUserDefinedFunction"
- "glue:GetUserDefinedFunctions"
Resource: "*"

RedshiftSubnetGroup:
Type: AWS::Redshift::ClusterSubnetGroup
Expand Down Expand Up @@ -140,7 +164,7 @@ Resources:
Properties:
DBName: test
MasterUsername: test
MasterUserPassword: !Ref Password
MasterUserPassword: !Ref DatabasesPassword
NodeType: dc2.large
ClusterType: single-node
VpcSecurityGroupIds:
Expand Down Expand Up @@ -223,7 +247,7 @@ Resources:
Engine: aurora-postgresql
DBClusterIdentifier : postgres-cluster-wrangler
MasterUsername: test
MasterUserPassword: !Ref Password
MasterUserPassword: !Ref DatabasesPassword
BackupRetentionPeriod: 1
DBSubnetGroupName: !Ref RdsSubnetGroup
VpcSecurityGroupIds:
Expand Down Expand Up @@ -264,19 +288,21 @@ Resources:
Engine: aurora-mysql
DBClusterIdentifier: mysql-cluster-wrangler
MasterUsername: test
MasterUserPassword: !Ref Password
MasterUserPassword: !Ref DatabasesPassword
BackupRetentionPeriod: 1
DBSubnetGroupName: !Ref RdsSubnetGroup
VpcSecurityGroupIds:
- !Ref DatabaseSecurityGroup
DBClusterParameterGroupName: !Ref MysqlParameterGroup
DatabaseName: test
AssociatedRoles:
- RoleArn: !GetAtt AuroraRole.Arn

AuroraInstanceMysql:
Type: AWS::RDS::DBInstance
Properties:
Engine: aurora-mysql
# DBName: test
DBInstanceIdentifier: mysql-instance-wrangler
DBClusterIdentifier: !Ref AuroraClusterMysql
DBInstanceClass: db.t3.medium
Expand All @@ -285,6 +311,9 @@ Resources:

RedshiftGlueConnection:
Type: AWS::Glue::Connection
DependsOn:
- DatabaseSecurityGroup
- Redshift
Properties:
CatalogId: !Ref AWS::AccountId
ConnectionInput:
Expand All @@ -310,12 +339,15 @@ Resources:
],
],
"USERNAME": test,
"PASSWORD": !Ref Password,
"PASSWORD": !Ref DatabasesPassword,
}
Name: "aws-data-wrangler-redshift"

PostgresGlueConnection:
Type: AWS::Glue::Connection
DependsOn:
- DatabaseSecurityGroup
- AuroraInstancePostgres
Properties:
CatalogId: !Ref AWS::AccountId
ConnectionInput:
Expand All @@ -341,12 +373,15 @@ Resources:
],
],
"USERNAME": test,
"PASSWORD": !Ref Password,
"PASSWORD": !Ref DatabasesPassword,
}
Name: "aws-data-wrangler-postgres"

MysqlGlueConnection:
Type: AWS::Glue::Connection
DependsOn:
- DatabaseSecurityGroup
- AuroraInstanceMysql
Properties:
CatalogId: !Ref AWS::AccountId
ConnectionInput:
Expand All @@ -372,7 +407,7 @@ Resources:
],
],
"USERNAME": test,
"PASSWORD": !Ref Password,
"PASSWORD": !Ref DatabasesPassword,
}
Name: "aws-data-wrangler-mysql"

Expand All @@ -398,12 +433,12 @@ Outputs:
Description: Name of the S3 Bucket used for tests.
RedshiftAddress:
Value: !GetAtt Redshift.Endpoint.Address
Description: Redshift Password.
Description: Redshift address.
RedshiftPort:
Value: !GetAtt Redshift.Endpoint.Port
Description: Redshift Endpoint Port.
Password:
Value: !Ref Password
DatabasesPassword:
Value: !Ref DatabasesPassword
Description: Password.
RedshiftRole:
Value: !GetAtt RedshiftRole.Arn
Expand Down Expand Up @@ -434,4 +469,7 @@ Outputs:
Description: Mysql Address
DynamoDbTableARN:
Value: !GetAtt DynamoDBTable.Arn
Description: DynamoDB table name
Description: DynamoDB table name
Region:
Value: !Ref AWS::Region
Description: AWS Region
14 changes: 7 additions & 7 deletions testing/test_awswrangler/test_aurora.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ def postgres_parameters(cloudformation_outputs):
postgres_parameters["PostgresAddress"] = cloudformation_outputs.get("PostgresAddress")
else:
raise Exception("You must deploy the test infrastructure using SAM!")
if "Password" in cloudformation_outputs:
postgres_parameters["Password"] = cloudformation_outputs.get("Password")
if "DatabasesPassword" in cloudformation_outputs:
postgres_parameters["DatabasesPassword"] = cloudformation_outputs.get("DatabasesPassword")
else:
raise Exception("You must deploy the test infrastructure using SAM!")
yield postgres_parameters
Expand All @@ -40,8 +40,8 @@ def mysql_parameters(cloudformation_outputs):
mysql_parameters["MysqlAddress"] = cloudformation_outputs.get("MysqlAddress")
else:
raise Exception("You must deploy the test infrastructure using SAM!")
if "Password" in cloudformation_outputs:
mysql_parameters["Password"] = cloudformation_outputs.get("Password")
if "DatabasesPassword" in cloudformation_outputs:
mysql_parameters["DatabasesPassword"] = cloudformation_outputs.get("DatabasesPassword")
else:
raise Exception("You must deploy the test infrastructure using SAM!")
yield mysql_parameters
Expand All @@ -52,7 +52,7 @@ def test_postgres_connection(postgres_parameters):
host=postgres_parameters["PostgresAddress"],
port=3306,
user="test",
password=postgres_parameters["Password"],
password=postgres_parameters["DatabasesPassword"],
engine="postgres")
cursor = conn.cursor()
cursor.execute("SELECT 1 + 2, 3 + 4")
Expand All @@ -68,7 +68,7 @@ def test_mysql_connection(mysql_parameters):
host=mysql_parameters["MysqlAddress"],
port=3306,
user="test",
password=mysql_parameters["Password"],
password=mysql_parameters["DatabasesPassword"],
engine="mysql")
cursor = conn.cursor()
cursor.execute("SELECT 1 + 2, 3 + 4")
Expand All @@ -85,5 +85,5 @@ def test_invalid_engine(mysql_parameters):
host=mysql_parameters["MysqlAddress"],
port=3306,
user="test",
password=mysql_parameters["Password"],
password=mysql_parameters["DatabasesPassword"],
engine="foo")
Loading