Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

File is overwritten with each sink operation #31

Open
pedro-muniz opened this issue Nov 23, 2023 · 7 comments
Open

File is overwritten with each sink operation #31

pedro-muniz opened this issue Nov 23, 2023 · 7 comments

Comments

@pedro-muniz
Copy link

pedro-muniz commented Nov 23, 2023

When using target-s3 with over 10k records, we've noticed that the only way to achieve the desired outcome is by using the "append_date_to_prefix_grain": "microsecond" option. This is due to the fact that each sink operation overwrites the generated file instead of appending the data. Consequently, if more than 10K data is sent by the tap to the target within a short time frame, some data may be lost during the file writing process. Could we consider modifying the command from "w" to "a"? Is there a particular reason for using the write operation? If so, it might be necessary to create a new file for each sink operation to prevent data loss.

The target-s3-parquet uses append mode to write data.
https://github.com/gupy-io/target-s3-parquet/blob/main/target_s3_parquet/sinks.py#L83C15-L83C25

@crowemi
Copy link
Owner

crowemi commented Nov 23, 2023

@pedro-muniz -- this sounds great, can you make this change (or make it configurable) and submit a PR?

@pedro-muniz
Copy link
Author

pedro-muniz commented Nov 24, 2023

smart_open plugin does not support append mode. I'll try to work on this.
https://github.com/piskvorky/smart_open/blob/2894d20048bd8ee56c0a89060413eb8041603c30/smart_open/s3.py#L260C22-L260C30

https://github.com/crowemi/target-s3/blob/main/target_s3/formats/format_base.py#L10

@rstml
Copy link
Contributor

rstml commented Nov 25, 2023

Bumped into the very same behaviour with Parquet files which has different implementation.

The workaround could be to keep writer open and keep adding, e.g.:

# this should be initialised somewhere outside
pqwriter = pq.ParquetWriter('sample.parquet', table.schema)

def _write(self, contents: str = None) -> None:
    pqwriter.write(records)

# cleanup
pqwriter.close()

However, each set of records may have different schema and I'm not sure how to overcome this.

@rstml
Copy link
Contributor

rstml commented Nov 25, 2023

I managed to resolve my issue described above by extending batch size and age. Please see #32 for details.

In my case, I use hourly batching and minute grain for filename. This combination solves the problem with overwriting for me.

@pedro-muniz
Copy link
Author

I managed to resolve my issue described above by extending batch size and age. Please see #32 for details.

In my case, I use hourly batching and minute grain for filename. This combination solves the problem with overwriting for me.

I think they are good parameters to have control over, but they don't resolve this issue, for example, if we set the granularity to microseconds, we also have a workaround for most cases without these variables.

S3 objects don't support append operation, so the best solution is to create a new file for each sink operation, but IMHO this behavior cannot be part of a parameter combination.

What do you think?

@ShahBinoy
Copy link

Another thing to add here is that in situations of large number of files, the S3 I/O has shown very good performance for an object size of ~100 MB. This provides a good blend of I/O latencies and record count, so rolling the file to new file based on a size is also a good option to add

@rstml
Copy link
Contributor

rstml commented Nov 30, 2023

Indeed, for Parquet files AWS recommends ~250MB per file. However, I didn't see any built-in mechanism in Meltano to flush based on byte size. Moreover, with compression enabled, it will be as hard to estimate output size when using byte size limit as it is with row size limit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants