Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-17412: [C++] AsofJoin multiple keys and types #13880

Merged
merged 29 commits into from
Sep 8, 2022

Conversation

rtpsw
Copy link
Contributor

@rtpsw rtpsw commented Aug 15, 2022

@github-actions
Copy link

@rtpsw
Copy link
Contributor Author

rtpsw commented Aug 15, 2022

A MacOS job failed on timeout of the AsofJoin tester but it looks like it took much longer than expected. @icexelloss, any idea how this should be handled?

@rok
Copy link
Member

rok commented Aug 15, 2022

Thanks for doing this @rtpsw!
I've not read in depth yet, but was wondering if this should also cover date32/64 and perhaps time32//64?

@icexelloss
Copy link
Contributor

@rok In principle I think it makes sense to support date32/64 and perhaps time32//64? (Or really, any type that supports ordering and distance). In practice, we found the predominant use case is either TImestamp type or some other thing that gets encoded into int/long (for example, number of trading days since a certain point).

I would prefer not to add support for date32/64 and perhaps time32//64 just yet mostly because we are working towards a internal product release and would prefer to focus issues related to that if that is ok.

@icexelloss
Copy link
Contributor

A MacOS job failed on timeout of the AsofJoin tester but it looks like it took much longer than expected. @icexelloss, any idea how this should be handled?

I am not sure and might seek help from the maintainers.

@icexelloss
Copy link
Contributor

@rtpsw At high level the changes make sense to me. My two main questions are

  • I haven't seen much use of macros in this way in Acero code. I think those are probably fine would like to hear opinions from maintainers, perhaps @westonpace or @rok
  • I still don't full grasp the need for the KeyHasher class, some more elaboration will probably help to understand the justification. Is KeyHasher strictly better than the current hashing logic in hash join node? (if so, we can unify those two?) Or it is better because of the way asof join works?

@rtpsw
Copy link
Contributor Author

rtpsw commented Aug 18, 2022

I haven't seen much use of macros in this way in Acero code. I think those are probably fine would like to hear opinions from maintainers, perhaps @westonpace or @rok

Sure. If there is a cleaner way to code this, I'd be happy to do so.

I still don't full grasp the need for the KeyHasher class, some more elaboration will probably help to understand the justification. Is KeyHasher strictly better than the current hashing logic in hash join node? (if so, we can unify those two?) Or it is better because of the way asof join works?

I'll try to explain a bit more. One observation was that the Hashing32 logic in hash_join is not readily reusable in AsofJoinNode, and besides I wanted to use Hashing64. So, I needed to write some hashing code in AsofJoinNode, and I placed it in this KeyHasher class. Another observation was that the hash_join code recomputes certain objects, especially the column metadata is computed within Hashing32::HashBatch, which do not change from one batch to another having the same schema. The KeyHasher code just caches what (and hopefully all that) can be reused for hashing across these batches.

Presumably, the hash_join code could leverage similar caching. Surely, there is some good way to unify this and the proposed KeyHasher code. However, I think it would be way out of scope for this PR.

@rtpsw
Copy link
Contributor Author

rtpsw commented Sep 1, 2022

Benchmark results comparing the baseline code of this PR with its latest

$ archery benchmark run --benchmark-filter=AsOfJoinOverhead --output=baseline.json bc1a16cd0eceeffe67893a7e8000d2dd28dcf3f1
$ archery benchmark run --benchmark-filter=AsOfJoinOverhead --output=ARROW-17412.json 8bee2346717c217d2f1e233a1273577d0df687eb
$ archery benchmark diff ARROW-17412.json baseline.json

are as follows:

INFO:numexpr.utils:NumExpr defaulting to 8 threads.
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Non-regressions: (1)
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
                                                                                                                                             benchmark        baseline       contender  change %                                                                                                                                                                                                                                                                                                                                                                           counters
AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:32000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time 142.364 MiB/sec 159.425 MiB/sec    11.984 {'family_index': 0, 'per_family_instance_index': 14, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:32000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 7, 'input_rows_per_second': 867904.5056496062, 'maximum_peak_memory': 686301504.0}

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Regressions: (10)
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
                                                                                                                                              benchmark        baseline       contender  change %                                                                                                                                                                                                                                                                                                                                                                            counters
 AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:50/right_freq:400/right_cols:20/right_ids:500/real_time 323.553 MiB/sec 287.966 MiB/sec   -10.999  {'family_index': 0, 'per_family_instance_index': 11, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:50/right_freq:400/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 1, 'input_rows_per_second': 1972499.868090142, 'maximum_peak_memory': 686301504.0}
 AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:10/right_freq:400/right_cols:20/right_ids:500/real_time 344.387 MiB/sec 306.150 MiB/sec   -11.103 {'family_index': 0, 'per_family_instance_index': 10, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:10/right_freq:400/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3, 'input_rows_per_second': 2099512.4178041373, 'maximum_peak_memory': 148407104.0}
  AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time 349.482 MiB/sec 303.890 MiB/sec   -13.046   {'family_index': 0, 'per_family_instance_index': 9, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 19, 'input_rows_per_second': 2111813.458550057, 'maximum_peak_memory': 131071104.0}
AsOfJoinOverhead/left_freq:1000/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:1000/right_cols:20/right_ids:500/real_time 348.038 MiB/sec 301.696 MiB/sec   -13.315 {'family_index': 0, 'per_family_instance_index': 2, 'run_name': 'AsOfJoinOverhead/left_freq:1000/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:1000/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 44, 'input_rows_per_second': 2121769.8288677963, 'maximum_peak_memory': 54420864.0}
  AsOfJoinOverhead/left_freq:200/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:200/right_cols:20/right_ids:500/real_time 355.718 MiB/sec 306.236 MiB/sec   -13.911    {'family_index': 0, 'per_family_instance_index': 0, 'run_name': 'AsOfJoinOverhead/left_freq:200/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:200/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 8, 'input_rows_per_second': 2168591.3894433207, 'maximum_peak_memory': 54420864.0}
  AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:1000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time 315.636 MiB/sec 270.699 MiB/sec   -14.237 {'family_index': 0, 'per_family_instance_index': 12, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:1000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 16, 'input_rows_per_second': 1924236.4216933611, 'maximum_peak_memory': 686301504.0}
  AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:100/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:100/real_time 339.172 MiB/sec 288.468 MiB/sec   -14.949    {'family_index': 0, 'per_family_instance_index': 6, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:100/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:100/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 84, 'input_rows_per_second': 2067719.28348546, 'maximum_peak_memory': 131071104.0}
  AsOfJoinOverhead/left_freq:400/left_cols:10/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:10/right_ids:500/real_time 305.968 MiB/sec 256.047 MiB/sec   -16.316    {'family_index': 0, 'per_family_instance_index': 3, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:10/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:10/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 29, 'input_rows_per_second': 3487291.181883503, 'maximum_peak_memory': 54420864.0}
AsOfJoinOverhead/left_freq:400/left_cols:100/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:100/right_ids:500/real_time 404.655 MiB/sec 337.929 MiB/sec   -16.490  {'family_index': 0, 'per_family_instance_index': 5, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:100/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:100/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4, 'input_rows_per_second': 522550.5184384509, 'maximum_peak_memory': 131071104.0}
AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:1000/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:1000/real_time 351.580 MiB/sec 289.530 MiB/sec   -17.649 {'family_index': 0, 'per_family_instance_index': 8, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:1000/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:1000/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 9, 'input_rows_per_second': 2143364.0434252876, 'maximum_peak_memory': 131071104.0}

@icexelloss
Copy link
Contributor

icexelloss commented Sep 1, 2022

Thanks @rtpsw for the benchmark numbers. Do you think the regression are caused by key hashing?

@rtpsw
Copy link
Contributor Author

rtpsw commented Sep 1, 2022

Thanks @rtpsw for the benchmark numbers. Do you think the regression are caused by key hashing?

No, because the benchmark was written prior to this feature and must invoke the fast path. I suspect it is overhead in hot-spots such as GetLatestKey. I'm looking into it.

@rtpsw
Copy link
Contributor Author

rtpsw commented Sep 2, 2022

The latest commit removes the nullable_by_key option, at the cost of a rehash, as previously discussed. The benchmark reports similar performance, i.e., ~15% reduction compared to the baseline. So, this commit is good for reviewing. In particular, I think a ~15% loss of performance is not too high a price to pay for the pack of features this PR adds, and we could always revisit performance in a separate PR.

$ archery benchmark diff ARROW-17412.rehash.json baseline.json
INFO:numexpr.utils:NumExpr defaulting to 8 threads.
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Non-regressions: (1)
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
                                                                                                                                             benchmark        baseline       contender  change %                                                                                                                                                                                                                                                                                                                                                                           counters
AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:32000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time 142.364 MiB/sec 173.767 MiB/sec    22.058 {'family_index': 0, 'per_family_instance_index': 14, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:32000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 7, 'input_rows_per_second': 867904.5056496062, 'maximum_peak_memory': 686301504.0}

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Regressions: (10)
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
                                                                                                                                              benchmark        baseline       contender  change %                                                                                                                                                                                                                                                                                                                                                                            counters
 AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:10/right_freq:400/right_cols:20/right_ids:500/real_time 344.387 MiB/sec 303.362 MiB/sec   -11.913 {'family_index': 0, 'per_family_instance_index': 10, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:10/right_freq:400/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3, 'input_rows_per_second': 2099512.4178041373, 'maximum_peak_memory': 148407104.0}
AsOfJoinOverhead/left_freq:400/left_cols:100/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:100/right_ids:500/real_time 404.655 MiB/sec 353.132 MiB/sec   -12.733  {'family_index': 0, 'per_family_instance_index': 5, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:100/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:100/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4, 'input_rows_per_second': 522550.5184384509, 'maximum_peak_memory': 131071104.0}
  AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:1000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time 315.636 MiB/sec 274.485 MiB/sec   -13.037 {'family_index': 0, 'per_family_instance_index': 12, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:1000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 16, 'input_rows_per_second': 1924236.4216933611, 'maximum_peak_memory': 686301504.0}
 AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:50/right_freq:400/right_cols:20/right_ids:500/real_time 323.553 MiB/sec 278.992 MiB/sec   -13.773  {'family_index': 0, 'per_family_instance_index': 11, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:50/right_freq:400/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 1, 'input_rows_per_second': 1972499.868090142, 'maximum_peak_memory': 686301504.0}
  AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time 349.482 MiB/sec 301.325 MiB/sec   -13.779   {'family_index': 0, 'per_family_instance_index': 9, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 19, 'input_rows_per_second': 2111813.458550057, 'maximum_peak_memory': 131071104.0}
  AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:100/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:100/real_time 339.172 MiB/sec 291.276 MiB/sec   -14.121    {'family_index': 0, 'per_family_instance_index': 6, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:100/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:100/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 84, 'input_rows_per_second': 2067719.28348546, 'maximum_peak_memory': 131071104.0}
  AsOfJoinOverhead/left_freq:200/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:200/right_cols:20/right_ids:500/real_time 355.718 MiB/sec 299.681 MiB/sec   -15.753    {'family_index': 0, 'per_family_instance_index': 0, 'run_name': 'AsOfJoinOverhead/left_freq:200/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:200/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 8, 'input_rows_per_second': 2168591.3894433207, 'maximum_peak_memory': 54420864.0}
AsOfJoinOverhead/left_freq:1000/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:1000/right_cols:20/right_ids:500/real_time 348.038 MiB/sec 289.326 MiB/sec   -16.870 {'family_index': 0, 'per_family_instance_index': 2, 'run_name': 'AsOfJoinOverhead/left_freq:1000/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:1000/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 44, 'input_rows_per_second': 2121769.8288677963, 'maximum_peak_memory': 54420864.0}
AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:1000/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:1000/real_time 351.580 MiB/sec 292.049 MiB/sec   -16.933 {'family_index': 0, 'per_family_instance_index': 8, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:1000/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:1000/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 9, 'input_rows_per_second': 2143364.0434252876, 'maximum_peak_memory': 131071104.0}
  AsOfJoinOverhead/left_freq:400/left_cols:10/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:10/right_ids:500/real_time 305.968 MiB/sec 248.099 MiB/sec   -18.914    {'family_index': 0, 'per_family_instance_index': 3, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:10/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:10/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 29, 'input_rows_per_second': 3487291.181883503, 'maximum_peak_memory': 54420864.0}

@rtpsw
Copy link
Contributor Author

rtpsw commented Sep 2, 2022

For posterity, I also made an attempt to improve performance by refactoring AdvanceAndMemoize for the different latest-key paths, but this resulted in slightly decreased performance yet, so I did not commit it.

$ archery benchmark diff ARROW-17412.template-memoize.json baseline.json
INFO:numexpr.utils:NumExpr defaulting to 8 threads.
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Non-regressions: (1)
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
                                                                                                                                             benchmark        baseline       contender  change %                                                                                                                                                                                                                                                                                                                                                                           counters
AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:32000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time 142.364 MiB/sec 166.963 MiB/sec    17.279 {'family_index': 0, 'per_family_instance_index': 14, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:32000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 7, 'input_rows_per_second': 867904.5056496062, 'maximum_peak_memory': 686301504.0}

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Regressions: (10)
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
                                                                                                                                              benchmark        baseline       contender  change %                                                                                                                                                                                                                                                                                                                                                                            counters
 AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:10/right_freq:400/right_cols:20/right_ids:500/real_time 344.387 MiB/sec 300.780 MiB/sec   -12.662 {'family_index': 0, 'per_family_instance_index': 10, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:10/right_freq:400/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 3, 'input_rows_per_second': 2099512.4178041373, 'maximum_peak_memory': 148407104.0}
  AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:1000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time 315.636 MiB/sec 269.865 MiB/sec   -14.501 {'family_index': 0, 'per_family_instance_index': 12, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:1000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 16, 'input_rows_per_second': 1924236.4216933611, 'maximum_peak_memory': 686301504.0}
 AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:50/right_freq:400/right_cols:20/right_ids:500/real_time 323.553 MiB/sec 275.253 MiB/sec   -14.928  {'family_index': 0, 'per_family_instance_index': 11, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:50/right_freq:400/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 1, 'input_rows_per_second': 1972499.868090142, 'maximum_peak_memory': 686301504.0}
AsOfJoinOverhead/left_freq:400/left_cols:100/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:100/right_ids:500/real_time 404.655 MiB/sec 336.061 MiB/sec   -16.951  {'family_index': 0, 'per_family_instance_index': 5, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:100/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:100/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 4, 'input_rows_per_second': 522550.5184384509, 'maximum_peak_memory': 131071104.0}
  AsOfJoinOverhead/left_freq:200/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:200/right_cols:20/right_ids:500/real_time 355.718 MiB/sec 293.828 MiB/sec   -17.399    {'family_index': 0, 'per_family_instance_index': 0, 'run_name': 'AsOfJoinOverhead/left_freq:200/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:200/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 8, 'input_rows_per_second': 2168591.3894433207, 'maximum_peak_memory': 54420864.0}
  AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:100/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:100/real_time 339.172 MiB/sec 277.452 MiB/sec   -18.197    {'family_index': 0, 'per_family_instance_index': 6, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:100/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:100/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 84, 'input_rows_per_second': 2067719.28348546, 'maximum_peak_memory': 131071104.0}
AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:1000/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:1000/real_time 351.580 MiB/sec 285.147 MiB/sec   -18.896 {'family_index': 0, 'per_family_instance_index': 8, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:1000/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:1000/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 9, 'input_rows_per_second': 2143364.0434252876, 'maximum_peak_memory': 131071104.0}
  AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time 349.482 MiB/sec 282.849 MiB/sec   -19.066   {'family_index': 0, 'per_family_instance_index': 9, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 19, 'input_rows_per_second': 2111813.458550057, 'maximum_peak_memory': 131071104.0}
AsOfJoinOverhead/left_freq:1000/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:1000/right_cols:20/right_ids:500/real_time 348.038 MiB/sec 278.493 MiB/sec   -19.982 {'family_index': 0, 'per_family_instance_index': 2, 'run_name': 'AsOfJoinOverhead/left_freq:1000/left_cols:20/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:1000/right_cols:20/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 44, 'input_rows_per_second': 2121769.8288677963, 'maximum_peak_memory': 54420864.0}
  AsOfJoinOverhead/left_freq:400/left_cols:10/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:10/right_ids:500/real_time 305.968 MiB/sec 223.128 MiB/sec   -27.075    {'family_index': 0, 'per_family_instance_index': 3, 'run_name': 'AsOfJoinOverhead/left_freq:400/left_cols:10/left_ids:500/left_batch_size:4000/num_right_tables:1/right_freq:400/right_cols:10/right_ids:500/real_time', 'repetitions': 1, 'repetition_index': 0, 'threads': 1, 'iterations': 29, 'input_rows_per_second': 3487291.181883503, 'maximum_peak_memory': 54420864.0}

The code diff for this benchmark is:

diff --git a/cpp/src/arrow/compute/exec/asof_join_node.cc b/cpp/src/arrow/compute/exec/asof_join_node.cc
index d4e704ba6..eb13ba5a7 100644
--- a/cpp/src/arrow/compute/exec/asof_join_node.cc
+++ b/cpp/src/arrow/compute/exec/asof_join_node.cc
@@ -302,6 +302,10 @@ class InputState {
     if (key_col_index_.size() == 0) {
       return 0;
     }
+    return GetLatestKeyFastPath(batch, row);
+  }
+
+  inline ByType GetLatestKeyFastPath(const RecordBatch* batch, row_index_t row) const {
     auto data = batch->column_data(key_col_index_[0]);
     switch (key_type_id_[0]) {
       LATEST_VAL_CASE(INT8, key_value)
@@ -382,7 +386,8 @@ class InputState {
   // latest_time and latest_ref_row to the value that immediately pass the
   // specified timestamp.
   // Returns true if updates were made, false if not.
-  Result<bool> AdvanceAndMemoize(OnType ts) {
+  template <typename LatestKey>
+  Result<bool> AdvanceAndMemoize(OnType ts, LatestKey get_latest_key) {
     // Advance the right side row index until we reach the latest right row (for each key)
     // for the given left timestamp.
 
@@ -405,14 +410,33 @@ class InputState {
         must_hash_ = true;
         may_rehash_ = false;
         Rehash();
+       break;
       }
-      memo_.Store(rb, latest_ref_row_, latest_time, GetLatestKey());
+      memo_.Store(rb, latest_ref_row_, latest_time, get_latest_key());
       updated = true;
       ARROW_ASSIGN_OR_RAISE(advanced, Advance());
     } while (advanced);
     return updated;
   }
 
+  Result<bool> AdvanceAndMemoize(OnType ts) {
+    if (key_col_index_.size() == 0) {
+      return AdvanceAndMemoize(ts, []() { return 0; });
+    }
+    if (must_hash_) {
+      return AdvanceAndMemoize(ts, [this]() { return key_hasher_->HashesFor(queue_.UnsyncFront().get())[latest_ref_row_]; });
+    }
+    ARROW_ASSIGN_OR_RAISE(bool updated,
+                          AdvanceAndMemoize(ts, [this]() { return GetLatestKeyFastPath(queue_.UnsyncFront().get(), latest_ref_row_); }));
+    if (must_hash_) {
+      ARROW_ASSIGN_OR_RAISE(
+          bool updated_after_rehash,
+          AdvanceAndMemoize(ts, [this]() { return key_hasher_->HashesFor(queue_.UnsyncFront().get())[latest_ref_row_]; }));
+      return updated || updated_after_rehash;
+    }
+    return updated;
+  }
+
   void Rehash() {
     MemoStore new_memo;
     for (const auto& entry : memo_.entries_) {

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played around a bit and I'm not entirely sure about the StopProducing logic. StopProducing is typically a request from the user to cancel a plan. You should not need to call it when an error occurs.

Today, errors propagate to the sink, where the sink makes the single StopProducing call to abort the entire plan. The only other place that StopProducing should be called internally is something like a "LIMIT 10" (fetch) node.

In the future, @save-buffer has proposed some cleanup here (#13848) which will change how errors are handled so we won't have that propagation (the idea, I think, was that some node might want to "handle" an error yet I've not seen any real proposal for what this might be) and an error will just immediately stop the plan.

Also, you shouldn't be calling StopProducing when you've finished.

All of this is somewhat theoretical, we don't yet really have enough extensive testing of abort scenarios. Anyways, I've proposed an alternate handling here: 2a88750

I think the queue clearing and pushing false is probably a bit redundant since the destructor does this so that part isn't really needed. However, the change that makes it so StopProducing is never called by this node (and gets rid of the concern about marking finished twice) might be useful.

cpp/src/arrow/compute/exec/asof_join_node.cc Outdated Show resolved Hide resolved
Comment on lines 1092 to 1097
void StopProducing() override {
// avoid finishing twice, to prevent "Plan was destroyed before finishing" error
if (finished_.state() == FutureState::PENDING) {
finished_.MarkFinished();
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems suspicious. What is happening here that we need this check?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original code just called finished_.MarkFinished() and I observed a failure, with the documented message as well as the message "Future already marked finished" in a debugging session,, that I haven't figured out. I needed some way to get runs to complete and this led me to the code-change here to avoid finishing twice. I don't know enough about the stopping logic myself to say what is a clean solution here; probably both the original code and the code-change here are not as clean as we'd like.

->GetValues<int64_t>(1)[latest_ref_row_];
OnType GetLatestTime() const {
auto data = queue_.UnsyncFront()->column_data(time_col_index_);
switch (time_type_id_) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played around with this briefly, trying to come up with anything more clever, and failed. I had missed the distinction between time_value and key_value and thought you were just coercing to uint64_t in both cases. I think the switch case is growing on me :)

cpp/src/arrow/compute/exec/asof_join_node_test.cc Outdated Show resolved Hide resolved
@rtpsw
Copy link
Contributor Author

rtpsw commented Sep 2, 2022

I've proposed an alternate handling here: 2a88750

This proposed change passes my local tests, so I included it.

@rtpsw
Copy link
Contributor Author

rtpsw commented Sep 4, 2022

I recently read this key-map doc, and in particular this section in it. which describes a practical limit of ~16 million inserted keys. Thinking about similar implications here, the latest version of PR also has some practical limit on the number of inserted keys due to the potential for collisions; specifically, these collisions are currently not arbitrated, i.e., if a collision occurs then the result is undefined. The probability of a collision after inserting 16 million (2^{24}) keys (using the slow-path) can be estimated using the square-approximation of the birthday problem to be 2^{2*24}/2/2^{64} = 2^{-17}, which some applications (or people) won't see as negligible.

Given the above, I see several options we have:

  1. Leave the code as is, to be dealt with later, and just document the limitation for now.
  2. Add logic to arbitrate upon collision. This could involve reusing the Swiss table described in the doc. In any case, being an extra layer of logic at a hot-spot of the code, this is expected to have a noticeable performance cost.
  3. Switch to an internal key type of 128-bit, for which the collision probability 2^{2*24}/2/2^{128} = 2^{-81} is negligible for all but extreme applications that may not even be justified in data science. Incidentally, this would also allow supporting the decimal-128 type using the fast-path. The performance cost of this could likely be kept small using vectorization for computing a slightly-tweaked pair of 64-bit hashes.

@westonpace, WDYT?

@rtpsw
Copy link
Contributor Author

rtpsw commented Sep 7, 2022

ping @westonpace

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is good to go at this point. We can work out details and performance as we go but this is a substantial improvement to the capability and those can be done as follow-on.

@westonpace
Copy link
Member

Given the above, I see several options we have:

Let's do 1 for now. I think this PR is probably ready to merge and it makes sense to chase those things down as follow-ups. Otherwise it might be too much to keep track of. Long term it sounds like 3 would be simpler if we could show it worked well.

That being said, don't we only need to worry about collisions within the on field's tolerance? Or do we have to worry about a collision anywhere in the dataset?

@westonpace
Copy link
Member

@icexelloss are you good with merging this? Or are there still changes you would like to see?

@rtpsw
Copy link
Contributor Author

rtpsw commented Sep 7, 2022

Let's do 1 for now.

Will do.

That being said, don't we only need to worry about collisions within the on field's tolerance? Or do we have to worry about a collision anywhere in the dataset?

It is normal for an on-key value of one row to be equal to that of the previous one, and the AsofJoinNode code handles that fine. The issue happens with the by-key, when two rows have different by-key tuples but equal hashes for them, because there is currently no AsofJoinNode code to arbitrate this case. In contrast, the hash-join code uses a Swiss table (as in my item 2, and at some performance cost) and arbitrates this case by checking equality of the by-key tuples.

@icexelloss
Copy link
Contributor

@rtpsw I left some question since the code looks a bit from the last time I looked. Otherwise looks good to me.

Copy link
Contributor

@icexelloss icexelloss left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@rtpsw
Copy link
Contributor Author

rtpsw commented Sep 8, 2022

Thanks for approving. Give me some time to add the doc per item 1 before merging.

@rtpsw
Copy link
Contributor Author

rtpsw commented Sep 8, 2022

@westonpace westonpace merged commit 99b57e8 into apache:master Sep 8, 2022
@ursabot
Copy link

ursabot commented Sep 8, 2022

Benchmark runs are scheduled for baseline = 7475605 and contender = 99b57e8. 99b57e8 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Finished ⬇️0.85% ⬆️0.14%] test-mac-arm
[Failed ⬇️0.27% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️2.31% ⬆️0.11%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] 99b57e84 ec2-t3-xlarge-us-east-2
[Finished] 99b57e84 test-mac-arm
[Failed] 99b57e84 ursa-i9-9960x
[Finished] 99b57e84 ursa-thinkcentre-m75q
[Finished] 74756051 ec2-t3-xlarge-us-east-2
[Finished] 74756051 test-mac-arm
[Failed] 74756051 ursa-i9-9960x
[Finished] 74756051 ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@rtpsw rtpsw deleted the ARROW-17412 branch September 9, 2022 06:22
zagto pushed a commit to zagto/arrow that referenced this pull request Oct 7, 2022
See https://issues.apache.org/jira/browse/ARROW-17412

Lead-authored-by: Yaron Gvili <rtpsw@hotmail.com>
Co-authored-by: rtpsw <rtpsw@hotmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
fatemehp pushed a commit to fatemehp/arrow that referenced this pull request Oct 17, 2022
See https://issues.apache.org/jira/browse/ARROW-17412

Lead-authored-by: Yaron Gvili <rtpsw@hotmail.com>
Co-authored-by: rtpsw <rtpsw@hotmail.com>
Signed-off-by: Weston Pace <weston.pace@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants