Skip to content

[Test] test with fine grained pr reverted#2612

Closed
YutingWang98 wants to merge 10 commits intoapache:branch-0.4from
YutingWang98:patch-1
Closed

[Test] test with fine grained pr reverted#2612
YutingWang98 wants to merge 10 commits intoapache:branch-0.4from
YutingWang98:patch-1

Conversation

@YutingWang98
Copy link
Contributor

@YutingWang98 YutingWang98 commented Jul 10, 2024

What changes were proposed in this pull request?

This is the version used for testing without any fined grained changes, and would expect no data loss. However. there are some jobs still lost a few records during shuffle read
with fine grain change reverted

reverted run 1
    Shuffle Write Size / Records: 20.8 TiB / 1965259729
    Shuffle Read Size / Records: 20.8 TiB / 1965259728
reverted run 2
    Shuffle Write Size / Records: 20.8 TiB / 1965259729
    Shuffle Read Size / Records: 20.8 TiB / 1965259729
reverted run 3
    Shuffle Write Size / Records: 20.8 TiB / 1965259729
    Shuffle Read Size / Records: 20.8 TiB / 1965259722

Suspect related to

Both ^ 2 prs were included in the tests

Why are the changes needed?

Does this PR introduce any user-facing change?

How was this patch tested?

@YutingWang98 YutingWang98 changed the title [Revereted test] test with fine grained pr reverted [Test] test with fine grained pr reverted Jul 10, 2024
@cfmcgrady
Copy link
Contributor

there are some jobs still lost a few records during shuffle read

Could you please provide more details on how to reproduce this bug?

@waitinfuture
Copy link
Contributor

waitinfuture commented Jul 12, 2024

I think the bug is related with #2134 , please see the detailed description in #2621 @YutingWang98 @cfmcgrady

waitinfuture added a commit that referenced this pull request Jul 13, 2024
…eqs should decrement when batchIdSet contains the batchId to avoid duplicate caller of removeBatch"

### What changes were proposed in this pull request?
One of our users reported a dataloss issue in #2612 , I tried to reproduce
the bug with the following setup:
1. Partition data is far larger than `spark.celeborn.client.shuffle.partitionSplit.threshold`, which means split happens very often
2. `spark.celeborn.client.shuffle.partitionSplit.threshold` is larger than `celeborn.worker.shuffle.partitionSplit.max`, which means when split happens, it is `HARD_SPLIT`
3. `celeborn.client.shuffle.batchHandleChangePartition.enabled` is true, which means when hard split happens, `LifecycleManager` will commit the splits before the stage finishes

Configs in spark side:
```
spark.celeborn.client.push.maxReqsInFlight.perWorker | 256
spark.celeborn.client.push.maxReqsInFlight.total | 2048
spark.celeborn.client.shuffle.batchHandleCommitPartition.enabled | true
spark.celeborn.client.shuffle.compression.codec | zstd
spark.celeborn.client.shuffle.partitionSplit.threshold | 48m
spark.celeborn.client.spark.fetch.throwsFetchFailure | true
spark.celeborn.client.spark.push.sort.memory.adaptiveThreshold | true
spark.celeborn.client.spark.push.sort.memory.threshold | 512m
spark.celeborn.client.spark.shuffle.writer | sort
spark.celeborn.master.endpoints | master-1-1:9097

```
Configs in celeborn side:
```
celeborn.metrics.enabled=false
celeborn.replicate.io.numConnectionsPerPeer=24
celeborn.application.heartbeat.timeout=120s
celeborn.worker.storage.dirs=/mnt/disk1,/mnt/disk2
celeborn.network.timeout=2000s
celeborn.ha.enabled=false
celeborn.worker.closeIdleConnections=true
celeborn.worker.monitor.disk.enabled=false
celeborn.worker.flusher.threads=16

celeborn.worker.graceful.shutdown.enabled=true
celeborn.worker.rpc.port=9100
celeborn.worker.push.port=9101
celeborn.worker.fetch.port=9102
celeborn.worker.replicate.port=9103

celeborn.worker.shuffle.partitionSplit.max=10m  // this is made to be small
```

My query on 10T TPCDS:
```
select
max(ss_sold_time_sk      ),
max(ss_item_sk           ),
max(ss_customer_sk       ),
max(ss_cdemo_sk          ),
max(ss_hdemo_sk          ),
max(ss_addr_sk           ),
max(ss_store_sk          ),
max(ss_promo_sk          ),
max(ss_ticket_number     ),
max(ss_quantity          ),
max(ss_wholesale_cost    ),
max(ss_list_price        ),
max(ss_sales_price       ),
max(ss_ext_discount_amt  ),
max(ss_ext_sales_price   ),
max(ss_ext_wholesale_cost),
max(ss_ext_list_price    ),
max(ss_ext_tax           ),
max(ss_coupon_amt        ),
max(ss_net_paid          ),
max(ss_net_paid_inc_tax  ),
max(ss_net_profit        ),
max(ss_sold_date_sk      )
from (
select * from store_sales where ss_sold_date_sk is not null distribute by ss_sold_date_sk
) a;
```

After digging into it, I found the bug is introduced by #2134 . #2134 added
check in `InFlightRequestTracker#addBatch` and `InFlightRequestTracker#removeBatch` and only
increments/decrements `totalInflightReqs`  when `batchIdSet` contains current `batchId`, which conflicts with
`ShuffleClientImpl#PushDataRpcResponseCallback#updateLatestPartition`, which calls `addBatch` first then calls
`removeBatch` with the same batchId. As a result, the call to `addBatch` fails to increment `totalInflightReqs`, but
the call to `removeBatch` decrements `totalInflightReqs`, which means the retried push is not counted, then later
`limitZeroInFlight` in `mapperEnd` will return even though the retried push fails.

This PR fixes the bug by reverting #2134

### Why are the changes needed?
ditto

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manual test.

Closes #2621 from waitinfuture/1506.

Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
waitinfuture added a commit that referenced this pull request Jul 13, 2024
…eqs should decrement when batchIdSet contains the batchId to avoid duplicate caller of removeBatch"

### What changes were proposed in this pull request?
One of our users reported a dataloss issue in #2612 , I tried to reproduce
the bug with the following setup:
1. Partition data is far larger than `spark.celeborn.client.shuffle.partitionSplit.threshold`, which means split happens very often
2. `spark.celeborn.client.shuffle.partitionSplit.threshold` is larger than `celeborn.worker.shuffle.partitionSplit.max`, which means when split happens, it is `HARD_SPLIT`
3. `celeborn.client.shuffle.batchHandleChangePartition.enabled` is true, which means when hard split happens, `LifecycleManager` will commit the splits before the stage finishes

Configs in spark side:
```
spark.celeborn.client.push.maxReqsInFlight.perWorker | 256
spark.celeborn.client.push.maxReqsInFlight.total | 2048
spark.celeborn.client.shuffle.batchHandleCommitPartition.enabled | true
spark.celeborn.client.shuffle.compression.codec | zstd
spark.celeborn.client.shuffle.partitionSplit.threshold | 48m
spark.celeborn.client.spark.fetch.throwsFetchFailure | true
spark.celeborn.client.spark.push.sort.memory.adaptiveThreshold | true
spark.celeborn.client.spark.push.sort.memory.threshold | 512m
spark.celeborn.client.spark.shuffle.writer | sort
spark.celeborn.master.endpoints | master-1-1:9097

```
Configs in celeborn side:
```
celeborn.metrics.enabled=false
celeborn.replicate.io.numConnectionsPerPeer=24
celeborn.application.heartbeat.timeout=120s
celeborn.worker.storage.dirs=/mnt/disk1,/mnt/disk2
celeborn.network.timeout=2000s
celeborn.ha.enabled=false
celeborn.worker.closeIdleConnections=true
celeborn.worker.monitor.disk.enabled=false
celeborn.worker.flusher.threads=16

celeborn.worker.graceful.shutdown.enabled=true
celeborn.worker.rpc.port=9100
celeborn.worker.push.port=9101
celeborn.worker.fetch.port=9102
celeborn.worker.replicate.port=9103

celeborn.worker.shuffle.partitionSplit.max=10m  // this is made to be small
```

My query on 10T TPCDS:
```
select
max(ss_sold_time_sk      ),
max(ss_item_sk           ),
max(ss_customer_sk       ),
max(ss_cdemo_sk          ),
max(ss_hdemo_sk          ),
max(ss_addr_sk           ),
max(ss_store_sk          ),
max(ss_promo_sk          ),
max(ss_ticket_number     ),
max(ss_quantity          ),
max(ss_wholesale_cost    ),
max(ss_list_price        ),
max(ss_sales_price       ),
max(ss_ext_discount_amt  ),
max(ss_ext_sales_price   ),
max(ss_ext_wholesale_cost),
max(ss_ext_list_price    ),
max(ss_ext_tax           ),
max(ss_coupon_amt        ),
max(ss_net_paid          ),
max(ss_net_paid_inc_tax  ),
max(ss_net_profit        ),
max(ss_sold_date_sk      )
from (
select * from store_sales where ss_sold_date_sk is not null distribute by ss_sold_date_sk
) a;
```

After digging into it, I found the bug is introduced by #2134 . #2134 added
check in `InFlightRequestTracker#addBatch` and `InFlightRequestTracker#removeBatch` and only
increments/decrements `totalInflightReqs`  when `batchIdSet` contains current `batchId`, which conflicts with
`ShuffleClientImpl#PushDataRpcResponseCallback#updateLatestPartition`, which calls `addBatch` first then calls
`removeBatch` with the same batchId. As a result, the call to `addBatch` fails to increment `totalInflightReqs`, but
the call to `removeBatch` decrements `totalInflightReqs`, which means the retried push is not counted, then later
`limitZeroInFlight` in `mapperEnd` will return even though the retried push fails.

This PR fixes the bug by reverting #2134

### Why are the changes needed?
ditto

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manual test.

Closes #2621 from waitinfuture/1506.

Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
(cherry picked from commit 8d0b4cf)
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
waitinfuture added a commit that referenced this pull request Jul 13, 2024
…eqs should decrement when batchIdSet contains the batchId to avoid duplicate caller of removeBatch"

### What changes were proposed in this pull request?
One of our users reported a dataloss issue in #2612 , I tried to reproduce
the bug with the following setup:
1. Partition data is far larger than `spark.celeborn.client.shuffle.partitionSplit.threshold`, which means split happens very often
2. `spark.celeborn.client.shuffle.partitionSplit.threshold` is larger than `celeborn.worker.shuffle.partitionSplit.max`, which means when split happens, it is `HARD_SPLIT`
3. `celeborn.client.shuffle.batchHandleChangePartition.enabled` is true, which means when hard split happens, `LifecycleManager` will commit the splits before the stage finishes

Configs in spark side:
```
spark.celeborn.client.push.maxReqsInFlight.perWorker | 256
spark.celeborn.client.push.maxReqsInFlight.total | 2048
spark.celeborn.client.shuffle.batchHandleCommitPartition.enabled | true
spark.celeborn.client.shuffle.compression.codec | zstd
spark.celeborn.client.shuffle.partitionSplit.threshold | 48m
spark.celeborn.client.spark.fetch.throwsFetchFailure | true
spark.celeborn.client.spark.push.sort.memory.adaptiveThreshold | true
spark.celeborn.client.spark.push.sort.memory.threshold | 512m
spark.celeborn.client.spark.shuffle.writer | sort
spark.celeborn.master.endpoints | master-1-1:9097

```
Configs in celeborn side:
```
celeborn.metrics.enabled=false
celeborn.replicate.io.numConnectionsPerPeer=24
celeborn.application.heartbeat.timeout=120s
celeborn.worker.storage.dirs=/mnt/disk1,/mnt/disk2
celeborn.network.timeout=2000s
celeborn.ha.enabled=false
celeborn.worker.closeIdleConnections=true
celeborn.worker.monitor.disk.enabled=false
celeborn.worker.flusher.threads=16

celeborn.worker.graceful.shutdown.enabled=true
celeborn.worker.rpc.port=9100
celeborn.worker.push.port=9101
celeborn.worker.fetch.port=9102
celeborn.worker.replicate.port=9103

celeborn.worker.shuffle.partitionSplit.max=10m  // this is made to be small
```

My query on 10T TPCDS:
```
select
max(ss_sold_time_sk      ),
max(ss_item_sk           ),
max(ss_customer_sk       ),
max(ss_cdemo_sk          ),
max(ss_hdemo_sk          ),
max(ss_addr_sk           ),
max(ss_store_sk          ),
max(ss_promo_sk          ),
max(ss_ticket_number     ),
max(ss_quantity          ),
max(ss_wholesale_cost    ),
max(ss_list_price        ),
max(ss_sales_price       ),
max(ss_ext_discount_amt  ),
max(ss_ext_sales_price   ),
max(ss_ext_wholesale_cost),
max(ss_ext_list_price    ),
max(ss_ext_tax           ),
max(ss_coupon_amt        ),
max(ss_net_paid          ),
max(ss_net_paid_inc_tax  ),
max(ss_net_profit        ),
max(ss_sold_date_sk      )
from (
select * from store_sales where ss_sold_date_sk is not null distribute by ss_sold_date_sk
) a;
```

After digging into it, I found the bug is introduced by #2134 . #2134 added
check in `InFlightRequestTracker#addBatch` and `InFlightRequestTracker#removeBatch` and only
increments/decrements `totalInflightReqs`  when `batchIdSet` contains current `batchId`, which conflicts with
`ShuffleClientImpl#PushDataRpcResponseCallback#updateLatestPartition`, which calls `addBatch` first then calls
`removeBatch` with the same batchId. As a result, the call to `addBatch` fails to increment `totalInflightReqs`, but
the call to `removeBatch` decrements `totalInflightReqs`, which means the retried push is not counted, then later
`limitZeroInFlight` in `mapperEnd` will return even though the retried push fails.

This PR fixes the bug by reverting #2134

### Why are the changes needed?
ditto

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manual test.

Closes #2621 from waitinfuture/1506.

Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
(cherry picked from commit 8d0b4cf)
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
wxplovecc pushed a commit to tongcheng-elong/incubator-celeborn that referenced this pull request Jul 15, 2024
…eqs should decrement when batchIdSet contains the batchId to avoid duplicate caller of removeBatch"

### What changes were proposed in this pull request?
One of our users reported a dataloss issue in apache#2612 , I tried to reproduce
the bug with the following setup:
1. Partition data is far larger than `spark.celeborn.client.shuffle.partitionSplit.threshold`, which means split happens very often
2. `spark.celeborn.client.shuffle.partitionSplit.threshold` is larger than `celeborn.worker.shuffle.partitionSplit.max`, which means when split happens, it is `HARD_SPLIT`
3. `celeborn.client.shuffle.batchHandleChangePartition.enabled` is true, which means when hard split happens, `LifecycleManager` will commit the splits before the stage finishes

Configs in spark side:
```
spark.celeborn.client.push.maxReqsInFlight.perWorker | 256
spark.celeborn.client.push.maxReqsInFlight.total | 2048
spark.celeborn.client.shuffle.batchHandleCommitPartition.enabled | true
spark.celeborn.client.shuffle.compression.codec | zstd
spark.celeborn.client.shuffle.partitionSplit.threshold | 48m
spark.celeborn.client.spark.fetch.throwsFetchFailure | true
spark.celeborn.client.spark.push.sort.memory.adaptiveThreshold | true
spark.celeborn.client.spark.push.sort.memory.threshold | 512m
spark.celeborn.client.spark.shuffle.writer | sort
spark.celeborn.master.endpoints | master-1-1:9097

```
Configs in celeborn side:
```
celeborn.metrics.enabled=false
celeborn.replicate.io.numConnectionsPerPeer=24
celeborn.application.heartbeat.timeout=120s
celeborn.worker.storage.dirs=/mnt/disk1,/mnt/disk2
celeborn.network.timeout=2000s
celeborn.ha.enabled=false
celeborn.worker.closeIdleConnections=true
celeborn.worker.monitor.disk.enabled=false
celeborn.worker.flusher.threads=16

celeborn.worker.graceful.shutdown.enabled=true
celeborn.worker.rpc.port=9100
celeborn.worker.push.port=9101
celeborn.worker.fetch.port=9102
celeborn.worker.replicate.port=9103

celeborn.worker.shuffle.partitionSplit.max=10m  // this is made to be small
```

My query on 10T TPCDS:
```
select
max(ss_sold_time_sk      ),
max(ss_item_sk           ),
max(ss_customer_sk       ),
max(ss_cdemo_sk          ),
max(ss_hdemo_sk          ),
max(ss_addr_sk           ),
max(ss_store_sk          ),
max(ss_promo_sk          ),
max(ss_ticket_number     ),
max(ss_quantity          ),
max(ss_wholesale_cost    ),
max(ss_list_price        ),
max(ss_sales_price       ),
max(ss_ext_discount_amt  ),
max(ss_ext_sales_price   ),
max(ss_ext_wholesale_cost),
max(ss_ext_list_price    ),
max(ss_ext_tax           ),
max(ss_coupon_amt        ),
max(ss_net_paid          ),
max(ss_net_paid_inc_tax  ),
max(ss_net_profit        ),
max(ss_sold_date_sk      )
from (
select * from store_sales where ss_sold_date_sk is not null distribute by ss_sold_date_sk
) a;
```

After digging into it, I found the bug is introduced by apache#2134 . apache#2134 added
check in `InFlightRequestTracker#addBatch` and `InFlightRequestTracker#removeBatch` and only
increments/decrements `totalInflightReqs`  when `batchIdSet` contains current `batchId`, which conflicts with
`ShuffleClientImpl#PushDataRpcResponseCallback#updateLatestPartition`, which calls `addBatch` first then calls
`removeBatch` with the same batchId. As a result, the call to `addBatch` fails to increment `totalInflightReqs`, but
the call to `removeBatch` decrements `totalInflightReqs`, which means the retried push is not counted, then later
`limitZeroInFlight` in `mapperEnd` will return even though the retried push fails.

This PR fixes the bug by reverting apache#2134

### Why are the changes needed?
ditto

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manual test.

Closes apache#2621 from waitinfuture/1506.

Authored-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
Signed-off-by: zky.zhoukeyong <zky.zhoukeyong@alibaba-inc.com>
(cherry picked from commit 8d0b4cf)
@YutingWang98
Copy link
Contributor Author

YutingWang98 commented Jul 15, 2024

sorry about the late reply @waitinfuture @cfmcgrady, yes, no longer seeing the data loss after applying #2621, closing this PR. And thanks for making the fix

@waitinfuture
Copy link
Contributor

Good to know, thanks for verifying! @YutingWang98

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants