Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][HUDI-686] Implement BloomIndexV2 that does not depend on memory caching #1469

Closed
wants to merge 1 commit into from
Closed

Conversation

lamberken
Copy link
Member

@lamberken lamberken commented Mar 31, 2020

What is the purpose of the pull request

Main goals here is to provide a much simpler index, without advanced optimizations like auto tuned parallelism/skew handling but a better out-of-experience for small workloads.

Brief change log

This is an initial version, will update it step by step.

Verify this pull request

will improve test.

Committer checklist

  • Has a corresponding JIRA in PR title & commit

  • Commit message is descriptive of the change

  • CI is green

  • Necessary doc changes done or have another open PR

  • For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lamber-ken : is this in a reviewable shape or you want me to wait.

@lamberken
Copy link
Member Author

@lamber-ken : is this in a reviewable shape or you want me to wait.

Thanks for reviewing this pr, I'll ping you later when finished : )

@lamberken
Copy link
Member Author

lamberken commented Apr 11, 2020

Hi @vinothchandar base on your branch, there are mainly the following updates:

  • Rebase branch
  • Add TestHoodieBloomIndexV2.java
  • Add DeltaTimer.java
  • Fix an implicit bug which causes input record duplication.

Bug fix
In the stage of double check(HoodieBloomIndexV2.LazyKeyChecker#computeNext),
when the target file doesn't contains the record key, should return Option.empty().

Previous

Option<HoodieRecord<T>> ret = fileIdOpt.map(fileId -> {

  ...

  Option<HoodieRecordLocation> location = currHandle.containsKey(record.getRecordKey())
    ? Option.of(new HoodieRecordLocation(currHandle.getBaseInstantTime(), currHandle.getFileId()))
    : Option.empty();
  return Option.of(getTaggedRecord(record, location));

}).orElse(Option.of(record));

Changes

Option<HoodieRecord<T>> recordOpt = fileIdOpt.map((Function<String, Option<HoodieRecord<T>>>) fileId -> {

  ...

  if (currHandle.containsKey(record.getRecordKey())) {
    HoodieRecordLocation recordLocation = new HoodieRecordLocation(currHandle.getBaseInstantTime(), currHandle.getFileId());
    return Option.of(getTaggedRecord(record, recordLocation));
  } else {
    return Option.empty();
  }

}).orElse(Option.of(record));

@lamberken
Copy link
Member Author

lamberken commented Apr 11, 2020

Also, index performance has been greatly improved, your idea and design is amazing 👍 👍 👍 @vinothchandar

I tested upsert 500,0000 records, bulk_insert first, then do upsert operation with the same dataset

  1. Download CSV data with 5M records
https://drive.google.com/open?id=1uwJ68_RrKMUTbEtsGl56_P5b_mNX3k2S
  1. Run demo command
export SPARK_HOME=/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
    --driver-memory 6G \
    --packages org.apache.hudi:hudi-spark-bundle_2.11:0.5.1-incubating,org.apache.spark:spark-avro_2.11:2.4.4 \
    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

export SPARK_HOME=/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
    --driver-memory 6G \
    --jars `ls packaging/hudi-spark-bundle/target/hudi-spark-bundle_*.*-*.*.*-SNAPSHOT.jar` \
    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

import org.apache.spark.sql.functions._
val tableName = "hudi_mor_table"
val basePath = "file:///tmp/hudi_mor_table"

var inputDF = spark.read.format("csv").option("header", "true").load("file:///work/hudi-debug/2.csv")
val hudiOptions = Map[String,String](
  "hoodie.insert.shuffle.parallelism" -> "10",
  "hoodie.upsert.shuffle.parallelism" -> "10",
  "hoodie.delete.shuffle.parallelism" -> "10",
  "hoodie.bulkinsert.shuffle.parallelism" -> "10",
  "hoodie.datasource.write.recordkey.field" -> "tds_cid",
  "hoodie.datasource.write.partitionpath.field" -> "hit_date",
  "hoodie.table.name" -> tableName,
  "hoodie.datasource.write.precombine.field" -> "hit_timestamp",
  "hoodie.datasource.write.operation" -> "upsert",
  "hoodie.memory.merge.max.size" -> "2004857600000",
  "hoodie.index.type" -> "BLOOM_V2",
  "hoodie.bloom.index.v2.buffer.max.size" -> "102000000000"
)

inputDF.write.format("org.apache.hudi").
  options(hudiOptions).
  mode("Append").
  save(basePath)

spark.read.format("org.apache.hudi").load(basePath + "/2020-03-19/*").count();

Performance comparison

HoodieBloomIndex: cost about 20min

image

HoodieBloomIndexV2: cost about 3min

image

@lamberken lamberken changed the title [WIP] [HUDI-686] Implement BloomIndexV2 that does not depend on memory caching [HUDI-686] Implement BloomIndexV2 that does not depend on memory caching Apr 11, 2020
@apache apache deleted a comment from codecov-io Apr 11, 2020
@apache apache deleted a comment from codecov-io Apr 11, 2020
@vinothchandar
Copy link
Member

@lamber-ken thanks for the update.. from the UI, it seems like the difference was only in countByKey/WorkloadProfile which has nothing to do with indexing?

@lamberken
Copy link
Member Author

lamberken commented Apr 12, 2020

@lamber-ken thanks for the update.. from the UI, it seems like the difference was only in countByKey/WorkloadProfile which has nothing to do with indexing?

The above shows the spark job page, a job is a sequence of stages which triggered by an action such as .count(), collect(), read() and etc.

@vinothchandar
Copy link
Member

understood :).. but I am saying the difference you see is outside the indexing piece.. Can you post the stages UI?

@nsivabalan
Copy link
Contributor

Once taggedRecords are returned from index.tagLocation(), next step is to build the workload which actually triggers the execution of actual tagging. Hence the first job in workloadprofile is actually referring to index. Let me know if my understanding is wrong.

@lamberken
Copy link
Member Author

understood :).. but I am saying the difference you see is outside the indexing piece.. Can you post the stages UI?

Got, let me show you the spark stage page, amazing 👍.
From the ui, we can see that time-consuming, shuffle write, shuffle read are better.

BTW, env: 4core, 6GB driver, local mode, don't forget "hoodie.index.type" -> "BLOOM_V2"

HoodieBloomIndex: cost about 20min

image

HoodieBloomIndexV2: cost about 3min

image

@lamberken
Copy link
Member Author

Once taggedRecords are returned from index.tagLocation(), next step is to build the workload which actually triggers the execution of actual tagging. Hence the first job in workloadprofile is actually referring to index. Let me know if my understanding is wrong.

You're right. 💯

Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay ... my bad.. misread .. So it seems like the main thing is that we shuffle 9GB with current bloom index.. on a workload where it explodes the input RDD such that each 5M records are checked against all files you have... Just curious how many files did your bulk_insert create..

@nsivabalan can review this and land.. :) I will make a final pass once he approves..

@lamber-ken given that all this is fresh in your mind, can you help review the SimpleIndex and once you approve that, I will do one place..

(We can scale better this way)...

@lamberken
Copy link
Member Author

lamberken commented Apr 12, 2020

how many files did your bulk_insert create.

Actually, I did upsert operation twice directly, IMO bulk_insert will get the same result,

There are about 41 parquet files, total size: 418MB in my env.

dcadmin-imac:hudi_mor_table dcadmin$ ll -lh /tmp/hudi_mor_table/2020-03-19 
total 844928
-rw-r--r--  1 dcadmin  wheel   9.7M Apr 12 10:58 05484246-8655-4045-b31b-45f6ff4b5765-0_37-47-674_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 06272cc1-337c-4ea1-9e5b-aa1b1d6b479f-0_17-47-654_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 0c9875b3-6ea3-4dfa-bc1d-ecc83de8440e-0_30-47-667_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 0fc36101-c1e9-41e2-b356-d680a17dc468-0_22-47-659_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.7M Apr 12 10:57 1329dfae-a324-438d-9606-6deb9d65fdb7-0_0-47-637_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:57 17417d4f-13d2-4d3f-94e6-c4f713575b58-0_5-47-642_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 1c5199eb-04b7-48c8-ba64-b66c41fe7f88-0_35-47-672_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 1f1f8a06-b6bb-4491-8f1e-c2203106aacd-0_23-47-660_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 230859c6-3ffe-4efc-813e-079bea14abe9-0_8-47-645_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 252c73ae-6912-481d-b0d2-356f9796b073-0_38-47-675_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.7M Apr 12 10:57 3173646f-47d2-4b70-bed5-c867045b6da7-0_3-47-640_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 341a530d-d7c9-4ab3-bbca-38cc276b4c17-0_32-47-669_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 352facd1-cdd1-4ea9-b38a-c36af74674c1-0_40-47-677_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 40cdb651-29b4-4b16-b8e7-26470f98c966-0_21-47-658_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.7M Apr 12 10:58 429d3ace-459a-4e55-903a-3a7cc6b35511-0_15-47-652_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 47e21491-f4cd-4f1d-82a5-ae8516846f47-0_19-47-656_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.7M Apr 12 10:57 4853078d-d120-49e5-abc1-622e6963324b-0_1-47-638_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.7M Apr 12 10:58 4c1d4a60-b2dd-4cdd-b98d-bbb216a838cb-0_31-47-668_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 4f49aaf4-6a74-425c-b5a8-29b3f311caae-0_13-47-650_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.7M Apr 12 10:58 5361ec26-6fce-4d95-96c4-e2811135d9d2-0_26-47-663_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.7M Apr 12 10:58 5815a649-0b09-4bce-aaff-82ac8428c1a7-0_14-47-651_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 60672e15-9477-42aa-aa79-d3a340feb9d6-0_20-47-657_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.7M Apr 12 10:58 740775dd-01d7-4147-a00a-805862599eed-0_24-47-661_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.7M Apr 12 10:58 78ff0cb1-6c8a-4c4b-bf47-01f1c56d5fd5-0_27-47-664_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 8710662b-66a7-4b0d-a818-e5cd6584a14f-0_34-47-671_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.7M Apr 12 10:58 8e21464e-cadb-4ab3-8ef6-198b138e2f0d-0_18-47-655_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 9b6accdc-e3f9-43b8-9d43-9e2306cafbab-0_12-47-649_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 a4bcc9e9-29c5-470b-8a8c-539e4ffa8f51-0_16-47-653_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 aa2dcc86-79f0-479f-92a4-886566e75219-0_10-47-647_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.7M Apr 12 10:58 b699bc5f-9786-44dd-90f9-57f9e68f85a2-0_25-47-662_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:57 b8941692-e240-4dc4-a47a-9adcedb86db6-0_4-47-641_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 b9a70bca-7fe4-4fa3-83e7-eb4a063ff56b-0_36-47-673_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.7M Apr 12 10:58 c2d799e5-edf0-4c3b-9e6d-1473489489e9-0_39-47-676_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:57 db1d8409-75d3-45ef-8f57-3b1040a473ad-0_6-47-643_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 db7cd3a8-cd27-436a-b2df-f2e4b6ae0e79-0_28-47-665_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.7M Apr 12 10:58 e3f46510-e062-4f33-8e6a-e206a7c5c80e-0_33-47-670_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.7M Apr 12 10:58 efa864fe-d4d8-4c66-89b2-a4d63cc2c6fc-0_9-47-646_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.7M Apr 12 10:57 f12955ac-c9e1-4c1b-ac39-4d42fe3aa269-0_2-47-639_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 f20c9f7c-ce4f-42bb-98d2-2495b613dbb1-0_29-47-666_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.7M Apr 12 10:57 f2e5d9d8-e858-437d-9694-09227287a8b0-0_7-47-644_20200412105332.parquet
-rw-r--r--  1 dcadmin  wheel   9.6M Apr 12 10:58 f300c12f-a0ee-4fb2-97b9-3da92922a4ff-0_11-47-648_20200412105332.parquet

can you help review the SimpleIndex

Sure, will review that pr, : )

@apache apache deleted a comment from codecov-io Apr 12, 2020
@apache apache deleted a comment from codecov-io Apr 12, 2020
@lamberken lamberken mentioned this pull request Apr 12, 2020
5 tasks
Copy link
Member

@vinothchandar vinothchandar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments on getting this over the finish line

@@ -69,4 +76,13 @@ public long endTimer() {
}
return timeInfoDeque.pop().stop();
}

public long deltaTime() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

endTimer() already gives you a delta time right? why do we need this new method? We should consolidate this more.. right now, this feels like two timer classes within one..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

endTimer can only support 1 v 1, what we need here is delta characteristics.
if use endTimer, we must create many instances, only one instance is enough when we use delta.

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, looks like HoodieTimer already gives you the delta. So, don't really understand why we need the changes.

T0: HoodieTimer timer = new HoodieTimer();
T1: timer.startTimer();
T4: timer.endTimer(); // T4 - T1
T4: timer.startTimer();
T6: timer.endTimer(); // T6 - T4
T8: timer.startTimer();
    T9: timer.startTimer();
    T12: timer.endTimer();  // T12 - T9
T15: timer.endTimer(); // T15 - t8

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @nsivabalan, as follown, we need call startTimer() and endTimer().

// HoodieTimer 
T0: HoodieTimer timer = new HoodieTimer(); timer.startTimer();
T1: timer.endTimer(); timer.startTimer();
T2: timer.endTimer(); timer.startTimer();
T3: timer.endTimer(); timer.startTimer();  
T4: timer.endTimer(); timer.startTimer();  

// Delta
T0: HoodieTimer timer = new HoodieTimer();
T1: timer.deltaTime();
T2: timer.deltaTime();
T3: timer.deltaTime();
T4: timer.deltaTime();

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's just reuse the timer class as is and move on? :) I'd actually encourage explicit start and end calls so its clear what is being measured? In any case, the current way of overloading HoodieTimer does not seem maintainable IMO

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @vinothchandar @nsivabalan, you're all think it's unnecessary, willing to take your suggestion. But from my side, it's better to use deltaTime(), will address it : )

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@Override
public JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
throw new UnsupportedOperationException("Not yet implemented");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to implement this as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

*/
@Override
public boolean isGlobal() {
return false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also a HoodieGlobalBloomIndexV2 ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@Override
protected void end() {
totalTimeMs = hoodieTimer.endTimer();
String rangeCheckInfo = "LazyRangeBloomChecker: "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added these for debugging purposes.. We need not really have these checked in per se?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, please remove an debugging statements and any other metrics that was added for perf tuning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, we need this. It'll help us to debug index performance issues if someone met in the future, and print only once per partition.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metrics are not easy to use, each partition just only print one log, keep it as it is :)

import org.apache.hudi.index.bloom.BloomIndexFileInfo;
import org.apache.hudi.table.HoodieTable;

public class HoodieBloomRangeInfoHandle<T extends HoodieRecordPayload> extends HoodieReadHandle<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add unit tests for these two handles?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, TestHoodieBloomRangeInfoHandle

@Override
protected void end() {
totalTimeMs = hoodieTimer.endTimer();
String rangeCheckInfo = "LazyRangeBloomChecker: "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, please remove an debugging statements and any other metrics that was added for perf tuning.

@@ -69,4 +76,13 @@ public long endTimer() {
}
return timeInfoDeque.pop().stop();
}

public long deltaTime() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, looks like HoodieTimer already gives you the delta. So, don't really understand why we need the changes.

T0: HoodieTimer timer = new HoodieTimer();
T1: timer.startTimer();
T4: timer.endTimer(); // T4 - T1
T4: timer.startTimer();
T6: timer.endTimer(); // T6 - T4
T8: timer.startTimer();
    T9: timer.startTimer();
    T12: timer.endTimer();  // T12 - T9
T15: timer.endTimer(); // T15 - t8

import static org.junit.Assert.fail;

@RunWith(Parameterized.class)
public class TestHoodieBloomIndexV2 extends HoodieClientTestHarness {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

once you have fixed fetchRecordLocation, do write tests for the same.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@vinothchandar
Copy link
Member

@lamber-ken we are hoping to get this into the next release as well . any etas on final review? :)

@apache apache deleted a comment from codecov-io Apr 22, 2020
@vinothchandar
Copy link
Member

@lamber-ken the fetchRecordLocation() API and global indexing is not implemented..Do you plan to work on them as well?

@lamberken
Copy link
Member Author

lamberken commented Apr 28, 2020

@vinothchandar I think the work can be finished this week, will ping you when finished : )

the fetchRecordLocation() API and global indexing is not implemented..Do you plan to work on them as well?

work status
fetchRecordLocation() API working
unit tests working
global simple index https://issues.apache.org/jira/browse/HUDI-787

@nsivabalan
Copy link
Contributor

@vinothchandar I think the work can be finished this week, will ping you when finished : )

the fetchRecordLocation() API and global indexing is not implemented..Do you plan to work on them as well?

work status
fetchRecordLocation() API working
unit tests working
global simple index https://issues.apache.org/jira/browse/HUDI-787

@lamber-ken : Just to confirm we are in same page. We wanted a Global index version for HoodieBloomIndexV2. The global simple index you quoted is the global version for SimpleIndex.

@lamberken
Copy link
Member Author

work status
fetchRecordLocation() API working
unit tests working
global simple index https://issues.apache.org/jira/browse/HUDI-787

@lamberken lamberken closed this May 1, 2020
@lamberken lamberken reopened this May 1, 2020
@codecov-io
Copy link

Codecov Report

Merging #1469 into master will decrease coverage by 0.18%.
The diff coverage is 62.41%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #1469      +/-   ##
============================================
- Coverage     71.77%   71.58%   -0.19%     
  Complexity     1087     1087              
============================================
  Files           385      389       +4     
  Lines         16575    16813     +238     
  Branches       1668     1685      +17     
============================================
+ Hits          11896    12035     +139     
- Misses         3951     4041      +90     
- Partials        728      737       +9     
Impacted Files Coverage Δ Complexity Δ
...java/org/apache/hudi/config/HoodieIndexConfig.java 61.53% <0.00%> (-1.96%) 3.00 <0.00> (ø)
...che/hudi/index/bloom/HoodieGlobalBloomIndexV2.java 0.00% <0.00%> (ø) 0.00 <0.00> (?)
...c/main/java/org/apache/hudi/index/HoodieIndex.java 78.94% <33.33%> (-9.29%) 3.00 <0.00> (ø)
...org/apache/hudi/io/HoodieBloomRangeInfoHandle.java 81.81% <81.81%> (ø) 0.00 <0.00> (?)
...rg/apache/hudi/index/bloom/HoodieBloomIndexV2.java 87.78% <87.78%> (ø) 0.00 <0.00> (?)
.../apache/hudi/io/HoodieKeyAndBloomLookupHandle.java 90.00% <90.00%> (ø) 0.00 <0.00> (?)
...java/org/apache/hudi/config/HoodieWriteConfig.java 84.97% <100.00%> (+0.13%) 47.00 <0.00> (ø)
...udi/index/bloom/HoodieBloomIndexCheckFunction.java 88.23% <100.00%> (ø) 1.00 <0.00> (ø)
...java/org/apache/hudi/io/HoodieKeyLookupHandle.java 100.00% <100.00%> (+10.00%) 0.00 <0.00> (ø)
... and 5 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update d54b4b8...139bc0c. Read the comment docs.

@lamberken
Copy link
Member Author

lamberken commented May 9, 2020

Hi @vinothchandar @nsivabalan thanks for you review, all review comments are addressed.

SYNC

task status
fix an implicit bug which causes input record repeat done and junit covered
implement fetchRecordLocation done and junit covered
junit tests for TestHoodieBloomIndexV2 done
junit test for TestHoodieBloomRangeInfoHandle done
revert HoodieTimer done
global HoodieBloomIndexV2 done and junit covered

Test


// BLOOM, GLOBAL_BLOOM, BLOOM_V2, GLOBAL_BLOOM_V2

export SPARK_HOME=/work/BigData/install/spark/spark-2.4.4-bin-hadoop2.7
${SPARK_HOME}/bin/spark-shell \
    --jars `ls packaging/hudi-spark-bundle/target/hudi-spark-bundle_*.*-*.*.*-SNAPSHOT.jar` \
    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

import org.apache.spark.sql.functions._

val tableName = "hudi_mor_table"
val basePath = "file:///tmp/hudi_mor_tablen"

val hudiOptions = Map[String,String](
  "hoodie.upsert.shuffle.parallelism" -> "10",
  "hoodie.datasource.write.recordkey.field" -> "name",
  "hoodie.datasource.write.partitionpath.field" -> "location",
  "hoodie.table.name" -> tableName,
  "hoodie.datasource.write.precombine.field" -> "ts",
  "hoodie.index.type" -> "BLOOM_V2"
)

var datas = List(
    """{ "name": "kenken1", "ts": 1574297893836, "age": 123, "location": "2019-03-01"}""",
    """{ "name": "kenken1", "ts": 1574297893836, "age": 123, "location": "2019-03-02"}"""
)
val inputDF = spark.read.json(spark.sparkContext.parallelize(datas, 2))

inputDF.write.format("org.apache.hudi").
  options(hudiOptions).
  mode("Overwrite").
  save(basePath)

spark.read.format("org.apache.hudi").load(basePath + "/*/*").show();


// update
// update

var datas = List(
    """{ "name": "kenken1", "ts": 1574297893838, "age": 100, "location": "2019-03-01"}"""
)
val inputDF = spark.read.json(spark.sparkContext.parallelize(datas, 2))

inputDF.write.format("org.apache.hudi").
  options(hudiOptions).
  mode("Append").
  save(basePath)

spark.read.format("org.apache.hudi").load(basePath + "/*/*").show();

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yet to review Global index. Will continue once my comments are clarified/addressed.

@@ -68,6 +68,13 @@
public static final String BLOOM_INDEX_KEYS_PER_BUCKET_PROP = "hoodie.bloom.index.keys.per.bucket";
public static final String DEFAULT_BLOOM_INDEX_KEYS_PER_BUCKET = "10000000";

public static final String BLOOM_INDEX_V2_PARALLELISM_PROP = "hoodie.bloom.index.v2.parallelism";
public static final String DEFAULT_BLOOM_V2_INDEX_PARALLELISM = "80";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may I know why 80 rather than 100 or some round number ?

@@ -441,6 +441,14 @@ public boolean getBloomIndexUpdatePartitionPath() {
return Boolean.parseBoolean(props.getProperty(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH));
}

public int getBloomIndexV2Parallelism() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guess we also need to add all these props as methods to HoodieIndexConfig (builder pattern)

* optional is empty, then the key is not found.
*/
@Override
public JavaPairRDD<HoodieKey, Option<Pair<String, String>>> fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not required in this patch. But I feel like we should have a class for PartitionPathandFileIdPair. Having it as Pair<String, String> sometimes is confusing or I had to trace back to see whether left is partitionpath or the right entry.


/**
* Checks if the given [Keys] exists in the hoodie table and returns [Key, Option[partitionPath, fileID]] If the
* optional is empty, then the key is not found.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor. Its Option.empty and not Optional.empty

super(config);
}

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

jus curious why no java docs. You have added java docs for fetchRecordLocations though.

.sortBy((record) -> String.format("%s-%s", record.getPartitionPath(), record.getRecordKey()),
true, config.getBloomIndexV2Parallelism())
.mapPartitions((itr) -> new LazyRangeAndBloomChecker(itr, hoodieTable)).flatMap(List::iterator)
.sortBy(Pair::getRight, true, config.getBloomIndexV2Parallelism())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.


// <Partition path, file name>
hoodieTimer.startTimer();
Set<Pair<String, String>> matchingFiles = indexFileFilter
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct me if I am wrong. in global index, there are chances that incoming record key is updated to a different partition path compared to existing record. For example, existing record in hoodie could be <rec_key1, partitionPath1>, and incoming record could be <rec_key1, partitionPath2>. We have to find such locations as well in global.
if we call indexFileFilter only for the partitionpath and recordkey from incoming record, then we might miss this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess, you missed the config.getBloomIndexUpdatePartitionPath(). Check what HoodieGlobalBloomIndex does for this config. if need be, I can explain too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bottom line, we need to consider all partitions, not just the ones where incoming records are tagged with.

BloomFilter filter = fileIDToBloomFilter.get(partitionFileIdPair.getRight());
if (filter.mightContain(record.getRecordKey())) {
totalMatches++;
candidates.add(Pair.of(updatePartition(record, partitionFileIdPair.getLeft()), partitionFileIdPair.getRight()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, this is confusing to me as to why we update the path. Once you fix my previous comment, I will take a look at this code block.

return new HoodieRecord<>(hoodieKey, record.getData());
}

private void populateFileIDs() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any difference between regular bloom IndexV2 and global version for this method?

}
}

private void populateRangeAndBloomFilters() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any difference between regular bloom IndexV2 and global version for this method? If most of the code base is similar, why can't we create a separate file for LazyRangeAndBloomChecker and introduce a global version(by extending from the same) and override only those required.

@lamberken
Copy link
Member Author

Fixing conflicts, wait

@codecov-commenter
Copy link

codecov-commenter commented May 27, 2020

Codecov Report

Merging #1469 into master will decrease coverage by 0.27%.
The diff coverage is 0.00%.

Impacted file tree graph

@@             Coverage Diff              @@
##             master    #1469      +/-   ##
============================================
- Coverage     18.21%   17.93%   -0.28%     
  Complexity      856      856              
============================================
  Files           348      352       +4     
  Lines         15332    15570     +238     
  Branches       1523     1540      +17     
============================================
  Hits           2792     2792              
- Misses        12183    12419     +236     
- Partials        357      359       +2     
Impacted Files Coverage Δ Complexity Δ
...java/org/apache/hudi/config/HoodieIndexConfig.java 37.50% <0.00%> (-0.97%) 3.00 <0.00> (ø)
...java/org/apache/hudi/config/HoodieWriteConfig.java 39.76% <0.00%> (-0.32%) 48.00 <0.00> (ø)
...udi/index/bloom/HoodieBloomIndexCheckFunction.java 11.76% <0.00%> (ø) 1.00 <0.00> (ø)
...rg/apache/hudi/index/bloom/HoodieBloomIndexV2.java 0.00% <0.00%> (ø) 0.00 <0.00> (?)
...che/hudi/index/bloom/HoodieGlobalBloomIndexV2.java 0.00% <0.00%> (ø) 0.00 <0.00> (?)
...org/apache/hudi/io/HoodieBloomRangeInfoHandle.java 0.00% <0.00%> (ø) 0.00 <0.00> (?)
.../apache/hudi/io/HoodieKeyAndBloomLookupHandle.java 0.00% <0.00%> (ø) 0.00 <0.00> (?)
...java/org/apache/hudi/io/HoodieKeyLookupHandle.java 0.00% <0.00%> (ø) 0.00 <0.00> (ø)
... and 2 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update bde7a70...c14db4c. Read the comment docs.

@nsivabalan
Copy link
Contributor

@lamber-ken : LMK once the patch is ready to be reviewed again.

@lamberken
Copy link
Member Author

@lamber-ken : LMK once the patch is ready to be reviewed again.

Big thanks for reviewing this pr very much. Sorry for the delay, I'm working on something else, when I'm ready, will ping you 👍

@nsivabalan
Copy link
Contributor

nsivabalan commented Jul 3, 2020

@lamber-ken @vinothchandar : I took a stab at the global bloom index V2. I don't have permissions to lamberken's repo and hence couldn't update his branch. Here is my branch and commit link. And here is the link to the GlobalBloomIndexV2. Please check it out. Have added and fixed tests for the same.

Also, I have two questions/clarifications.
1: with regular bloom index V2, why do we need to sort based on both partition path and record keys. Why not just partition path suffice?
2: Correct me if I am wrong. But there is one corner case where both bloom index V2 and global version needs to be fixed. But it might incur an additional left outer join. So, wanted to confirm if its feasible.
Let's say for an incoming record, there is 1 or more files returned after range and bloom look up. But in key checker, lets say none of the files had the record key. In this scenario, the output of tag location may not have the record only.

If this is a feasible case, then the fix I could think of is.
Do not return empty candidates from LazyRangeAndBloomChecker. So that result after LazyKeyChecker will not contain such records. With this fix, LazyKeyChecker will return only existing records in storage. Once we have the result from LazyKeyChecker, we might have to do left outer join with incoming records to find those non existent records and add them to final tagged record list.

Similar fix needs to be done with global version as well.

.mapPartitions((itr) -> new LazyRangeAndBloomChecker(itr, hoodieTable)).flatMap(List::iterator)
.sortBy(Pair::getRight, true, config.getBloomIndexV2Parallelism())
.mapPartitions((itr) -> new LazyKeyChecker(itr, hoodieTable))
.filter(Option::isPresent)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinothchandar : guess there could a bug here. If for a record, few files were matched from range and bloom lookup, but in LazyKeyChecker none of the files had the record, current code may not have this record in the final JavaRDD<HoodieRecord> returned. But we have to return this record w/ empty cur location.

@vinothchandar vinothchandar changed the title [HUDI-686] Implement BloomIndexV2 that does not depend on memory caching [WIP][HUDI-686] Implement BloomIndexV2 that does not depend on memory caching Oct 4, 2020
@vinothchandar vinothchandar added the pr:wip Work in Progress/PRs label Oct 4, 2020
@vinothchandar
Copy link
Member

We are plannign to reimplement indexing as a new "HoodieMetadataIndex", now that the metadata table is there. Closing this. but I expect we would use/share some ideas from this PR there

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pr:wip Work in Progress/PRs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants