Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Upsert for S3 Hudi dataset with large partitions takes a lot of time in writing #1371

Closed
abhaygupta3390 opened this issue Mar 4, 2020 · 7 comments
Assignees

Comments

@abhaygupta3390
Copy link

abhaygupta3390 commented Mar 4, 2020

Describe the problem you faced

I have a batch stream processing spark app in which I take a bunch of upserts and write the result at a s3 location in hudi format. The application is running on an EMR cluster.
The dataset has 3 partition columns and the overall cardinality of the partitions is roughly 200 * 2 * 12.
After the commit and clean is done, the method createRelation is invoked which takes roughly 9-10 mins and is increasing as the cardinality of the partitions is increasing

To Reproduce

Steps to reproduce the behavior:

  1. Update a few records for a hudi dataset at an S3 location which has a lot of partitions

Expected behavior

Since I am writing the DataFrame in append mode to the path, I expect the write to be complete at the point when the commit happens

Environment Description

  • Hudi version : 0.5.1-incubating

  • Spark version : 2.4.4

  • Hive version : Hive 2.3.6-amzn-1

  • Hadoop version : Amazon 2.8.5

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : no

Additional context

EMR version: emr-5.29.0

Stacktrace
Info logs for one iteration of org.apache.hudi.hadoop.HoodieROTablePathFilter#accept in org.apache.spark.sql.execution.datasources.InMemoryFileIndex#listLeafFiles:

20/02/20 13:53:27 INFO HoodieTableMetaClient: Loading HoodieTableMetaClient from <s3Path>
20/02/20 13:53:27 INFO FSUtils: Hadoop Configuration: fs.defaultFS: [hdfs:/<emr_node>:8020], Config:[Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml, hdfs-default.xml, hdfs-site.xml, __spark_hadoop_conf__.xml, file:/mnt1/yarn/usercache/hadoop/appcache/<app_id>/<container_id>/hive-site.xml], FileSystem: [S3AFileSystem{uri=<s3_bucket>, workingDir=<s3_bucket>/user/hadoop, inputPolicy=normal, partSize=104857600, enableMultiObjectsDelete=true, maxKeys=5000, readAhead=65536, blockSize=33554432, multiPartThreshold=2147483647, boundedExecutor=BlockingThreadPoolExecutorService{SemaphoredDelegatingExecutor{permitCount=25, available=25, waiting=0}, activeCount=0}, unboundedExecutor=java.util.concurrent.ThreadPoolExecutor@7427ca41[Running, pool size = 9, active threads = 0, queued tasks = 0, completed tasks = 9], statistics {2524201 bytes read, 932558 bytes written, 1969 read ops, 0 large read ops, 224 write ops}, metrics {{Context=S3AFileSystem} {FileSystemId=b1dbe2e3-50de-4c75-a12e-4d4d5059b9a7-sense-datawarehouse} {fsURI=s3a://sense-datawarehouse} {files_created=8} {files_copied=0} {files_copied_bytes=0} {files_deleted=9} {fake_directories_deleted=24} {directories_created=1} {directories_deleted=0} {ignored_errors=0} {op_copy_from_local_file=0} {op_exists=316} {op_get_file_status=1371} {op_glob_status=2} {op_is_directory=222} {op_is_file=0} {op_list_files=166} {op_list_located_status=0} {op_list_status=431} {op_mkdirs=0} {op_rename=0} {object_copy_requests=0} {object_delete_requests=11} {object_list_requests=1686} {object_continue_list_requests=0} {object_metadata_requests=2460} {object_multipart_aborted=0} {object_put_bytes=932558} {object_put_requests=9} {object_put_requests_completed=9} {stream_write_failures=0} {stream_write_block_uploads=0} {stream_write_block_uploads_committed=0} {stream_write_block_uploads_aborted=0} {stream_write_total_time=0} {stream_write_total_data=0} {object_put_requests_active=0} {object_put_bytes_pending=0} {stream_write_block_uploads_active=0} {stream_write_block_uploads_pending=0} {stream_write_block_uploads_data_pending=0} {stream_read_fully_operations=0} {stream_opened=194} {stream_bytes_skipped_on_seek=0} {stream_closed=194} {stream_bytes_backwards_on_seek=0} {stream_bytes_read=2524201} {stream_read_operations_incomplete=476} {stream_bytes_discarded_in_abort=0} {stream_close_operations=194} {stream_read_operations=2766} {stream_aborted=0} {stream_forward_seek_operations=0} {stream_backward_seek_operations=0} {stream_seek_operations=0} {stream_bytes_read_in_close=0} {stream_read_exceptions=0} }}]
20/02/20 13:53:27 INFO HoodieTableConfig: Loading table properties from s3a://<path>/<table_name>/.hoodie/hoodie.properties
20/02/20 13:53:27 INFO HoodieTableMetaClient: Finished Loading Table of type COPY_ON_WRITE(version=1) from s3a://<path>/<table_name>
20/02/20 13:53:27 INFO HoodieActiveTimeline: Loaded instants [[20200220113826__clean__COMPLETED], [20200220113826__commit__COMPLETED], [20200220114307__clean__COMPLETED], [20200220114307__commit__COMPLETED], [20200220114742__clean__COMPLETED], [20200220114742__commit__COMPLETED], [20200220115229__clean__COMPLETED], [20200220115229__commit__COMPLETED], [20200220115716__clean__COMPLETED], [20200220115716__commit__COMPLETED], [20200220120158__clean__COMPLETED], [20200220120158__commit__COMPLETED], [20200220120630__clean__COMPLETED], [20200220120630__commit__COMPLETED], [20200220121120__clean__COMPLETED], [20200220121120__commit__COMPLETED], [20200220121605__clean__COMPLETED], [20200220121605__commit__COMPLETED], [20200220122055__clean__COMPLETED], [20200220122055__commit__COMPLETED], [20200220122552__clean__COMPLETED], [20200220122552__commit__COMPLETED], [20200220123052__clean__COMPLETED], [20200220123052__commit__COMPLETED], [20200220123556__clean__COMPLETED], [20200220123556__commit__COMPLETED], [20200220124053__clean__COMPLETED], [20200220124053__commit__COMPLETED], [20200220124553__clean__COMPLETED], [20200220124553__commit__COMPLETED], [20200220125055__clean__COMPLETED], [20200220125055__commit__COMPLETED], [20200220125600__clean__COMPLETED], [20200220125600__commit__COMPLETED], [20200220130650__clean__COMPLETED], [20200220130650__commit__COMPLETED], [20200220131306__clean__COMPLETED], [20200220131306__commit__COMPLETED], [20200220131919__clean__COMPLETED], [20200220131919__commit__COMPLETED], [20200220132515__clean__COMPLETED], [20200220132515__commit__COMPLETED], [20200220134152__clean__COMPLETED], [20200220134152__commit__COMPLETED], [20200220135148__clean__COMPLETED], [20200220135148__commit__COMPLETED]]
20/02/20 13:53:27 INFO HoodieTableFileSystemView: Adding file-groups for partition :2775011610391456677/2019/11, #FileGroups=1
20/02/20 13:53:27 INFO AbstractTableFileSystemView: addFilesToView: NumFiles=3, FileGroupsCreationTime=0, StoreTimeTaken=0
20/02/20 13:53:27 INFO HoodieROTablePathFilter: Based on hoodie metadata from base path: s3a:/<path>/<table_name>, caching 1 files under s3a://<path>/<table_name>/<part_col1>/<part_col2>/<part_col3>

Below is the screenshot of the sparkUI depicting the time gap which represents the time taken between the above step and processing the next batch of updates:

Screen Shot 2020-03-04 at 5 11 54 PM

@vinothchandar
Copy link
Member

cc @umehrot2 is this similar to what you were mentioning as well?

@abhaygupta3390 IIUC you are facing this as a part of a streaming write? using the structured streaming sink? if you can share a code snippet , it will help reproduce this

@umehrot2
Copy link
Contributor

umehrot2 commented Mar 5, 2020

@vinothchandar this is exactly what I was talking about. This easily becomes a bottleneck as the driver spends time filtering out the files that it gets from InMemoryFileIndex as filtering is not distributed. My suggestion here is, at the time of ingestion we just return an EmptyRelation once HoodieSparkSqlWriter has done its job, because write now we end up creating a relation even at write time using parquet data source which is really not necessary for our use-case. I have been testing this internally for the past week.

@umehrot2
Copy link
Contributor

umehrot2 commented Mar 5, 2020

Created a Jira for this issue https://issues.apache.org/jira/browse/HUDI-656

@abhaygupta3390
Copy link
Author

@vinothchandar No, I am using the df write api. Sample code snippet:

writeDF.write.format("org.apache.hudi")
      .options(getQuickstartWriteConfigs)
      .option(PRECOMBINE_FIELD_OPT_KEY, "ts_ms")
      .option(RECORDKEY_FIELD_OPT_KEY, "id")
      .option(TABLE_NAME, <tableName>)
      .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[ComplexKeyGenerator].getName)
      .option(PARTITIONPATH_FIELD_OPT_KEY, "partCol1,partCol2,partCol3")
      .option(HIVE_SYNC_ENABLED_OPT_KEY, "true")
      .option(HIVE_DATABASE_OPT_KEY, <hiveDB>)
      .option(HIVE_TABLE_OPT_KEY, <hiveTableName>)
      .option(HIVE_URL_OPT_KEY, <hiveUrl>)
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName)
      .option(HIVE_PARTITION_FIELDS_OPT_KEY, "partCol1,partCol2,partCol3")
      .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
      .mode(Append)
      .save(<s3Location>)

@vinothchandar
Copy link
Member

@abhaygupta3390 we will get this fixed in the next release.. @umehrot2 do you have a patch to share already?

@bvaradar
Copy link
Contributor

bvaradar commented Mar 6, 2020

@umehrot2 : Assigning this github issue to you. Corresponding Jira : https://jira.apache.org/jira/browse/HUDI-672 If there is already a tracking Jira, please feel free to close this one.

@bvaradar
Copy link
Contributor

#1394 merged. Resolving this ticket.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants