diff --git a/awswrangler/cloudwatchlogs.py b/awswrangler/cloudwatchlogs.py index a11a001eb..3a1208930 100644 --- a/awswrangler/cloudwatchlogs.py +++ b/awswrangler/cloudwatchlogs.py @@ -32,6 +32,7 @@ def start_query(self, :param limit: The maximum number of log events to return in the query. :return: Query ID """ + logger.debug(f"log_group_names: {log_group_names}") start_timestamp = int(1000 * start_time.timestamp()) end_timestamp = int(1000 * end_time.timestamp()) logger.debug(f"start_timestamp: {start_timestamp}") diff --git a/awswrangler/glue.py b/awswrangler/glue.py index 0d5c55c5a..610fcbdac 100644 --- a/awswrangler/glue.py +++ b/awswrangler/glue.py @@ -109,7 +109,8 @@ def metadata_to_glue(self, partition_cols=None, preserve_index=True, mode="append", - cast_columns=None): + cast_columns=None, + extra_args=None): schema = Glue._build_schema(dataframe=dataframe, partition_cols=partition_cols, preserve_index=preserve_index, @@ -120,14 +121,13 @@ def metadata_to_glue(self, self.delete_table_if_exists(database=database, table=table) exists = self.does_table_exists(database=database, table=table) if not exists: - self.create_table( - database=database, - table=table, - schema=schema, - partition_cols=partition_cols, - path=path, - file_format=file_format, - ) + self.create_table(database=database, + table=table, + schema=schema, + partition_cols=partition_cols, + path=path, + file_format=file_format, + extra_args=extra_args) if partition_cols: partitions_tuples = Glue._parse_partitions_tuples( objects_paths=objects_paths, partition_cols=partition_cols) @@ -157,13 +157,17 @@ def create_table(self, schema, path, file_format, - partition_cols=None): + partition_cols=None, + extra_args=None): if file_format == "parquet": table_input = Glue.parquet_table_definition( table, partition_cols, schema, path) elif file_format == "csv": - table_input = Glue.csv_table_definition(table, partition_cols, - schema, path) + table_input = Glue.csv_table_definition(table, + partition_cols, + schema, + path, + extra_args=extra_args) else: raise UnsupportedFileFormat(file_format) self._client_glue.create_table(DatabaseName=database, @@ -229,7 +233,8 @@ def _parse_table_name(path): return path.rpartition("/")[2] @staticmethod - def csv_table_definition(table, partition_cols, schema, path): + def csv_table_definition(table, partition_cols, schema, path, extra_args): + sep = extra_args["sep"] if "sep" in extra_args else "," if not partition_cols: partition_cols = [] return { @@ -245,7 +250,7 @@ def csv_table_definition(table, partition_cols, schema, path): "classification": "csv", "compressionType": "none", "typeOfData": "file", - "delimiter": ",", + "delimiter": sep, "columnsOrdered": "true", "areColumnsQuoted": "false", }, @@ -262,7 +267,7 @@ def csv_table_definition(table, partition_cols, schema, path): "NumberOfBuckets": -1, "SerdeInfo": { "Parameters": { - "field.delim": "," + "field.delim": sep }, "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", diff --git a/awswrangler/pandas.py b/awswrangler/pandas.py index 9ec26ebab..909c172fe 100644 --- a/awswrangler/pandas.py +++ b/awswrangler/pandas.py @@ -433,6 +433,7 @@ def to_csv( self, dataframe, path, + sep=",", database=None, table=None, partition_cols=None, @@ -447,6 +448,7 @@ def to_csv( :param dataframe: Pandas Dataframe :param path: AWS S3 path (E.g. s3://bucket-name/folder_name/ + :param sep: Same as pandas.to_csv() :param database: AWS Glue Database name :param table: AWS Glue table name :param partition_cols: List of columns names that will be partitions on S3 @@ -456,18 +458,18 @@ def to_csv( :param procs_io_bound: Number of cores used for I/O bound tasks :return: List of objects written on S3 """ - return self.to_s3( - dataframe=dataframe, - path=path, - file_format="csv", - database=database, - table=table, - partition_cols=partition_cols, - preserve_index=preserve_index, - mode=mode, - procs_cpu_bound=procs_cpu_bound, - procs_io_bound=procs_io_bound, - ) + extra_args = {"sep": sep} + return self.to_s3(dataframe=dataframe, + path=path, + file_format="csv", + database=database, + table=table, + partition_cols=partition_cols, + preserve_index=preserve_index, + mode=mode, + procs_cpu_bound=procs_cpu_bound, + procs_io_bound=procs_io_bound, + extra_args=extra_args) def to_parquet(self, dataframe, @@ -519,7 +521,8 @@ def to_s3(self, mode="append", procs_cpu_bound=None, procs_io_bound=None, - cast_columns=None): + cast_columns=None, + extra_args=None): """ Write a Pandas Dataframe on S3 Optionally writes metadata on AWS Glue. @@ -535,6 +538,7 @@ def to_s3(self, :param procs_cpu_bound: Number of cores used for CPU bound tasks :param procs_io_bound: Number of cores used for I/O bound tasks :param cast_columns: Dictionary of columns indexes and Arrow types to be casted. (E.g. {2: "int64", 5: "int32"}) (Only for "parquet" file_format) + :param extra_args: Extra arguments specific for each file formats (E.g. "sep" for CSV) :return: List of objects written on S3 """ if dataframe.empty: @@ -554,7 +558,8 @@ def to_s3(self, mode=mode, procs_cpu_bound=procs_cpu_bound, procs_io_bound=procs_io_bound, - cast_columns=cast_columns) + cast_columns=cast_columns, + extra_args=extra_args) if database: self._session.glue.metadata_to_glue(dataframe=dataframe, path=path, @@ -565,7 +570,8 @@ def to_s3(self, preserve_index=preserve_index, file_format=file_format, mode=mode, - cast_columns=cast_columns) + cast_columns=cast_columns, + extra_args=extra_args) return objects_paths def data_to_s3(self, @@ -577,7 +583,8 @@ def data_to_s3(self, mode="append", procs_cpu_bound=None, procs_io_bound=None, - cast_columns=None): + cast_columns=None, + extra_args=None): if not procs_cpu_bound: procs_cpu_bound = self._session.procs_cpu_bound if not procs_io_bound: @@ -601,7 +608,8 @@ def data_to_s3(self, target=self._data_to_s3_dataset_writer_remote, args=(send_pipe, dataframe.iloc[bounder[0]:bounder[1], :], path, partition_cols, preserve_index, - self._session.primitives, file_format, cast_columns), + self._session.primitives, file_format, cast_columns, + extra_args), ) proc.daemon = False proc.start() @@ -619,7 +627,8 @@ def data_to_s3(self, preserve_index=preserve_index, session_primitives=self._session.primitives, file_format=file_format, - cast_columns=cast_columns) + cast_columns=cast_columns, + extra_args=extra_args) if mode == "overwrite_partitions" and partition_cols: if procs_io_bound > procs_cpu_bound: num_procs = floor( @@ -639,7 +648,8 @@ def _data_to_s3_dataset_writer(dataframe, preserve_index, session_primitives, file_format, - cast_columns=None): + cast_columns=None, + extra_args=None): objects_paths = [] if not partition_cols: object_path = Pandas._data_to_s3_object_writer( @@ -648,7 +658,8 @@ def _data_to_s3_dataset_writer(dataframe, preserve_index=preserve_index, session_primitives=session_primitives, file_format=file_format, - cast_columns=cast_columns) + cast_columns=cast_columns, + extra_args=extra_args) objects_paths.append(object_path) else: for keys, subgroup in dataframe.groupby(partition_cols): @@ -665,21 +676,21 @@ def _data_to_s3_dataset_writer(dataframe, preserve_index=preserve_index, session_primitives=session_primitives, file_format=file_format, - cast_columns=cast_columns) + cast_columns=cast_columns, + extra_args=extra_args) objects_paths.append(object_path) return objects_paths @staticmethod - def _data_to_s3_dataset_writer_remote( - send_pipe, - dataframe, - path, - partition_cols, - preserve_index, - session_primitives, - file_format, - cast_columns=None, - ): + def _data_to_s3_dataset_writer_remote(send_pipe, + dataframe, + path, + partition_cols, + preserve_index, + session_primitives, + file_format, + cast_columns=None, + extra_args=None): send_pipe.send( Pandas._data_to_s3_dataset_writer( dataframe=dataframe, @@ -688,7 +699,8 @@ def _data_to_s3_dataset_writer_remote( preserve_index=preserve_index, session_primitives=session_primitives, file_format=file_format, - cast_columns=cast_columns)) + cast_columns=cast_columns, + extra_args=extra_args)) send_pipe.close() @staticmethod @@ -697,7 +709,8 @@ def _data_to_s3_object_writer(dataframe, preserve_index, session_primitives, file_format, - cast_columns=None): + cast_columns=None, + extra_args=None): fs = s3.get_fs(session_primitives=session_primitives) fs = pyarrow.filesystem._ensure_filesystem(fs) s3.mkdir_if_not_exists(fs, path) @@ -713,27 +726,40 @@ def _data_to_s3_object_writer(dataframe, path=object_path, preserve_index=preserve_index, fs=fs, - cast_columns=cast_columns) + cast_columns=cast_columns, + extra_args=extra_args) elif file_format == "csv": - Pandas.write_csv_dataframe( - dataframe=dataframe, - path=object_path, - preserve_index=preserve_index, - fs=fs, - ) + Pandas.write_csv_dataframe(dataframe=dataframe, + path=object_path, + preserve_index=preserve_index, + fs=fs, + extra_args=extra_args) return object_path @staticmethod - def write_csv_dataframe(dataframe, path, preserve_index, fs): + def write_csv_dataframe(dataframe, + path, + preserve_index, + fs, + extra_args=None): + csv_extra_args = {} + if "sep" in extra_args: + csv_extra_args["sep"] = extra_args["sep"] csv_buffer = bytes( - dataframe.to_csv(None, header=False, index=preserve_index), - "utf-8") + dataframe.to_csv(None, + header=False, + index=preserve_index, + **csv_extra_args), "utf-8") with fs.open(path, "wb") as f: f.write(csv_buffer) @staticmethod - def write_parquet_dataframe(dataframe, path, preserve_index, fs, - cast_columns): + def write_parquet_dataframe(dataframe, + path, + preserve_index, + fs, + cast_columns, + extra_args=None): if not cast_columns: cast_columns = {} casted_in_pandas = [] diff --git a/building/build-docs.sh b/building/build-docs.sh index 40b01e850..a35ae0d43 100755 --- a/building/build-docs.sh +++ b/building/build-docs.sh @@ -1,4 +1,5 @@ #!/bin/bash +set -e cd .. sphinx-apidoc --separate -f -H "API Reference" -o docs/source/api awswrangler/ diff --git a/building/build-glue-egg.sh b/building/build-glue-egg.sh index 3dcf196c6..ace7f390b 100755 --- a/building/build-glue-egg.sh +++ b/building/build-glue-egg.sh @@ -1,4 +1,5 @@ #!/bin/bash +set -e cd .. rm -rf *.egg-info build dist/*.egg diff --git a/building/build-image.sh b/building/build-image.sh index b3e95c9bb..82446cc80 100755 --- a/building/build-image.sh +++ b/building/build-image.sh @@ -1,4 +1,5 @@ #!/bin/bash +set -e cp ../requirements.txt . cp ../requirements-dev.txt . diff --git a/building/build-lambda-layer.sh b/building/build-lambda-layer.sh index c934c7173..55cbf3ae1 100755 --- a/building/build-lambda-layer.sh +++ b/building/build-lambda-layer.sh @@ -1,4 +1,5 @@ #!/bin/bash +set -e # Go to home cd ~ diff --git a/building/deploy-source.sh b/building/deploy-source.sh index 875b4c770..392b11f64 100755 --- a/building/deploy-source.sh +++ b/building/deploy-source.sh @@ -1,4 +1,5 @@ #!/bin/bash +set -e cd .. rm -rf *.egg-info dist/*.tar.gz diff --git a/building/publish.sh b/building/publish.sh index 6518f32a7..91286ae5b 100755 --- a/building/publish.sh +++ b/building/publish.sh @@ -1,4 +1,5 @@ #!/bin/bash +set -e cd .. rm -fr build dist .egg awswrangler.egg-info diff --git a/testing/run-tests.sh b/testing/run-tests.sh index 68d0487f3..2d315ab5b 100755 --- a/testing/run-tests.sh +++ b/testing/run-tests.sh @@ -1,5 +1,7 @@ #!/bin/bash +set -e + cd .. rm -rf *.pytest_cache yapf --in-place --recursive setup.py awswrangler testing/test_awswrangler diff --git a/testing/test_awswrangler/test_athena.py b/testing/test_awswrangler/test_athena.py index dffa08888..0f6020129 100644 --- a/testing/test_awswrangler/test_athena.py +++ b/testing/test_awswrangler/test_athena.py @@ -40,7 +40,7 @@ def database(cloudformation_outputs): def test_query_cancelled(session, database): client_athena = boto3.client("athena") query_execution_id = session.athena.run_query(query=""" -SELECT +SELECT rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), rand(), diff --git a/testing/test_awswrangler/test_cloudwatchlogs.py b/testing/test_awswrangler/test_cloudwatchlogs.py index 2ed6e038a..ed5080497 100644 --- a/testing/test_awswrangler/test_cloudwatchlogs.py +++ b/testing/test_awswrangler/test_cloudwatchlogs.py @@ -1,5 +1,6 @@ import logging from datetime import datetime +from time import sleep import pytest import boto3 @@ -63,6 +64,7 @@ def logstream(cloudformation_outputs, loggroup): if token: args["sequenceToken"] = token client.put_log_events(**args) + sleep(120) yield logstream diff --git a/testing/test_awswrangler/test_pandas.py b/testing/test_awswrangler/test_pandas.py index 71bb45d41..c33303e4d 100644 --- a/testing/test_awswrangler/test_pandas.py +++ b/testing/test_awswrangler/test_pandas.py @@ -99,6 +99,7 @@ def logstream(cloudformation_outputs, loggroup): if token: args["sequenceToken"] = token client.put_log_events(**args) + sleep(120) yield logstream @@ -235,6 +236,13 @@ def test_to_s3( break sleep(1) assert factor * len(dataframe.index) == len(dataframe2.index) + if preserve_index: + assert (len(list(dataframe.columns)) + 1) == len( + list(dataframe2.columns)) + else: + assert len(list(dataframe.columns)) == len(list(dataframe2.columns)) + assert dataframe[dataframe["id"] == 0].iloc[0]["name"] == dataframe2[ + dataframe2["id"] == 0].iloc[0]["name"] def test_to_parquet_with_cast( @@ -261,8 +269,9 @@ def test_to_parquet_with_cast( break sleep(1) assert len(dataframe.index) == len(dataframe2.index) - print(dataframe2) - print(dataframe2.dtypes) + assert len(list(dataframe.columns)) == len(list(dataframe2.columns)) + assert dataframe[dataframe["id"] == 0].iloc[0]["name"] == dataframe2[ + dataframe2["id"] == 0].iloc[0]["name"] @pytest.mark.parametrize("sample, row_num, max_result_size", [ @@ -297,6 +306,8 @@ def test_read_sql_athena_iterator(session, bucket, database, sample, row_num, total_count = 0 for dataframe in dataframe_iter: total_count += len(dataframe.index) + assert len(list(dataframe.columns)) == len( + list(dataframe_sample.columns)) print(dataframe) if total_count == row_num: break @@ -392,6 +403,7 @@ def test_etl_complex(session, bucket, database, max_result_size): for df in df_iter: count += len(df.index) for row in df.itertuples(): + assert len(list(dataframe.columns)) == len(list(df.columns)) assert isinstance(row.my_timestamp, datetime) assert isinstance(row.my_date, date) assert isinstance(row.my_float, float) @@ -429,6 +441,9 @@ def test_to_parquet_with_kms( break sleep(1) assert len(dataframe.index) == len(dataframe2.index) + assert len(list(dataframe.columns)) == len(list(dataframe2.columns)) + assert dataframe[dataframe["id"] == 0].iloc[0]["name"] == dataframe2[ + dataframe2["id"] == 0].iloc[0]["name"] def test_to_parquet_with_empty_dataframe(session, bucket, database): @@ -449,3 +464,28 @@ def test_read_log_query(session, loggroup, logstream): ) assert len(dataframe.index) == 5 assert len(dataframe.columns) == 3 + + +def test_to_csv_with_sep( + session, + bucket, + database, +): + dataframe = pandas.read_csv("data_samples/nano.csv") + session.pandas.to_csv(dataframe=dataframe, + database=database, + path=f"s3://{bucket}/test/", + preserve_index=False, + mode="overwrite", + sep="|") + dataframe2 = None + for counter in range(10): + dataframe2 = session.pandas.read_sql_athena(sql="select * from test", + database=database) + if len(dataframe.index) == len(dataframe2.index): + break + sleep(1) + assert len(dataframe.index) == len(dataframe2.index) + assert len(list(dataframe.columns)) == len(list(dataframe2.columns)) + assert dataframe[dataframe["id"] == 0].iloc[0]["name"] == dataframe2[ + dataframe2["id"] == 0].iloc[0]["name"]