Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] S3 slow file listing causes Hudi read performance. #1829

Closed
zuyanton opened this issue Jul 14, 2020 · 15 comments
Closed

[SUPPORT] S3 slow file listing causes Hudi read performance. #1829

zuyanton opened this issue Jul 14, 2020 · 15 comments

Comments

@zuyanton
Copy link

Hudi MoR reading performance gets slower on tables with many (1000+) partitions stored in S3. When running simple spark.sql("select * from table_ro).count command, we observe in spark UI that first 2.5 minutes no spark jobs gets scheduled and the load on cluster during that period is almost non existing.
select star ro

When looking into logs to figure out what is going on during that period we observe that first two and a half minutes Hudi is busy running HoodieParquetInputFormat.listStatus code link. I placed timer logs lines around various parts of that function and was able to narrow down to this line

FileStatus[] fileStatuses = super.listStatus(job);
this line execution takes over 2/3 of all time.
If I understand correctly what this line does it lists all files in a single partition.
Looks like this "overhead" is linearly depends on number of partitions as increasing number of partitions to 2000 almost doubles the overhead and cluster just runs HoodieParquetInputFormat.listStatus before starting executing any spark jobs.

To Reproduce
see code snippet bellow

  • Hudi version : master branch

  • Spark version : 2.4.4

  • Hive version : 2.3.6

  • Hadoop version : 2.8.5

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : no

Additional context

    import org.apache.spark.sql.functions._
    import org.apache.hudi.hive.MultiPartKeysValueExtractor
    import org.apache.hudi.QuickstartUtils._
    import scala.collection.JavaConversions._
    import org.apache.spark.sql.SaveMode
    import org.apache.hudi.DataSourceReadOptions._
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.DataSourceWriteOptions
    import org.apache.hudi.config.HoodieWriteConfig._
    import org.apache.hudi.config.HoodieWriteConfig
    import org.apache.hudi.keygen.ComplexKeyGenerator
    import org.apache.hadoop.hive.conf.HiveConf
    val hiveConf = new HiveConf()
    val hiveMetastoreURI = hiveConf.get("hive.metastore.uris").replaceAll("thrift://", "")
    val hiveServer2URI = hiveMetastoreURI.substring(0, hiveMetastoreURI.lastIndexOf(":"))
    var hudiOptions = Map[String,String](
      HoodieWriteConfig.TABLE_NAME → "testTable1",
      "hoodie.consistency.check.enabled"->"true",
      "hoodie.compact.inline.max.delta.commits"->"100",
      "hoodie.compact.inline"->"true",
      DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "MERGE_ON_READ",
      DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "pk",
      DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> classOf[ComplexKeyGenerator].getName,
      DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY ->"partition",
      DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "sort_key",
      DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY → "true",
      DataSourceWriteOptions.HIVE_TABLE_OPT_KEY → "testTable1",
      DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY → "partition",
      DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY → classOf[MultiPartKeysValueExtractor].getName,
      DataSourceWriteOptions.HIVE_URL_OPT_KEY ->s"jdbc:hive2://$hiveServer2URI:10000"
    )

    spark.sql("drop table if exists testTable1_ro")
    spark.sql("drop table if exists testTable1_rt")
    var seq = Seq((1, 2, 3))
    for (i<- 2 to 1000) {
      seq = seq :+ (i, i , 1)
    }
    var df = seq.toDF("pk", "partition", "sort_key")
    //create table
    df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testBucket/test/hudi/zuyanton/1/testTable1")
    //update table couple times
    df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testBucket/test/hudi/zuyanton/1/testTable1")
    df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://testBucket/test/hudi/zuyanton/1/testTable1")
    
    //read table 
    spark.sql("select * from testTable_ro").count
@vinothchandar
Copy link
Member

vinothchandar commented Jul 15, 2020

@zuyanton this seems like a general issue with FileInputFormat

 int numThreads = job
        .getInt(
            org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS,
            org.apache.hadoop.mapreduce.lib.input.FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);

can you try adding spark.hadoop.mapreduce.input.fileinputformat.list-status.num-threads=8 or something to sparkConf and see if helps? (default inside Hadoop is 1)

cc @n3nash IIRC you mentioned a similar approach done at uber?

@zuyanton
Copy link
Author

@vinothchandar it didnt have any effect and I believe it shouldn't, since from what it looks like that parameter only gives improvement if you are trying to list statuses of multiple dirs https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java#L216 where is in our case its always one dir - the root location of single partition.

@umehrot2
Copy link
Contributor

I think the finding by @zuyanton seems correct. Increasing the num-threads will not help because we just set the basepath of the table in the inputpath of jobconf. I believe we will have a good speed up, if instead of basePath we set all the partition paths in the inputpath of jobconf, and then increase the num-threads.

Another thing we can potentially explore is using Spark to perform this listing parallely on the cluster. But this seems like something we should target for 0.6.0 release with Blocker priority.

@bvaradar
Copy link
Contributor

@zuyanton : This sounds like a general Spark/HMS query integration issue. Are we seeing similar behavior when running the same query over non-hudi table ?

@zuyanton
Copy link
Author

@bvaradar we dont see similar issue with regular non hudi tables saved to s3 in parquet format. for regular tables "overhead" is the same and under one minute despite the number of partitions. Regular tables with 20k partitions as well as 100 partition take the same time to "load" before spark starts running its jobs where is hudi table on s3 becomes slow with 5k+ partitions. Although we use EMR 5.28 which comes with EMRFS s3 optimized committer enabled in spark by default ,so I assume whatever bottlenecks s3 has, are addressed in the committer.

@bvaradar
Copy link
Contributor

Thanks @zuyanton for the updates. IIUC, S3 optimized committer was for optimizing writes reducing the renames done. I might be wrong but I am generally curious on EMR optimizations for Spark. @umehrot2 : We can look at the option you mentioned regarding setting the partition paths and then increasing the num-threads. Is this one of the optimizations done internally within EMR spark ?

@umehrot2
Copy link
Contributor

umehrot2 commented Jul 18, 2020

@zuyanton In your test with regular parquet tables you are probably not setting the following property in the spark config spark.sql.hive.convertMetastoreParquet=false. When you set this property to ```false`` only then will Spark use Parquet InputFormat as well as its listing code. Otherwise by default Spark uses its native listing (parallelized over the cluster) and parquet readers which are supposed to be faster.

However the way Hudi works is it uses InputFormat implementation. Thus for a fair comparison when you test regular parquet with Spark you should set spark.sql.hive.convertMetastoreParquet=false and I think you will observe quite similar behavior then as to what you are seeing. Would you mind trying that out once ?

But @bvaradar irrespective I think for Hudi we should always compare our performance against standard spark performance (native listing and reading) and not the performance of spark when it is made to go through InputFormat. So we need to get this fixed either ways if we have to be comparable to spark parquet performance which uses parallelized listing over the cluster.

@umehrot2
Copy link
Contributor

@bvaradar @zuyanton EMR S3 optimized committer only helps avoid renames. Again that does not come into effect for Hudi because of the way Hudi datasource is implemented. Hudi datasource is not an extension of FileFormat datasource of Spark. It has its own commit mechanism and writing logic and does not use Sparks commit/write process. So EMR optimized committer unfortunately does not come into effect for Hudi workloads.

Irrespective the committer would not have any effect on this listing performance.

@zuyanton
Copy link
Author

@umehrot2 you are right , with convertMetastoreParquet set to false , when querying regular parquet table with 20k partitions I can see similar behavior of spark not running any jobs for first 4 minutes.

@rubenssoto
Copy link

rubenssoto commented Jan 25, 2021

@umehrot2 @bvaradar

Do you know if this problem will be solved in 0.7.0? I'm querying some big datasets with more than 500 partitions and I had the same problem.

2 Minutes doing nothing.
Captura de Tela 2021-01-24 às 23 18 58

Thank you

@vinothchandar
Copy link
Member

@rubenssoto for some code paths, it will be. if you turn on hoodie.metadata.enable=true on the writing, you should see improvements. Hive queries should see improvement, SparkSQL with --conf spark.sql.hive.convertMetastoreParquet=false and --conf "spark.hadoop.hoodie.metadata.enable=true" should see improvement. Spark datasource path will see modest gains for now, integration coming quickly in 0.8.0. Will include it in release highlights

@rubenssoto
Copy link

rubenssoto commented Jan 25, 2021

@vinothchandar

Thank you so much for your answer.
When do you plan to release this version? I will try to make some workarounds until then.

Is this configuration right?

{ "conf": {
            "spark.jars.packages": "org.apache.spark:spark-avro_2.12:2.4.4",
            "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
            "spark.jars": "s3://dl/lib/hudi-spark-bundle_2.12-0.8.0-SNAPSHOT.jar",
            "spark.sql.hive.convertMetastoreParquet": "false",
            "spark.hadoop.hoodie.metadata.enable": "true"}
}

I made these 2 queries:

spark.read.format('hudi').load('s3://ze-data-lake/temp/order_test').count()

%%sql
select count('*') from raw_courier_api.order_test

On the pyspark query spark creates a job with 143 tasks, after 10 seconds of listing the count was fast, but in the spark sql query spark creates a job with 2000 tasks and was very slow, is it a Hudi or spark issue?

SPARK SQL
Captura de Tela 2021-01-25 às 10 45 16

PYSPARK
Captura de Tela 2021-01-25 às 10 47 13

Another problem that I got it, my table has 36 million rows, with that config shows only 4 million.
Thank you so much!

@vinothchandar
Copy link
Member

0.7.0 is being voted on right now. Hopefully today.

So the spark.read.format('hudi') route (spark datasource path) does not go through Hive, so those configs may not help at all. Between pySpark and spark datasource in scala, there should be no difference. So not sure whats going on :/

@vinothchandar
Copy link
Member

0.7.0 is out!

@n3nash
Copy link
Contributor

n3nash commented Jun 16, 2021

With 0.7.0, one can set hoodie.metadata.enable to true to eliminate issues due to file listings. Closing this ticket now.

@n3nash n3nash closed this as completed Jun 16, 2021
GI Tracker Board automation moved this from Blocked On User to Done Jun 16, 2021
@n3nash n3nash added the archive label Jun 16, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Development

No branches or pull requests

6 participants