Skip to content

Commit

Permalink
Adding CudaStream & LaunchAsync::par_launch_async (#82)
Browse files Browse the repository at this point in the history
* #1 Adding par_launch_async

* Changing to using CudaStream

* Adding additional details to docstring

* Adding unit tests

* Adding wait_event on new stream creation
  • Loading branch information
coreylowman committed Feb 27, 2023
1 parent 434cedd commit 69deb5d
Show file tree
Hide file tree
Showing 2 changed files with 205 additions and 11 deletions.
15 changes: 14 additions & 1 deletion src/cublas/safe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#![allow(clippy::too_many_arguments)]

use super::{result, result::CublasError, sys};
use crate::driver::{CudaDevice, DevicePtr, DevicePtrMut};
use crate::driver::{CudaDevice, CudaStream, DevicePtr, DevicePtrMut};
use core::ffi::{c_int, c_longlong};
use std::sync::Arc;

Expand Down Expand Up @@ -31,6 +31,19 @@ impl CudaBlas {
unsafe { result::set_stream(handle, blas.device.stream as *mut _) }?;
Ok(blas)
}

/// Sets the handle's current to either the stream specified, or the device's default work
/// stream.
///
/// # Safety
/// This is unsafe because you can end up scheduling multiple concurrent kernels that all
/// write to the same memory address.
pub unsafe fn set_stream(&self, opt_stream: Option<&CudaStream>) -> Result<(), CublasError> {
match opt_stream {
Some(s) => result::set_stream(self.handle, s.stream as *mut _),
None => result::set_stream(self.handle, self.device.stream as *mut _),
}
}
}

impl Drop for CudaBlas {
Expand Down
201 changes: 191 additions & 10 deletions src/driver/safe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ impl CudaDevice {

/// Synchronously copies device memory into host memory
///
/// Use [`sync_copy_into_vec`] if you need [`Vec<T>`] and can't provide
/// Use [`CudaDevice::sync_copy_into_vec`] if you need [`Vec<T>`] and can't provide
/// a correctly sized slice.
///
/// # Panics
Expand All @@ -670,7 +670,7 @@ impl CudaDevice {
}

/// Synchronously copies device memory into host memory.
/// Unlike [`sync_copy_from`] this returns a [`Vec<T>`].
/// Unlike [`CudaDevice::sync_copy_from`] this returns a [`Vec<T>`].
///
/// # Safety
/// 1. Since this function doesn't own `dst` (after returning) it is executed synchronously.
Expand Down Expand Up @@ -763,6 +763,77 @@ impl CudaDevice {
}
}

/// A wrapper around [sys::CUstream] that safely ensures null stream is synchronized
/// upon the completion of this streams work.
///
/// Create with [CudaDevice::auto_joining_stream].
///
/// The synchronization happens in **code order**. E.g.
/// ```ignore
/// let stream = dev.auto_joining_stream()?; // 0
/// dev.launch_async(...)?; // 1
/// dev.par_launch_async(&stream, ...)?; // 2
/// dev.launch_async(...)?; // 3
/// drop(stream); // 4
/// dev.launch_async(...) // 5
/// ```
///
/// - 0 will place a streamWaitEvent(default work stream) on the new stream
/// - 1 will launch on the default work stream
/// - 2 will launch concurrently to 1 on `&stream`,
/// - 3 will launch after 1 on the default work stream, but potentially concurrently to 2.
/// - 4 will place a streamWaitEvent(`&stream`) on default work stream
/// - 5 will happen on the default stream **after the default stream waits for 2**
#[derive(Debug)]
pub struct CudaStream {
pub stream: sys::CUstream,
device: Arc<CudaDevice>,
}

impl CudaDevice {
/// Allocates a new stream that can execute kernels concurrently to the default stream.
///
/// This stream synchronizes in the following way:
/// 1. On creation it adds a wait for any existing work on the default work stream to complete
/// 2. On drop it adds a wait for any existign work on Self to complete *to the default stream*.
pub fn auto_joining_stream(self: &Arc<Self>) -> Result<CudaStream, DriverError> {
let stream = result::stream::create(result::stream::StreamKind::NonBlocking)?;
unsafe {
result::event::record(self.event, self.stream)?;
result::stream::wait_event(
stream,
self.event,
sys::CUevent_wait_flags::CU_EVENT_WAIT_DEFAULT,
)?;
}
Ok(CudaStream {
stream,
device: self.clone(),
})
}

/// Forces [CudaStream] to drop, causing the default work stream to block on `streams` completion.
#[allow(unused_variables)]
pub fn join_async(self: &Arc<Self>, stream: CudaStream) -> Result<(), DriverError> {
Ok(())
}
}

impl Drop for CudaStream {
fn drop(&mut self) {
unsafe {
result::event::record(self.device.event, self.stream).unwrap();
result::stream::wait_event(
self.device.stream,
self.device.event,
sys::CUevent_wait_flags::CU_EVENT_WAIT_DEFAULT,
)
.unwrap();
result::stream::destroy(self.stream).unwrap();
}
}
}

/// Wrapper around [sys::CUmodule] that also contains
/// the loaded [CudaFunction] associated with this module.
///
Expand Down Expand Up @@ -801,6 +872,41 @@ pub struct CudaFunction {
unsafe impl Send for CudaFunction {}
unsafe impl Sync for CudaFunction {}

impl CudaFunction {
#[inline]
unsafe fn launch_async_impl(
self,
cfg: LaunchConfig,
params: &mut [*mut std::ffi::c_void],
) -> Result<(), DriverError> {
result::launch_kernel(
self.cu_function,
cfg.grid_dim,
cfg.block_dim,
cfg.shared_mem_bytes,
self.device.stream,
params,
)
}

#[inline]
unsafe fn par_launch_async_impl(
self,
stream: &CudaStream,
cfg: LaunchConfig,
params: &mut [*mut std::ffi::c_void],
) -> Result<(), DriverError> {
result::launch_kernel(
self.cu_function,
cfg.grid_dim,
cfg.block_dim,
cfg.shared_mem_bytes,
stream.stream,
params,
)
}
}

/// Configuration for [result::launch_kernel]
///
/// See [cuda docs](https://docs.nvidia.com/cuda/cuda-driver-api/group__CUDA__EXEC.html#group__CUDA__EXEC_1gb8f3dc3031b40da29d5f9a7139e52e15)
Expand Down Expand Up @@ -958,6 +1064,22 @@ pub unsafe trait LaunchAsync<Params> {
/// **If you launch a kernel or drop a value on a different stream
/// this may not hold**
unsafe fn launch_async(self, cfg: LaunchConfig, params: Params) -> Result<(), DriverError>;

/// Launch the function on a stream concurrent to the device's default
/// work stream.
///
/// # Safety
/// This method is even more unsafe than [LaunchAsync::launch_async], all the same rules apply,
/// except now things are executing in parallel to each other.
///
/// That means that if any of the kernels modify the same memory location, you'll get race
/// conditions or potentially undefined behavior.
unsafe fn par_launch_async(
self,
stream: &CudaStream,
cfg: LaunchConfig,
params: Params,
) -> Result<(), DriverError>;
}

macro_rules! impl_launch {
Expand All @@ -969,14 +1091,17 @@ unsafe impl<$($Vars: AsKernelParam),*> LaunchAsync<($($Vars, )*)> for CudaFuncti
args: ($($Vars, )*)
) -> Result<(), DriverError> {
let params = &mut [$(args.$Idx.as_kernel_param(), )*];
result::launch_kernel(
self.cu_function,
cfg.grid_dim,
cfg.block_dim,
cfg.shared_mem_bytes,
self.device.stream,
params,
)
self.launch_async_impl(cfg, params)
}

unsafe fn par_launch_async(
self,
stream: &CudaStream,
cfg: LaunchConfig,
args: ($($Vars, )*)
) -> Result<(), DriverError> {
let params = &mut [$(args.$Idx.as_kernel_param(), )*];
self.par_launch_async_impl(stream, cfg, params)
}
}
};
Expand Down Expand Up @@ -1284,6 +1409,8 @@ impl_tuples!(A, B, C, D, E, F, G, H, I, J, K, L);

#[cfg(test)]
mod tests {
use std::time::Instant;

use crate::nvrtc::compile_ptx_with_opts;

use super::*;
Expand Down Expand Up @@ -1689,4 +1816,58 @@ extern \"C\" __global__ void halfs(__half h) {
.unwrap();
dev.synchronize().unwrap();
}

const SLOW_KERNELS: &str = "
extern \"C\" __global__ void slow_worker(const float *data, const size_t len, float *out) {
float tmp = 0.0;
for(size_t i = 0; i < 1000000; i++) {
tmp += data[i % len];
}
*out = tmp;
}
";

#[test]
fn test_par_launch() -> Result<(), DriverError> {
let ptx = compile_ptx_with_opts(SLOW_KERNELS, Default::default()).unwrap();
let dev = CudaDeviceBuilder::new(0)
.with_ptx(ptx, "tests", &["slow_worker"])
.build()
.unwrap();
let slice = dev.alloc_zeros_async::<f32>(1000)?;
let mut a = dev.alloc_zeros_async::<f32>(1)?;
let mut b = dev.alloc_zeros_async::<f32>(1)?;
let cfg = LaunchConfig::for_num_elems(1);

let start = Instant::now();
{
// launch two kernels on the default stream
let f = dev.get_func("tests", "slow_worker").unwrap();
unsafe { f.launch_async(cfg, (&slice, slice.len(), &mut a))? };
let f = dev.get_func("tests", "slow_worker").unwrap();
unsafe { f.launch_async(cfg, (&slice, slice.len(), &mut b))? };
dev.synchronize()?;
}
let double_launch_s = start.elapsed().as_secs_f64();

let start = Instant::now();
{
// create a new stream & launch them concurrently
let stream = dev.auto_joining_stream()?;
let f = dev.get_func("tests", "slow_worker").unwrap();
unsafe { f.launch_async(cfg, (&slice, slice.len(), &mut a))? };
let f = dev.get_func("tests", "slow_worker").unwrap();
unsafe { f.par_launch_async(&stream, cfg, (&slice, slice.len(), &mut b))? };
dev.synchronize()?;
}
let par_launch_s = start.elapsed().as_secs_f64();

assert!(
(double_launch_s - 2.0 * par_launch_s).abs() < 20.0 / 1000.0,
"par={:?} dbl={:?}",
par_launch_s,
double_launch_s
);
Ok(())
}
}

0 comments on commit 69deb5d

Please sign in to comment.