Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-186][Feature] Use I/O cost time to select storage paths #192

Merged
merged 13 commits into from
Sep 7, 2022

Conversation

smallzhongfeng
Copy link
Contributor

@smallzhongfeng smallzhongfeng commented Aug 30, 2022

What changes were proposed in this pull request?

Solve #186

Why are the changes needed?

A better strategy to select remote path.

Does this PR introduce any user-facing change?

Three config added.

  1. rss.coordinator.remote.storage.select.strategy support APP_BALANCE and IO_SAMPLE, APP_BALANCE selection strategy based on the number of apps, IO_SAMPLE selection strategy based on time consumption of reading and writing files.
  2. rss.coordinator.remote.storage.io.sample.schedule.time , if user choose IO_SAMPLE, file will be read and written at regular intervals.
  3. rss.coordinator.remote.storage.io.sample.file.size , the size of each read / write HDFS file.
  4. rss.coordinator.remote.storage.io.sample.access.times, number of times to read and write HDFS files.

How was this patch tested?

Added ut.

@codecov-commenter
Copy link

codecov-commenter commented Aug 30, 2022

Codecov Report

Merging #192 (00f6892) into master (6b9f2d1) will decrease coverage by 0.89%.
The diff coverage is 77.31%.

@@             Coverage Diff              @@
##             master     #192      +/-   ##
============================================
- Coverage     58.48%   57.59%   -0.90%     
+ Complexity     1277     1229      -48     
============================================
  Files           158      151       -7     
  Lines          8463     8117     -346     
  Branches        785      770      -15     
============================================
- Hits           4950     4675     -275     
+ Misses         3259     3193      -66     
+ Partials        254      249       -5     
Impacted Files Coverage Δ
...e/coordinator/AppBalanceSelectStorageStrategy.java 72.34% <72.34%> (ø)
...nator/LowestIOSampleCostSelectStorageStrategy.java 73.14% <73.14%> (ø)
...apache/uniffle/coordinator/ApplicationManager.java 90.26% <89.47%> (+6.44%) ⬆️
...rg/apache/uniffle/coordinator/CoordinatorConf.java 96.94% <100.00%> (+0.55%) ⬆️
...storage/handler/impl/DataSkippableReadHandler.java 81.25% <0.00%> (-3.13%) ⬇️
...e/spark/shuffle/reader/RssShuffleDataIterator.java
...ava/org/apache/spark/shuffle/RssShuffleHandle.java
...org/apache/spark/shuffle/writer/AddBlockEvent.java
...org/apache/spark/shuffle/RssSparkShuffleUtils.java
...k/shuffle/writer/WrappedByteArrayOutputStream.java
... and 4 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

@jerqi
Copy link
Contributor

jerqi commented Aug 30, 2022

It may not correct enough to sample the I/O cost time.

@zuston
Copy link
Member

zuston commented Aug 30, 2022

Before reviewing this PR, I have some questions

  1. How to disable this strategy in conf? If we only specify one path, there is no need to do storage selection, like specifying single s3a path.
  2. IO cost time maybe meaningless depending on the single w/r time to construct the rank. But I think it’s ok to check whether the dfs namespace is healthy by this way
  3. Can u share some requirements on your production env. From my side, dfs can be as a big and stable enough store for storing shuffle data.

@smallzhongfeng
Copy link
Contributor Author

Thank you for your questions. My answers are as follows:

  1. If the parameter rss.coordinator.remote.storage.select.strategy is set to APP, it will be the original selection strategy.
  2. The reason why this strategy is adopted at present is to prevent some exceptions in the HDFS path under different namespaces. We can avoid selecting this path.
  3. At present, we want to support multiple clusters, but the pressure of each cluster is different. We want to select the cluster with the least pressure to write data, instead of writing data to each cluster evenly. In fact, our understanding of configuring multiple HDFS paths can play a certain role in disaster recovery.
    @jerqi @zuston

@jerqi
Copy link
Contributor

jerqi commented Sep 1, 2022

If you just use the single w/r time to judge whether HDFS is usable or not, it's ok. But you use the single w/r time as the metric of system load, it seems wrong. Could we get some metrics of HDFS? It may be more accurate. I don't know whether HDFS provide similar metrics.

@smallzhongfeng
Copy link
Contributor Author

Now the core of this PR is to enable multiple HDFS paths to have certain disaster tolerance capability. IIRC, the metrics of HDFS can only be obtained through the rest API, and it may not be very stable through HTTP. WDYT?

@jerqi
Copy link
Contributor

jerqi commented Sep 1, 2022

Now the core of this PR is to enable multiple HDFS paths to have certain disaster tolerance capability. IIRC, the metrics of HDFS can only be obtained through the rest API, and it may not be very stable through HTTP. WDYT?

Could we separate this pr into two pull requests? One is to refactor the interface, the other is to add the other strategy.
If we want to avoid the HDFS which was broken, I think we should a RemoteStorage HealthChecker.
Why is the http metrics unstable? If HDFS provide a http metrics, it only means that we can't access it frequently. If I am wrong, you can point out.

@smallzhongfeng
Copy link
Contributor Author

smallzhongfeng commented Sep 1, 2022

I think what you said is reasonable. I agree to divide into two PRS. However, at present, the load of the cluster can not be perfectly evaluated. However, the read-write time can directly compare the I / O speed of the cluster. Moreover, for HDFS metrics, there seems to be no metric that can directly reflect the pressure of the cluster.

@jerqi
Copy link
Contributor

jerqi commented Sep 1, 2022

I think what you said is reasonable. I agree to divide into two PRS. However, at present, the load of the cluster can not be perfectly evaluated. However, the read-write time can directly compare the I / O speed of the cluster. Moreover, for HDFS metrics, there seems to be no metric that can directly reflect the pressure of the cluster.

Read-write time can't reflect the overall situation of the cluster.

@jerqi
Copy link
Contributor

jerqi commented Sep 1, 2022

Maybe you can collect the write speed from shuffle servers.

@jerqi
Copy link
Contributor

jerqi commented Sep 1, 2022

I think there are many metrics that can directly reflect the pressure of the cluster.

@zuston
Copy link
Member

zuston commented Sep 1, 2022

Thanks for your reply @smallzhongfeng

At present, we want to support multiple clusters, but the pressure of each cluster is different. We want to select the cluster with the least pressure to write data, instead of writing data to each cluster evenly. In fact, our understanding of configuring multiple HDFS paths can play a certain role in disaster recovery.

Got your thought. In our internal company, we have a router filesystem to support fault tolerance between namespaces. So there is no such requirement.

Moreover, for HDFS metrics, there seems to be no metric that can directly reflect the pressure of the cluster.

Can we implement a HDFS checker to check the w/r time whether to exceed the threshold?

@smallzhongfeng
Copy link
Contributor Author

It seems difficult to determine a reasonable threshold, so it may be better to directly compare the time consumption from different path. @zuston

@smallzhongfeng
Copy link
Contributor Author

smallzhongfeng commented Sep 1, 2022

Maybe you can collect the write speed from shuffle servers.

If the speed of writing to one of the paths of the shuffle servers is collected, the other paths may never be written.

@smallzhongfeng
Copy link
Contributor Author

I think there are many metrics that can directly reflect the pressure of the cluster.

There are indeed some metrics that can reflect the load of the cluster, but will the Read-write time be more critical for these shuffled temporary files.

@zuston
Copy link
Member

zuston commented Sep 1, 2022

It seems difficult to determine a reasonable threshold, so it may be better to directly compare the time consumption from different path. @zuston

Time consumption actually is not meaningful. For example. one namespace cost 10s, but another namespace cost 10.1 s.
For my side, there is no difference on two namespaces' performance .

@smallzhongfeng
Copy link
Contributor Author

Time consumption actually is not meaningful. For example. one namespace cost 10s, but another namespace cost 10.1 s. For my side, there is no difference on two namespaces' performance .

Yes, because of the instantaneous value obtained at present, I have replaced it with the mean value over a period of time.

@jerqi
Copy link
Contributor

jerqi commented Sep 1, 2022

I think there are many metrics that can directly reflect the pressure of the cluster.

There are indeed some metrics that can reflect the load of the cluster, but will the Read-write time be more critical for these shuffled temporary files.

Em ... I can't agree with it. Because read-write time is just a sample and you can't write much data, it's not accurate.

@jerqi
Copy link
Contributor

jerqi commented Sep 1, 2022

Just give my advice, you should verify the feature in your production environment.

@smallzhongfeng
Copy link
Contributor Author

It doesn't matter. Maybe you're right. After all, it's also a process of discussion. Then, this solution has been deployed in our online cluster for nearly a month.

@smallzhongfeng

This comment was marked as resolved.

@jerqi
Copy link
Contributor

jerqi commented Sep 1, 2022

Do you have measure the performance of two clusters? Is the data written to them proportional to the number of nodes?

@smallzhongfeng
Copy link
Contributor Author

At present, the data we Read-write is 10m, so it is fast in theory, and this has nothing to do with the number of nodes. Reading and writing should depend on the performance of the disk. Right?

@jerqi
Copy link
Contributor

jerqi commented Sep 1, 2022

There are more DataNodes, the cluster can provide more IO capability, we should allocate more data to this cluster.

@smallzhongfeng
Copy link
Contributor Author

smallzhongfeng commented Sep 1, 2022

I think this comparison may be a bit unreasonable, because it may be related to the model, disk type, and some other factors, we can't say that a cluster with 800 ssd disks is necessarily slower than a cluster with 2000 hdd disks. But the most direct way may be to read and write files, which can directly reflect the pressure of DataNodes at that time.

@jerqi
Copy link
Contributor

jerqi commented Sep 1, 2022

How to evaluate the throughout of the HDFS cluster? I use the throughout of the DataNode * the number of Datanode. You shouldn't care the single write and read time too much. You can pick the only one low load datanode, you can pick the only one high load datanode. They can't stand for the overall situation.

@smallzhongfeng
Copy link
Contributor Author

smallzhongfeng commented Sep 2, 2022

If use the hdfs metrics system, which metric do you think is better to measure? The length of callqueue or the ratio of the load of dn to the number of dn

@jerqi
Copy link
Contributor

jerqi commented Sep 2, 2022

If use the hdfs metrics system, which metric do you think is better to measure? The length of callqueue or the ratio of the load of dn to the number of dn

Maybe you can ask your company internal HDFS colleagues.

@smallzhongfeng
Copy link
Contributor Author

Maybe we can use the number of threads reading and writing files of DN to judge the load of the cluster. The general idea is as follows:

DistributedFileSystem dfs = (DistributedFileSystem) fs;
DatanodeStorageReport[] datanodeStorageReports =
        dfs.getClient().getDatanodeStorageReport(HdfsConstants.DatanodeReportType.LIVE);

@jerqi
Copy link
Contributor

jerqi commented Sep 2, 2022

Do you mean the xceiverCount? If DN use a fix size thread pool, thread number won't stand for the load of DN. It depends on the implement of DN.

@jerqi
Copy link
Contributor

jerqi commented Sep 6, 2022

Is IOSampleSelectStorageStrategy a better name? HealthSelectStorageStrategy is a little general.

I still worry about the name ... HealthSelectStorageStrategy and IOSampleSelectStorageStrategy aren't good enough.

@jerqi
Copy link
Contributor

jerqi commented Sep 6, 2022

The comparison of IO sample isn't meaningful. Do we remove it?

@smallzhongfeng
Copy link
Contributor Author

The comparison of IO sample isn't meaningful. Do we remove it?

I want to say that the reading and writing time is of certain reference value. For us, the writing speed of large clusters is faster than that of small clusters.

@jerqi
Copy link
Contributor

jerqi commented Sep 6, 2022

The comparison of IO sample isn't meaningful. Do we remove it?

I want to say that the reading and writing time is of certain reference value. For us, the writing speed of large clusters is faster than that of small clusters.

Em... just one time WR, why will the large clusters be faster than small clusters? They process the same request pressure?

@smallzhongfeng
Copy link
Contributor Author

smallzhongfeng commented Sep 6, 2022

Because some requirements of the current parameter configuration of the two clusters are different, the reading and writing speeds are not the same. And the models of the machines in the two clusters are not the same, which leads to these differences.

@jerqi
Copy link
Contributor

jerqi commented Sep 6, 2022

In general, I hardly accept that we use sample IO speed to judge the speed HDFS. And it's a little weird that we use this strategy to judge whether HDFS is ok. It should be a common capacity to judge storage's health. Maybe we should use the shuffle server to judge whether HDFS is ok.

@smallzhongfeng
Copy link
Contributor Author

Because all shuffle servers access the same hdfs path at the same time, this is actually unnecessary. For the process of reading and writing, you can actually understand that this process is not only about calculating his time, but also whether it can read or write normally. Whether there will be exceptions in the process of reading and writing, this is actually an extra check on whether the cluster is abnormal.

@jerqi
Copy link
Contributor

jerqi commented Sep 6, 2022

Could we implement the check HDFS logic in AppBalanceSelectStorageStrategy?

@smallzhongfeng
Copy link
Contributor Author

Because for us, a large and faster cluster needs to write more data, instead of writing to two clusters evenly, and this only provides a strategy. If we want to modify the original policy, maybe we can add these logic in another PR?

@jerqi
Copy link
Contributor

jerqi commented Sep 6, 2022

Because for us, a large and faster cluster needs to write more data, instead of writing to two clusters evenly, and this only provides a strategy. If we want to modify the original policy, maybe we can add these logic in another PR?

The measure of this strategy can't prove the cluster which you select is a larger and faster cluster.

@jerqi
Copy link
Contributor

jerqi commented Sep 6, 2022

The interface is ok for me. But the strategy is hardly acceptable for me. Maybe you can only modify the interface in this pr, you can only maintain the strategy for your internal implement.

@jerqi
Copy link
Contributor

jerqi commented Sep 6, 2022

If you want to use the IO sample, please modify your sample method. One time is not enough, 10M may be two small. The value must be bigger than single buffer flush threshold.

@smallzhongfeng
Copy link
Contributor Author

smallzhongfeng commented Sep 6, 2022

Your suggestion is good for me and I added a parameter rss.coordinator.remote.storage.access.times for the number of times to read and write files.

long startWriteTime = System.currentTimeMillis();
try {
fs = HadoopFilesystemProvider.getFilesystem(remotePath, conf);
for (int j = 0; j < readAndWriteTimes; j++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we write the same file, the file will be allocated the same DN, our sample will only test only one DN.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although we are writing the same file, the stream of this file has been closed, and a batch of new DN should be reallocated next time we write.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

@jerqi
Copy link
Contributor

jerqi commented Sep 7, 2022

Actually we don't need to write multiple times in one sample, we can collect multilple samples to caculate the IO cost time.

@jerqi
Copy link
Contributor

jerqi commented Sep 7, 2022

If we increase the sample times and you insist on the comparision of io sample, could we change the class name from HealthSelectStorageStrategy to LowestIOSampleCostSelectStorageStrategy?

@smallzhongfeng
Copy link
Contributor Author

I'm sorry for the trouble my insistence has caused you, but this does have certain application scenarios in our company. At present, LowestIOSampleCostSelectStorageStrategy may be more suitable. I will change.

.defaultValue(APP_BALANCE)
.withDescription("Strategy for selecting the remote path");
public static final ConfigOption<Long> COORDINATOR_REMOTE_STORAGE_HEALTH_SCHEDULE_TIME = ConfigOptions
.key("rss.coordinator.remote.storage.health.schedule.time")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io sample

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

.defaultValue(60 * 1000L)
.withDescription("The time of scheduling the read and write time of the paths to obtain different HDFS");
public static final ConfigOption<Integer> COORDINATOR_REMOTE_STORAGE_FILE_SIZE = ConfigOptions
.key("rss.coordinator.remote.storage.file.size")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io sample

.defaultValue(204800 * 1000)
.withDescription("The size of the file that the scheduled thread reads and writes");
public static final ConfigOption<Integer> COORDINATOR_REMOTE_STORAGE_ACCESS_TIMES = ConfigOptions
.key("rss.coordinator.remote.storage.access.times")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

io sample

.key("rss.coordinator.remote.storage.access.times")
.intType()
.defaultValue(3)
.withDescription("The number of times to read and write HDFS files");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add the new configuration option to the documents?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

@jerqi
Copy link
Contributor

jerqi commented Sep 7, 2022

What changes were proposed in this pull request?

Solve #186

Why are the changes needed?

A better strategy to select remote path.

Does this PR introduce any user-facing change?

Three config added.

  1. rss.coordinator.remote.storage.select.strategy support APP and IO, APP selection strategy based on the number of apps, IO selection strategy based on time consumption of reading and writing files.
  2. rss.coordinator.remote.storage.schedule.time , if user choose IO, file will be read and written at regular intervals.
  3. rss.coordinator.remote.storage.file.size , the size of each read / write file.

How was this patch tested?

Added ut.

Please update the description.

@smallzhongfeng
Copy link
Contributor Author

smallzhongfeng commented Sep 7, 2022

Please update the description.

Updated.

Copy link
Contributor

@jerqi jerqi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @smallzhongfeng

@jerqi jerqi merged commit 78a0371 into apache:master Sep 7, 2022
@smallzhongfeng
Copy link
Contributor Author

Thanks all. And thank you for your hard review. @jerqi

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants