Skip to content
This repository has been archived by the owner on Apr 17, 2024. It is now read-only.

Commit

Permalink
Add shuffle block removing operation within one Spark context
Browse files Browse the repository at this point in the history
  • Loading branch information
Eugene-Mark committed Oct 21, 2020
1 parent d2e26ee commit b911390
Show file tree
Hide file tree
Showing 8 changed files with 374 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

public class PersistentMemoryPool {
static {
System.load("/usr/local/lib/libjnipmdk.so");
System.load("/usr/local/lib/libjnipmdk.so");
}
private static native long nativeOpenDevice(String path, long size);
private static native void nativeSetBlock(long deviceHandler, String key, ByteBuffer byteBuffer, int size, boolean clean);
Expand All @@ -13,6 +13,7 @@ public class PersistentMemoryPool {
private static native void nativeDeleteBlock(long deviceHandler, String key);
private static native long nativeGetRoot(long deviceHandler);
private static native int nativeCloseDevice(long deviceHandler);
private static native long nativeRemoveBlock(long deviceHandler, String key);

private static final long DEFAULT_PMPOOL_SIZE = 0L;

Expand Down Expand Up @@ -41,6 +42,10 @@ public void deletePartition(String key) {
nativeDeleteBlock(this.deviceHandler, key);
}

public long removeBlock(String key) {
return nativeRemoveBlock(this.deviceHandler, key);
}

public long getRootAddr() {
return nativeGetRoot(this.deviceHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,14 @@ private[spark] class PmemShuffleBlockResolver(
PersistentMemoryHandler.stop()
super.stop()
}

override def removeDataByMap(shuffleId: Int, mapId: Int): Unit ={
val partitionNumber = conf.get("spark.sql.shuffle.partitions")
val persistentMemoryHandler = PersistentMemoryHandler.getPersistentMemoryHandler

for (i <- 0 to partitionNumber.toInt){
val key = "shuffle_" + shuffleId + "_" + mapId + "_" + i
persistentMemoryHandler.removeBlock(key)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ private[spark] class PersistentMemoryHandler(
pmpool.deletePartition(blockId)
}

def removeBlock(blockId: String): Long = {
pmpool.removeBlock(blockId)
}

def getPartitionManagedBuffer(blockId: String): ManagedBuffer = {
new PmemManagedBuffer(this, blockId)
}
Expand Down
174 changes: 172 additions & 2 deletions native/src/010-TestCasePersistentMemoryPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ int test_multithread_put(uint64_t index, pmemkv* kv) {
return 0;
}

int test_multithread_remove(uint64_t index, pmemkv* kv) {
std::string key = std::to_string(index);
kv->remove(key);
return 0;
}

TEST_CASE( "PmemBuffer operations", "[PmemBuffer]" ) {
char data[LENGTH] = {};
memset(data, 'a', LENGTH);
Expand Down Expand Up @@ -52,7 +58,8 @@ TEST_CASE( "PmemBuffer operations", "[PmemBuffer]" ) {
for (char c = 'a'; c < 'f'; c++) {
memset(data + (c - 'a') * 3, c, 3);
}
/*data should be "aaabbbcccdddeee"*/

//data should be "aaabbbcccdddeee"
buf.write(data, 15);

char* firstTime = buf.getDataForFlush(buf.getRemaining());
Expand All @@ -66,7 +73,9 @@ TEST_CASE( "PmemBuffer operations", "[PmemBuffer]" ) {
}
}


TEST_CASE("pmemkv operations", "[pmemkv]") {

SECTION("test open and close") {
std::string key = "1";
pmemkv* kv = new pmemkv("/dev/dax0.0");
Expand Down Expand Up @@ -100,14 +109,175 @@ TEST_CASE("pmemkv operations", "[pmemkv]") {
uint64_t value_size = 0;
for (int i = 0; i < size; i++) {
value_size += mm->meta[i*2+1];
}
}
REQUIRE(value_size == 11);
std::free(mm->meta);
std::free(mm);
kv->free_all();
delete kv;
}

SECTION("test remove element from an empty list"){
std::string key = "remove-element-from-empty-list";
pmemkv* kv = new pmemkv("/dev/dax0.0");
int result = kv->remove(key);
REQUIRE(result == -1);
kv->free_all();
delete kv;
}


SECTION("test remove an non-existed element"){
std::string key = "key";
pmemkv* kv = new pmemkv("/dev/dax0.0");
int length = 5;
kv->put(key, "first", length);
string non_existed_key = "non-exist-key";
int result = kv->remove(non_existed_key);
REQUIRE(result == -1);
kv->free_all();
delete kv;
}

SECTION("test remove an element from a single node list"){
std::string key = "key-single";
pmemkv* kv = new pmemkv("/dev/dax0.0");
int length = 5;
kv->put(key, "first", length);
std::cout<<"Before remove a single node, dump all: "<<std::endl;
kv->dump_all();
int result = kv->remove(key);
std::cout<<"After remove a single node, dump all: "<<std::endl;
kv->dump_all();
REQUIRE(result == 0);
kv->free_all();
delete kv;
}

SECTION("test remove an element from a middle of a list"){
pmemkv* kv = new pmemkv("/dev/dax0.0");
std::string key1 = "first-key";
int length1 = 5 ;
kv->put(key1, "first", length1);

std::string key2 = "second-key";
int length2 = 6;
kv->put(key2, "second", length2);

std::string key3 = "third-key";
int length3 = 5;
kv->put(key3, "third", length3);

std::string key4 = "forth-key";
int length4 = 5;
kv->put(key4, "forth", length4);

kv->reverse_dump_all();
long r1 = kv->remove(key4);
std::cout<<"The forth is removed, dump:"<<std::endl;
kv->reverse_dump_all();

long r2 = kv->remove(key2);
std::cout<<"The second is removed, dump:"<<std::endl;
kv->reverse_dump_all();
long r3 = kv->remove(key1);
std::cout<<"The first is removed, dump:"<<std::endl;
kv->reverse_dump_all();
long r4 = kv->remove(key3);
std::cout<<"The third is removed, dump:"<<std::endl;
kv->reverse_dump_all();
REQUIRE((r1 + r2 + r3 + r4) == 0);

kv->free_all();
delete kv;
}

SECTION("test put and remove multiple elements with same key"){
pmemkv* kv = new pmemkv("/dev/dax0.0");
std::string key = "key-multiple-objects";
int size = 100;
int length = 5;
for(int i = 0; i < size; i++){
kv->put(key, "first", length);
}

int bytes_written = kv->getBytesWritten();
assert(bytes_written == size * length);
kv->dump_all();

kv->remove(key);
std::cout<<"The key with " << size << " objects is removed, dump:"<<std::endl;
kv->dump_all();

assert(kv->getBytesWritten() == 0);

kv->free_all();
delete kv;
}

SECTION("test multithreaded remove") {
std::vector<std::thread> threads;
pmemkv* kv = new pmemkv("/dev/dax0.0");
int size = 10;
for (uint64_t i = 0; i < size; i++) {
threads.emplace_back(test_multithread_put, i, kv);
}
for (uint64_t i = 0; i < size; i++) {
threads[i].join();
}

std::cout<<"mark1. kv->getBytesWritten()="<<kv->getBytesWritten()<<std::endl;
assert(kv->getBytesWritten() != 0);
kv->dump_all();

std::vector<std::thread> removeThreads;
for (uint64_t i = 0; i < size; i++) {
removeThreads.emplace_back(test_multithread_remove, i, kv);
}
for (uint64_t i = 0; i < size; i++) {
removeThreads[i].join();
}
std::cout<<"mark2. kv->getBytesWritten()="<<kv->getBytesWritten()<<std::endl;
assert(kv->getBytesWritten() == 0);
kv->dump_all();

kv->free_all();
delete kv;
threads.clear();
}

SECTION("test remove an element in specific sequence"){
pmemkv* kv = new pmemkv("/dev/dax0.0");
std::string key1 = "first-key";
int length1 = 5;
kv->put(key1, "first", length1);

std::string key2 = "second-key";
int length2 = 6;
kv->put(key2, "second", length2);

std::string key3 = "third-key";
int length3 = 5;
kv->put(key3, "third", length3);

long r1 = kv->remove(key2);
std::cout<<"The second is removed, dump:"<<std::endl;
kv->dump_all();

long r2 = kv->remove(key3);
std::cout<<"The third is removed, dump:"<<std::endl;
kv->dump_all();

long r3 = kv->remove(key1);
std::cout<<"The first is removed, dump:"<<std::endl;
kv->dump_all();

REQUIRE((r1 + r2 + r3) == 0);

kv->free_all();
delete kv;
}

SECTION("test multithreaded put and get") {
std::vector<std::thread> threads;
pmemkv* kv = new pmemkv("/dev/dax0.0");
Expand Down
11 changes: 10 additions & 1 deletion native/src/lib_jni_pmdk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,16 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_
uint64_t value_size;
pmkv->get_value_size(key_str, &value_size);
return value_size;
}
}

JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeRemoveBlock
(JNIEnv *env, jclass obj, jlong kv, jstring key){
const char *CStr = env->GetStringUTFChars(key, 0);
string key_str(CStr);
pmemkv *pmkv = static_cast<pmemkv*>((void*)kv);
long result = pmkv->remove(key_str);
return result;
}

JNIEXPORT jint JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeCloseDevice
(JNIEnv *env, jclass obj, jlong kv) {
Expand Down
8 changes: 8 additions & 0 deletions native/src/lib_jni_pmdk.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file added native/src/libjnipmdk.so
Binary file not shown.

0 comments on commit b911390

Please sign in to comment.