From 7f48ee1a1969dc28589cb589dbeac0314de6914e Mon Sep 17 00:00:00 2001 From: Igor Tavares Date: Sun, 3 Mar 2019 10:34:35 -0300 Subject: [PATCH 1/3] experimental s3 write --- awswrangler/common.py | 18 ++- awswrangler/glue/utils.py | 8 +- awswrangler/s3/__init__.py | 7 +- awswrangler/s3/csv.py | 44 -------- awswrangler/s3/parquet.py | 48 -------- awswrangler/s3/read/__init__.py | 0 awswrangler/s3/read/csv.py | 0 awswrangler/s3/read/manager.py | 0 awswrangler/s3/write.py | 174 ----------------------------- awswrangler/s3/write/__init__.py | 0 awswrangler/s3/write/csv.py | 4 + awswrangler/s3/write/manager.py | 184 +++++++++++++++++++++++++++++++ awswrangler/s3/write/metadata.py | 102 +++++++++++++++++ awswrangler/s3/write/parquet.py | 8 ++ awswrangler/s3/write/write.py | 103 +++++++++++++++++ tests/utils.py | 1 - 16 files changed, 429 insertions(+), 272 deletions(-) delete mode 100644 awswrangler/s3/csv.py delete mode 100644 awswrangler/s3/parquet.py create mode 100644 awswrangler/s3/read/__init__.py create mode 100644 awswrangler/s3/read/csv.py create mode 100644 awswrangler/s3/read/manager.py delete mode 100644 awswrangler/s3/write.py create mode 100644 awswrangler/s3/write/__init__.py create mode 100644 awswrangler/s3/write/csv.py create mode 100644 awswrangler/s3/write/manager.py create mode 100644 awswrangler/s3/write/metadata.py create mode 100644 awswrangler/s3/write/parquet.py create mode 100644 awswrangler/s3/write/write.py diff --git a/awswrangler/common.py b/awswrangler/common.py index f5b5ac004..76282d284 100644 --- a/awswrangler/common.py +++ b/awswrangler/common.py @@ -5,7 +5,7 @@ def get_session( session_primitives=None, key=None, secret=None, profile=None, region=None ): """ - Return a configured boto3 Session object + Return a configured Boto3 Session object """ if session_primitives: key = session_primitives.key if session_primitives.key else key @@ -28,3 +28,19 @@ def __init__(self, key=None, secret=None, profile=None, region=None): self.secret = secret self.profile = profile self.region = region + + +def calculate_bounders(num_items, num_groups): + num_groups = num_items if num_items < num_groups else num_groups + size = int(num_items / num_groups) + rest = num_items % num_groups + bounders = [] + end = -1 + for _ in range(num_groups): + start = end + 1 + end += size + if rest: + end += 1 + rest -= 1 + bounders.append((start, end)) + return bounders diff --git a/awswrangler/glue/utils.py b/awswrangler/glue/utils.py index 1fa6fa996..5db5363fd 100644 --- a/awswrangler/glue/utils.py +++ b/awswrangler/glue/utils.py @@ -57,7 +57,13 @@ def add_partitions( def create_table( - database, table, schema, partition_cols, path, file_format, session_primitives=None + database, + table, + schema, + path, + file_format, + partition_cols=None, + session_primitives=None, ): """ Create Glue table diff --git a/awswrangler/s3/__init__.py b/awswrangler/s3/__init__.py index 303909bd6..d8e1a2e03 100644 --- a/awswrangler/s3/__init__.py +++ b/awswrangler/s3/__init__.py @@ -1,10 +1,10 @@ -from . import write as _write +from .write.write import write as _write def write( df, - database, path, + database=None, table=None, partition_cols=[], preserve_index=False, @@ -14,5 +14,6 @@ def write( key=None, secret=None, profile=None, + num_procs=None, ): - return _write.write(**locals()) + return _write(**locals()) diff --git a/awswrangler/s3/csv.py b/awswrangler/s3/csv.py deleted file mode 100644 index cf9397c18..000000000 --- a/awswrangler/s3/csv.py +++ /dev/null @@ -1,44 +0,0 @@ -from pyarrow.compat import guid - -from .utils import mkdir_if_not_exists, delete_listed_objects, list_objects - - -def write(df, fs, path, preserve_index): - outfile = guid() + ".csv" - full_path = "/".join([path, outfile]) - csv_buffer = df.to_csv(None, header=False, index=preserve_index).encode() - with fs.open(full_path, "wb") as f: - f.write(csv_buffer) - - -def write_dataset( - df, fs, path, partition_cols, preserve_index, session_primitives, mode -): - partition_paths = [] - dead_keys = [] - for keys, subgroup in df.groupby(partition_cols): - subgroup = subgroup.drop(partition_cols, axis="columns") - if not isinstance(keys, tuple): - keys = (keys,) - subdir = "/".join( - [ - "{colname}={value}".format(colname=name, value=val) - for name, val in zip(partition_cols, keys) - ] - ) - prefix = "/".join([path, subdir]) - if mode == "overwrite_partitions": - dead_keys += list_objects(prefix, session_primitives=session_primitives) - mkdir_if_not_exists(fs, prefix) - outfile = guid() + ".csv" - full_path = "/".join([prefix, outfile]) - csv_buffer = subgroup.to_csv(None, header=False, index=preserve_index).encode() - with fs.open(full_path, "wb") as f: - f.write(csv_buffer) - partition_path = full_path.rpartition("/")[0] + "/" - keys_str = [str(x) for x in keys] - partition_paths.append((partition_path, keys_str)) - if mode == "overwrite_partitions" and dead_keys: - bucket = path.replace("s3://", "").split("/", 1)[0] - delete_listed_objects(bucket, dead_keys, session_primitives=session_primitives) - return partition_paths diff --git a/awswrangler/s3/parquet.py b/awswrangler/s3/parquet.py deleted file mode 100644 index 888ee5aa7..000000000 --- a/awswrangler/s3/parquet.py +++ /dev/null @@ -1,48 +0,0 @@ -import pyarrow as pa -from pyarrow.compat import guid -import pyarrow.parquet as pq - -from .utils import mkdir_if_not_exists, delete_listed_objects, list_objects - - -def write(df, fs, path, preserve_index): - outfile = guid() + ".parquet" - full_path = "/".join([path, outfile]) - table = pa.Table.from_pandas(df, preserve_index=preserve_index) - with fs.open(full_path, "wb") as f: - pq.write_table(table, f, coerce_timestamps="ms") - - -def write_dataset( - df, fs, path, partition_cols, preserve_index, session_primitives, mode -): - partition_paths = [] - dead_keys = [] - for keys, subgroup in df.groupby(partition_cols): - subgroup = subgroup.drop(partition_cols, axis="columns") - if not isinstance(keys, tuple): - keys = (keys,) - subdir = "/".join( - [ - "{colname}={value}".format(colname=name, value=val) - for name, val in zip(partition_cols, keys) - ] - ) - subtable = pa.Table.from_pandas( - subgroup, preserve_index=preserve_index, safe=False - ) - prefix = "/".join([path, subdir]) - if mode == "overwrite_partitions": - dead_keys += list_objects(prefix, session_primitives=session_primitives) - mkdir_if_not_exists(fs, prefix) - outfile = guid() + ".parquet" - full_path = "/".join([prefix, outfile]) - with fs.open(full_path, "wb") as f: - pq.write_table(subtable, f, coerce_timestamps="ms") - partition_path = full_path.rpartition("/")[0] + "/" - keys_str = [str(x) for x in keys] - partition_paths.append((partition_path, keys_str)) - if mode == "overwrite_partitions" and dead_keys: - bucket = path.replace("s3://", "").split("/", 1)[0] - delete_listed_objects(bucket, dead_keys, session_primitives=session_primitives) - return partition_paths diff --git a/awswrangler/s3/read/__init__.py b/awswrangler/s3/read/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/awswrangler/s3/read/csv.py b/awswrangler/s3/read/csv.py new file mode 100644 index 000000000..e69de29bb diff --git a/awswrangler/s3/read/manager.py b/awswrangler/s3/read/manager.py new file mode 100644 index 000000000..e69de29bb diff --git a/awswrangler/s3/write.py b/awswrangler/s3/write.py deleted file mode 100644 index 98561f42a..000000000 --- a/awswrangler/s3/write.py +++ /dev/null @@ -1,174 +0,0 @@ -import sys - -from pyarrow.filesystem import _ensure_filesystem - -from awswrangler.exceptions import ( - UnsupportedFileFormat, - UnsupportedType, - UnsupportedWriteMode, -) -from ..common import SessionPrimitives -from .utils import get_fs, delete_objects, mkdir_if_not_exists -from ..glue.utils import ( - delete_table_if_exists, - create_table, - add_partitions, - table_exists, -) -from .parquet import write_dataset as p_write_dataset, write as p_write -from .csv import write_dataset as c_write_dataset, write as c_write - - -if sys.version_info.major > 2: - string_types = str # noqa -else: - # noinspection PyUnresolvedReferences - string_types = basestring # noqa - - -def _type_pandas2athena(dtype): - dtype = dtype.lower() - if dtype == "int32": - return "int" - elif dtype == "int64": - return "bigint" - elif dtype == "float32": - return "float" - elif dtype == "float64": - return "double" - elif dtype == "bool": - return "boolean" - elif dtype == "object" and isinstance(dtype, string_types): - return "string" - elif dtype[:10] == "datetime64": - return "string" - else: - raise UnsupportedType("Unsupported Pandas type: " + dtype) - - -def _build_schema(df, partition_cols, preserve_index): - schema_built = [] - if preserve_index: - name = str(df.index.name) if df.index.name else "index" - df.index.name = "index" - dtype = str(df.index.dtype) - if name not in partition_cols: - athena_type = _type_pandas2athena(dtype) - schema_built.append((name, athena_type)) - for col in df.columns: - name = str(col) - dtype = str(df[name].dtype) - if name not in partition_cols: - athena_type = _type_pandas2athena(dtype) - schema_built.append((name, athena_type)) - return schema_built - - -def _write_data( - df, - session_primitives, - path, - partition_cols=[], - preserve_index=True, - file_format="parquet", - mode="append", -): - """ - Write the parquet files to s3 - """ - if path[-1] == "/": - path = path[:-1] - fs = get_fs(session_primitives=session_primitives) - fs = _ensure_filesystem(fs) - mkdir_if_not_exists(fs, path) - schema = _build_schema( - df=df, partition_cols=partition_cols, preserve_index=preserve_index - ) - partition_paths = None - file_format = file_format.lower() - if partition_cols is not None and len(partition_cols) > 0: - if file_format == "parquet": - partition_paths = p_write_dataset( - df, fs, path, partition_cols, preserve_index, session_primitives, mode - ) - elif file_format == "csv": - partition_paths = c_write_dataset( - df, fs, path, partition_cols, preserve_index, session_primitives, mode - ) - else: - raise UnsupportedFileFormat(file_format) - else: - if file_format == "parquet": - p_write(df, fs, path, preserve_index) - elif file_format == "csv": - c_write(df, fs, path, preserve_index) - else: - raise UnsupportedFileFormat(file_format) - return schema, partition_paths - - -def _get_table_name(path): - if path[-1] == "/": - path = path[:-1] - return path.rpartition("/")[2] - - -def write( - df, - path, - database=None, - table=None, - partition_cols=[], - preserve_index=True, - file_format="parquet", - mode="append", - region=None, - key=None, - secret=None, - profile=None, -): - """ - Convert a given Pandas Dataframe to a Glue Parquet table - """ - session_primitives = SessionPrimitives( - region=region, key=key, secret=secret, profile=profile - ) - if mode == "overwrite" or (mode == "overwrite_partitions" and not partition_cols): - delete_objects(path, session_primitives=session_primitives) - elif mode not in ["overwrite_partitions", "append"]: - raise UnsupportedWriteMode(mode) - schema, partition_paths = _write_data( - df=df, - session_primitives=session_primitives, - path=path, - partition_cols=partition_cols, - preserve_index=preserve_index, - file_format=file_format, - mode=mode, - ) - if database: - table = table if table else _get_table_name(path) - if mode == "overwrite": - delete_table_if_exists( - database=database, table=table, session_primitives=session_primitives - ) - exists = table_exists( - database=database, table=table, session_primitives=session_primitives - ) - if not exists: - create_table( - database=database, - table=table, - schema=schema, - partition_cols=partition_cols, - path=path, - file_format=file_format, - session_primitives=session_primitives, - ) - add_partitions( - database=database, - table=table, - partition_paths=partition_paths, - file_format=file_format, - session_primitives=session_primitives, - ) diff --git a/awswrangler/s3/write/__init__.py b/awswrangler/s3/write/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/awswrangler/s3/write/csv.py b/awswrangler/s3/write/csv.py new file mode 100644 index 000000000..bddb39fe9 --- /dev/null +++ b/awswrangler/s3/write/csv.py @@ -0,0 +1,4 @@ +def write_csv_dataframe(df, path, preserve_index, fs): + csv_buffer = df.to_csv(None, header=False, index=preserve_index).encode() + with fs.open(path, "wb") as f: + f.write(csv_buffer) diff --git a/awswrangler/s3/write/manager.py b/awswrangler/s3/write/manager.py new file mode 100644 index 000000000..d448aa762 --- /dev/null +++ b/awswrangler/s3/write/manager.py @@ -0,0 +1,184 @@ +import multiprocessing as mp + +from pyarrow.compat import guid +from pyarrow.filesystem import _ensure_filesystem + +from awswrangler.common import calculate_bounders +from awswrangler.s3.utils import ( + mkdir_if_not_exists, + delete_listed_objects, + list_objects, + get_fs, +) +from awswrangler.s3.write.parquet import write_parquet_dataframe +from awswrangler.s3.write.csv import write_csv_dataframe + + +def _get_bounders(df, num_procs): + num_rows = len(df.index) + return calculate_bounders(num_items=num_rows, num_groups=num_procs) + + +def write_file(df, path, preserve_index, session_primitives, file_format): + fs = get_fs(session_primitives=session_primitives) + fs = _ensure_filesystem(fs) + mkdir_if_not_exists(fs, path) + if file_format == "parquet": + outfile = guid() + ".parquet" + elif file_format == "csv": + outfile = guid() + ".csv" + full_path = "/".join([path, outfile]) + if file_format == "parquet": + write_parquet_dataframe( + df=df, path=full_path, preserve_index=preserve_index, fs=fs + ) + elif file_format == "csv": + write_csv_dataframe(df=df, path=full_path, preserve_index=preserve_index, fs=fs) + return full_path + + +def write_file_manager( + df, path, preserve_index, session_primitives, file_format, num_procs +): + bounders = _get_bounders(df=df, num_procs=num_procs) + partition_paths = [] + procs = [] + for bounder in bounders[1:]: + proc = mp.Process( + target=write_file, + args=( + df.iloc[bounder[0] : bounder[1], :], + path, + preserve_index, + session_primitives, + file_format, + ), + ) + proc.daemon = True + proc.start() + procs.append(proc) + partition_paths.append( + write_file( + df=df.iloc[bounders[0][0] : bounders[0][1], :], + path=path, + preserve_index=preserve_index, + session_primitives=session_primitives, + file_format=file_format, + ) + ) + for i in range(len(procs)): + procs[i].join() + + +def write_dataset( + df, path, partition_cols, preserve_index, session_primitives, file_format, mode +): + fs = get_fs(session_primitives=session_primitives) + fs = _ensure_filesystem(fs) + mkdir_if_not_exists(fs, path) + partition_paths = [] + dead_keys = [] + for keys, subgroup in df.groupby(partition_cols): + subgroup = subgroup.drop(partition_cols, axis="columns") + if not isinstance(keys, tuple): + keys = (keys,) + subdir = "/".join( + [ + "{colname}={value}".format(colname=name, value=val) + for name, val in zip(partition_cols, keys) + ] + ) + prefix = "/".join([path, subdir]) + if mode == "overwrite_partitions": + dead_keys += list_objects(prefix, session_primitives=session_primitives) + full_path = write_file( + df=subgroup, + path=prefix, + preserve_index=preserve_index, + session_primitives=session_primitives, + file_format=file_format, + ) + partition_path = full_path.rpartition("/")[0] + "/" + keys_str = [str(x) for x in keys] + partition_paths.append((partition_path, keys_str)) + if mode == "overwrite_partitions" and dead_keys: + bucket = path.replace("s3://", "").split("/", 1)[0] + delete_listed_objects(bucket, dead_keys, session_primitives=session_primitives) + return partition_paths + + +def write_dataset_remote( + send_pipe, + df, + path, + partition_cols, + preserve_index, + session_primitives, + file_format, + mode, +): + send_pipe.send( + write_dataset( + df=df, + path=path, + partition_cols=partition_cols, + preserve_index=preserve_index, + session_primitives=session_primitives, + file_format=file_format, + mode=mode, + ) + ) + send_pipe.close() + + +def write_dataset_manager( + df, + path, + partition_cols, + session_primitives, + preserve_index, + file_format, + mode, + num_procs, +): + bounders = _get_bounders(df=df, num_procs=num_procs) + print(f"bounders: {bounders}") + partition_paths = [] + procs = [] + receive_pipes = [] + for bounder in bounders[1:]: + receive_pipe, send_pipe = mp.Pipe(duplex=False) + proc = mp.Process( + target=write_dataset_remote, + args=( + send_pipe, + df.iloc[bounder[0] : bounder[1], :], + path, + partition_cols, + preserve_index, + session_primitives, + file_format, + mode, + ), + ) + proc.daemon = True + proc.start() + send_pipe.close() + procs.append(proc) + receive_pipes.append(receive_pipe) + partition_paths.append( + write_dataset( + df=df.iloc[bounders[0][0] : bounders[0][1], :], + path=path, + partition_cols=partition_cols, + preserve_index=preserve_index, + session_primitives=session_primitives, + file_format=file_format, + mode=mode, + ) + ) + for i in range(len(procs)): + procs[i].join() + partition_paths.append(receive_pipes[i].recv()) + receive_pipes[i].close() + return partition_paths diff --git a/awswrangler/s3/write/metadata.py b/awswrangler/s3/write/metadata.py new file mode 100644 index 000000000..ce37c21db --- /dev/null +++ b/awswrangler/s3/write/metadata.py @@ -0,0 +1,102 @@ +import sys + +from awswrangler.exceptions import UnsupportedType +from awswrangler.glue.utils import ( + delete_table_if_exists, + create_table, + add_partitions, + table_exists, +) + + +if sys.version_info.major > 2: + string_types = str # noqa +else: + # noinspection PyUnresolvedReferences + string_types = basestring # noqa + + +def _type_pandas2athena(dtype): + dtype = dtype.lower() + if dtype == "int32": + return "int" + elif dtype == "int64": + return "bigint" + elif dtype == "float32": + return "float" + elif dtype == "float64": + return "double" + elif dtype == "bool": + return "boolean" + elif dtype == "object" and isinstance(dtype, string_types): + return "string" + elif dtype[:10] == "datetime64": + return "string" + else: + raise UnsupportedType("Unsupported Pandas type: " + dtype) + + +def _build_schema(df, partition_cols, preserve_index): + schema_built = [] + if preserve_index: + name = str(df.index.name) if df.index.name else "index" + df.index.name = "index" + dtype = str(df.index.dtype) + if name not in partition_cols: + athena_type = _type_pandas2athena(dtype) + schema_built.append((name, athena_type)) + for col in df.columns: + name = str(col) + dtype = str(df[name].dtype) + if name not in partition_cols: + athena_type = _type_pandas2athena(dtype) + schema_built.append((name, athena_type)) + return schema_built + + +def _get_table_name(path): + if path[-1] == "/": + path = path[:-1] + return path.rpartition("/")[2] + + +def write_metadata( + df, + path, + session_primitives, + partition_paths, + database=None, + table=None, + partition_cols=None, + preserve_index=True, + file_format="parquet", + mode="append", +): + schema = _build_schema( + df=df, partition_cols=partition_cols, preserve_index=preserve_index + ) + table = table if table else _get_table_name(path) + if mode == "overwrite": + delete_table_if_exists( + database=database, table=table, session_primitives=session_primitives + ) + exists = table_exists( + database=database, table=table, session_primitives=session_primitives + ) + if not exists: + create_table( + database=database, + table=table, + schema=schema, + partition_cols=partition_cols, + path=path, + file_format=file_format, + session_primitives=session_primitives, + ) + add_partitions( + database=database, + table=table, + partition_paths=partition_paths, + file_format=file_format, + session_primitives=session_primitives, + ) diff --git a/awswrangler/s3/write/parquet.py b/awswrangler/s3/write/parquet.py new file mode 100644 index 000000000..4b02daddb --- /dev/null +++ b/awswrangler/s3/write/parquet.py @@ -0,0 +1,8 @@ +import pyarrow as pa +import pyarrow.parquet as pq + + +def write_parquet_dataframe(df, path, preserve_index, fs): + table = pa.Table.from_pandas(df, preserve_index=preserve_index, safe=False) + with fs.open(path, "wb") as f: + pq.write_table(table, f, coerce_timestamps="ms") diff --git a/awswrangler/s3/write/write.py b/awswrangler/s3/write/write.py new file mode 100644 index 000000000..c89881f1f --- /dev/null +++ b/awswrangler/s3/write/write.py @@ -0,0 +1,103 @@ +import multiprocessing as mp + +from awswrangler.exceptions import UnsupportedFileFormat, UnsupportedWriteMode +from awswrangler.common import SessionPrimitives +from awswrangler.s3.utils import delete_objects +from awswrangler.s3.write.manager import write_dataset_manager, write_file_manager +from awswrangler.s3.write.metadata import write_metadata + + +def _write_data( + df, + path, + session_primitives, + partition_cols=None, + preserve_index=True, + file_format="parquet", + mode="append", + num_procs=None, +): + """ + Write the parquet files to s3 + """ + if not num_procs: + num_procs = mp.cpu_count() + if path[-1] == "/": + path = path[:-1] + file_format = file_format.lower() + if file_format not in ["parquet", "csv"]: + raise UnsupportedFileFormat(file_format) + partition_paths = None + + if partition_cols is not None and len(partition_cols) > 0: + partition_paths = write_dataset_manager( + df=df, + path=path, + partition_cols=partition_cols, + session_primitives=session_primitives, + preserve_index=preserve_index, + file_format=file_format, + mode=mode, + num_procs=num_procs, + ) + else: + write_file_manager( + df=df, + path=path, + preserve_index=preserve_index, + session_primitives=session_primitives, + file_format=file_format, + num_procs=num_procs, + ) + + return partition_paths + + +def write( + df, + path, + database=None, + table=None, + partition_cols=None, + preserve_index=True, + file_format="parquet", + mode="append", + region=None, + key=None, + secret=None, + profile=None, + num_procs=None, +): + """ + Convert a given Pandas Dataframe to a Glue Parquet table + """ + session_primitives = SessionPrimitives( + region=region, key=key, secret=secret, profile=profile + ) + if mode == "overwrite" or (mode == "overwrite_partitions" and not partition_cols): + delete_objects(path, session_primitives=session_primitives) + elif mode not in ["overwrite_partitions", "append"]: + raise UnsupportedWriteMode(mode) + partition_paths = _write_data( + df=df, + path=path, + partition_cols=partition_cols, + preserve_index=preserve_index, + file_format=file_format, + mode=mode, + session_primitives=session_primitives, + num_procs=num_procs, + ) + if database: + write_metadata( + df=df, + path=path, + session_primitives=session_primitives, + partition_paths=partition_paths, + database=database, + table=table, + partition_cols=partition_cols, + preserve_index=preserve_index, + file_format=file_format, + mode=mode, + ) diff --git a/tests/utils.py b/tests/utils.py index 1b0270e8e..bc66a4bfb 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -12,7 +12,6 @@ def calc_bounders(num, cpus): size = int(num / cpus) rest = num % cpus bounders = [] - start = 0 end = -1 for _ in range(cpus): start = end + 1 From 850e0bfffaad48d4df84822449032eae4fa7eec1 Mon Sep 17 00:00:00 2001 From: Igor Tavares Date: Sun, 3 Mar 2019 22:36:18 -0300 Subject: [PATCH 2/3] add tests to parallel operations --- awswrangler/athena/read.py | 4 +- awswrangler/common.py | 4 +- awswrangler/s3/read/__init__.py | 0 awswrangler/s3/read/csv.py | 0 awswrangler/s3/read/manager.py | 0 awswrangler/s3/utils.py | 75 ++++++++------------ awswrangler/s3/write/manager.py | 122 ++++++++++++++++++-------------- data_samples/micro.csv | 11 +++ tests/test_awswrangler.py | 103 +++++++++++++++++++-------- tox.ini | 10 +-- 10 files changed, 191 insertions(+), 138 deletions(-) delete mode 100644 awswrangler/s3/read/__init__.py delete mode 100644 awswrangler/s3/read/csv.py delete mode 100644 awswrangler/s3/read/manager.py create mode 100644 data_samples/micro.csv diff --git a/awswrangler/athena/read.py b/awswrangler/athena/read.py index 8bfcdf1f6..31769f17a 100644 --- a/awswrangler/athena/read.py +++ b/awswrangler/athena/read.py @@ -22,10 +22,10 @@ def read( athena_client = session.client("athena") qe = run_query(athena_client, query, database, s3_output) validation = query_validation(athena_client, qe) - if validation["QueryExecution"]["Status"]["State"] == "FAILED": + if validation.get("QueryExecution").get("Status").get("State") == "FAILED": message_error = ( "Your query is not valid: " - + validation["QueryExecution"]["Status"]["StateChangeReason"] + + validation.get("QueryExecution").get("Status").get("StateChangeReason") ) raise Exception(message_error) else: diff --git a/awswrangler/common.py b/awswrangler/common.py index 76282d284..9fdadf7ff 100644 --- a/awswrangler/common.py +++ b/awswrangler/common.py @@ -35,9 +35,9 @@ def calculate_bounders(num_items, num_groups): size = int(num_items / num_groups) rest = num_items % num_groups bounders = [] - end = -1 + end = 0 for _ in range(num_groups): - start = end + 1 + start = end end += size if rest: end += 1 diff --git a/awswrangler/s3/read/__init__.py b/awswrangler/s3/read/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/awswrangler/s3/read/csv.py b/awswrangler/s3/read/csv.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/awswrangler/s3/read/manager.py b/awswrangler/s3/read/manager.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/awswrangler/s3/utils.py b/awswrangler/s3/utils.py index 5717c421f..359e1aaf9 100644 --- a/awswrangler/s3/utils.py +++ b/awswrangler/s3/utils.py @@ -2,11 +2,7 @@ import s3fs -from ..common import get_session - - -def del_objs_batch_wrapper(args): - return del_objs_batch(*args) +from ..common import get_session, calculate_bounders def del_objs_batch(bucket, batch, session_primitives=None): @@ -19,27 +15,27 @@ def delete_objects(path, session_primitives=None, batch_size=1000): if path[-1] != "/": path += "/" client = get_session(session_primitives=session_primitives).client("s3") - pool = mp.Pool(mp.cpu_count()) procs = [] args = {"Bucket": bucket, "MaxKeys": batch_size, "Prefix": path} - NextContinuationToken = True - while NextContinuationToken: + next_continuation_token = True + while next_continuation_token: res = client.list_objects_v2(**args) if not res.get("Contents"): break keys = [{"Key": x.get("Key")} for x in res.get("Contents")] - NextContinuationToken = res.get("NextContinuationToken") - if NextContinuationToken: - args["ContinuationToken"] = NextContinuationToken - procs.append( - pool.apply_async( - del_objs_batch_wrapper, ((bucket, keys, session_primitives),) - ) + next_continuation_token = res.get("NextContinuationToken") + if next_continuation_token: + args["ContinuationToken"] = next_continuation_token + proc = mp.Process( + target=del_objs_batch, args=(bucket, keys, session_primitives) ) + proc.daemon = True + proc.start() + procs.append(proc) else: del_objs_batch(bucket, keys, session_primitives) for proc in procs: - proc.get() + proc.join() def list_objects(path, session_primitives=None, batch_size=1000): @@ -48,45 +44,34 @@ def list_objects(path, session_primitives=None, batch_size=1000): path += "/" client = get_session(session_primitives=session_primitives).client("s3") args = {"Bucket": bucket, "MaxKeys": batch_size, "Prefix": path} - NextContinuationToken = True + next_continuation_token = True keys = [] - while NextContinuationToken: + while next_continuation_token: res = client.list_objects_v2(**args) if not res.get("Contents"): break keys += [{"Key": x.get("Key")} for x in res.get("Contents")] - NextContinuationToken = res.get("NextContinuationToken") - if NextContinuationToken: - args["ContinuationToken"] = NextContinuationToken + next_continuation_token = res.get("NextContinuationToken") + if next_continuation_token: + args["ContinuationToken"] = next_continuation_token return keys -def calc_bounders(num, cpus): - cpus = num if num < cpus else cpus - size = int(num / cpus) - rest = num % cpus - bounders = [] - start = 0 - end = -1 - for _ in range(cpus): - start = end + 1 - end += size - if rest: - end += 1 - rest -= 1 - bounders.append((start, end)) - return bounders - - def delete_listed_objects(bucket, batch, session_primitives=None, batch_size=1000): if len(batch) > batch_size: - cpus = mp.cpu_count() - bounders = calc_bounders(len(batch), cpus) - args = [] - for item in bounders: - args.append((bucket, batch[item[0] : item[1] + 1], session_primitives)) - pool = mp.Pool(cpus) - pool.map(del_objs_batch_wrapper, args) + num_procs = mp.cpu_count() + bounders = calculate_bounders(len(batch), num_procs) + procs = [] + for bounder in bounders: + proc = mp.Process( + target=del_objs_batch, + args=(bucket, batch[bounder[0] : bounder[1]], session_primitives), + ) + proc.daemon = True + proc.start() + procs.append(proc) + for proc in procs: + proc.join() else: del_objs_batch(bucket, batch, session_primitives) diff --git a/awswrangler/s3/write/manager.py b/awswrangler/s3/write/manager.py index d448aa762..e274c346f 100644 --- a/awswrangler/s3/write/manager.py +++ b/awswrangler/s3/write/manager.py @@ -1,3 +1,4 @@ +import sys import multiprocessing as mp from pyarrow.compat import guid @@ -13,6 +14,9 @@ from awswrangler.s3.write.parquet import write_parquet_dataframe from awswrangler.s3.write.csv import write_csv_dataframe +if sys.version_info.major > 2: + xrange = range + def _get_bounders(df, num_procs): num_rows = len(df.index) @@ -40,24 +44,23 @@ def write_file(df, path, preserve_index, session_primitives, file_format): def write_file_manager( df, path, preserve_index, session_primitives, file_format, num_procs ): - bounders = _get_bounders(df=df, num_procs=num_procs) - partition_paths = [] - procs = [] - for bounder in bounders[1:]: - proc = mp.Process( - target=write_file, - args=( - df.iloc[bounder[0] : bounder[1], :], - path, - preserve_index, - session_primitives, - file_format, - ), - ) - proc.daemon = True - proc.start() - procs.append(proc) - partition_paths.append( + if num_procs > 1: + bounders = _get_bounders(df=df, num_procs=num_procs) + procs = [] + for bounder in bounders[1:]: + proc = mp.Process( + target=write_file, + args=( + df.iloc[bounder[0] : bounder[1], :], + path, + preserve_index, + session_primitives, + file_format, + ), + ) + proc.daemon = True + proc.start() + procs.append(proc) write_file( df=df.iloc[bounders[0][0] : bounders[0][1], :], path=path, @@ -65,9 +68,16 @@ def write_file_manager( session_primitives=session_primitives, file_format=file_format, ) - ) - for i in range(len(procs)): - procs[i].join() + for i in range(len(procs)): + procs[i].join() + else: + write_file( + df=df, + path=path, + preserve_index=preserve_index, + session_primitives=session_primitives, + file_format=file_format, + ) def write_dataset( @@ -141,33 +151,32 @@ def write_dataset_manager( mode, num_procs, ): - bounders = _get_bounders(df=df, num_procs=num_procs) - print(f"bounders: {bounders}") partition_paths = [] - procs = [] - receive_pipes = [] - for bounder in bounders[1:]: - receive_pipe, send_pipe = mp.Pipe(duplex=False) - proc = mp.Process( - target=write_dataset_remote, - args=( - send_pipe, - df.iloc[bounder[0] : bounder[1], :], - path, - partition_cols, - preserve_index, - session_primitives, - file_format, - mode, - ), - ) - proc.daemon = True - proc.start() - send_pipe.close() - procs.append(proc) - receive_pipes.append(receive_pipe) - partition_paths.append( - write_dataset( + if num_procs > 1: + bounders = _get_bounders(df=df, num_procs=num_procs) + procs = [] + receive_pipes = [] + for bounder in bounders[1:]: + receive_pipe, send_pipe = mp.Pipe(duplex=False) + proc = mp.Process( + target=write_dataset_remote, + args=( + send_pipe, + df.iloc[bounder[0] : bounder[1], :], + path, + partition_cols, + preserve_index, + session_primitives, + file_format, + mode, + ), + ) + proc.daemon = True + proc.start() + send_pipe.close() + procs.append(proc) + receive_pipes.append(receive_pipe) + partition_paths += write_dataset( df=df.iloc[bounders[0][0] : bounders[0][1], :], path=path, partition_cols=partition_cols, @@ -176,9 +185,18 @@ def write_dataset_manager( file_format=file_format, mode=mode, ) - ) - for i in range(len(procs)): - procs[i].join() - partition_paths.append(receive_pipes[i].recv()) - receive_pipes[i].close() + for i in range(len(procs)): + procs[i].join() + partition_paths += receive_pipes[i].recv() + receive_pipes[i].close() + else: + partition_paths += write_dataset( + df=df, + path=path, + partition_cols=partition_cols, + preserve_index=preserve_index, + session_primitives=session_primitives, + file_format=file_format, + mode=mode, + ) return partition_paths diff --git a/data_samples/micro.csv b/data_samples/micro.csv new file mode 100644 index 000000000..0141aa38b --- /dev/null +++ b/data_samples/micro.csv @@ -0,0 +1,11 @@ +id,name,value,date +0,zero,0.00,2007-11-06 +1,one,1.00,2007-11-05 +2,two,2.00,2007-11-06 +3,three,3.00,2007-11-05 +4,four,4.00,2007-11-06 +5,five,5.00,2007-11-05 +6,six,6.00,2007-11-06 +7,seven,7.00,2007-11-05 +8,eight,8.00,2007-11-06 +9,nine,9.00,2007-11-05 \ No newline at end of file diff --git a/tests/test_awswrangler.py b/tests/test_awswrangler.py index ff300cc0a..1234a426d 100644 --- a/tests/test_awswrangler.py +++ b/tests/test_awswrangler.py @@ -25,72 +25,119 @@ def database(): yield database -@pytest.mark.parametrize("file_format", [("parquet"), ("csv")]) -def test_awswrangler(bucket, database, file_format): - df = pd.read_csv("data_samples/small.csv") - df = df[(df.name.isin(["Brazil", "Argentina"])) & (df.date == "2019")] +@pytest.mark.parametrize("file_format", ["csv", "parquet"]) +def test_s3_write(bucket, database, file_format): + df = pd.read_csv("data_samples/micro.csv") awswrangler.s3.write( df=df, database=database, - table="small", - path="s3://{}/small/".format(bucket), + path="s3://{}/test/".format(bucket), file_format=file_format, - partition_cols=["name", "date"], + preserve_index=True, mode="overwrite", ) + df2 = awswrangler.athena.read(database, "select * from test") + assert len(df.index) == len(df2.index) + + +@pytest.mark.parametrize("file_format", ["csv", "parquet"]) +def test_s3_write_single(bucket, database, file_format): + df = pd.read_csv("data_samples/micro.csv") awswrangler.s3.write( df=df, database=database, - table="small", - path="s3://{}/small/".format(bucket), + path="s3://{}/test/".format(bucket), file_format=file_format, - partition_cols=["name", "date"], - mode="overwrite_partitions", + preserve_index=False, + mode="overwrite", + num_procs=1, ) + df2 = awswrangler.athena.read(database, "select * from test") + assert len(df.index) == len(df2.index) + + +@pytest.mark.parametrize("file_format", ["csv", "parquet"]) +def test_s3_write_partitioned(bucket, database, file_format): + df = pd.read_csv("data_samples/micro.csv") awswrangler.s3.write( df=df, database=database, - table="small", - path="s3://{}/small/".format(bucket), + path="s3://{}/test/".format(bucket), file_format=file_format, - partition_cols=["name", "date"], - mode="append", + preserve_index=True, + partition_cols=["date"], + mode="overwrite", ) - df2 = awswrangler.athena.read( - database, "select * from small", "s3://{}/athena/".format(bucket) + df2 = awswrangler.athena.read(database, "select * from test") + assert len(df.index) == len(df2.index) + + +@pytest.mark.parametrize("file_format", ["csv", "parquet"]) +def test_s3_write_partitioned_single(bucket, database, file_format): + df = pd.read_csv("data_samples/micro.csv") + awswrangler.s3.write( + df=df, + database=database, + path="s3://{}/test/".format(bucket), + file_format=file_format, + preserve_index=False, + partition_cols=["date"], + mode="overwrite", + num_procs=1, ) - assert 2 * len(df.index) == len(df2.index) + df2 = awswrangler.athena.read(database, "select * from test") + assert len(df.index) == len(df2.index) -@pytest.mark.parametrize("file_format", [("parquet"), ("csv")]) -def test_awswrangler2(bucket, database, file_format): - df = pd.read_csv("data_samples/small.csv") - df = df[(df.name.isin(["Brazil", "Argentina"])) & (df.date == "2019")] +@pytest.mark.parametrize("file_format", ["csv", "parquet"]) +def test_s3_write_multi_partitioned(bucket, database, file_format): + df = pd.read_csv("data_samples/micro.csv") awswrangler.s3.write( df=df, database=database, - path="s3://{}/small2/".format(bucket), + path="s3://{}/test/".format(bucket), file_format=file_format, preserve_index=True, + partition_cols=["name", "date"], mode="overwrite", ) + df2 = awswrangler.athena.read(database, "select * from test") + assert len(df.index) == len(df2.index) + + +@pytest.mark.parametrize("file_format", ["parquet", "csv"]) +def test_s3_write_append(bucket, database, file_format): + df = pd.read_csv("data_samples/micro.csv") awswrangler.s3.write( df=df, database=database, - path="s3://{}/small2/".format(bucket), + table="test", + path="s3://{}/test/".format(bucket), file_format=file_format, - preserve_index=True, + partition_cols=["name", "date"], + mode="overwrite", + ) + awswrangler.s3.write( + df=df, + database=database, + table="test", + path="s3://{}/test/".format(bucket), + file_format=file_format, + partition_cols=["name", "date"], mode="overwrite_partitions", ) awswrangler.s3.write( df=df, database=database, - path="s3://{}/small2/".format(bucket), + table="test", + path="s3://{}/test/".format(bucket), file_format=file_format, - preserve_index=True, + partition_cols=["name", "date"], mode="append", ) - df2 = awswrangler.athena.read(database, "select * from small2") + df2 = awswrangler.athena.read( + database, "select * from test", "s3://{}/athena/".format(bucket) + ) assert 2 * len(df.index) == len(df2.index) diff --git a/tox.ini b/tox.ini index bca061786..dd170ae80 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py{27,36,37}, flake8-py37 +envlist = py{27,36,37} [testenv] passenv = AWS_PROFILE AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY AWS_DEFAULT_REGION awswrangler_TEST_BUCKET awswrangler_TEST_DATABASE @@ -8,11 +8,3 @@ deps = boto3 commands= pytest - -[flake8-py37] -basepython = python3.7 -deps = - flake8 -commands= - flake8 --version - flake8 From 4d0e218e54c038fbc56447bb534b6b4189fa2336 Mon Sep 17 00:00:00 2001 From: Igor Tavares Date: Mon, 4 Mar 2019 09:27:34 -0300 Subject: [PATCH 3/3] remove some limitations from README --- README.md | 2 -- 1 file changed, 2 deletions(-) diff --git a/README.md b/README.md index af9e0b5e4..58c1896ff 100644 --- a/README.md +++ b/README.md @@ -87,10 +87,8 @@ AWS Data Wrangler project relies on others great initiatives: ## Known Limitations * By now only writes in Parquet and CSV file formats -* By now only reads through AWS Athena * By now there are not compression support * By now there are not nested type support -* By now AWS Lambda don't support write in overwrite mode ## Contributing