-
Notifications
You must be signed in to change notification settings - Fork 16.6k
Description
Apache Airflow version:
1.10
Environment:
- Cloud provider or hardware configuration:
On docker container (puckel/docker-airflow:1.10.9)
- OS (e.g. from /etc/os-release):
Debian GNU/Linux 10 (buster)
- Kernel (e.g.
uname -a):
Linux 3bf5943b9fbd 5.4.0-1025-gcp #25-Ubuntu SMP Fri Sep 11 15:02:15 UTC 2020 x86_64 GNU/Linux
- Install tools:
Docker (puckel/docker-airflow:1.10.9)
What happened:
While using MySqlToGoogleCloudStorageOperator, some dumps of tables uploaded to GCS had much less records than ones in source table. For example:
- source table: 890,206 records
- dump on GCS: 302,014 records
The definition of operator is:
chunk_filesize = 256 * 1024 * 1024 # chunk size of files are 256MB
MySqlToGoogleCloudStorageOperator(
task_id=task_id,
mysql_conn_id=MYSQL_CONN,
google_cloud_storage_conn_id=GCS_CONN,
sql="SELECT * FROM some_table;",
bucket=BUCKET_NAME,
filename='file_name_in_bucket',
schema_filename="schema_filename_in_bucket"
approx_max_file_size_bytes=chunk_filesize,
dag=DAG,
)What you expected to happen:
I expected that complete number of records are uploaded to GCS.
I suppose that the problem is caused when records are dumped into multiple files while dumping records from tables. However, the operator overwrites the files while uploading to GCS. The files are splitted by approx_max_file_size_bytes (default: 1900000000bytes).
The temporary files are splitted by approx_max_file_size_bytes at the lines below.
https://github.com/apache/airflow/blob/v1-10-stable/airflow/contrib/operators/sql_to_gcs.py#L183-L196
Then, the multiple temporary files are passed to GcsHook.upload in for loop.
https://github.com/apache/airflow/blob/v1-10-stable/airflow/contrib/operators/sql_to_gcs.py#L265-L277
However, each temporary files are overwritten by give filename (object name) in each iteration.
https://github.com/apache/airflow/blob/master/airflow/providers/google/cloud/hooks/gcs.py#L371-L374
This operator should be expected to upload and/or merge multiple temporary files into one object file that includes complete records of source table.
These are temporary files while the operator is dumping records.
airflow airflow 11M Oct 12 13:13 tmp7uuj1skr
airflow airflow 256M Oct 12 13:13 tmpz3x8qtil
airflow@xxxx:/tmp$ cat tmpz3x8qtil | wc -l
674510
airflow@xxx:/tmp$ cat tmp7uuj1skr | wc -l
108770
The airflow log shows that the operator overwrites the object with each temporary file in each interation.
[2020-10-12 13:14:49,990] {{gcs_hook.py:224}} INFO - File /tmp/tmpz3x8qtil uploaded to masters/yyyy.json in <Bucket: xxxx> bucket
[2020-10-12 13:14:52,647] {{gcs_hook.py:224}} INFO - File /tmp/tmp7uuj1skr uploaded to masters/yyyy.json in <Bucket: xxxx> bucket