Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VectorHasher's value ID caching logic makes certain queries unnecessarily slow #10057

Closed
zeodtr opened this issue Jun 5, 2024 · 15 comments
Closed
Labels
bug Something isn't working performance

Comments

@zeodtr
Copy link

zeodtr commented Jun 5, 2024

Description

Hi,

(I believe this issue is more of a performance bug report rather than an enhancement suggestion. However, since it is not a functional bug, I have chosen to classify it under the 'enhancement' category.)

I am building an OLAP DBMS system that uses Velox for the execution engine. In this issue, two executor processes exchange intermediate results. The query, which has been slightly modified to hide the real table name, is as follows:

SELECT ip, name
FROM ( 
	SELECT name
	FROM t1 
	WHERE tm_col >= timestamp '2022-07-11 03:00:00' 
	AND tm_col < timestamp '2022-07-11 03:10:00' 
) JOIN ( 
	SELECT ip, name AS name2
	FROM t2 
  	WHERE tm_col >= timestamp '2022-09-30 05:30:00' 
	AND tm_col < timestamp '2022-09-30 06:00:00' 
) ON name = name2
GROUP BY ip, name
;

The subqueries' resulting record count is 2,763,772 and 5,600 respectively.

The query became very slow after applying #7404 to my local Velox repository.

I investigated it and found the source code line that causes the problem is in ExchangeQueue.cpp.
The line is as follows:

 if (pageBytes > 0 && pageBytes + queue_.front()->size() > maxBytes) {

The code link is as follows:

if (pageBytes > 0 && pageBytes + queue_.front()->size() > maxBytes) {

After I changed the code to the following, the query became significantly faster (from 176 secs to 43 secs).

if (pageBytes > 0 && pageBytes + queue_.front()->size() > 1) {

(I will call this modification as M1.)
The modified code effectively disables what #7404 tries to achieve. Strange.

So, I've run valgrind with callgrind on the original code and the resulting performance graph of one of the two executor processes was as follows:

원복_out 224335

memset() was taking a big portion of runtime. It is called by std::fill() by VectorHasher::makeValueIdsDecoded().

The code link is as follows:

std::fill(cachedHashes_.begin(), cachedHashes_.end(), 0);

M1's valgrind result graph is as follows:

임시_out 271447

memset()'s portion is now negligible.

Upon further investigation, I found that the DecodedVector's base vector size becomes disproportionately large relative to the SelectivityVector's size in the unmodified code. For example, the DecodedVector's base vector size is 1,066,500, while the SelectivityVector's size is 1024. For M1, the DecodedVector's base vector size is 11,900, while the SelectivityVector's size is 1024.

As a result, the cost of clearing the value ID cache outweighs the benefits of caching.

When I removed the caching logic, the query performed as quickly as M1.

Since I am working with a modified version of Velox's source code and not the current official source, I cannot be completely certain that this issue is present in the current version. However, I believe there is a high probability that it exists.

It would be nice if VectorHasher could be made more intelligent to avoid this kind of issue. (For example, disable the caching logic if DecodedVector's size is too big for the SelectivityVector's size.)

Thank you.

@zeodtr zeodtr added the enhancement New feature or request label Jun 5, 2024
@mbasmanova
Copy link
Contributor

@zeodtr Thank you for reporting this issue with so much detail.

I found that the DecodedVector's base vector size becomes disproportionately large relative to the SelectivityVector's size in the unmodified code. For example, the DecodedVector's base vector size is 1,066,500, while the SelectivityVector's size is 1024.

This sounds similar to #9843

CC: @Yuhta @xiaoxmeng

@Yuhta
Copy link
Contributor

Yuhta commented Jun 5, 2024

If VectorHasher is the place we spend most of the time, this can be fixed with a similar approach as #7150. I can prepare something so you can try.

@Yuhta Yuhta added performance bug Something isn't working and removed enhancement New feature or request labels Jun 5, 2024
@mbasmanova
Copy link
Contributor

@Yuhta Jimmy, let's first find out which code produced such a dictionary vector. It might be better to change that code to avoid producing such vectors (similar to Unnest).

@Yuhta
Copy link
Contributor

Yuhta commented Jun 6, 2024

@mbasmanova Agree that we should find out the code producing this dictionary and selectivity in this case (out of exchange), because peeling can be inefficient as well on the data.

But as general case I see this can happen legitimately, for example whenever we use dictionary to filter rows (remaining filter / join filter). As in VectorHasher there is no downside to handle this case separately and it is much easier to implement than encoding peeling, we should fix it in VectorHasher in addition to investigating how the data is generated.

Yuhta added a commit to Yuhta/velox that referenced this issue Jun 6, 2024
…hen it is not beneficial

Summary:
Similar to facebookincubator#7150, when we only need to make IDs for a small number of rows fewer than dictionary values, using cache is slower and we should just compute the IDs directly.

Related issue: facebookincubator#10057

Differential Revision: D58215380
Yuhta added a commit to Yuhta/velox that referenced this issue Jun 6, 2024
…hen it is not beneficial (facebookincubator#10084)

Summary:

Similar to facebookincubator#7150, when we only need to make IDs for a small number of rows fewer than dictionary values, using cache is slower and we should just compute the IDs directly.

Related issue: facebookincubator#10057

Differential Revision: D58215380
@Yuhta
Copy link
Contributor

Yuhta commented Jun 6, 2024

@zeodtr #10084 is a fix for VectorHasher you can try it. But as @mbasmanova pointed out this is probably not the only place it is inefficient. Can you

  • Try to diagnose where the dictionary vectors are generated?
  • Or you can try create a unit test to reproduce it using PlanBuilder (example: rui-mo@e05691a)
  • Or just print out the plan node tree in LocalPlanner and paste here

@zeodtr
Copy link
Author

zeodtr commented Jun 6, 2024

@Yuhta Thank you very much for your fix. Since I am out of the office this week, I will try it next week.

I have this for the plan printout for now (I have changed names and deleted the column names in ROW<>). Two executor processes were given the same plans. They each have their portion of the table data. It's a shared-nothing configuration.

velox plan[0] :
 -- PartitionedOutput[partitionFunction: HASH(0) with 2 partitions] -> n2_2:VARCHAR
  -- Project[expressions: (n2_2:VARCHAR, ROW["n0_1"])] -> n2_2:VARCHAR
    -- Filter[expression: and(and(not(is_null(ROW["n0_1"])),gte(ROW["n0_0"],"2022-07-11T03:00:00.000000000")),lt(ROW["n0_0"],"2022-07-11T03:10:00.000000000"))] -> n0_0:TIMESTAMP, n0_1:VARCHAR
      -- TableScan[table: hive_table, data columns: ROW<...>] -> n0_0:TIMESTAMP, n0_1:VARCHAR

velox plan[1] :
 -- PartitionedOutput[partitionFunction: HASH(1) with 2 partitions] -> n2_3:VARCHAR, n2_4:VARCHAR
  -- Project[expressions: (n2_3:VARCHAR, ROW["n0_0"]), (n2_4:VARCHAR, ROW["n0_2"])] -> n2_3:VARCHAR, n2_4:VARCHAR
    -- Filter[expression: and(and(not(is_null(ROW["n0_2"])),gte(ROW["n0_1"],"2022-09-30T05:30:00.000000000")),lt(ROW["n0_1"],"2022-09-30T06:00:00.000000000"))] -> n0_0:VARCHAR, n0_1:TIMESTAMP, n0_2:VARCHAR
      -- TableScan[table: hive_table, data columns: ROW<...>] -> n0_0:VARCHAR, n0_1:TIMESTAMP, n0_2:VARCHAR

velox plan[2] :
 -- PartitionedOutput[partitionFunction: HASH(0, 1) with 2 partitions] -> n3_3:VARCHAR, n3_4:VARCHAR
  -- Aggregation[SINGLE [n3_3, n3_4] ] -> n3_3:VARCHAR, n3_4:VARCHAR
    -- Project[expressions: (n3_3:VARCHAR, ROW["t2.1.ip#0"]), (n3_4:VARCHAR, ROW["t1.0.name#0"])] -> n3_3:VARCHAR, n3_4:VARCHAR
      -- HashJoin[INNER t1.0.name#0=t2.1.name2#1] -> "t1.0.name#0":VARCHAR, "t2.1.ip#0":VARCHAR, "t2.1.name2#1":VARCHAR
        -- Exchange[] -> "t1.0.name#0":VARCHAR
        -- Exchange[] -> "t2.1.ip#0":VARCHAR, "t2.1.name2#1":VARCHAR

velox plan[3] :
 -- Project[expressions: (ip:VARCHAR, "#anonymous#.2.ip#0"), (name:VARCHAR, "#anonymous#.2.name#1")] -> ip:VARCHAR, name:VARCHAR
  -- Aggregation[SINGLE [#anonymous#.2.ip#0, #anonymous#.2.name#1] ] -> "#anonymous#.2.ip#0":VARCHAR, "#anonymous#.2.name#1":VARCHAR
    -- Exchange[] -> "#anonymous#.2.ip#0":VARCHAR, "#anonymous#.2.name#1":VARCHAR

Also, I will try to diagnose the dictionary vector. However, given my current knowledge of Velox's internals, I may have difficulty locating the place.

Creating a unit test case might not be possible due to the large amount of data in the tables.

Thank you.

@Yuhta
Copy link
Contributor

Yuhta commented Jun 6, 2024

@zeodtr Thanks for the detail. Just a question, which plan tree are you observing the slowness? Is it plan[2] or plan[3]? Both are taking data from exchange.

Yuhta added a commit to Yuhta/velox that referenced this issue Jun 6, 2024
…hen it is not beneficial (facebookincubator#10084)

Summary:

Similar to facebookincubator#7150, when we only need to make IDs for a small number of rows fewer than dictionary values, using cache is slower and we should just compute the IDs directly.

Related issue: facebookincubator#10057

Reviewed By: mbasmanova

Differential Revision: D58215380
Yuhta added a commit to Yuhta/velox that referenced this issue Jun 6, 2024
…hen it is not beneficial (facebookincubator#10084)

Summary:

Similar to facebookincubator#7150, when we only need to make IDs for a small number of rows fewer than dictionary values, using cache is slower and we should just compute the IDs directly.

Related issue: facebookincubator#10057

Reviewed By: mbasmanova

Differential Revision: D58215380
facebook-github-bot pushed a commit that referenced this issue Jun 6, 2024
…hen it is not beneficial (#10084)

Summary:
Pull Request resolved: #10084

Similar to #7150, when we only need to make IDs for a small number of rows fewer than dictionary values, using cache is slower and we should just compute the IDs directly.

Related issue: #10057

Reviewed By: mbasmanova

Differential Revision: D58215380

fbshipit-source-id: 50c904f06f4614525d289d36c792bfbf04ed8f6e
@zeodtr
Copy link
Author

zeodtr commented Jun 6, 2024

@Yuhta I'm not entirely sure, but I guess it's plan[2].
The task for plan[2] ran for more than 2 minutes logging the following debugging messages many times. (I've added code to print the message each second.)

{Driver: running Exchange(0)<xdb_cpu_executor_task_3:0.0 0x7f91a9896700> HashProbe(1)<xdb_cpu_executor_task_3:0.0 0x7f91a9845a00> FilterProject(2)<xdb_cpu_executor_task_3:0.0 0x7f91a9835c00> Aggregation(3)<xdb_cpu_executor_task_3:0.0 0x7f91a9834000> PartitionedOutput(4)<xdb_cpu_executor_task_3:0.0 0x7f91a9865900> CallbackSink(5)<xdb_cpu_executor_task_3:0.0 0x7f91a9896a80> {OpCallStatus: executing HashProbe::getOutput for 0ms}}

{Driver: running Exchange(0)<xdb_cpu_executor_task_3:0.0 0x7f91a9896700> HashProbe(1)<xdb_cpu_executor_task_3:0.0 0x7f91a9845a00> FilterProject(2)<xdb_cpu_executor_task_3:0.0 0x7f91a9835c00> Aggregation(3)<xdb_cpu_executor_task_3:0.0 0x7f91a9834000> PartitionedOutput(4)<xdb_cpu_executor_task_3:0.0 0x7f91a9865900> CallbackSink(5)<xdb_cpu_executor_task_3:0.0 0x7f91a9896a80> {OpCallStatus: executing Aggregation::addInput for 0ms}}

Each executor process spent its most time on different plans, (maybe) one process was busy executing plan[2] while the other process was waiting for the results of its peer.

Thank you.

@zeodtr
Copy link
Author

zeodtr commented Jun 7, 2024

@Yuhta For your information, the version of Velox for my executor process is taken from the upstream code as of January 2024, specifically from the commit titled "Rename getDataType and getDataChannels funcs in HiveDataSink (#8404)" on January 17, 2024.
Therefore, it's possible that the issue related to plan[2] has already been resolved in the latest upstream version.

@mbasmanova
Copy link
Contributor

Plan 2 has Aggregation and Join. It seems likely that "bad" dictionary was produced by the Join and is causing trouble during Aggregation.

Joe-Abraham pushed a commit to Joe-Abraham/velox that referenced this issue Jun 7, 2024
…hen it is not beneficial (facebookincubator#10084)

Summary:
Pull Request resolved: facebookincubator#10084

Similar to facebookincubator#7150, when we only need to make IDs for a small number of rows fewer than dictionary values, using cache is slower and we should just compute the IDs directly.

Related issue: facebookincubator#10057

Reviewed By: mbasmanova

Differential Revision: D58215380

fbshipit-source-id: 50c904f06f4614525d289d36c792bfbf04ed8f6e
Joe-Abraham pushed a commit to Joe-Abraham/velox that referenced this issue Jun 7, 2024
…hen it is not beneficial (facebookincubator#10084)

Summary:
Pull Request resolved: facebookincubator#10084

Similar to facebookincubator#7150, when we only need to make IDs for a small number of rows fewer than dictionary values, using cache is slower and we should just compute the IDs directly.

Related issue: facebookincubator#10057

Reviewed By: mbasmanova

Differential Revision: D58215380

fbshipit-source-id: 50c904f06f4614525d289d36c792bfbf04ed8f6e
@Yuhta
Copy link
Contributor

Yuhta commented Jun 7, 2024

I think it's likely due to HashProbe plan[2] wrapping input in dictionary to filter out non-matching rows. The ratio of 5600/2763772 has the same order of magnitude as 1024/1066500. @mbasmanova I think the solution should belong to the same story as #7801

@zeodtr
Copy link
Author

zeodtr commented Jun 10, 2024

@Yuhta I've tried #10084.
I've run the code with the combinations of the following modifications:

The execution times for the query are as follows:

  • No modification: 180~182 secs
  • M1: 42~43 secs
  • M2: 53~54 secs
  • M1 + M2: 53~53 secs
  • M2 + M3: 51~52 secs
  • M1 + M2 + M3: 41~42 secs (this is effectively the same as M1)

Here are my thoughts:

  • The value ID cache logic can be beneficial for decoded vectors larger than SelectivityVector to some extent (at least in my case).
  • Small decoded vectors are still preferable so that the value ID cache logic can be used (at least in my case).

Thank you.

@Yuhta
Copy link
Contributor

Yuhta commented Jun 12, 2024

The exact condition to use cache is tricky and depending on the data. The current solution makes sure there is no large regression if we are using dictionary for filtering.

For more investigation, it would be nice if you can find out what the dictionary vector is wrapping around. If it is from the probe side rows, I would imagine it's used exclusively for filtering so cache should not improve performance here. For build side rows, they are extracted from row container so should not be in dictionary. So it's a little mystery here why cache is beneficial. Maybe the join duplicates the probe side rows in some cases?

deepashreeraghu pushed a commit to deepashreeraghu/velox that referenced this issue Jun 13, 2024
…hen it is not beneficial (facebookincubator#10084)

Summary:
Pull Request resolved: facebookincubator#10084

Similar to facebookincubator#7150, when we only need to make IDs for a small number of rows fewer than dictionary values, using cache is slower and we should just compute the IDs directly.

Related issue: facebookincubator#10057

Reviewed By: mbasmanova

Differential Revision: D58215380

fbshipit-source-id: 50c904f06f4614525d289d36c792bfbf04ed8f6e
@zeodtr
Copy link
Author

zeodtr commented Jun 27, 2024

I've investigated further.

The sparse dictionary vectors are returned from HashProbe::getOutput().
I checked the shape of the data of the two tables. In short, the query was executed on poorly shaped data, resulting in many join mismatches and many duplications in the join results. Therefore, this query might not be suitable for Velox code optimization. I believe the fix in #10084 is sufficient.

The shape of the data for the two tables (t1, t2) is as follows.
The record counts are filtered by the query's WHERE clause.

  1. t1's count(name): 2,763,772
  2. t1's count(distinct name): 457
  3. t2's count(name): 5,600
  4. t2's count(distinct name): 54
  5. count(name) after join: 533,305,844
  6. count(distinct name) after join: 52
  7. t1's count(name) that do not exist in t2: 837,094
  8. t1's count(distinct name) that do not exist in t2: 405
  9. t1's count(name) that exist in t2: 1,926,678
  10. t1's count(distinct name) that exist in t2: 52

The queries for each count result (slightly modified to hide the real table and column name) are as follows:

-- 1.
SELECT count(name)
FROM t1 
WHERE tm_col >= timestamp '2022-07-11 03:00:00' 
AND tm_col < timestamp '2022-07-11 03:10:00' 
;

-- 2.
SELECT count(DISTINCT name)
FROM t1 
WHERE tm_col >= timestamp '2022-07-11 03:00:00' 
AND tm_col < timestamp '2022-07-11 03:10:00' 
;

-- 3.
SELECT count(name)
FROM t2 
WHERE tm_col >= timestamp '2022-09-30 05:30:00' 
AND tm_col < timestamp '2022-09-30 06:00:00' 
;

-- 4.
SELECT count(DISTINCT name)
FROM t2 
WHERE tm_col >= timestamp '2022-09-30 05:30:00' 
AND tm_col < timestamp '2022-09-30 06:00:00' 
;

-- 5., 6.
SELECT count(name), count(DISTINCT name)
FROM ( 
    SELECT name AS name
    FROM t1 
    WHERE tm_col >= timestamp '2022-07-11 03:00:00' 
    AND tm_col < timestamp '2022-07-11 03:10:00' 
) JOIN ( 
    SELECT ip, name AS name2
    FROM t2 
    WHERE tm_col >= timestamp '2022-09-30 05:30:00' 
    AND tm_col < timestamp '2022-09-30 06:00:00' 
) ON name = name2 
;

-- 7., 8
SELECT count(name1), count(DISTINCT name1)
FROM ( 
    SELECT name AS name1
    FROM t1 
    WHERE tm_col >= timestamp '2022-07-11 03:00:00' 
    AND tm_col < timestamp '2022-07-11 03:10:00' 
)
WHERE NOT EXISTS 
( 
    SELECT 1
    FROM t2 
    WHERE tm_col >= timestamp '2022-09-30 05:30:00' 
    AND tm_col < timestamp '2022-09-30 06:00:00'
    AND name = name1	
) 
;

-- 9., 10
SELECT count(name1), count(DISTINCT name1)
FROM ( 
    SELECT name AS name1
    FROM t1 
    WHERE tm_col >= timestamp '2022-07-11 03:00:00' 
    AND tm_col < timestamp '2022-07-11 03:10:00' 
)
WHERE EXISTS 
( 
    SELECT 1
    FROM t2 
    WHERE tm_col >= timestamp '2022-09-30 05:30:00' 
    AND tm_col < timestamp '2022-09-30 06:00:00'
    AND name = name1	
) 
;

@Yuhta
Copy link
Contributor

Yuhta commented Jun 27, 2024

I see so the build side are both duplicating and filtering. Agree that the current solution should be enough, unless there is very important use case that requires us to optimize for this data shape.

@Yuhta Yuhta closed this as completed Jun 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working performance
Projects
None yet
Development

No branches or pull requests

3 participants