From 0120518d4662df828020f78f5f1295392399eccd Mon Sep 17 00:00:00 2001 From: Tulio Casagrande Date: Fri, 31 Jan 2020 16:58:41 -0300 Subject: [PATCH] Forward all pandas kwargs --- awswrangler/pandas.py | 590 ++++++------------------------------------ 1 file changed, 77 insertions(+), 513 deletions(-) diff --git a/awswrangler/pandas.py b/awswrangler/pandas.py index 0c2bf6372..392375081 100644 --- a/awswrangler/pandas.py +++ b/awswrangler/pandas.py @@ -57,155 +57,42 @@ def _parse_path(path: str) -> Tuple[str, str]: parts: Tuple[str, str, str] = path2.partition("/") return parts[0], parts[2] - def read_csv( - self, - path: str, - max_result_size: Optional[int] = None, - header="infer", - names=None, - usecols=None, - dtype=None, - sep=",", - thousands=None, - decimal=".", - lineterminator="\n", - quotechar='"', - quoting=csv.QUOTE_MINIMAL, - escapechar=None, - parse_dates: Union[bool, Dict, List] = False, - infer_datetime_format=False, - na_values: Optional[Union[str, List[str]]] = None, - keep_default_na: bool = True, - na_filter: bool = True, - encoding="utf-8", - converters=None, - ): + def read_csv(self, path: str, max_result_size: Optional[int] = None, **pd_additional_kwargs): """ - Read CSV file from AWS S3 using optimized strategies. - Try to mimic as most as possible pandas.read_csv() - https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html - P.S. max_result_size != None tries to mimic the chunksize behaviour in pandas.read_sql() + Read a single CSV file from Amazon S3 using optimized strategies. - :param path: AWS S3 path (E.g. S3://BUCKET_NAME/KEY_NAME) - :param max_result_size: Max number of bytes on each request to S3 - :param header: Same as pandas.read_csv() - :param names: Same as pandas.read_csv() - :param usecols: Same as pandas.read_csv() - :param dtype: Same as pandas.read_csv() - :param sep: Same as pandas.read_csv() - :param thousands: Same as pandas.read_csv() - :param decimal: Same as pandas.read_csv() - :param lineterminator: Same as pandas.read_csv() - :param quotechar: Same as pandas.read_csv() - :param quoting: Same as pandas.read_csv() - :param escapechar: Same as pandas.read_csv() - :param parse_dates: Same as pandas.read_csv() - :param infer_datetime_format: Same as pandas.read_csv() - :param na_values: Same as pandas.read_csv() - :param keep_default_na: Same as pandas.read_csv() - :param na_filter: Same as pandas.read_csv() - :param encoding: Same as pandas.read_csv() - :param converters: Same as pandas.read_csv() + :param path: Amazon S3 path (e.g. s3://bucket_name/key_name) + :param max_result_size: Max number of bytes on each request to S3. It offers functionality similar to chunksize in pandas.read_csv(), but with higher performance + :param **pd_additional_kwargs: Additional parameters forwarded to pandas.read_csv :return: Pandas Dataframe or Iterator of Pandas Dataframes if max_result_size != None """ + + if "chunksize" in pd_additional_kwargs: + raise InvalidParameters( + "chunksize is currently not supported. Use max_result_size for a similar functionality") + bucket_name, key_path = self._parse_path(path) if max_result_size is not None: ret = self._read_csv_iterator(bucket_name=bucket_name, key_path=key_path, max_result_size=max_result_size, - header=header, - names=names, - usecols=usecols, - dtype=dtype, - sep=sep, - thousands=thousands, - decimal=decimal, - lineterminator=lineterminator, - quotechar=quotechar, - quoting=quoting, - escapechar=escapechar, - parse_dates=parse_dates, - infer_datetime_format=infer_datetime_format, - na_values=na_values, - keep_default_na=keep_default_na, - na_filter=na_filter, - encoding=encoding, - converters=converters) + **pd_additional_kwargs) else: ret = self._read_csv_once(session_primitives=self._session.primitives, bucket_name=bucket_name, key_path=key_path, - header=header, - names=names, - usecols=usecols, - dtype=dtype, - sep=sep, - thousands=thousands, - decimal=decimal, - lineterminator=lineterminator, - quotechar=quotechar, - quoting=quoting, - escapechar=escapechar, - parse_dates=parse_dates, - infer_datetime_format=infer_datetime_format, - na_values=na_values, - keep_default_na=keep_default_na, - na_filter=na_filter, - encoding=encoding, - converters=converters) + **pd_additional_kwargs) return ret - def _read_csv_iterator( - self, - bucket_name, - key_path, - max_result_size=200_000_000, # 200 MB - header="infer", - names=None, - usecols=None, - dtype=None, - sep=",", - thousands=None, - decimal=".", - lineterminator="\n", - quotechar='"', - quoting=csv.QUOTE_MINIMAL, - escapechar=None, - parse_dates: Union[bool, Dict, List] = False, - infer_datetime_format=False, - na_values: Optional[Union[str, List[str]]] = None, - keep_default_na: bool = True, - na_filter: bool = True, - encoding="utf-8", - converters=None, - ): + def _read_csv_iterator(self, bucket_name, key_path, max_result_size=200_000_000, **pd_additional_kwargs): """ - Read CSV file from AWS S3 using optimized strategies. - Try to mimic as most as possible pandas.read_csv() - https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html + Read a single CSV file from Amazon S3 in batches of max_result_size. :param bucket_name: S3 bucket name - :param key_path: S3 key path (W/o bucket) - :param max_result_size: Max number of bytes on each request to S3 - :param header: Same as pandas.read_csv() - :param names: Same as pandas.read_csv() - :param usecols: Same as pandas.read_csv() - :param dtype: Same as pandas.read_csv() - :param sep: Same as pandas.read_csv() - :param thousands: Same as pandas.read_csv() - :param decimal: Same as pandas.read_csv() - :param lineterminator: Same as pandas.read_csv() - :param quotechar: Same as pandas.read_csv() - :param quoting: Same as pandas.read_csv() - :param escapechar: Same as pandas.read_csv() - :param parse_dates: Same as pandas.read_csv() - :param infer_datetime_format: Same as pandas.read_csv() - :param na_values: Same as pandas.read_csv() - :param keep_default_na: Same as pandas.read_csv() - :param na_filter: Same as pandas.read_csv() - :param encoding: Same as pandas.read_csv() - :param converters: Same as pandas.read_csv() - :return: Pandas Dataframe + :param key_path: S3 key path (w/o bucket) + :param max_result_size: Max number of bytes on each request to S3. It offers functionality similar to chunksize in pandas.read_csv(), but with higher performance + :param **pd_additional_kwargs: Additional parameters forwarded to pandas.read_csv + :return: Iterator of Pandas Dataframes """ metadata = S3.head_object_with_retry(client_s3=self._client_s3, bucket=bucket_name, key=key_path) total_size = metadata["ContentLength"] @@ -216,25 +103,13 @@ def _read_csv_iterator( yield self._read_csv_once(session_primitives=self._session.primitives, bucket_name=bucket_name, key_path=key_path, - header=header, - names=names, - usecols=usecols, - dtype=dtype, - sep=sep, - thousands=thousands, - decimal=decimal, - lineterminator=lineterminator, - quotechar=quotechar, - quoting=quoting, - escapechar=escapechar, - parse_dates=parse_dates, - infer_datetime_format=infer_datetime_format, - na_values=na_values, - keep_default_na=keep_default_na, - na_filter=na_filter, - encoding=encoding, - converters=converters) + **pd_additional_kwargs) else: + sep = pd_additional_kwargs.get('sep', ",") + quoting = pd_additional_kwargs.get('quoting', 0) + quotechar = pd_additional_kwargs.get('quotechar', '"') + lineterminator = pd_additional_kwargs.get('lineterminator', "\n") + bounders = calculate_bounders(num_items=total_size, max_size=max_result_size) logger.debug(f"bounders: {bounders}") bounders_len: int = len(bounders) @@ -245,20 +120,13 @@ def _read_csv_iterator( ini -= forgotten_bytes end -= 1 # Range is inclusive, contrary from Python's List - bytes_range = "bytes={}-{}".format(ini, end) + bytes_range = f"bytes={ini}-{end}" logger.debug(f"bytes_range: {bytes_range}") body = self._client_s3.get_object(Bucket=bucket_name, Key=key_path, Range=bytes_range)["Body"].read() chunk_size = len(body) logger.debug(f"chunk_size (bytes): {chunk_size}") - if count == 1: # first chunk - last_char = Pandas._find_terminator(body=body, - sep=sep, - quoting=quoting, - quotechar=quotechar, - lineterminator=lineterminator) - forgotten_bytes = len(body[last_char:]) - elif count == bounders_len: # Last chunk + if count == bounders_len: # Last chunk last_char = chunk_size else: last_char = Pandas._find_terminator(body=body, @@ -268,29 +136,11 @@ def _read_csv_iterator( lineterminator=lineterminator) forgotten_bytes = len(body[last_char:]) - df = pd.read_csv(StringIO(body[:last_char].decode("utf-8")), - header=header, - names=names, - usecols=usecols, - na_values=na_values, - keep_default_na=keep_default_na, - na_filter=na_filter, - sep=sep, - thousands=thousands, - decimal=decimal, - quotechar=quotechar, - quoting=quoting, - escapechar=escapechar, - parse_dates=parse_dates, - infer_datetime_format=infer_datetime_format, - lineterminator=lineterminator, - dtype=dtype, - encoding=encoding, - converters=converters) + df = pd.read_csv(StringIO(body[:last_char].decode("utf-8")), **pd_additional_kwargs) + if count == 1: # saving column names from the first chunk + pd_additional_kwargs["names"] = df.columns + pd_additional_kwargs["header"] = None yield df - if count == 1: # first chunk - names = df.columns - header = None @staticmethod def _extract_terminator_profile(body, sep, quotechar, lineterminator, last_index): @@ -378,55 +228,15 @@ def _find_terminator(body, sep, quoting, quotechar, lineterminator): return index @staticmethod - def _read_csv_once( - session_primitives: "SessionPrimitives", - bucket_name: str, - key_path: str, - header: Optional[str] = "infer", - names=None, - usecols=None, - dtype=None, - sep: str = ",", - thousands=None, - decimal: str = ".", - lineterminator: str = "\n", - quotechar: str = '"', - quoting: int = 0, - escapechar=None, - parse_dates: Union[bool, Dict, List] = False, - infer_datetime_format=False, - na_values: Optional[Union[str, List[str]]] = None, - keep_default_na: bool = True, - na_filter: bool = True, - encoding=None, - converters=None, - ): + def _read_csv_once(session_primitives: "SessionPrimitives", bucket_name: str, key_path: str, + **pd_additional_kwargs): """ - Read CSV file from AWS S3 using optimized strategies. - Try to mimic as most as possible pandas.read_csv() - https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html + Read a single CSV file from Amazon S3 using optimized strategies. :param session_primitives: SessionPrimitives() :param bucket_name: S3 bucket name - :param key_path: S3 key path (W/o bucket) - :param header: Same as pandas.read_csv() - :param names: Same as pandas.read_csv() - :param usecols: Same as pandas.read_csv() - :param dtype: Same as pandas.read_csv() - :param sep: Same as pandas.read_csv() - :param thousands: Same as pandas.read_csv() - :param decimal: Same as pandas.read_csv() - :param lineterminator: Same as pandas.read_csv() - :param quotechar: Same as pandas.read_csv() - :param quoting: Same as pandas.read_csv() - :param escapechar: Same as pandas.read_csv() - :param parse_dates: Same as pandas.read_csv() - :param infer_datetime_format: Same as pandas.read_csv() - :param na_values: Same as pandas.read_csv() - :param keep_default_na: Same as pandas.read_csv() - :param na_filter: Same as pandas.read_csv() - :param encoding: Same as pandas.read_csv() - :param converters: Same as pandas.read_csv() + :param key_path: S3 key path (w/o bucket) + :param **pd_additional_kwargs: Additional parameters forwarded to pandas.read_csv :return: Pandas Dataframe """ buff = BytesIO() @@ -434,74 +244,17 @@ def _read_csv_once( client_s3 = session.boto3_session.client(service_name="s3", use_ssl=True, config=session.botocore_config) client_s3.download_fileobj(Bucket=bucket_name, Key=key_path, Fileobj=buff) buff.seek(0), - dataframe = pd.read_csv( - buff, - header=header, - names=names, - usecols=usecols, - na_values=na_values, - keep_default_na=keep_default_na, - na_filter=na_filter, - sep=sep, - thousands=thousands, - decimal=decimal, - quotechar=quotechar, - quoting=quoting, - escapechar=escapechar, - parse_dates=parse_dates, - infer_datetime_format=infer_datetime_format, - lineterminator=lineterminator, - dtype=dtype, - encoding=encoding, - converters=converters, - ) + dataframe = pd.read_csv(buff, **pd_additional_kwargs) buff.close() return dataframe @staticmethod - def _read_csv_once_remote(send_pipe: mp.connection.Connection, - session_primitives: "SessionPrimitives", - bucket_name: str, - key_path: str, - header: str = "infer", - names=None, - usecols=None, - dtype=None, - sep: str = ",", - thousands=None, - decimal: str = ".", - lineterminator: str = "\n", - quotechar: str = '"', - quoting: int = 0, - escapechar=None, - parse_dates: Union[bool, Dict, List] = False, - infer_datetime_format=False, - na_values: Optional[Union[str, List[str]]] = None, - keep_default_na: bool = True, - na_filter: bool = True, - encoding=None, - converters=None): + def _read_csv_once_remote(send_pipe: mp.connection.Connection, session_primitives: "SessionPrimitives", + bucket_name: str, key_path: str, **pd_additional_kwargs): df: pd.DataFrame = Pandas._read_csv_once(session_primitives=session_primitives, bucket_name=bucket_name, key_path=key_path, - header=header, - names=names, - usecols=usecols, - dtype=dtype, - sep=sep, - thousands=thousands, - decimal=decimal, - lineterminator=lineterminator, - quotechar=quotechar, - quoting=quoting, - escapechar=escapechar, - parse_dates=parse_dates, - infer_datetime_format=infer_datetime_format, - na_values=na_values, - keep_default_na=keep_default_na, - na_filter=na_filter, - encoding=encoding, - converters=converters) + **pd_additional_kwargs) send_pipe.send(df) send_pipe.close() @@ -605,7 +358,7 @@ def read_sql_athena(self, :param sql: SQL Query :param database: Glue/Athena Database - :param s3_output: AWS S3 path + :param s3_output: Amazon S3 path :param workgroup: The name of the workgroup in which the query is being started. (By default uses de Session() workgroup) :param encryption: None|'SSE_S3'|'SSE_KMS'|'CSE_KMS' :param kms_key: For SSE-KMS and CSE-KMS , this is the KMS key ARN or ID. @@ -758,7 +511,7 @@ def to_csv(self, Optionally writes metadata on AWS Glue. :param dataframe: Pandas Dataframe - :param path: AWS S3 path (E.g. s3://bucket-name/folder_name/ + :param path: Amazon S3 path (e.g. s3://bucket_name/folder_name/) :param sep: Same as pandas.to_csv() :param na_rep: Same as pandas.to_csv() :param columns: Same as pandas.to_csv() @@ -828,7 +581,7 @@ def to_parquet(self, Optionally writes metadata on AWS Glue. :param dataframe: Pandas Dataframe - :param path: AWS S3 path (E.g. s3://bucket-name/folder_name/) + :param path: Amazon S3 path (e.g. s3://bucket_name/folder_name/) :param database: AWS Glue Database name :param table: AWS Glue table name :param partition_cols: List of columns names that will be partitions on S3 @@ -837,7 +590,7 @@ def to_parquet(self, :param compression: None, snappy, gzip, lzo :param procs_cpu_bound: Number of cores used for CPU bound tasks :param procs_io_bound: Number of cores used for I/O bound tasks - :param cast_columns: Dictionary of columns names and Athena/Glue types to be casted (E.g. {"col name": "bigint", "col2 name": "int"}) + :param cast_columns: Dictionary of columns names and Athena/Glue types to be casted (e.g. {"col name": "bigint", "col2 name": "int"}) :param inplace: True is cheapest (CPU and Memory) but False leaves your DataFrame intact :param description: Table description :param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]]) @@ -884,7 +637,7 @@ def to_s3(self, Optionally writes metadata on AWS Glue. :param dataframe: Pandas Dataframe - :param path: AWS S3 path (E.g. s3://bucket-name/folder_name/ + :param path: Amazon S3 path (e.g. s3://bucket_name/folder_name/) :param file_format: "csv" or "parquet" :param database: AWS Glue Database name :param table: AWS Glue table name @@ -894,8 +647,8 @@ def to_s3(self, :param compression: None, gzip, snappy, etc :param procs_cpu_bound: Number of cores used for CPU bound tasks :param procs_io_bound: Number of cores used for I/O bound tasks - :param cast_columns: Dictionary of columns names and Athena/Glue types to be casted. (E.g. {"col name": "bigint", "col2 name": "int"}) (Only for "parquet" file_format) - :param extra_args: Extra arguments specific for each file formats (E.g. "sep" for CSV) + :param cast_columns: Dictionary of columns names and Athena/Glue types to be casted. (e.g. {"col name": "bigint", "col2 name": "int"}) (Only for "parquet" file_format) + :param extra_args: Extra arguments specific for each file formats (e.g. "sep" for CSV) :param inplace: True is cheapest (CPU and Memory) but False leaves your DataFrame intact :param description: Table description :param parameters: Key/value pairs to tag the table (Optional[Dict[str, str]]) @@ -1262,7 +1015,7 @@ def to_redshift( Load Pandas Dataframe as a Table on Amazon Redshift :param dataframe: Pandas Dataframe - :param path: S3 path to write temporary files (E.g. s3://BUCKET_NAME/ANY_NAME/) + :param path: S3 path to write temporary files (e.g. s3://bucket_name/any_name/) :param connection: Glue connection name (str) OR a PEP 249 compatible connection (Can be generated with Redshift.generate_connection()) :param schema: The Redshift Schema for the table :param table: The name of the desired Redshift table @@ -1274,7 +1027,7 @@ def to_redshift( :param primary_keys: Primary keys :param preserve_index: Should we preserve the Dataframe index? :param mode: append, overwrite or upsert - :param cast_columns: Dictionary of columns names and Redshift types to be casted. (E.g. {"col name": "SMALLINT", "col2 name": "FLOAT4"}) + :param cast_columns: Dictionary of columns names and Redshift types to be casted. (e.g. {"col name": "SMALLINT", "col2 name": "FLOAT4"}) :return: None """ if cast_columns is None: @@ -1400,7 +1153,7 @@ def read_parquet(self, """ Read parquet data from S3 - :param path: AWS S3 path or List of paths (E.g. s3://bucket-name/folder_name/) + :param path: Amazon S3 path or List of paths (e.g. s3://bucket_name/folder_name/) :param columns: Names of columns to read from the file :param filters: List of filters to apply, like ``[[('x', '=', 0), ...], ...]``. :param procs_cpu_bound: Number of cores used for CPU bound tasks @@ -1493,7 +1246,7 @@ def _read_parquet_paths(session_primitives: "SessionPrimitives", Read parquet data from S3 :param session_primitives: SessionPrimitives() - :param path: AWS S3 path or List of paths (E.g. s3://bucket-name/folder_name/) + :param path: Amazon S3 path or List of paths (e.g. s3://bucket_name/folder_name/) :param columns: Names of columns to read from the file :param filters: List of filters to apply, like ``[[('x', '=', 0), ...], ...]``. :param procs_cpu_bound: Number of cores used for CPU bound tasks @@ -1538,7 +1291,7 @@ def _read_parquet_path(session_primitives: "SessionPrimitives", Read parquet data from S3 :param session_primitives: SessionPrimitives() - :param path: AWS S3 path (E.g. s3://bucket-name/folder_name/) + :param path: Amazon S3 path (e.g. s3://bucket_name/folder_name/) :param columns: Names of columns to read from the file :param filters: List of filters to apply, like ``[[('x', '=', 0), ...], ...]``. :param procs_cpu_bound: Number of cores used for CPU bound tasks @@ -1614,7 +1367,7 @@ def read_sql_redshift(self, :param sql: SQL Query :param iam_role: AWS IAM role with the related permissions :param connection: Glue connection name (str) OR a PEP 249 compatible connection (Can be generated with Redshift.generate_connection()) - :param temp_s3_path: AWS S3 path to write temporary data (e.g. s3://...) (Default uses the Athena's results bucket) + :param temp_s3_path: Amazon S3 path to write temporary data (e.g. s3://...) (Default uses the Athena's results bucket) :param procs_cpu_bound: Number of cores used for CPU bound tasks """ guid: str = pa.compat.guid() @@ -1682,7 +1435,7 @@ def to_aurora(self, :param schema: The Redshift Schema for the table :param table: The name of the desired Redshift table :param engine: "mysql" or "postgres" - :param temp_s3_path: S3 path to write temporary files (E.g. s3://BUCKET_NAME/ANY_NAME/) + :param temp_s3_path: S3 path to write temporary files (e.g. s3://bucket_name/any_name/) :param preserve_index: Should we preserve the Dataframe index? :param mode: append or overwrite :param columns: List of columns to load @@ -1771,7 +1524,7 @@ def read_sql_aurora(self, :param sql: SQL Query :param connection: Glue connection name (str) OR a PEP 249 compatible connection (Can be generated with Aurora.generate_connection()) :param col_names: List of column names. Default (None) is use columns IDs as column names. - :param temp_s3_path: AWS S3 path to write temporary data (e.g. s3://...) (Default uses the Athena's results bucket) + :param temp_s3_path: Amazon S3 path to write temporary data (e.g. s3://...) (Default uses the Athena's results bucket) :param engine: Only "mysql" by now :param max_result_size: Max number of bytes on each request to S3 :return: Pandas Dataframe or Iterator of Pandas Dataframes if max_result_size != None @@ -1821,76 +1574,20 @@ def read_csv_list( self, paths: List[str], max_result_size=None, - header: Optional[str] = "infer", - names=None, - usecols=None, - dtype=None, - sep=",", - thousands=None, - decimal=".", - lineterminator="\n", - quotechar='"', - quoting=csv.QUOTE_MINIMAL, - escapechar=None, - parse_dates: Union[bool, Dict, List] = False, - infer_datetime_format=False, - na_values: Optional[Union[str, List[str]]] = None, - keep_default_na: bool = True, - na_filter: bool = True, - encoding="utf-8", - converters=None, procs_cpu_bound: Optional[int] = None, + **pd_additional_kwargs, ) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]: """ - Read CSV files from AWS S3 using optimized strategies. - Try to mimic as most as possible pandas.read_csv() - https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html - P.S. max_result_size != None tries to mimic the chunksize behaviour in pandas.read_sql() + Read a list of CSV files from Amazon S3 using optimized strategies. - :param paths: AWS S3 paths (E.g. S3://BUCKET_NAME/KEY_NAME) - :param max_result_size: Max number of bytes on each request to S3 - :param header: Same as pandas.read_csv() - :param names: Same as pandas.read_csv() - :param usecols: Same as pandas.read_csv() - :param dtype: Same as pandas.read_csv() - :param sep: Same as pandas.read_csv() - :param thousands: Same as pandas.read_csv() - :param decimal: Same as pandas.read_csv() - :param lineterminator: Same as pandas.read_csv() - :param quotechar: Same as pandas.read_csv() - :param quoting: Same as pandas.read_csv() - :param escapechar: Same as pandas.read_csv() - :param parse_dates: Same as pandas.read_csv() - :param infer_datetime_format: Same as pandas.read_csv() - :param na_values: Same as pandas.read_csv() - :param keep_default_na: Same as pandas.read_csv() - :param na_filter: Same as pandas.read_csv() - :param encoding: Same as pandas.read_csv() - :param converters: Same as pandas.read_csv() + :param paths: List of Amazon S3 paths (e.g. ['s3://bucket_name/key_name1', 's3://bucket_name/key_name2']) + :param max_result_size: Max number of bytes on each request to S3. It offers functionality similar to chunksize in pandas.read_csv(), but with higher performance :param procs_cpu_bound: Number of cores used for CPU bound tasks + :param **pd_additional_kwargs: Additional parameters forwarded to pandas.read_csv :return: Pandas Dataframe or Iterator of Pandas Dataframes if max_result_size != None """ if max_result_size is not None: - return self._read_csv_list_iterator(paths=paths, - max_result_size=max_result_size, - header=header, - names=names, - usecols=usecols, - dtype=dtype, - sep=sep, - thousands=thousands, - decimal=decimal, - lineterminator=lineterminator, - quotechar=quotechar, - quoting=quoting, - escapechar=escapechar, - parse_dates=parse_dates, - infer_datetime_format=infer_datetime_format, - na_values=na_values, - keep_default_na=keep_default_na, - na_filter=na_filter, - encoding=encoding, - converters=converters) + return self._read_csv_list_iterator(paths=paths, max_result_size=max_result_size, **pd_additional_kwargs) else: procs_cpu_bound = procs_cpu_bound if procs_cpu_bound is not None else self._session.procs_cpu_bound if self._session.procs_cpu_bound is not None else 1 logger.debug(f"procs_cpu_bound: {procs_cpu_bound}") @@ -1902,24 +1599,7 @@ def read_csv_list( df: pd.DataFrame = self._read_csv_once(session_primitives=self._session.primitives, bucket_name=bucket_name, key_path=key_path, - header=header, - names=names, - usecols=usecols, - dtype=dtype, - sep=sep, - thousands=thousands, - decimal=decimal, - lineterminator=lineterminator, - quotechar=quotechar, - quoting=quoting, - escapechar=escapechar, - parse_dates=parse_dates, - infer_datetime_format=infer_datetime_format, - na_values=na_values, - keep_default_na=keep_default_na, - na_filter=na_filter, - encoding=encoding, - converters=converters) + **pd_additional_kwargs) else: procs = [] receive_pipes = [] @@ -1928,12 +1608,9 @@ def read_csv_list( receive_pipe, send_pipe = mp.Pipe() bucket_name, key_path = Pandas._parse_path(path) logger.debug(f"launching path: {path}") - proc = mp.Process( - target=self._read_csv_once_remote, - args=(send_pipe, session_primitives, bucket_name, key_path, header, names, usecols, dtype, sep, - thousands, decimal, lineterminator, quotechar, quoting, escapechar, parse_dates, - infer_datetime_format, na_values, keep_default_na, na_filter, encoding, converters), - ) + proc = mp.Process(target=self._read_csv_once_remote, + args=(send_pipe, session_primitives, bucket_name, key_path), + kwargs=pd_additional_kwargs) proc.daemon = False proc.start() procs.append(proc) @@ -1953,55 +1630,13 @@ def read_csv_list( logger.debug("Concatenation done!") return df - def _read_csv_list_iterator( - self, - paths: List[str], - max_result_size=None, - header="infer", - names=None, - usecols=None, - dtype=None, - sep=",", - thousands=None, - decimal=".", - lineterminator="\n", - quotechar='"', - quoting=csv.QUOTE_MINIMAL, - escapechar=None, - parse_dates: Union[bool, Dict, List] = False, - infer_datetime_format=False, - na_values: Optional[Union[str, List[str]]] = None, - keep_default_na: bool = True, - na_filter: bool = True, - encoding="utf-8", - converters=None, - ): + def _read_csv_list_iterator(self, paths: List[str], max_result_size=None, **pd_additional_kwargs): """ - Read CSV files from AWS S3 using optimized strategies. - Try to mimic as most as possible pandas.read_csv() - https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html - P.S. Try to mimic the chunksize behaviour in pandas.read_sql() + Read a list of CSV files from Amazon S3 using optimized strategies. - :param paths: AWS S3 paths (E.g. S3://BUCKET_NAME/KEY_NAME) - :param max_result_size: Max number of bytes on each request to S3 - :param header: Same as pandas.read_csv() - :param names: Same as pandas.read_csv() - :param usecols: Same as pandas.read_csv() - :param dtype: Same as pandas.read_csv() - :param sep: Same as pandas.read_csv() - :param thousands: Same as pandas.read_csv() - :param decimal: Same as pandas.read_csv() - :param lineterminator: Same as pandas.read_csv() - :param quotechar: Same as pandas.read_csv() - :param quoting: Same as pandas.read_csv() - :param escapechar: Same as pandas.read_csv() - :param parse_dates: Same as pandas.read_csv() - :param infer_datetime_format: Same as pandas.read_csv() - :param na_values: Same as pandas.read_csv() - :param keep_default_na: Same as pandas.read_csv() - :param na_filter: Same as pandas.read_csv() - :param encoding: Same as pandas.read_csv() - :param converters: Same as pandas.read_csv() + :param paths: List of Amazon S3 paths (e.g. ['s3://bucket_name/key_name1', 's3://bucket_name/key_name2']) + :param max_result_size: Max number of bytes on each request to S3. It offers functionality similar to chunksize in pandas.read_csv(), but with higher performance + :param **pd_additional_kwargs: Additional parameters forwarded to pandas.read_csv :return: Iterator of iterators of Pandas Dataframes """ for path in paths: @@ -2010,98 +1645,27 @@ def _read_csv_list_iterator( yield from self._read_csv_iterator(bucket_name=bucket_name, key_path=key_path, max_result_size=max_result_size, - header=header, - names=names, - usecols=usecols, - dtype=dtype, - sep=sep, - thousands=thousands, - decimal=decimal, - lineterminator=lineterminator, - quotechar=quotechar, - quoting=quoting, - escapechar=escapechar, - parse_dates=parse_dates, - infer_datetime_format=infer_datetime_format, - na_values=na_values, - keep_default_na=keep_default_na, - na_filter=na_filter, - encoding=encoding, - converters=converters) + **pd_additional_kwargs) def read_csv_prefix( self, path_prefix: str, max_result_size=None, - header: Optional[str] = "infer", - names=None, - usecols=None, - dtype=None, - sep=",", - thousands=None, - decimal=".", - lineterminator="\n", - quotechar='"', - quoting=csv.QUOTE_MINIMAL, - escapechar=None, - parse_dates: Union[bool, Dict, List] = False, - infer_datetime_format=False, - na_values: Optional[Union[str, List[str]]] = None, - keep_default_na: bool = True, - na_filter: bool = True, - encoding="utf-8", - converters=None, procs_cpu_bound: Optional[int] = None, + **pd_additional_kwargs, ) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]: """ - Read CSV files from AWS S3 PREFIX using optimized strategies. - Try to mimic as most as possible pandas.read_csv() - https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html - P.S. max_result_size != None tries to mimic the chunksize behaviour in pandas.read_sql() + Read all CSV files from a given Amazon S3 prefix using optimized strategies. - :param path_prefix: AWS S3 path prefix (E.g. S3://BUCKET_NAME/PREFIX) - :param max_result_size: Max number of bytes on each request to S3 - :param header: Same as pandas.read_csv() - :param names: Same as pandas.read_csv() - :param usecols: Same as pandas.read_csv() - :param dtype: Same as pandas.read_csv() - :param sep: Same as pandas.read_csv() - :param thousands: Same as pandas.read_csv() - :param decimal: Same as pandas.read_csv() - :param lineterminator: Same as pandas.read_csv() - :param quotechar: Same as pandas.read_csv() - :param quoting: Same as pandas.read_csv() - :param escapechar: Same as pandas.read_csv() - :param parse_dates: Same as pandas.read_csv() - :param infer_datetime_format: Same as pandas.read_csv() - :param na_values: Same as pandas.read_csv() - :param keep_default_na: Same as pandas.read_csv() - :param na_filter: Same as pandas.read_csv() - :param encoding: Same as pandas.read_csv() - :param converters: Same as pandas.read_csv() + :param path_prefix: Amazon S3 prefix (e.g. s3://bucket_name/prefix) + :param max_result_size: Max number of bytes on each request to S3. It offers functionality similar to chunksize in pandas.read_csv(), but with higher performance :param procs_cpu_bound: Number of cores used for CPU bound tasks + :param **pd_additional_kwargs: Additional parameters forwarded to pandas.read_csv :return: Pandas Dataframe or Iterator of Pandas Dataframes if max_result_size != None """ paths: List[str] = self._session.s3.list_objects(path=path_prefix) paths = [p for p in paths if not p.endswith("/")] return self.read_csv_list(paths=paths, max_result_size=max_result_size, - header=header, - names=names, - usecols=usecols, - dtype=dtype, - sep=sep, - thousands=thousands, - decimal=decimal, - lineterminator=lineterminator, - quotechar=quotechar, - quoting=quoting, - escapechar=escapechar, - parse_dates=parse_dates, - infer_datetime_format=infer_datetime_format, - na_values=na_values, - keep_default_na=keep_default_na, - na_filter=na_filter, - encoding=encoding, - converters=converters, - procs_cpu_bound=procs_cpu_bound) + procs_cpu_bound=procs_cpu_bound, + **pd_additional_kwargs)