# CPU Financial Analytics

In [1]:
# Universal environment detection and setup
import json
import os
import numpy as np
import unittest
import time
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

def detect_environment():
    """Detect if we're running in Colab, local Jupyter, or other environment."""
    try:
        import google.colab
        return 'colab'
    except ImportError:
        pass

    try:
        from IPython import get_ipython
        if get_ipython() is not None:
            if 'ipykernel' in str(get_ipython()):
                return 'jupyter'
        return 'local'
    except:
        return 'local'

CURRENT_ENV = detect_environment()
print(f"CPU NOTEBOOK WITH SEGMENTED REDUCE LOADED")
print(f"Environment detected: {CURRENT_ENV}")
print(f"Current directory: {os.getcwd()}")
print(f"Ready to implement and test SCAN + REDUCE operations!")

CPU NOTEBOOK WITH SEGMENTED REDUCE LOADED
Environment detected: jupyter
Current directory: /home/quydx/advancedhpc2025/project
Ready to implement and test SCAN + REDUCE operations!


## CPU Implementations - SCAN + REDUCE Primitives

In [None]:
class CPUFinancialPrimitives:
    """ CPU primitive implementations - SCAN + REDUCE."""

    @staticmethod
    def validate_input(data, expected_dtype=None, min_length=0):
        if not isinstance(data, np.ndarray):
            raise TypeError(f"Expected numpy array, got {type(data)}")
        if len(data) < min_length:
            raise ValueError(f"Array too short: {len(data)} < {min_length}")
        if expected_dtype and len(data) > 0 and data.dtype != expected_dtype:
            raise ValueError(f"Expected dtype {expected_dtype}, got {data.dtype}")

    # =============================================
    # EXCLUSIVE SCAN
    # =============================================
    @staticmethod
    def exclusive_scan(flags):
        """ Convert segment boundary flags to segment IDs."""
        CPUFinancialPrimitives.validate_input(flags, np.int32)
        if len(flags) == 0:
            return np.array([], dtype=np.int32)

        result = np.zeros_like(flags, dtype=np.int32)
        for i in range(len(flags)):
            if i == 0:
                result[i] = 0
            else:
                result[i] = result[i-1] + flags[i-1]
        return result
2
    # =============================================
    # SEGMENTED SCAN OPERATIONS
    # =============================================
    @staticmethod
    def segmented_scan_sum(values, seg_ids):
        """ Segmented scan sum - cumulative sum within segments."""
        CPUFinancialPrimitives.validate_input(values, np.float32)
        CPUFinancialPrimitives.validate_input(seg_ids, np.int32)

        if len(values) != len(seg_ids):
            raise ValueError(f"Length mismatch: values {len(values)} vs seg_ids {len(seg_ids)}")
        if len(values) == 0:
            return np.array([], dtype=np.float32)

        result = np.zeros_like(values, dtype=np.float32)
        current_sum = values[0]
        result[0] = current_sum
        current_seg = seg_ids[0]

        for i in range(1, len(values)):
            if seg_ids[i] != current_seg:
                current_sum = values[i]  # Reset for new segment
                current_seg = seg_ids[i]
            else:
                current_sum += values[i]  # Accumulate in same segment
            result[i] = current_sum
        return result

    @staticmethod
    def segmented_scan_max(values, seg_ids):
        """ Segmented scan max - cumulative max within segments."""
        CPUFinancialPrimitives.validate_input(values, np.float32)
        CPUFinancialPrimitives.validate_input(seg_ids, np.int32)

        if len(values) != len(seg_ids):
            raise ValueError(f"Length mismatch: values {len(values)} vs seg_ids {len(seg_ids)}")
        if len(values) == 0:
            return np.array([], dtype=np.float32)

        result = np.zeros_like(values, dtype=np.float32)
        current_max = values[0]
        result[0] = current_max
        current_seg = seg_ids[0]

        for i in range(1, len(values)):
            if seg_ids[i] != current_seg:
                current_max = values[i]  # Reset for new segment
                current_seg = seg_ids[i]
            else:
                current_max = max(current_max, values[i])
            result[i] = current_max
        return result

    # =============================================
    # SEGMENTED REDUCE OPERATIONS
    # =============================================
    @staticmethod
    def segmented_reduce_sum(values, seg_ids):
        """ Segmented reduce sum - final sum per segment.

        Returns one result per unique segment (not cumulative).

        Example:
            values = [5, 3, 2, 7, 1, 4, 6, 8]
            seg_ids = [0, 0, 0, 1, 1, 2, 2, 2]
            result = [10, 8, 18]  # [5+3+2, 7+1, 4+6+8]
        """
        CPUFinancialPrimitives.validate_input(values, np.float32)
        CPUFinancialPrimitives.validate_input(seg_ids, np.int32)

        if len(values) != len(seg_ids):
            raise ValueError(f"Length mismatch: values {len(values)} vs seg_ids {len(seg_ids)}")
        if len(values) == 0:
            return np.array([], dtype=np.float32)

        segment_sums = {}
        segment_order = []

        for i in range(len(values)):
            seg_id = seg_ids[i]
            if seg_id not in segment_sums:
                segment_order.append(seg_id)
                segment_sums[seg_id] = 0.0
            segment_sums[seg_id] += values[i]

        # Convert to array in order
        result = np.array([segment_sums[seg_id] for seg_id in segment_order], dtype=np.float32)
        return result

    @staticmethod
    def segmented_reduce_max(values, seg_ids):
        """ Segmented reduce max - maximum value per segment.

        Returns one result per unique segment.

        Example:
            values = [5, 3, 2, 7, 1, 4, 6, 8]
            seg_ids = [0, 0, 0, 1, 1, 2, 2, 2]
            result = [5, 7, 8]  # [max(5,3,2), max(7,1), max(4,6,8)]
        """
        CPUFinancialPrimitives.validate_input(values, np.float32)
        CPUFinancialPrimitives.validate_input(seg_ids, np.int32)

        if len(values) != len(seg_ids):
            raise ValueError(f"Length mismatch: values {len(values)} vs seg_ids {len(seg_ids)}")
        if len(values) == 0:
            return np.array([], dtype=np.float32)

        segment_max = {}
        segment_order = []

        for i in range(len(values)):
            seg_id = seg_ids[i]
            if seg_id not in segment_max:
                segment_order.append(seg_id)
                segment_max[seg_id] = values[i]
            else:
                segment_max[seg_id] = max(segment_max[seg_id], values[i])

        # Convert to array in order
        result = np.array([segment_max[seg_id] for seg_id in segment_order], dtype=np.float32)
        return result

    @staticmethod
    def segmented_reduce_min(values, seg_ids):
        """ Segmented reduce min - minimum value per segment.

        Returns one result per unique segment.

        Example:
            values = [5, 3, 2, 7, 1, 4, 6, 8]
            seg_ids = [0, 0, 0, 1, 1, 2, 2, 2]
            result = [2, 1, 4]  # [min(5,3,2), min(7,1), min(4,6,8)]
        """
        CPUFinancialPrimitives.validate_input(values, np.float32)
        CPUFinancialPrimitives.validate_input(seg_ids, np.int32)

        if len(values) != len(seg_ids):
            raise ValueError(f"Length mismatch: values {len(values)} vs seg_ids {len(seg_ids)}")
        if len(values) == 0:
            return np.array([], dtype=np.float32)

        segment_min = {}
        segment_order = []

        for i in range(len(values)):
            seg_id = seg_ids[i]
            if seg_id not in segment_min:
                segment_order.append(seg_id)
                segment_min[seg_id] = values[i]
            else:
                segment_min[seg_id] = min(segment_min[seg_id], values[i])

        # Convert to array in order
        result = np.array([segment_min[seg_id] for seg_id in segment_order], dtype=np.float32)
        return result

print("Functions available:")
print("   • SCAN: exclusive_scan, segmented_scan_sum, segmented_scan_max")
print("   • REDUCE: segmented_reduce_sum, segmented_reduce_max, segmented_reduce_min")

Functions available:
   • SCAN: exclusive_scan, segmented_scan_sum, segmented_scan_max
   • REDUCE: segmented_reduce_sum, segmented_reduce_max, segmented_reduce_min


## Financial Metrics (Using Primitives)

In [3]:
class CPUFinancialMetrics:
    """ CPU financial metric implementations using scan/reduce primitives."""

    @staticmethod
    def cumulative_returns(prices, seg_ids):
        """ Cumulative returns calculation using segmented_scan_sum."""
        if len(prices) <= 1:
            return np.zeros_like(prices, dtype=np.float32)

        CPUFinancialPrimitives.validate_input(prices, np.float32)
        CPUFinancialPrimitives.validate_input(seg_ids, np.int32)

        # Calculate daily returns
        returns = np.zeros_like(prices, dtype=np.float32)
        for i in range(1, len(prices)):
            if seg_ids[i] != seg_ids[i-1]:
                returns[i] = 0.0  # Reset at segment boundary
            elif prices[i-1] > 0:
                returns[i] = prices[i] / prices[i-1] - 1.0
            else:
                returns[i] = 0.0

        # Convert to log returns and apply segmented scan
        log_returns = np.log(1.0 + np.clip(returns, -0.999, 10.0))
        cum_log_returns = CPUFinancialPrimitives.segmented_scan_sum(log_returns, seg_ids)
        return np.exp(cum_log_returns) - 1.0

    @staticmethod
    def simple_moving_average(prices, seg_ids, window=5):
        """Simple moving average - OPTIMIZED version."""
        CPUFinancialPrimitives.validate_input(prices, np.float32)
        CPUFinancialPrimitives.validate_input(seg_ids, np.int32)

        if window <= 0:
            raise ValueError(f"Window must be positive, got {window}")
        if len(prices) == 0:
            return np.array([], dtype=np.float32)

        result = np.zeros_like(prices, dtype=np.float32)

        # Precompute segment boundaries - single pass O(n)
        segment_starts = np.zeros(len(prices), dtype=np.int32)
        current_seg = seg_ids[0]
        current_start = 0

        for i in range(len(prices)):
            if seg_ids[i] != current_seg:
                current_seg = seg_ids[i]
                current_start = i
            segment_starts[i] = current_start

        # Calculate moving averages - O(n×w) where w=window
        for i in range(len(prices)):
            seg_start = segment_starts[i]
            window_start = max(seg_start, i - window + 1)

            # Calculate sum of valid (non-zero) prices in window
            count = 0
            total = 0.0
            for j in range(window_start, i + 1):
                if prices[j] > 0:
                    total += prices[j]
                    count += 1

            if count > 0:
                result[i] = total / count
            elif prices[i] > 0:
                result[i] = prices[i]
            else:
                result[i] = 0.0

        return result


    @staticmethod
    def rolling_std(prices, seg_ids, window=20):
        """Rolling standard deviation - OPTIMIZED version."""
        CPUFinancialPrimitives.validate_input(prices, np.float32)
        CPUFinancialPrimitives.validate_input(seg_ids, np.int32)

        if window <= 1:
            raise ValueError(f"Window must be > 1 for std calculation, got {window}")
        if len(prices) == 0:
            return np.array([], dtype=np.float32)

        result = np.zeros_like(prices, dtype=np.float32)

        # Precompute segment boundaries - single pass O(n)
        segment_starts = np.zeros(len(prices), dtype=np.int32)
        current_seg = seg_ids[0]
        current_start = 0

        for i in range(len(prices)):
            if seg_ids[i] != current_seg:
                current_seg = seg_ids[i]
                current_start = i
            segment_starts[i] = current_start

        # Calculate rolling std - O(n×w)
        for i in range(len(prices)):
            seg_start = segment_starts[i]
            window_start = max(seg_start, i - window + 1)

            # Collect valid prices in window
            valid_prices = []
            for j in range(window_start, i + 1):
                if prices[j] > 0:
                    valid_prices.append(prices[j])

            if len(valid_prices) > 1:
                # Calculate std using two-pass algorithm for numerical stability
                mean = sum(valid_prices) / len(valid_prices)
                variance = sum((x - mean) ** 2 for x in valid_prices) / (len(valid_prices) - 1)
                result[i] = np.sqrt(variance)
            else:
                result[i] = 0.0

        return result


    @staticmethod
    def max_drawdown(prices, seg_ids):
        """ Maximum drawdown using segmented_scan_max."""
        running_max = CPUFinancialPrimitives.segmented_scan_max(prices, seg_ids)
        return prices - running_max

    @staticmethod
    def portfolio_value(holdings, prices):
        """ Portfolio value calculation."""
        if holdings.shape != prices.shape:
            raise ValueError(f"Shape mismatch: holdings {holdings.shape} vs prices {prices.shape}")
        return np.sum(holdings * prices, axis=0).astype(np.float32)

    @staticmethod
    def high_water_mark(portfolio_values, seg_ids):
        """ High-water mark using segmented_scan_max."""
        return CPUFinancialPrimitives.segmented_scan_max(portfolio_values, seg_ids)

## Comprehensive Unit Tests - SCAN + REDUCE

In [4]:
class TestCPUPrimitivesComplete(unittest.TestCase):
    """Comprehensive tests for ALL CPU primitives - SCAN + REDUCE."""

    def setUp(self):
        self.test_flags = np.array([0, 0, 1, 0, 1, 0, 0, 1], dtype=np.int32)  # Corrected: flags at END
        self.test_values = np.array([5.0, 3.0, 2.0, 7.0, 1.0, 4.0, 6.0, 8.0], dtype=np.float32)
        self.expected_seg_ids = np.array([0, 0, 0, 1, 1, 2, 2, 2], dtype=np.int32)
        self.tolerance = 1e-6

    # ===== SCAN TESTS =====
    def test_exclusive_scan_correctness(self):
        """ Test exclusive scan produces correct segment IDs."""
        result = CPUFinancialPrimitives.exclusive_scan(self.test_flags)
        np.testing.assert_array_equal(result, self.expected_seg_ids)

    def test_segmented_scan_sum_correctness(self):
        """ Test segmented scan sum produces correct cumulative results."""
        expected = np.array([5.0, 8.0, 10.0, 7.0, 8.0, 4.0, 10.0, 18.0], dtype=np.float32)
        result = CPUFinancialPrimitives.segmented_scan_sum(self.test_values, self.expected_seg_ids)
        np.testing.assert_allclose(result, expected, rtol=self.tolerance)

    def test_segmented_scan_max_correctness(self):
        """ Test segmented scan max produces correct cumulative max."""
        expected = np.array([5.0, 5.0, 5.0, 7.0, 7.0, 4.0, 6.0, 8.0], dtype=np.float32)
        result = CPUFinancialPrimitives.segmented_scan_max(self.test_values, self.expected_seg_ids)
        np.testing.assert_allclose(result, expected, rtol=self.tolerance)

    # ===== REDUCE TESTS =====
    def test_segmented_reduce_sum_correctness(self):
        """ Test segmented reduce sum produces correct final sums."""
        # Expected: [5+3+2, 7+1, 4+6+8] = [10, 8, 18]
        expected = np.array([10.0, 8.0, 18.0], dtype=np.float32)
        result = CPUFinancialPrimitives.segmented_reduce_sum(self.test_values, self.expected_seg_ids)

        self.assertEqual(len(result), 3, "Should have 3 segments")
        np.testing.assert_allclose(result, expected, rtol=self.tolerance,
            err_msg="Reduce sum should compute final sum per segment")

    def test_segmented_reduce_max_correctness(self):
        """ Test segmented reduce max produces correct max per segment."""
        # Expected: [max(5,3,2), max(7,1), max(4,6,8)] = [5, 7, 8]
        expected = np.array([5.0, 7.0, 8.0], dtype=np.float32)
        result = CPUFinancialPrimitives.segmented_reduce_max(self.test_values, self.expected_seg_ids)

        self.assertEqual(len(result), 3, "Should have 3 segments")
        np.testing.assert_allclose(result, expected, rtol=self.tolerance,
            err_msg="Reduce max should compute max value per segment")

    def test_segmented_reduce_min_correctness(self):
        """ Test segmented reduce min produces correct min per segment."""
        # Expected: [min(5,3,2), min(7,1), min(4,6,8)] = [2, 1, 4]
        expected = np.array([2.0, 1.0, 4.0], dtype=np.float32)
        result = CPUFinancialPrimitives.segmented_reduce_min(self.test_values, self.expected_seg_ids)

        self.assertEqual(len(result), 3, "Should have 3 segments")
        np.testing.assert_allclose(result, expected, rtol=self.tolerance,
            err_msg="Reduce min should compute min value per segment")

    def test_reduce_edge_cases(self):
        """ Test reduce operations with edge cases."""
        # Empty arrays
        empty_values = np.array([], dtype=np.float32)
        empty_seg_ids = np.array([], dtype=np.int32)

        sum_result = CPUFinancialPrimitives.segmented_reduce_sum(empty_values, empty_seg_ids)
        max_result = CPUFinancialPrimitives.segmented_reduce_max(empty_values, empty_seg_ids)
        min_result = CPUFinancialPrimitives.segmented_reduce_min(empty_values, empty_seg_ids)

        self.assertEqual(len(sum_result), 0)
        self.assertEqual(len(max_result), 0)
        self.assertEqual(len(min_result), 0)

        # Single element
        single_values = np.array([42.0], dtype=np.float32)
        single_seg_ids = np.array([0], dtype=np.int32)

        sum_single = CPUFinancialPrimitives.segmented_reduce_sum(single_values, single_seg_ids)
        max_single = CPUFinancialPrimitives.segmented_reduce_max(single_values, single_seg_ids)
        min_single = CPUFinancialPrimitives.segmented_reduce_min(single_values, single_seg_ids)

        np.testing.assert_allclose(sum_single, [42.0])
        np.testing.assert_allclose(max_single, [42.0])
        np.testing.assert_allclose(min_single, [42.0])

    def test_scan_vs_reduce_consistency(self):
        """ Test that scan and reduce operations are consistent."""
        # The last value in each segment's scan should equal the reduce result
        scan_sum = CPUFinancialPrimitives.segmented_scan_sum(self.test_values, self.expected_seg_ids)
        reduce_sum = CPUFinancialPrimitives.segmented_reduce_sum(self.test_values, self.expected_seg_ids)

        scan_max = CPUFinancialPrimitives.segmented_scan_max(self.test_values, self.expected_seg_ids)
        reduce_max = CPUFinancialPrimitives.segmented_reduce_max(self.test_values, self.expected_seg_ids)

        # Extract last value of each segment from scan
        segment_ends = [2, 4, 7]  # Indices where segments end
        scan_sum_finals = scan_sum[segment_ends]
        scan_max_finals = scan_max[segment_ends]

        np.testing.assert_allclose(scan_sum_finals, reduce_sum, rtol=self.tolerance,
            err_msg="Final scan values should match reduce results for sum")
        np.testing.assert_allclose(scan_max_finals, reduce_max, rtol=self.tolerance,
            err_msg="Final scan values should match reduce results for max")


In [5]:
class TestCPUFinancialMetrics(unittest.TestCase):
    """ Comprehensive tests for financial metrics (unchanged)."""

    def setUp(self):
        self.tolerance = 1e-5

        # Test data
        self.prices = np.array([100.0, 102.0, 101.0, 105.0, 103.0,
                               110.0, 108.0, 112.0, 115.0, 113.0], dtype=np.float32)
        self.seg_ids = np.array([0, 0, 0, 0, 0, 1, 1, 1, 1, 1], dtype=np.int32)

        # Portfolio data
        self.n_securities = 2
        self.n_days = len(self.prices)
        self.prices_multi = np.array([[100.0, 102.0, 101.0, 105.0, 103.0, 110.0, 108.0, 112.0, 115.0, 113.0],
                                     [50.0, 51.0, 50.5, 52.5, 51.5, 55.0, 54.0, 56.0, 57.5, 56.5]], dtype=np.float32)
        self.holdings = np.array([[100.0, 100.0, 100.0, 100.0, 100.0, 100.0, 100.0, 100.0, 100.0, 100.0],
                                 [200.0, 200.0, 200.0, 200.0, 200.0, 200.0, 200.0, 200.0, 200.0, 200.0]], dtype=np.float32)

    def test_cumulative_returns_mathematical_correctness(self):
        """Test cumulative returns calculation."""
        result = CPUFinancialMetrics.cumulative_returns(self.prices, self.seg_ids)

        self.assertEqual(len(result), len(self.prices))
        self.assertAlmostEqual(result[0], 0.0, places=6)
        self.assertAlmostEqual(result[5], 0.0, places=6)  # Segment boundary reset
        self.assertFalse(np.any(np.isnan(result)))

    def test_simple_moving_average_mathematical_correctness(self):
        """Test SMA calculation."""
        window = 3
        result = CPUFinancialMetrics.simple_moving_average(self.prices, self.seg_ids, window)

        self.assertEqual(len(result), len(self.prices))
        self.assertAlmostEqual(result[0], 100.0, places=3)
        self.assertAlmostEqual(result[1], 101.0, places=3)
        self.assertTrue(np.all(result > 0))

    def test_rolling_std_mathematical_correctness(self):
        """Test rolling standard deviation."""
        window = 3
        result = CPUFinancialMetrics.rolling_std(self.prices, self.seg_ids, window)

        self.assertTrue(np.all(result >= 0))
        self.assertGreaterEqual(result[0], 0)

    def test_max_drawdown_mathematical_correctness(self):
        """Test max drawdown calculation."""
        result = CPUFinancialMetrics.max_drawdown(self.prices, self.seg_ids)

        self.assertTrue(np.all(result <= 0.001))  # Should be <= 0
        self.assertEqual(len(result), len(self.prices))
        self.assertAlmostEqual(result[2], -1.0, places=3)  # 101 - 102 = -1

    def test_portfolio_value_mathematical_correctness(self):
        """Test portfolio value calculation."""
        result = CPUFinancialMetrics.portfolio_value(self.holdings, self.prices_multi)

        expected_day_0 = 100*100 + 200*50  # 20000
        self.assertAlmostEqual(result[0], expected_day_0, places=1)

        self.assertEqual(len(result), self.n_days)
        self.assertTrue(np.all(result > 0))

    def test_high_water_mark_mathematical_correctness(self):
        """Test high-water mark calculation."""
        portfolio_values = CPUFinancialMetrics.portfolio_value(self.holdings, self.prices_multi)
        result = CPUFinancialMetrics.high_water_mark(portfolio_values, self.seg_ids)

        # HWM should be non-decreasing within segments
        for seg_id in np.unique(self.seg_ids):
            seg_mask = self.seg_ids == seg_id
            seg_hwm = result[seg_mask]

            for i in range(1, len(seg_hwm)):
                self.assertGreaterEqual(seg_hwm[i], seg_hwm[i-1] - 1e-6)

        self.assertEqual(len(result), len(portfolio_values))
        self.assertTrue(np.all(result > 0))


## Run Test Suite

In [6]:
def run_comprehensive_test_suite_with_reduce():
    """Run ALL tests - primitives (scan+reduce) AND financial metrics."""
    print(f" RUNNING COMPREHENSIVE TEST SUITE")
    print("=" * 70)

    # Test classes to run
    test_classes = [
        (TestCPUPrimitivesComplete, "CPU Primitives (Scan + Reduce)"),
        (TestCPUFinancialMetrics, "CPU Financial Metrics")
    ]

    all_results = []
    total_tests = 0
    total_failures = 0
    total_errors = 0

    for test_class, description in test_classes:
        print(f"\n{'='*20} {description} {'='*20}")

        # Create test suite
        suite = unittest.TestLoader().loadTestsFromTestCase(test_class)

        # Count methods
        test_methods = [method for method in dir(test_class) if method.startswith('test_')]
        print(f"Running {len(test_methods)} test methods...")

        # Run with detailed output
        runner = unittest.TextTestRunner(verbosity=2)
        result = runner.run(suite)

        all_results.append(result)
        total_tests += result.testsRun
        total_failures += len(result.failures)
        total_errors += len(result.errors)

        # Show results
        success_rate = ((result.testsRun - len(result.failures) - len(result.errors)) / result.testsRun * 100) if result.testsRun > 0 else 0
        print(f"\n{description} Results: {success_rate:.1f}% pass rate ({result.testsRun - len(result.failures) - len(result.errors)}/{result.testsRun} tests)")

        if result.failures:
            print(f"Failures: {len(result.failures)}")
        if result.errors:
            print(f"Errors: {len(result.errors)}")

    # Overall summary
    print("\n" + "=" * 70)
    print("COMPREHENSIVE TEST SUMMARY - ALL PRIMITIVES + FINANCIAL METRICS")
    print("=" * 70)

    success_rate = ((total_tests - total_failures - total_errors) / total_tests * 100) if total_tests > 0 else 0

    print(f"Test Statistics:")
    print(f"   Total tests run: {total_tests}")
    print(f"   Passed: {total_tests - total_failures - total_errors}")
    print(f"   Failed: {total_failures}")
    print(f"   Errors: {total_errors}")
    print(f"   Overall success rate: {success_rate:.1f}%")

    print(f"\nFunctions Tested:")
    print(f"   exclusive_scan - segment ID generation")
    print(f"   segmented_scan_sum - cumulative sum within segments")
    print(f"   segmented_scan_max - cumulative max within segments")
    print(f"   segmented_reduce_sum - final sum per segment")
    print(f"   segmented_reduce_max - max value per segment")
    print(f"   segmented_reduce_min - min value per segment")
    print(f"   cumulative_returns - portfolio return calculation")
    print(f"   simple_moving_average - windowed averaging")
    print(f"   rolling_std - rolling standard deviation")
    print(f"   max_drawdown - maximum drawdown calculation")
    print(f"   portfolio_value - multi-security valuation")
    print(f"   high_water_mark - high-water mark tracking")

    if total_failures == 0 and total_errors == 0:
        print(f"\nALL TESTS PASSED! VALIDATION ACHIEVED!")
        print(f"All 12/12 functions (6 primitives + 6 financial) are working correctly")
        print(f"Segmented REDUCE operations successfully added!")
        print(f"Ready to generate test fixtures")
        return True
    else:
        print(f"\nSome tests failed - fix implementations before generating fixtures")
        return False

# Run the comprehensive test suite
all_tests_passed = run_comprehensive_test_suite_with_reduce()

if all_tests_passed:
    print(f"\nSUCCESS: All CPU implementations are validated!")
else:
    print(f"\nFix any failing tests before proceeding to fixture generation.")

test_exclusive_scan_correctness (__main__.TestCPUPrimitivesComplete.test_exclusive_scan_correctness)
Test exclusive scan produces correct segment IDs. ... ok
test_reduce_edge_cases (__main__.TestCPUPrimitivesComplete.test_reduce_edge_cases)
Test reduce operations with edge cases. ... ok
test_scan_vs_reduce_consistency (__main__.TestCPUPrimitivesComplete.test_scan_vs_reduce_consistency)
Test that scan and reduce operations are consistent. ... ok
test_segmented_reduce_max_correctness (__main__.TestCPUPrimitivesComplete.test_segmented_reduce_max_correctness)
Test segmented reduce max produces correct max per segment. ... ok
test_segmented_reduce_min_correctness (__main__.TestCPUPrimitivesComplete.test_segmented_reduce_min_correctness)
Test segmented reduce min produces correct min per segment. ... ok
test_segmented_reduce_sum_correctness (__main__.TestCPUPrimitivesComplete.test_segmented_reduce_sum_correctness)
Test segmented reduce sum produces correct final sums. ... ok
test_segmented_s

 RUNNING COMPREHENSIVE TEST SUITE

Running 8 test methods...

CPU Primitives (Scan + Reduce) Results: 100.0% pass rate (8/8 tests)

Running 6 test methods...

CPU Financial Metrics Results: 100.0% pass rate (6/6 tests)

COMPREHENSIVE TEST SUMMARY - ALL PRIMITIVES + FINANCIAL METRICS
Test Statistics:
   Total tests run: 14
   Passed: 14
   Failed: 0
   Errors: 0
   Overall success rate: 100.0%

Functions Tested:
   exclusive_scan - segment ID generation
   segmented_scan_sum - cumulative sum within segments
   segmented_scan_max - cumulative max within segments
   segmented_reduce_sum - final sum per segment
   segmented_reduce_max - max value per segment
   segmented_reduce_min - min value per segment
   cumulative_returns - portfolio return calculation
   simple_moving_average - windowed averaging
   rolling_std - rolling standard deviation
   max_drawdown - maximum drawdown calculation
   portfolio_value - multi-security valuation
   high_water_mark - high-water mark tracking

ALL TE

## Generate Test Fixtures with REDUCE

In [7]:
def save_fixtures_universal(test_suite, filename='cpu_test_fixtures.json'):
    """Universal fixture saving."""
    env = CURRENT_ENV
    print(f"SAVING FIXTURES {filename}")
    print("=" * 60)

    if env == 'colab':
        try:
            from google.colab import drive, files
            try:
                drive.mount('/content/drive')
                class_folder = "/content/drive/MyDrive/HPC_Project_2025/Test_Fixtures"
                os.makedirs(class_folder, exist_ok=True)

                drive_path = f"{class_folder}/{filename}"
                with open(drive_path, 'w') as f:
                    json.dump(test_suite, f, indent=2)

                print(f"Saved to Google Drive: {drive_path}")
                return drive_path
            except Exception:
                with open(filename, 'w') as f:
                    json.dump(test_suite, f, indent=2)
                files.download(filename)
                return filename
        except Exception:
            pass

    # Local save
    with open(filename, 'w') as f:
        json.dump(test_suite, f, indent=2)

    full_path = f"{os.getcwd()}/{filename}"
    print(f"Saved locally: {full_path}")
    return full_path

def generate_complete_fixtures():
    """Generate test fixtures."""

    if not all_tests_passed:
        print("Cannot generate fixtures - all tests must pass first")
        return None

    print(f"GENERATING TEST FIXTURES")
    print("=" * 60)

    # Simple test data
    simple_flags = np.array([0, 0, 1, 0, 1, 0, 0, 1], dtype=np.int32)  # Corrected
    simple_values = np.array([5.0, 3.0, 2.0, 7.0, 1.0, 4.0, 6.0, 8.0], dtype=np.float32)
    simple_seg_ids = CPUFinancialPrimitives.exclusive_scan(simple_flags)

    print(f"Computing ALL primitive operations...")

    simple_test = {
        'name': 'simple_test_with_reduce',
        'description': 'Basic test with ALL primitives - SCAN + REDUCE',
        'flags': simple_flags.tolist(),
        'values': simple_values.tolist(),
        'reference_results': {
            # Scan operations (cumulative)
            'exclusive_scan': simple_seg_ids.tolist(),
            'segmented_scan_sum': CPUFinancialPrimitives.segmented_scan_sum(simple_values, simple_seg_ids).tolist(),
            'segmented_scan_max': CPUFinancialPrimitives.segmented_scan_max(simple_values, simple_seg_ids).tolist(),

            # Reduce operations (final per segment)
            'segmented_reduce_sum': CPUFinancialPrimitives.segmented_reduce_sum(simple_values, simple_seg_ids).tolist(),
            'segmented_reduce_max': CPUFinancialPrimitives.segmented_reduce_max(simple_values, simple_seg_ids).tolist(),
            'segmented_reduce_min': CPUFinancialPrimitives.segmented_reduce_min(simple_values, simple_seg_ids).tolist(),
        }
    }

    # Financial test data
    np.random.seed(42)
    n_days = 770
    base_price = 100.0
    prices = [base_price]

    for i in range(1, n_days):
        daily_return = np.random.normal(0.001, 0.02)
        new_price = prices[-1] * (1 + daily_return)
        prices.append(max(new_price, 1.0))

    prices = np.array(prices, dtype=np.float32)

    # Create segments
    flags = np.zeros(n_days, dtype=np.int32)
    flags[::20] = 1  # Segment every 20 days
    flags[-1] = 1   # End last segment
    seg_ids = CPUFinancialPrimitives.exclusive_scan(flags)

    # Multi-security portfolio
    n_securities = 3
    prices_multi = np.tile(prices, (n_securities, 1))
    for i in range(1, n_securities):
        prices_multi[i] *= (0.5 + i * 0.5)
    holdings = np.ones_like(prices_multi) * 100

    # Compute ALL operations
    print(f"Computing all scan operations...")
    scan_sum = CPUFinancialPrimitives.segmented_scan_sum(prices, seg_ids)
    scan_max = CPUFinancialPrimitives.segmented_scan_max(prices, seg_ids)

    print(f"Computing all reduce operations...")
    reduce_sum = CPUFinancialPrimitives.segmented_reduce_sum(prices, seg_ids)
    reduce_max = CPUFinancialPrimitives.segmented_reduce_max(prices, seg_ids)
    reduce_min = CPUFinancialPrimitives.segmented_reduce_min(prices, seg_ids)

    print(f"Computing all financial metrics...")
    cum_returns = CPUFinancialMetrics.cumulative_returns(prices, seg_ids)
    sma_5 = CPUFinancialMetrics.simple_moving_average(prices, seg_ids, 5)
    sma_20 = CPUFinancialMetrics.simple_moving_average(prices, seg_ids, 20)
    rolling_std_10 = CPUFinancialMetrics.rolling_std(prices, seg_ids, 10)
    rolling_std_20 = CPUFinancialMetrics.rolling_std(prices, seg_ids, 20)
    max_dd = CPUFinancialMetrics.max_drawdown(prices, seg_ids)
    portfolio_val = CPUFinancialMetrics.portfolio_value(holdings, prices_multi)
    hwm = CPUFinancialMetrics.high_water_mark(portfolio_val, seg_ids)

    financial_test = {
        'name': 'financial_with_reduce',
        'description': f'Financial dataset with SCAN + REDUCE operations',
        'prices': prices.tolist(),
        'prices_multi': prices_multi.tolist(),
        'holdings': holdings.tolist(),
        'flags': flags.tolist(),
        'seg_ids': seg_ids.tolist(),
        'n_days': n_days,
        'n_securities': n_securities,
        'n_segments': len(reduce_sum),  # Number of segments from reduce
        'reference_results': {
            # Scan operations (cumulative)
            'segmented_scan_sum': scan_sum.tolist(),
            'segmented_scan_max': scan_max.tolist(),

            # Reduce operations (final per segment)
            'segmented_reduce_sum': reduce_sum.tolist(),
            'segmented_reduce_max': reduce_max.tolist(),
            'segmented_reduce_min': reduce_min.tolist(),

            # Financial metrics
            'cumulative_returns': cum_returns.tolist(),
            'simple_moving_average_w5': sma_5.tolist(),
            'simple_moving_average_w20': sma_20.tolist(),
            'rolling_std_w10': rolling_std_10.tolist(),
            'rolling_std_w20': rolling_std_20.tolist(),
            'max_drawdown': max_dd.tolist(),
            'portfolio_value': portfolio_val.tolist(),
            'high_water_mark': hwm.tolist(),
        }
    }

    # Create test suite
    test_suite = {
        'version': '7.0.0-complete',
        'created': datetime.now().isoformat(),
        'description': 'GPU Test Suite - ALL primitives + financial metrics',
        'environment': {
            'generated_in': CURRENT_ENV,
            'all_tests_passed': all_tests_passed,
            'validation_status': 'COMPREHENSIVE - All functions verified'
        },
        'tolerance': 1e-5,
        'primitives_summary': {
            'total_primitives': 6,
            'scan_operations': ['exclusive_scan', 'segmented_scan_sum', 'segmented_scan_max'],
            'reduce_operations': ['segmented_reduce_sum', 'segmented_reduce_max', 'segmented_reduce_min']
        },
        'validation_summary': {
            'total_functions': 12,
            'primitive_functions': 6,  # 3 scan + 3 reduce
            'financial_functions': 6,
            'test_coverage': 'COMPLETE',
            'numerical_accuracy': 'VERIFIED',
            'edge_cases': 'TESTED',
            'scan_reduce_consistency': 'VERIFIED'
        },
        'student_targets': {
            'functions_to_implement': 12,
            'minimum_speedup': '5x faster than CPU',
            'excellent_speedup': '10x faster than CPU'
        },
        'tests': {
            'simple': simple_test,
            'financial': financial_test
        }
    }

    # Save with comprehensive naming
    timestamp = datetime.now().strftime("%Y%m%d_%H%M")
    main_filename = f"cpu_test_fixtures_{timestamp}.json"

    saved_path = save_fixtures_universal(test_suite, main_filename)
    generic_path = save_fixtures_universal(test_suite, "cpu_test_fixtures.json")

    print(f"\nTEST FIXTURES GENERATED!")
    print(f"=" * 60)
    print(f"Timestamped: {main_filename}")
    print(f"Generic: gpu_test_fixtures.json")
    print(f"\nVALIDATION SUMMARY:")
    print(f"   Scan primitives (3/3): exclusive_scan, segmented_scan_sum, segmented_scan_max")
    print(f"   Reduce primitives (3/3): segmented_reduce_sum, segmented_reduce_max, segmented_reduce_min")
    print(f"   Financial metrics (6/6): ALL VALIDATED")
    print(f"   Total functions: 12/12")
    print(f"   Consistency checks: scan vs reduce verified")

    return test_suite

# Generate complete test fixtures
complete_test_suite = generate_complete_fixtures()

if complete_test_suite:
    print(f"\nSUCCESS! Complete test fixture!")
    print(f"Students now have the full primitive toolkit for GPU implementation!")
else:
    print(f"\nFix any test failures before generating complete fixtures.")

GENERATING TEST FIXTURES
Computing ALL primitive operations...
Computing all scan operations...
Computing all reduce operations...
Computing all financial metrics...
SAVING FIXTURES cpu_test_fixtures_20251025_0904.json
Saved locally: /home/quydx/advancedhpc2025/project/cpu_test_fixtures_20251025_0904.json
SAVING FIXTURES cpu_test_fixtures.json
Saved locally: /home/quydx/advancedhpc2025/project/cpu_test_fixtures.json

TEST FIXTURES GENERATED!
Timestamped: cpu_test_fixtures_20251025_0904.json
Generic: gpu_test_fixtures.json

VALIDATION SUMMARY:
   Scan primitives (3/3): exclusive_scan, segmented_scan_sum, segmented_scan_max
   Reduce primitives (3/3): segmented_reduce_sum, segmented_reduce_max, segmented_reduce_min
   Financial metrics (6/6): ALL VALIDATED
   Total functions: 12/12
   Consistency checks: scan vs reduce verified

SUCCESS! Complete test fixture!
Students now have the full primitive toolkit for GPU implementation!


## Benchmarking

In [8]:
## Performance Benchmarking

class CPUBenchmark:
    """Clean benchmark framework for CPU functions."""

    def __init__(self, n_elements=50000):
        self.n_elements = n_elements
        self.n_runs = 5
        self.results = {}
        self.burnin_iterations = 4

    def generate_test_data(self):
        """Generate test data for benchmarking."""
        np.random.seed(42)

        # Generate segment flags and IDs
        flags = np.zeros(self.n_elements, dtype=np.int32)
        flags[::100] = 1  # Segment every 100 elements
        flags[-1] = 1
        seg_ids = CPUFinancialPrimitives.exclusive_scan(flags)

        # Generate values and prices
        values = np.random.randn(self.n_elements).astype(np.float32)
        prices = np.abs(values) + 100

        # Generate multi-security portfolio data
        n_securities = 3
        n_days = min(self.n_elements, 10000)
        prices_multi = np.tile(prices[:n_days], (n_securities, 1))
        holdings = np.ones_like(prices_multi) * 100

        return {
            'flags': flags,
            'seg_ids': seg_ids,
            'values': values,
            'prices': prices,
            'holdings': holdings,
            'prices_multi': prices_multi,
            'n_days': n_days
        }

    def get_functions_to_benchmark(self, data):
        """Define all functions to benchmark."""
        return [
            # Primitives
            ('exclusive_scan',
             lambda: CPUFinancialPrimitives.exclusive_scan(data['flags'])),
            ('segmented_scan_sum',
             lambda: CPUFinancialPrimitives.segmented_scan_sum(data['values'], data['seg_ids'])),
            ('segmented_scan_max',
             lambda: CPUFinancialPrimitives.segmented_scan_max(data['values'], data['seg_ids'])),
            ('segmented_reduce_sum',
             lambda: CPUFinancialPrimitives.segmented_reduce_sum(data['values'], data['seg_ids'])),
            ('segmented_reduce_max',
             lambda: CPUFinancialPrimitives.segmented_reduce_max(data['values'], data['seg_ids'])),
            ('segmented_reduce_min',
             lambda: CPUFinancialPrimitives.segmented_reduce_min(data['values'], data['seg_ids'])),

            # Financial metrics
            ('cumulative_returns',
             lambda: CPUFinancialMetrics.cumulative_returns(data['prices'], data['seg_ids'])),
            ('simple_moving_average',
             lambda: CPUFinancialMetrics.simple_moving_average(data['prices'], data['seg_ids'], 5)),
            ('rolling_std',
             lambda: CPUFinancialMetrics.rolling_std(data['prices'][:10000], data['seg_ids'][:10000], 10)),
            ('max_drawdown',
             lambda: CPUFinancialMetrics.max_drawdown(data['prices'], data['seg_ids'])),
            ('portfolio_value',
             lambda: CPUFinancialMetrics.portfolio_value(data['holdings'], data['prices_multi'])),
            ('high_water_mark',
             lambda: CPUFinancialMetrics.high_water_mark(
                 CPUFinancialMetrics.portfolio_value(data['holdings'], data['prices_multi']),
                 data['seg_ids'][:data['n_days']]))
        ]

    def benchmark_function(self, func_name, func_call):
        """Benchmark a single function."""
        try:
            # Warmup
            for i in range(self.burnin_iterations):
                _ = func_call()

            # Benchmark runs
            times = []
            for _ in range(self.n_runs):
                start = time.perf_counter()
                _ = func_call()
                end = time.perf_counter()
                times.append(end - start)

            # Calculate statistics
            avg_time_ms = np.mean(times) * 1000
            std_time_ms = np.std(times) * 1000
            throughput = self.n_elements / np.mean(times) / 1e6

            return {
                'status': 'success',
                'avg_time_ms': avg_time_ms,
                'std_time_ms': std_time_ms,
                'throughput_M_per_sec': throughput
            }

        except Exception as e:
            return {'status': 'error', 'message': str(e)}

    def print_function_result(self, func_name, result):
        """Print benchmark result for a single function."""
        if result['status'] == 'success':
            print(f"{func_name:30s} {result['avg_time_ms']:8.3f} ± {result['std_time_ms']:6.3f} ms"
                  f"  ({result['throughput_M_per_sec']:6.2f} M/s)")
        else:
            print(f"{func_name:30s} Error: {result.get('message', 'Unknown error')}")

    def calculate_summary(self):
        """Calculate overall benchmark summary."""
        successful = {k: v for k, v in self.results.items()
                     if v.get('status') == 'success'}

        if not successful:
            return {'successful': 0, 'total': len(self.results)}

        total_time = sum(v['avg_time_ms'] for v in successful.values())
        avg_throughput = np.mean([v['throughput_M_per_sec'] for v in successful.values()])

        return {
            'successful': len(successful),
            'total': len(self.results),
            'total_time_ms': total_time,
            'avg_throughput': avg_throughput
        }

    def print_summary(self, summary):
        """Print benchmark summary."""
        print("\n" + "=" * 70)
        print("BENCHMARK SUMMARY")
        print("=" * 70)

        if summary['successful'] == 0:
            print("No successful benchmarks")
            return

        print(f"Functions benchmarked: {summary['successful']}/{summary['total']}")
        print(f"Total CPU time: {summary['total_time_ms']:.3f} ms")
        print(f"Average throughput: {summary['avg_throughput']:.2f} M elements/sec")
        print(f"\nThese timings will be used as GPU baseline comparisons")

    def run(self):
        """Run complete benchmark suite."""
        print("=" * 70)
        print("CPU PERFORMANCE BENCHMARK")
        print("=" * 70)

        # Generate test data
        data = self.generate_test_data()
        functions = self.get_functions_to_benchmark(data)

        print(f"\nBenchmarking {len(functions)} functions with {self.n_runs} runs each...")
        print(f"Data size: {self.n_elements:,} elements\n")

        # Benchmark each function
        for func_name, func_call in functions:
            result = self.benchmark_function(func_name, func_call)
            self.results[func_name] = result
            self.print_function_result(func_name, result)

        # Print summary
        summary = self.calculate_summary()
        self.print_summary(summary)

        return self.results


# Run CPU benchmark
cpu_benchmark = CPUBenchmark(n_elements=1000000)
cpu_benchmark_results = cpu_benchmark.run()

# Export CPU benchmark results for GPU comparison
benchmark_export = {
    'n_elements': 1000000,
    'timestamp': datetime.now().isoformat(),
    'cpu_benchmarks': {
        name: {
            'avg_time_ms': result['avg_time_ms'],
            'std_time_ms': result['std_time_ms'],
            'throughput_M_per_sec': result['throughput_M_per_sec']
        }
        for name, result in cpu_benchmark_results.items()
        if result.get('status') == 'success'
    }
}

save_fixtures_universal(benchmark_export, 'cpu_benchmarks_1M.json')

print("CPU benchmark results saved to cpu_benchmarks_1M.json")


CPU PERFORMANCE BENCHMARK

Benchmarking 12 functions with 5 runs each...
Data size: 1,000,000 elements

exclusive_scan                  263.254 ±  8.381 ms  (  3.80 M/s)
segmented_scan_sum              234.870 ± 10.490 ms  (  4.26 M/s)
segmented_scan_max              296.029 ±  2.284 ms  (  3.38 M/s)
segmented_reduce_sum            279.620 ±  5.525 ms  (  3.58 M/s)
segmented_reduce_max            337.428 ±  8.427 ms  (  2.96 M/s)
segmented_reduce_min            334.728 ±  2.710 ms  (  2.99 M/s)
cumulative_returns              793.172 ± 10.635 ms  (  1.26 M/s)
simple_moving_average          1765.685 ± 24.234 ms  (  0.57 M/s)
rolling_std                      56.679 ±  0.652 ms  ( 17.64 M/s)
max_drawdown                    305.851 ± 10.329 ms  (  3.27 M/s)
portfolio_value                   0.023 ±  0.006 ms  (44272.48 M/s)
high_water_mark                   3.159 ±  0.091 ms  (316.60 M/s)

BENCHMARK SUMMARY
Functions benchmarked: 12/12
Total CPU time: 4670.496 ms
Average throughput: 3719.4

## That's all, folks!