Skip to content
This repository has been archived by the owner on Jun 18, 2021. It is now read-only.

Commit

Permalink
Merge #214
Browse files Browse the repository at this point in the history
214: Rewrite GpuFuture to avoid blocking and to use less space r=kvark a=lachlansneff

Since `GpuFuture` doesn't blocking wait for the mapping to resolve anymore, we need to poll the device for it to actually work. ~~I haven't added that to the `hello-compute` example, so it doesn't work anymore.~~

Co-authored-by: Lachlan Sneff <lachlan.sneff@gmail.com>
  • Loading branch information
bors[bot] and lachlansneff committed Mar 27, 2020
2 parents 5f24202 + ea6bcd0 commit 6fd4bf2
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 87 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -45,6 +45,7 @@ rev = "08e8d406c175579da5ef18c1abf4d6c00e2a9726"
arrayvec = "0.5"
smallvec = "1"
raw-window-handle = "0.3"
parking_lot = "0.10"

[dev-dependencies]
cgmath = "0.17"
Expand Down
17 changes: 11 additions & 6 deletions examples/capture/main.rs
Expand Up @@ -5,8 +5,6 @@ use std::fs::File;
use std::mem::size_of;

async fn run() {
env_logger::init();

let adapter = wgpu::Adapter::request(
&wgpu::RequestAdapterOptions {
power_preference: wgpu::PowerPreference::Default,
Expand Down Expand Up @@ -87,8 +85,16 @@ async fn run() {

queue.submit(&[command_buffer]);

// Note that we're not calling `.await` here.
let buffer_future = output_buffer.map_read(0, (size * size) as u64 * size_of::<u32>() as u64);

// Poll the device in a blocking manner so that our future resolves.
// In an actual application, `device.poll(...)` should
// be called in an event loop or on another thread.
device.poll(wgpu::Maintain::Wait);

// Write the buffer as a PNG
if let Ok(mapping) = output_buffer.map_read(0u64, (size * size) as u64 * size_of::<u32>() as u64).await {
if let Ok(mapping) = buffer_future.await {
let mut png_encoder = png::Encoder::new(File::create("red.png").unwrap(), size, size);
png_encoder.set_depth(png::BitDepth::Eight);
png_encoder.set_color(png::ColorType::RGBA);
Expand All @@ -98,11 +104,10 @@ async fn run() {
.write_image_data(mapping.as_slice())
.unwrap();
}

// The device will be polled when it is dropped but we can also poll it explicitly
device.poll(true);
}

fn main() {
env_logger::init();

futures::executor::block_on(run());
}
13 changes: 12 additions & 1 deletion examples/hello-compute/main.rs
Expand Up @@ -13,6 +13,7 @@ async fn run() {
.collect()
};

// To see the output, run `RUST_LOG=info cargo run --example hello-compute`.
log::info!("Times: {:?}", execute_gpu(numbers).await);
}

Expand Down Expand Up @@ -98,7 +99,16 @@ async fn execute_gpu(numbers: Vec<u32>) -> Vec<u32> {
encoder.copy_buffer_to_buffer(&storage_buffer, 0, &staging_buffer, 0, size);

queue.submit(&[encoder.finish()]);
if let Ok(mapping) = staging_buffer.map_read(0u64, size).await {

// Note that we're not calling `.await` here.
let buffer_future = staging_buffer.map_read(0, size);

// Poll the device in a blocking manner so that our future resolves.
// In an actual application, `device.poll(...)` should
// be called in an event loop or on another thread.
device.poll(wgpu::Maintain::Wait);

if let Ok(mapping) = buffer_future.await {
mapping
.as_slice()
.chunks_exact(4)
Expand All @@ -111,6 +121,7 @@ async fn execute_gpu(numbers: Vec<u32>) -> Vec<u32> {

fn main() {
env_logger::init();

futures::executor::block_on(run());
}

Expand Down
94 changes: 58 additions & 36 deletions src/backend/native_gpu_future.rs
@@ -1,72 +1,94 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::task::{Context, Poll, Waker};
use parking_lot::Mutex;
use crate::BufferAddress;

struct GpuFutureInner<T> {
id: wgc::id::DeviceId,
result: Option<T>,
waker: Option<Waker>,
enum WakerOrResult<T> {
Waker(Waker),
Result(T),
}

/// A Future that can poll the wgpu::Device
pub struct GpuFuture<T> {
inner: Arc<Mutex<GpuFutureInner<T>>>,
data: Arc<Data<T>>,
}

pub enum OpaqueData {}

struct Data<T> {
buffer_id: wgc::id::BufferId,
size: BufferAddress,
waker_or_result: Mutex<Option<WakerOrResult<T>>>,
}

/// A completion handle to set the result on a GpuFuture
pub struct GpuFutureCompletion<T> {
inner: Arc<Mutex<GpuFutureInner<T>>>,
data: Arc<Data<T>>,
}

impl<T> Future for GpuFuture<T>
{
type Output = T;

fn poll(self: Pin<&mut Self>, context: &mut Context) -> Poll<Self::Output> {
// grab a clone of the Arc
let arc = Arc::clone(&self.get_mut().inner);
let mut waker_or_result = self.into_ref().get_ref().data.waker_or_result.lock();

// grab the device id and set the waker, but release the lock, so that the native callback can write to it
let device_id = {
let mut inner = arc.lock().unwrap();
inner.waker.replace(context.waker().clone());
inner.id
};

// polling the device should trigger the callback
wgn::wgpu_device_poll(device_id, true);

// now take the lock again, and check whether the future is complete
let mut inner = arc.lock().unwrap();
match inner.result.take() {
Some(value) => Poll::Ready(value),
_ => Poll::Pending,
match waker_or_result.take() {
Some(WakerOrResult::Result(res)) => Poll::Ready(res),
_ => {
*waker_or_result = Some(WakerOrResult::Waker(context.waker().clone()));
Poll::Pending
}
}
}
}

impl<T> GpuFutureCompletion<T> {
pub fn complete(self, value: T) {
let mut inner = self.inner.lock().unwrap();
inner.result.replace(value);
if let Some(waker) = &inner.waker {
waker.wake_by_ref();
let mut waker_or_result = self.data.waker_or_result.lock();

match waker_or_result.replace(WakerOrResult::Result(value)) {
Some(WakerOrResult::Waker(waker)) => waker.wake(),
None => {}
Some(WakerOrResult::Result(_)) => {
// Drop before panicking. Not sure if this is necessary, but it makes me feel better.
drop(waker_or_result);
unreachable!()
},
};
}

pub(crate) fn to_raw(self) -> *mut OpaqueData {
Arc::into_raw(self.data) as _
}

pub(crate) unsafe fn from_raw(this: *mut OpaqueData) -> Self {
Self {
data: Arc::from_raw(this as _)
}
}

pub(crate) fn get_buffer_info(&self) -> (wgc::id::BufferId, BufferAddress) {
(self.data.buffer_id, self.data.size)
}
}

pub(crate) fn new_gpu_future<T>(id: wgc::id::DeviceId) -> (GpuFuture<T>, GpuFutureCompletion<T>) {
let inner = Arc::new(Mutex::new(GpuFutureInner {
id,
result: None,
waker: None,
}));
pub(crate) fn new_gpu_future<T>(
buffer_id: wgc::id::BufferId,
size: BufferAddress,
) -> (GpuFuture<T>, GpuFutureCompletion<T>) {
let data = Arc::new(Data {
buffer_id,
size,
waker_or_result: Mutex::new(None),
});

(
GpuFuture {
inner: inner.clone(),
data: Arc::clone(&data),
},
GpuFutureCompletion { inner },
GpuFutureCompletion { data },
)
}
100 changes: 56 additions & 44 deletions src/lib.rs
Expand Up @@ -12,6 +12,7 @@ use smallvec::SmallVec;
use std::{
ffi::CString,
ops::Range,
future::Future,
ptr,
slice,
thread,
Expand Down Expand Up @@ -55,6 +56,14 @@ pub struct Device {
temp: Temp,
}

/// This is passed to `Device::poll` to control whether
/// it should block or not.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum Maintain {
Wait,
Poll,
}

/// A handle to a GPU-accessible buffer.
#[derive(Debug, PartialEq)]
pub struct Buffer {
Expand Down Expand Up @@ -536,8 +545,11 @@ impl Adapter {

impl Device {
/// Check for resource cleanups and mapping callbacks.
pub fn poll(&self, force_wait: bool) {
wgn::wgpu_device_poll(self.id, force_wait);
pub fn poll(&self, maintain: Maintain) {
wgn::wgpu_device_poll(self.id, match maintain {
Maintain::Poll => false,
Maintain::Wait => true,
});
}

/// Creates a shader module from SPIR-V source code.
Expand Down Expand Up @@ -889,99 +901,99 @@ impl Drop for BufferWriteMapping {
}
}

struct BufferMapReadFutureUserData
{
size: BufferAddress,
completion: native_gpu_future::GpuFutureCompletion<Result<BufferReadMapping, BufferAsyncErr>>,
buffer_id: wgc::id::BufferId,
}

struct BufferMapWriteFutureUserData
{
size: BufferAddress,
completion: native_gpu_future::GpuFutureCompletion<Result<BufferWriteMapping, BufferAsyncErr>>,
buffer_id: wgc::id::BufferId,
}

impl Buffer {
/// Map the buffer for reading. The result is returned in a future.
pub async fn map_read(&self, start: BufferAddress, size: BufferAddress) -> Result<BufferReadMapping, BufferAsyncErr>
///
/// For the future to complete, `device.poll(...)` must be called elsewhere in the runtime, possibly integrated
/// into an event loop, run on a separate thread, or continually polled in the same task runtime that this
/// future will be run on.
///
/// It's expected that wgpu will eventually supply its own event loop infrastructure that will be easy to integrate
/// into other event loops, like winit's.
pub fn map_read(&self, start: BufferAddress, size: BufferAddress) -> impl Future<Output = Result<BufferReadMapping, BufferAsyncErr>>
{
let (future, completion) = native_gpu_future::new_gpu_future(self.device_id);
let (future, completion) = native_gpu_future::new_gpu_future(
self.id,
size,
);

extern "C" fn buffer_map_read_future_wrapper(
status: wgc::resource::BufferMapAsyncStatus,
data: *const u8,
user_data: *mut u8,
)
{
let user_data =
unsafe { Box::from_raw(user_data as *mut BufferMapReadFutureUserData) };
let completion = unsafe {
native_gpu_future::GpuFutureCompletion::from_raw(user_data as _)
};
let (buffer_id, size) = completion.get_buffer_info();

if let wgc::resource::BufferMapAsyncStatus::Success = status {
user_data.completion.complete(Ok(BufferReadMapping {
completion.complete(Ok(BufferReadMapping {
data,
size: user_data.size as usize,
buffer_id: user_data.buffer_id,
size: size as usize,
buffer_id,
}));
} else {
user_data.completion.complete(Err(BufferAsyncErr));
completion.complete(Err(BufferAsyncErr));
}
}

let user_data = Box::new(BufferMapReadFutureUserData {
size,
completion,
buffer_id: self.id,
});
wgn::wgpu_buffer_map_read_async(
self.id,
start,
size,
buffer_map_read_future_wrapper,
Box::into_raw(user_data) as *mut u8,
completion.to_raw() as _,
);

future.await
future
}

/// Map the buffer for writing. The result is returned in a future.
pub async fn map_write(&self, start: BufferAddress, size: BufferAddress) -> Result<BufferWriteMapping, BufferAsyncErr>
///
/// See the documentation of (map_read)[#method.map_read] for more information about
/// how to run this future.
pub fn map_write(&self, start: BufferAddress, size: BufferAddress) -> impl Future<Output = Result<BufferWriteMapping, BufferAsyncErr>>
{
let (future, completion) = native_gpu_future::new_gpu_future(self.device_id);
let (future, completion) = native_gpu_future::new_gpu_future(
self.id,
size,
);

extern "C" fn buffer_map_write_future_wrapper(
status: wgc::resource::BufferMapAsyncStatus,
data: *mut u8,
user_data: *mut u8,
)
{
let user_data =
unsafe { Box::from_raw(user_data as *mut BufferMapWriteFutureUserData) };
let completion = unsafe {
native_gpu_future::GpuFutureCompletion::from_raw(user_data as _)
};
let (buffer_id, size) = completion.get_buffer_info();

if let wgc::resource::BufferMapAsyncStatus::Success = status {
user_data.completion.complete(Ok(BufferWriteMapping {
completion.complete(Ok(BufferWriteMapping {
data,
size: user_data.size as usize,
buffer_id: user_data.buffer_id,
size: size as usize,
buffer_id,
}));
} else {
user_data.completion.complete(Err(BufferAsyncErr));
completion.complete(Err(BufferAsyncErr));
}
}

let user_data = Box::new(BufferMapWriteFutureUserData {
size,
completion,
buffer_id: self.id,
});
wgn::wgpu_buffer_map_write_async(
self.id,
start,
size,
buffer_map_write_future_wrapper,
Box::into_raw(user_data) as *mut u8,
completion.to_raw() as _,
);

future.await
future
}

/// Flushes any pending write operations and unmaps the buffer from host memory.
Expand Down

0 comments on commit 6fd4bf2

Please sign in to comment.