Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,27 @@ jobs:
- name: Test native + approx
run: cargo nextest run -p ndarray --features native,approx

hpc-stream-parallel:
# D-CSV-17 (sprint-13 W-I4): rayon par_* variants for hpc::stream.
# This job co-ships with the par_* implementation so the rayon feature
# gate is always exercised alongside the code it guards — prevents
# silent-dead-code drift (spec §0 + worker-template-v2 §5 CI ownership).
runs-on: ubuntu-latest
name: hpc-stream-parallel/rayon
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@1.95.0
- uses: Swatinem/rust-cache@v2
- uses: taiki-e/install-action@nextest
- name: cargo check (no rayon — scalar path unchanged)
run: cargo check -p ndarray
- name: cargo check --features rayon
run: cargo check -p ndarray --features rayon
- name: par_* stream tests (--features rayon, hpc::stream filter)
run: cargo nextest run -p ndarray --features rayon -E 'test(hpc::stream)'
- name: clippy --features rayon
run: cargo clippy -p ndarray --features rayon --lib -- -D warnings

blas-msrv:
runs-on: ubuntu-latest
name: blas-msrv
Expand Down Expand Up @@ -247,6 +268,7 @@ jobs:
- nostd
- tests
- native-backend
- hpc-stream-parallel
- miri
- cross_test
- cargo-careful
Expand Down
110 changes: 109 additions & 1 deletion src/hpc/stream/inference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
//! over the inference-mantissa lane of the EdgeColumn SoA. Used by the
//! integer-SIMD MUL evaluation hot path (D-CSV-8 sprint-12 SIMD vec).
//!
//! Pure iterator scaffold; `par_inference_stream` rayon variant is sprint-13+.
//! Pure iterator scaffold; `par_inference_stream` rayon variant wired in
//! sprint-13 (D-CSV-17) behind `#[cfg(feature = "rayon")]`.

// Local mirror of CausalEdge64 shape (bit-compatible with causal_edge::CausalEdge64).
// No cross-crate import: ndarray is the producer; causal-edge is the consumer.
Expand Down Expand Up @@ -113,6 +114,35 @@ impl<'a> ExactSizeIterator for InferenceStream<'a> {
}
}

// ─── Rayon-parallel variant (D-CSV-17, sprint-13) ─────────────────────────────

#[cfg(feature = "rayon")]
use rayon::prelude::*;

/// Rayon-parallel forward-iterator over `&[InferenceRow]`.
///
/// Mirrors `par_qualia_stream` semantics. Returns `impl IndexedParallelIterator`
/// so callers retain `enumerate()`, `zip()`, and order-preserving `collect()`.
///
/// For `InferenceRow` (8 B/row, `repr(C, align(8))`), 8 rows fill one 64-byte
/// cache line; callers folding into ordered structures should call
/// `.with_min_len(8)` to align chunks (OQ-CSV-8 fixed chunk for InferenceRow).
///
/// Particularly useful for the integer-SIMD MUL evaluation hot path (D-CSV-8):
/// folding the inference mantissa lane across millions of EdgeColumn rows
/// benefits from work-stealing on multi-core hosts.
///
/// # Determinism
///
/// See §6 of pr-sprint-13-rayon-streams.md. Pattern A (order-insensitive folds
/// like `.sum()`, `.count()`) and Pattern B (`collect()`) are safe.
/// Pattern C (non-commutative accumulators) requires per-callsite analysis.
#[cfg(feature = "rayon")]
#[inline]
pub fn par_inference_stream(rows: &[InferenceRow]) -> impl IndexedParallelIterator<Item = (usize, &InferenceRow)> {
rows.par_iter().enumerate()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -217,3 +247,81 @@ mod tests {
assert_eq!(first.1 .0, 10);
}
}

// ─── Rayon par_* tests (D-CSV-17) ─────────────────────────────────────────────

#[cfg(all(test, feature = "rayon"))]
mod par_tests {
use super::{par_inference_stream, InferenceRow, InferenceStream};
use rayon::prelude::*;
use std::sync::atomic::{AtomicUsize, Ordering};

/// T-P-I-1: par_inference_stream yields all N items.
#[test]
fn test_par_inference_yields_all() {
let rows: Vec<InferenceRow> = (0u64..1024).map(InferenceRow).collect();
let count = par_inference_stream(&rows).count();
assert_eq!(count, 1024);
}

/// T-P-I-2: par_inference_stream on empty slice yields zero items.
#[test]
fn test_par_inference_empty() {
let rows: Vec<InferenceRow> = vec![];
let count = par_inference_stream(&rows).count();
assert_eq!(count, 0);
}

/// T-P-I-3: par_iter result equals serial iter result (as sorted sets).
#[test]
fn test_par_inference_matches_serial() {
let rows: Vec<InferenceRow> = (0u64..256).map(InferenceRow).collect();
let mut par: Vec<u64> = par_inference_stream(&rows)
.map(|(i, r)| (i as u64) ^ r.0)
.collect();
let mut ser: Vec<u64> = InferenceStream::new(&rows)
.map(|(i, r)| (i as u64) ^ r.0)
.collect();
par.sort_unstable();
ser.sort_unstable();
assert_eq!(par, ser);
}

/// T-P-I-4: filter on inference_mantissa() — sign-extension behaves
/// identically under parallel access.
///
/// 256 rows with mantissa varying 0..16 cyclically via bits 46-49:
/// - raw values 0..7 → mantissa 0..7 (non-negative): 128 rows
/// - raw values 8..15 → mantissa -8..-1 (negative): 128 rows
#[test]
fn test_par_inference_filter_mantissa() {
let rows: Vec<InferenceRow> = (0u64..256).map(|i| InferenceRow((i & 0xF) << 46)).collect();
let neg = par_inference_stream(&rows)
.filter(|(_, r)| r.inference_mantissa() < 0)
.count();
assert_eq!(neg, 128);
}

/// T-P-I-5: with_min_len(8) knob compiles and yields all items.
/// 8 rows × 8 B = 64 B = one cache line (OQ-CSV-8 fixed chunk for InferenceRow).
#[test]
fn test_par_inference_min_len() {
let rows: Vec<InferenceRow> = (0u64..1024).map(InferenceRow).collect();
let count = par_inference_stream(&rows).with_min_len(8).count();
assert_eq!(count, 1024);
}

/// T-P-I-6: thread-safety — InferenceRow is Send + Sync; verified by
/// mutating an AtomicUsize from the parallel for_each closure.
#[test]
fn test_par_inference_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<InferenceRow>();
let rows: Vec<InferenceRow> = (0u64..1024).map(InferenceRow).collect();
let counter = AtomicUsize::new(0);
par_inference_stream(&rows).for_each(|_| {
counter.fetch_add(1, Ordering::Relaxed);
});
assert_eq!(counter.load(Ordering::Relaxed), 1024);
}
}
12 changes: 9 additions & 3 deletions src/hpc/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
//! Per cognitive-substrate-convergence-v1.md §5 L-20.
//!
//! Sprint-12 scope (W-F4/5/6): `QualiaStream` + `InferenceStream` +
//! `SplatFieldStream` forward-iterator scaffolds. Sprint-13+:
//! `par_*` rayon variants once rayon is wired into the ndarray
//! feature gate.
//! `SplatFieldStream` forward-iterator scaffolds. Sprint-13 (D-CSV-17):
//! `par_*` rayon variants wired behind `#[cfg(feature = "rayon")]`.

pub mod inference;
pub mod qualia;
Expand All @@ -13,3 +12,10 @@ pub mod splat_field;
pub use inference::{InferenceRow, InferenceStream};
pub use qualia::{QualiaI4Row, QualiaStream};
pub use splat_field::{SplatField, SplatFieldStream};

#[cfg(feature = "rayon")]
pub use inference::par_inference_stream;
#[cfg(feature = "rayon")]
pub use qualia::par_qualia_stream;
#[cfg(feature = "rayon")]
pub use splat_field::par_splat_field_stream;
124 changes: 122 additions & 2 deletions src/hpc/stream/qualia.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
//! QualiaColumn SoA layout introduced by D-CSV-5b.
//!
//! Yields `(row_index, &QualiaI4Row)` tuples. Pure iterator scaffold; the
//! `par_qualia_stream` rayon-parallel variant is sprint-13+ once rayon is
//! wired into the ndarray feature gate.
//! `par_qualia_stream` rayon-parallel variant is wired in sprint-13 (D-CSV-17)
//! behind `#[cfg(feature = "rayon")]`.

// NOTE: do NOT import lance-graph-contract here (would create circular dep
// since contract is *consumer* of ndarray). Define a minimal local mirror
Expand Down Expand Up @@ -108,6 +108,50 @@ impl<'a> ExactSizeIterator for QualiaStream<'a> {
}
}

// ─── Rayon-parallel variant (D-CSV-17, sprint-13) ─────────────────────────────

#[cfg(feature = "rayon")]
use rayon::prelude::*;

/// Rayon-parallel forward-iterator over a borrowed `&[QualiaI4Row]` slice.
///
/// Yields `(row_index, &QualiaI4Row)` tuples. Unlike the scalar
/// `QualiaStream`, iteration order is **not** guaranteed to be ascending
/// by index; rayon's work-stealing scheduler may process chunks
/// out-of-order. See §6 of pr-sprint-13-rayon-streams.md for the
/// determinism contract callers must respect.
///
/// Returns `impl IndexedParallelIterator` (not bare `ParallelIterator`) so
/// callers can use `enumerate()`, `zip()`, and `collect()` with guaranteed
/// index-order preservation (rayon's `IndexedParallelIterator::collect` is
/// contract-guaranteed to preserve original order).
///
/// # Chunk-size note
///
/// `QualiaI4Row` is 8 bytes (`repr(C, align(8))`), so 8 rows fit one 64-byte
/// cache line. For folds into ordered structures, call `.with_min_len(8)` to
/// align chunks to cache-line boundaries (see OQ-CSV-8).
///
/// # Example
///
/// ```
/// # #[cfg(feature = "rayon")] {
/// use ndarray::hpc::stream::qualia::{QualiaI4Row, par_qualia_stream};
/// use rayon::prelude::*;
///
/// let rows: Vec<QualiaI4Row> = (0u64..1024).map(QualiaI4Row).collect();
/// let total_nonzero: usize = par_qualia_stream(&rows)
/// .filter(|(_, r)| r.0 != 0)
/// .count();
/// assert_eq!(total_nonzero, 1023); // QualiaI4Row(0) is the lone zero
/// # }
/// ```
#[cfg(feature = "rayon")]
#[inline]
pub fn par_qualia_stream(rows: &[QualiaI4Row]) -> impl IndexedParallelIterator<Item = (usize, &QualiaI4Row)> {
rows.par_iter().enumerate()
}

// ─── Tests ────────────────────────────────────────────────────────────────────

#[cfg(test)]
Expand Down Expand Up @@ -199,3 +243,79 @@ mod tests {
assert_eq!(ExactSizeIterator::len(&stream), 0);
}
}

// ─── Rayon par_* tests (D-CSV-17) ─────────────────────────────────────────────

#[cfg(all(test, feature = "rayon"))]
mod par_tests {
use super::{par_qualia_stream, QualiaI4Row, QualiaStream};
use rayon::prelude::*;
use std::sync::atomic::{AtomicUsize, Ordering};

/// T-P-Q-1: par_qualia_stream yields all N items.
#[test]
fn test_par_qualia_yields_all() {
let rows: Vec<QualiaI4Row> = (0u64..1024).map(QualiaI4Row).collect();
let count = par_qualia_stream(&rows).count();
assert_eq!(count, 1024);
}

/// T-P-Q-2: par_qualia_stream on empty slice yields zero items.
#[test]
fn test_par_qualia_empty() {
let rows: Vec<QualiaI4Row> = vec![];
let count = par_qualia_stream(&rows).count();
assert_eq!(count, 0);
}

/// T-P-Q-3: par_iter result equals serial iter result (as sorted sets).
/// Both iterators produce the same (index XOR value) pairs; sorting
/// makes the comparison order-independent.
#[test]
fn test_par_qualia_matches_serial() {
let rows: Vec<QualiaI4Row> = (0u64..256).map(QualiaI4Row).collect();
let mut par: Vec<u64> = par_qualia_stream(&rows)
.map(|(i, r)| (i as u64) ^ r.0)
.collect();
let mut ser: Vec<u64> = QualiaStream::new(&rows)
.map(|(i, r)| (i as u64) ^ r.0)
.collect();
par.sort_unstable();
ser.sort_unstable();
assert_eq!(par, ser);
}

/// T-P-Q-4: par_iter with filter is correct (count of even-valued rows).
#[test]
fn test_par_qualia_with_filter() {
let rows: Vec<QualiaI4Row> = (0u64..512).map(QualiaI4Row).collect();
let count_even = par_qualia_stream(&rows)
.filter(|(_, r)| r.0 % 2 == 0)
.count();
// Values 0, 2, 4, ..., 510 → 256 even values.
assert_eq!(count_even, 256);
}

/// T-P-Q-5: with_min_len(8) knob compiles and yields all items.
/// 8 rows × 8 B = 64 B = one cache line (OQ-CSV-8 fixed chunk for QualiaI4).
#[test]
fn test_par_qualia_min_len() {
let rows: Vec<QualiaI4Row> = (0u64..1024).map(QualiaI4Row).collect();
let count = par_qualia_stream(&rows).with_min_len(8).count();
assert_eq!(count, 1024);
}

/// T-P-Q-6: thread-safety — QualiaI4Row is Send + Sync; verified by
/// mutating an AtomicUsize from the parallel for_each closure.
#[test]
fn test_par_qualia_send_sync() {
fn assert_send_sync<T: Send + Sync>() {}
assert_send_sync::<QualiaI4Row>();
let rows: Vec<QualiaI4Row> = (0u64..1024).map(QualiaI4Row).collect();
let counter = AtomicUsize::new(0);
par_qualia_stream(&rows).for_each(|_| {
counter.fetch_add(1, Ordering::Relaxed);
});
assert_eq!(counter.load(Ordering::Relaxed), 1024);
}
}
Loading
Loading