From b5cf1f2e8e0ee84dd8eae78678d7cc77a4127985 Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Sat, 19 Aug 2023 14:18:19 +0800 Subject: [PATCH 1/2] [FEATURE] Add labels with disk path for local storage total_localfile_write_data metrics --- .../uniffle/server/ShuffleServerMetrics.java | 5 +- .../server/storage/LocalStorageManager.java | 4 +- .../server/ShuffleFlushManagerTest.java | 82 ++++++++++++++++++- 3 files changed, 84 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java index ea7b445c33..98fddf5e0f 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java @@ -75,6 +75,7 @@ public class ShuffleServerMetrics { private static final String TOTAL_DROPPED_EVENT_NUM = "total_dropped_event_num"; private static final String TOTAL_HADOOP_WRITE_DATA = "total_hadoop_write_data"; private static final String TOTAL_LOCALFILE_WRITE_DATA = "total_localfile_write_data"; + private static final String LOCAL_DISK_PATH_LABEL = "local_disk_path"; private static final String TOTAL_REQUIRE_BUFFER_FAILED = "total_require_buffer_failed"; private static final String TOTAL_REQUIRE_BUFFER_FAILED_FOR_HUGE_PARTITION = "total_require_buffer_failed_for_huge_partition"; @@ -126,7 +127,6 @@ public class ShuffleServerMetrics { public static Counter.Child counterTotalReadTime; public static Counter.Child counterTotalFailedWrittenEventNum; public static Counter.Child counterTotalDroppedEventNum; - public static Counter.Child counterTotalLocalFileWriteDataSize; public static Counter.Child counterTotalRequireBufferFailed; public static Counter.Child counterTotalRequireBufferFailedForHugePartition; public static Counter.Child counterTotalRequireBufferFailedForRegularPartition; @@ -164,6 +164,7 @@ public class ShuffleServerMetrics { public static Counter counterRemoteStorageFailedWrite; public static Counter counterRemoteStorageSuccessWrite; public static Counter counterTotalHadoopWriteDataSize; + public static Counter counterTotalLocalFileWriteDataSize; private static String tags; public static Counter counterLocalFileEventFlush; @@ -268,7 +269,7 @@ private static void setUpMetrics() { metricsManager.addCounter( TOTAL_HADOOP_WRITE_DATA, Constants.METRICS_TAG_LABEL_NAME, STORAGE_HOST_LABEL); counterTotalLocalFileWriteDataSize = - metricsManager.addLabeledCounter(TOTAL_LOCALFILE_WRITE_DATA); + metricsManager.addCounter(TOTAL_LOCALFILE_WRITE_DATA, LOCAL_DISK_PATH_LABEL); counterTotalRequireBufferFailed = metricsManager.addLabeledCounter(TOTAL_REQUIRE_BUFFER_FAILED); counterTotalRequireBufferFailedForRegularPartition = metricsManager.addLabeledCounter(TOTAL_REQUIRE_BUFFER_FAILED_FOR_REGULAR_PARTITION); diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java index 635cc4c6db..680c9105ae 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java @@ -237,7 +237,9 @@ public Storage selectStorage(ShuffleDataReadEvent event) { @Override public void updateWriteMetrics(ShuffleDataFlushEvent event, long writeTime) { super.updateWriteMetrics(event, writeTime); - ShuffleServerMetrics.counterTotalLocalFileWriteDataSize.inc(event.getSize()); + ShuffleServerMetrics.counterTotalLocalFileWriteDataSize + .labels(event.getUnderStorage().getStoragePath()) + .inc(event.getSize()); } @Override diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java index 2df020333f..b91e669e2f 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java @@ -283,7 +283,7 @@ public void localMetricsTest(@TempDir File tempDir) throws Exception { // wait for write data waitForFlush(manager, appId, 1, 5); - validateLocalMetadata(storageManager, 160L); + validateLocalMetadata(storageManager, 0, 160L); ShuffleDataFlushEvent event12 = createShuffleDataFlushEvent(appId, 1, 1, 1, null); manager.addToFlushQueue(event12); @@ -291,7 +291,60 @@ public void localMetricsTest(@TempDir File tempDir) throws Exception { // wait for write data waitForFlush(manager, appId, 1, 10); - validateLocalMetadata(storageManager, 320L); + validateLocalMetadata(storageManager, 0, 320L); + } + + @Test + public void totalLocalFileWriteDataMetricTest() throws Exception { + List storagePaths = Arrays.asList("/tmp/rss-data1", "/tmp/rss-data2", "/tmp/rss-data3"); + + shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, storagePaths); + shuffleServerConf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L); + shuffleServerConf.setString( + ShuffleServerConf.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name()); + + String appId = "localMetricsTest_appId"; + StorageManager storageManager = + StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf); + ShuffleFlushManager manager = + new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, storageManager); + + ShuffleDataFlushEvent flushEvent = createShuffleDataFlushEvent(appId, 1, 1, 1, 10, 100, null); + manager.addToFlushQueue(flushEvent); + // wait for write data + waitForFlush(manager, appId, 1, 10); + int storageIndex = storagePaths.indexOf(flushEvent.getUnderStorage().getStoragePath()); + validateLocalMetadata(storageManager, storageIndex, 1000L); + + flushEvent = createShuffleDataFlushEvent(appId, 2, 1, 1, 10, 101, null); + manager.addToFlushQueue(flushEvent); + // wait for write data + waitForFlush(manager, appId, 2, 10); + int storageIndex1 = storagePaths.indexOf(flushEvent.getUnderStorage().getStoragePath()); + validateLocalMetadata(storageManager, storageIndex1, 1010L); + + flushEvent = createShuffleDataFlushEvent(appId, 3, 1, 1, 10, 102, null); + manager.addToFlushQueue(flushEvent); + // wait for write data + waitForFlush(manager, appId, 3, 10); + int storageIndex2 = storagePaths.indexOf(flushEvent.getUnderStorage().getStoragePath()); + validateLocalMetadata(storageManager, storageIndex2, 1020L); + + assertEquals( + 1000L, + ShuffleServerMetrics.counterTotalLocalFileWriteDataSize + .labels(storagePaths.get(storageIndex)) + .get()); + assertEquals( + 1010L, + ShuffleServerMetrics.counterTotalLocalFileWriteDataSize + .labels(storagePaths.get(storageIndex1)) + .get()); + assertEquals( + 1020L, + ShuffleServerMetrics.counterTotalLocalFileWriteDataSize + .labels(storagePaths.get(storageIndex2)) + .get()); } @Test @@ -515,6 +568,26 @@ public static ShuffleDataFlushEvent createShuffleDataFlushEvent( null); } + public static ShuffleDataFlushEvent createShuffleDataFlushEvent( + String appId, + int shuffleId, + int startPartition, + int endPartition, + int blockNum, + int blockSize, + Supplier isValid) { + return new ShuffleDataFlushEvent( + ATOMIC_LONG.getAndIncrement(), + appId, + shuffleId, + startPartition, + endPartition, + (long) blockNum * blockSize, + createBlock(blockNum, blockSize), + isValid, + null); + } + public static List createBlock(int num, int length) { List blocks = Lists.newArrayList(); for (int i = 0; i < num; i++) { @@ -677,9 +750,10 @@ public void defaultFlushEventHandlerTest(@TempDir File tempDir) throws Exception assertEquals(2, ShuffleServerMetrics.counterHadoopEventFlush.get()); } - private void validateLocalMetadata(StorageManager storageManager, Long size) { + private void validateLocalMetadata(StorageManager storageManager, int storageIndex, Long size) { assertInstanceOf(LocalStorageManager.class, storageManager); - LocalStorage localStorage = ((LocalStorageManager) storageManager).getStorages().get(0); + LocalStorage localStorage = + ((LocalStorageManager) storageManager).getStorages().get(storageIndex); assertEquals(size, localStorage.getMetaData().getDiskSize().longValue()); } } From 455455d767c52fd07387b429c7f4e296d1dd6b41 Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Mon, 21 Aug 2023 15:45:17 +0800 Subject: [PATCH 2/2] [FEATURE] Add labels with disk path for local storage total_localfile_write_data metrics --- .../org/apache/uniffle/server/ShuffleServerMetrics.java | 1 + .../apache/uniffle/server/storage/LocalStorageManager.java | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java index 98fddf5e0f..dbbe36a69d 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java @@ -76,6 +76,7 @@ public class ShuffleServerMetrics { private static final String TOTAL_HADOOP_WRITE_DATA = "total_hadoop_write_data"; private static final String TOTAL_LOCALFILE_WRITE_DATA = "total_localfile_write_data"; private static final String LOCAL_DISK_PATH_LABEL = "local_disk_path"; + public static final String LOCAL_DISK_PATH_LABEL_ALL = "ALL"; private static final String TOTAL_REQUIRE_BUFFER_FAILED = "total_require_buffer_failed"; private static final String TOTAL_REQUIRE_BUFFER_FAILED_FOR_HUGE_PARTITION = "total_require_buffer_failed_for_huge_partition"; diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java index 680c9105ae..0752abcc0b 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java @@ -238,8 +238,13 @@ public Storage selectStorage(ShuffleDataReadEvent event) { public void updateWriteMetrics(ShuffleDataFlushEvent event, long writeTime) { super.updateWriteMetrics(event, writeTime); ShuffleServerMetrics.counterTotalLocalFileWriteDataSize - .labels(event.getUnderStorage().getStoragePath()) + .labels(ShuffleServerMetrics.LOCAL_DISK_PATH_LABEL_ALL) .inc(event.getSize()); + if (event.getUnderStorage() != null) { + ShuffleServerMetrics.counterTotalLocalFileWriteDataSize + .labels(event.getUnderStorage().getStoragePath()) + .inc(event.getSize()); + } } @Override