Skip to content

Commit

Permalink
refactor fast init with Guava ListenableFuture
Browse files Browse the repository at this point in the history
  • Loading branch information
LucasMLK committed Dec 8, 2022
1 parent 31f9f83 commit 5bd5e98
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 18 deletions.
23 changes: 23 additions & 0 deletions src/main/java/io/xdag/crypto/randomx/RandomXFlag.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,26 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2022-2030 The XdagJ Developers
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package io.xdag.crypto.randomx;

import lombok.ToString;
Expand Down
66 changes: 49 additions & 17 deletions src/main/java/io/xdag/crypto/randomx/RandomXWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,24 @@
*/
package io.xdag.crypto.randomx;

import java.util.ArrayList;
import java.util.Arrays;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.sun.jna.Memory;
import com.sun.jna.NativeLong;
import com.sun.jna.Pointer;
import com.sun.jna.ptr.PointerByReference;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import lombok.Builder;
import lombok.ToString;
import org.apache.commons.lang3.time.StopWatch;

@Builder
@ToString
Expand All @@ -44,13 +51,14 @@ public final class RandomXWrapper {

final ArrayList<RandomXVM> vms = Lists.newArrayList();

boolean fastInit;
private boolean fastInit;

private Pointer memory;
private int keySize;

private int flagsValue;
private final ArrayList<RandomXFlag> flags;
private boolean isDebug;

/**
* Initialize randomX cache or dataset for a specific key
Expand Down Expand Up @@ -124,29 +132,54 @@ private void setDataset(byte[] key) {
* If fastInit enabled use all cores to create the dataset
* by equally distributing work between them
*/
ArrayList<Thread> threads = Lists.newArrayList();

int threadCount = Runtime.getRuntime().availableProcessors();
long perThread = RandomXJNA.INSTANCE.randomx_dataset_item_count().longValue() / threadCount;
long remainder = RandomXJNA.INSTANCE.randomx_dataset_item_count().longValue() % threadCount;

ExecutorService pool = Executors.newFixedThreadPool(threadCount);
ListeningExecutorService service = MoreExecutors.listeningDecorator(pool);
List<ListenableFuture<Long>> taskList = Lists.newArrayList();

long startItem = 0;
for (int i = 0; i < threadCount; ++i) {
long count = perThread + (i == threadCount - 1 ? remainder : 0);
long start = startItem;
Thread thread = new Thread(() -> RandomXJNA.INSTANCE.randomx_init_dataset(newDataset, cache, new NativeLong(start), new NativeLong(count)));
thread.start();
threads.add(thread);
ListenableFuture<Long> future = service.submit(() -> {
StopWatch watch = StopWatch.createStarted();
RandomXJNA.INSTANCE.randomx_init_dataset(newDataset, cache, new NativeLong(start), new NativeLong(count));
watch.stop();
return watch.getTime();
});
taskList.add(future);
startItem += count;
}



//wait for every thread to terminate execution (ie: dataset is initialised)
for(Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
ListenableFuture<List<Long>> listFuture = Futures.successfulAsList(taskList);
if(isDebug) {
Futures.addCallback(listFuture, new FutureCallback<>() {
@Override
public void onSuccess(List<Long> result) {
for (Long cost: result) {
System.out.println("cost:" + cost);
}
}

@Override
public void onFailure(Throwable t) {
System.err.println(t.getMessage());
}
}, pool);

}
try {
listFuture.get();
} catch (Exception e) {
e.printStackTrace();
} finally {
pool.shutdown();
}

} else {
Expand Down Expand Up @@ -185,7 +218,6 @@ public void changeKey(byte[] key) {
}
}
}

}

/**
Expand Down
3 changes: 2 additions & 1 deletion src/test/java/io/xdag/crypto/randomx/RandomXWrapperTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ public void testInit() {
byte[] key2Bytes = key2.getBytes(StandardCharsets.UTF_8);

RandomXWrapper randomXWrapper = RandomXWrapper.builder()
.flags(Lists.newArrayList(RandomXFlag.JIT))
.flags(Lists.newArrayList(RandomXFlag.JIT, RandomXFlag.FULL_MEM))
.fastInit(true)
.isDebug(true)
.build();
randomXWrapper.init(key1Bytes);
RandomXVM randomxVm = randomXWrapper.createVM();
Expand Down

0 comments on commit 5bd5e98

Please sign in to comment.