Skip to content

Commit

Permalink
[FOLLOWUP] Delete hdfs shuffle data files using proxy user (#170)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Delete shuffle data files stored on secured hdfs using proxy user

### Why are the changes needed?
In previous PR #53, we introduce the proxy user for shuffle server to write shuffle data to HDFS. 
But I ignored that the shuffle server should also delete these files by the way of proxy user.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manual tests on kerberos HDFS cluster, and unit tests.
  • Loading branch information
zuston committed Aug 22, 2022
1 parent abb9215 commit 270e2ad
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 22 deletions.
10 changes: 10 additions & 0 deletions server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,16 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minikdc</artifactId>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,10 @@ public void removeResources(String appId) {
partitionsToBlockIds.remove(appId);
shuffleBufferManager.removeBuffer(appId);
shuffleFlushManager.removeResources(appId);
appUserMap.remove(appId);
if (!shuffleToCachedBlockIds.isEmpty()) {
storageManager.removeResources(appId, shuffleToCachedBlockIds.keySet());
storageManager.removeResources(appId, shuffleToCachedBlockIds.keySet(), appUserMap.get(appId));
}
appUserMap.remove(appId);
shuffleTaskInfos.remove(appId);
LOG.info("Finish remove resource for appId[" + appId + "] cost " + (System.currentTimeMillis() - start) + " ms");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ public Storage selectStorage(ShuffleDataReadEvent event) {
}

@Override
public void removeResources(String appId, Set<Integer> shuffleSet) {
public void removeResources(String appId, Set<Integer> shuffleSet, String user) {
HdfsStorage storage = getStorageByAppId(appId);
if (storage != null) {
storage.removeHandlers(appId);
appIdToStorages.remove(appId);
ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory.getInstance()
.createShuffleDeleteHandler(
new CreateShuffleDeleteHandlerRequest(StorageType.HDFS.name(), storage.getConf()));
deleteHandler.delete(new String[] {storage.getStoragePath()}, appId);
deleteHandler.delete(new String[] {storage.getStoragePath()}, appId, user);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public Checker getStorageChecker() {
}

@Override
public void removeResources(String appId, Set<Integer> shuffleSet) {
public void removeResources(String appId, Set<Integer> shuffleSet, String user) {
for (LocalStorage storage : localStorages) {
for (Integer shuffleId : shuffleSet) {
storage.removeHandlers(appId);
Expand All @@ -182,7 +182,7 @@ public void removeResources(String appId, Set<Integer> shuffleSet) {
ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory.getInstance()
.createShuffleDeleteHandler(
new CreateShuffleDeleteHandlerRequest(StorageType.LOCALFILE.name(), new Configuration()));
deleteHandler.delete(storageBasePaths.toArray(new String[storageBasePaths.size()]), appId);
deleteHandler.delete(storageBasePaths.toArray(new String[storageBasePaths.size()]), appId, user);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ public Checker getStorageChecker() {
}

@Override
public void removeResources(String appId, Set<Integer> shuffleSet) {
public void removeResources(String appId, Set<Integer> shuffleSet, String user) {
LOG.info("Start to remove resource of appId: {}, shuffles: {}", appId, shuffleSet.toString());
warmStorageManager.removeResources(appId, shuffleSet);
coldStorageManager.removeResources(appId, shuffleSet);
warmStorageManager.removeResources(appId, shuffleSet, user);
coldStorageManager.removeResources(appId, shuffleSet, user);
}

public StorageManager getColdStorageManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public interface StorageManager {

// todo: add an interface for updateReadMetrics

void removeResources(String appId, Set<Integer> shuffleSet);
void removeResources(String appId, Set<Integer> shuffleSet, String user);

void start();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.server;

import java.io.FileNotFoundException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import com.google.common.collect.Sets;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.KerberizedHdfsBase;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.storage.HdfsStorageManager;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.server.storage.StorageManagerFactory;
import org.apache.uniffle.storage.common.AbstractStorage;
import org.apache.uniffle.storage.util.StorageType;

import static org.apache.uniffle.server.ShuffleFlushManagerTest.createShuffleDataFlushEvent;
import static org.apache.uniffle.server.ShuffleFlushManagerTest.waitForFlush;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ShuffleFlushManagerOnKerberizedHdfsTest extends KerberizedHdfsBase {
private static final Logger LOGGER = LoggerFactory.getLogger(ShuffleFlushManagerOnKerberizedHdfsTest.class);

private ShuffleServerConf shuffleServerConf = new ShuffleServerConf();

private static RemoteStorageInfo remoteStorage;
private static ShuffleServer mockShuffleServer = mock(ShuffleServer.class);

@BeforeEach
public void prepare() throws Exception {
ShuffleServerMetrics.register();
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, Collections.emptyList());
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
LogManager.getRootLogger().setLevel(Level.INFO);

initHadoopSecurityContext();
}

@AfterEach
public void afterEach() {
ShuffleServerMetrics.clear();
}

@BeforeAll
public static void beforeAll() throws Exception {
testRunner = ShuffleFlushManagerOnKerberizedHdfsTest.class;
KerberizedHdfsBase.init();

ShuffleTaskManager shuffleTaskManager = mock(ShuffleTaskManager.class);
ShuffleBufferManager shuffleBufferManager = mock(ShuffleBufferManager.class);

when(mockShuffleServer.getShuffleTaskManager()).thenReturn(shuffleTaskManager);
when(mockShuffleServer.getShuffleBufferManager()).thenReturn(shuffleBufferManager);

String storedPath = kerberizedHdfs.getSchemeAndAuthorityPrefix() + "/alex/rss-data/";
Map<String, String> confMap = new HashMap<>();
for (Map.Entry<String, String> entry : kerberizedHdfs.getConf()) {
confMap.put(entry.getKey(), entry.getValue());
}
remoteStorage = new RemoteStorageInfo(
storedPath, confMap
);
}

@Test
public void clearTest() throws Exception {
String appId1 = "complexWriteTest_appId1";
String appId2 = "complexWriteTest_appId2";

when(mockShuffleServer.getShuffleTaskManager().getUserByAppId(appId1)).thenReturn("alex");
when(mockShuffleServer.getShuffleTaskManager().getUserByAppId(appId2)).thenReturn("alex");

StorageManager storageManager =
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
storageManager.registerRemoteStorage(appId1, remoteStorage);
storageManager.registerRemoteStorage(appId2, remoteStorage);
ShuffleFlushManager manager =
new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", mockShuffleServer, storageManager);
ShuffleDataFlushEvent event1 =
createShuffleDataFlushEvent(appId1, 1, 0, 1, null);
manager.addToFlushQueue(event1);
ShuffleDataFlushEvent event2 =
createShuffleDataFlushEvent(appId2, 1, 0, 1, null);
manager.addToFlushQueue(event2);
waitForFlush(manager, appId1, 1, 5);
waitForFlush(manager, appId2, 1, 5);
AbstractStorage storage = (AbstractStorage) storageManager.selectStorage(event1);
assertEquals(5, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(storageManager.selectStorage(event1), storageManager.selectStorage(event2));
int size = storage.getHandlerSize();
assertEquals(2, size);

FileStatus[] fileStatus = kerberizedHdfs.getFileSystem()
.listStatus(new Path(remoteStorage.getPath() + "/" + appId1 + "/"));
for (FileStatus fileState : fileStatus) {
assertEquals("alex", fileState.getOwner());
}
assertTrue(fileStatus.length > 0);
manager.removeResources(appId1);

assertTrue(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId1));
storageManager.removeResources(appId1, Sets.newHashSet(1), "alex");
assertFalse(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId1));
try {
kerberizedHdfs.getFileSystem().listStatus(new Path(remoteStorage.getPath() + "/" + appId1 + "/"));
fail("Exception should be thrown");
} catch (FileNotFoundException fnfe) {
// expected exception
}

assertTrue(kerberizedHdfs.getFileSystem().exists(new Path(remoteStorage.getPath())));

assertEquals(0, manager.getCommittedBlockIds(appId1, 1).getLongCardinality());
assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
size = storage.getHandlerSize();
assertEquals(1, size);
manager.removeResources(appId2);
assertTrue(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId2));
storageManager.removeResources(appId2, Sets.newHashSet(1), "alex");
assertFalse(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId2));
assertEquals(0, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
size = storage.getHandlerSize();
assertEquals(0, size);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.prometheus.client.Gauge;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -225,7 +226,7 @@ public void clearTest() throws Exception {
manager.removeResources(appId1);

assertTrue(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId1));
storageManager.removeResources(appId1, Sets.newHashSet(1));
storageManager.removeResources(appId1, Sets.newHashSet(1), StringUtils.EMPTY);
assertFalse(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId1));
try {
fs.listStatus(new Path(remoteStorage.getPath() + "/" + appId1 + "/"));
Expand All @@ -240,7 +241,7 @@ public void clearTest() throws Exception {
assertEquals(1, size);
manager.removeResources(appId2);
assertTrue(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId2));
storageManager.removeResources(appId2, Sets.newHashSet(1));
storageManager.removeResources(appId2, Sets.newHashSet(1), StringUtils.EMPTY);
assertFalse(((HdfsStorageManager)storageManager).getAppIdToStorages().containsKey(appId2));
assertEquals(0, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
size = storage.getHandlerSize();
Expand Down Expand Up @@ -274,7 +275,7 @@ public void clearLocalTest(@TempDir File tempDir) throws Exception {
assertEquals(2, storage.getHandlerSize());
File file = new File(tempDir, appId1);
assertTrue(file.exists());
storageManager.removeResources(appId1, Sets.newHashSet(1));
storageManager.removeResources(appId1, Sets.newHashSet(1), StringUtils.EMPTY);
manager.removeResources(appId1);
assertFalse(file.exists());
ShuffleDataFlushEvent event3 =
Expand All @@ -285,7 +286,7 @@ public void clearLocalTest(@TempDir File tempDir) throws Exception {
assertEquals(5, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(1, storage.getHandlerSize());
manager.removeResources(appId2);
storageManager.removeResources(appId2, Sets.newHashSet(1));
storageManager.removeResources(appId2, Sets.newHashSet(1), StringUtils.EMPTY);
assertEquals(0, manager.getCommittedBlockIds(appId2, 1).getLongCardinality());
assertEquals(0, storage.getHandlerSize());
}
Expand Down Expand Up @@ -321,7 +322,7 @@ private void waitForQueueClear(ShuffleFlushManager manager) throws Exception {
} while (size > 0);
}

private void waitForFlush(ShuffleFlushManager manager,
public static void waitForFlush(ShuffleFlushManager manager,
String appId, int shuffleId, int expectedBlockNum) throws Exception {
int retry = 0;
int size = 0;
Expand All @@ -335,14 +336,14 @@ private void waitForFlush(ShuffleFlushManager manager,
} while (size < expectedBlockNum);
}

private ShuffleDataFlushEvent createShuffleDataFlushEvent(
public static ShuffleDataFlushEvent createShuffleDataFlushEvent(
String appId, int shuffleId, int startPartition, int endPartition, Supplier<Boolean> isValid) {
List<ShufflePartitionedBlock> spbs = createBlock(5, 32);
return new ShuffleDataFlushEvent(ATOMIC_LONG.getAndIncrement(),
appId, shuffleId, startPartition, endPartition, 1, spbs, isValid, null);
}

private List<ShufflePartitionedBlock> createBlock(int num, int length) {
public static List<ShufflePartitionedBlock> createBlock(int num, int length) {
List<ShufflePartitionedBlock> blocks = Lists.newArrayList();
for (int i = 0; i < num; i++) {
byte[] buf = new byte[length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ public interface ShuffleDeleteHandler {
*
* @param appId ApplicationId for delete
*/
void delete(String[] storageBasePaths, String appId);
void delete(String[] storageBasePaths, String appId, String user);
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ public HdfsShuffleDeleteHandler(Configuration hadoopConf) {
}

@Override
public void delete(String[] storageBasePaths, String appId) {
public void delete(String[] storageBasePaths, String appId, String user) {
Path path = new Path(ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePaths[0], appId));
boolean isSuccess = false;
int times = 0;
int retryMax = 5;
long start = System.currentTimeMillis();
LOG.info("Try delete shuffle data in HDFS for appId[" + appId + "] with " + path);
LOG.info("Try delete shuffle data in HDFS for appId[{}] of user[{}] with {}",appId, user, path);
while (!isSuccess && times < retryMax) {
try {
FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(path, hadoopConf);
FileSystem fileSystem = HadoopFilesystemProvider.getFilesystem(user, path, hadoopConf);
fileSystem.delete(path, true);
isSuccess = true;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class LocalFileDeleteHandler implements ShuffleDeleteHandler {
private static final Logger LOG = LoggerFactory.getLogger(LocalFileDeleteHandler.class);

@Override
public void delete(String[] storageBasePaths, String appId) {
public void delete(String[] storageBasePaths, String appId, String user) {
for (String basePath : storageBasePaths) {
String shufflePath = ShuffleStorageUtils.getFullShuffleDataFolder(basePath, appId);
long start = System.currentTimeMillis();
Expand Down

0 comments on commit 270e2ad

Please sign in to comment.