Skip to content

Commit

Permalink
Fix BaseSQLToGCSOperator approx_max_file_size_bytes (#25469)
Browse files Browse the repository at this point in the history
* Fix BaseSQLToGCSOperator approx_max_file_size_bytes

When using the parquet file_format, using `tmp_file_handle.tell()`
always points to the beginning of the file after the data has been saved
and therefore is not a good indicator for the files current size.

Save the current file pointer position and set the file pointer position
to `os.SEEK_END`. file_size is set to the new position, and the file
pointer's position goes back to the saved position.

Currently, after a parquet write operation the pointer is set to 0,
and therefore, simply executing `tmp_file_handle.tell()` is not
sufficient to determine the current size. This sequence is added to
allow file splitting when the export format is set to parquet.
  • Loading branch information
dclandau committed Aug 3, 2022
1 parent d004841 commit 803c0e2
Showing 1 changed file with 8 additions and 1 deletion.
9 changes: 8 additions & 1 deletion airflow/providers/google/cloud/transfers/sql_to_gcs.py
Expand Up @@ -198,6 +198,8 @@ def _write_local_data_files(self, cursor):
names in GCS, and values are file handles to local files that
contain the data for the GCS objects.
"""
import os

org_schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description))
schema = [column for column in org_schema if column not in self.exclude_columns]

Expand Down Expand Up @@ -250,7 +252,12 @@ def _write_local_data_files(self, cursor):
tmp_file_handle.write(b'\n')

# Stop if the file exceeds the file size limit.
if tmp_file_handle.tell() >= self.approx_max_file_size_bytes:
fppos = tmp_file_handle.tell()
tmp_file_handle.seek(0, os.SEEK_END)
file_size = tmp_file_handle.tell()
tmp_file_handle.seek(fppos, os.SEEK_SET)

if file_size >= self.approx_max_file_size_bytes:
file_no += 1

if self.export_format == 'parquet':
Expand Down

0 comments on commit 803c0e2

Please sign in to comment.