In [6]:
import cupy as cp
import time

# Pre-compiled kernel (same as provided)
kernel_code = """
extern "C" __global__
void broadcasted_multiplies_kernel(
    const float* lambdas,     
    const int* shapes,        
    const float* data,        
    const int* offsets,       
    const int* offsets_result, 
    float* result,             
    int N,                     
    int dims,                  
    int total_size             
) {
    int idx = blockIdx.x * blockDim.x + threadIdx.x;
    if (idx >= total_size) return;

    int i = 0;
    while (i < N - 1 && idx >= offsets_result[i + 1]) i++;
    int local_idx = idx - offsets_result[i];

    int temp = local_idx;
    int indices[4]; 
    for (int j = dims - 1; j >= 0; j--) {
        int s = shapes[i * dims + j];
        indices[j] = temp % s;
        temp /= s;
    }

    float val = lambdas[i];
    for (int j = 0; j < dims; j++) {
        int offset = offsets[i * dims + j];
        int index = indices[j];
        val *= data[offset + index];
    }
    result[idx] = val;
}
"""

# Compile the kernel only once.
broadcasted_multiplies_kernel = cp.RawKernel(kernel_code, 'broadcasted_multiplies_kernel')

def broadcasted_multiplies_cuda(lambdas, shapes, data, offsets, offsets_result):
    """
    Run a single broadcasted multiplication job.
    
    Parameters:
      - lambdas (cp.ndarray, shape (N,)): lambda coefficients
      - shapes (cp.ndarray, shape (N*dims,)): shape sizes for each batch item
      - data (cp.ndarray): concatenated data arrays
      - offsets (cp.ndarray, shape (N*dims,)): starting offsets of each array in data
      - offsets_result (cp.ndarray, shape (N,)): starting offsets for each result batch
      
    Returns:
      - result_cp (cp.ndarray): flattened result on the GPU
      - offsets_result (cp.ndarray): offsets defining the boundaries of each batch result
    """
    N = lambdas.size
    dims = shapes.size // N
    # Compute total output size based on last batch item.
    total_size = int(offsets_result[-1] + cp.prod(shapes[-dims:]))

    result_cp = cp.zeros(total_size, dtype=cp.float32)
    block_size = 256
    grid_size = (total_size + block_size - 1) // block_size

    broadcasted_multiplies_kernel(
        (grid_size,), (block_size,),
        (lambdas, shapes, data, offsets, offsets_result, result_cp, N, dims, total_size)
    )

    return result_cp, offsets_result

def prepare_data_from_arrayss(arrayss):
    """
    Prepare a single "job" for broadcasted multiplication.
    
    arrayss: list of lists of cp.ndarray for one job.
             Each inner list represents a batch item with a fixed number (dims) arrays.
             
    Returns:
      A tuple (lambdas, shapes, data, offsets, offsets_result) ready for the CUDA kernel.
      Note: This function assumes that you already have a corresponding lambdas array.
    """
    k = len(arrayss)        # number of batch items for this job
    dims = len(arrayss[0])   # number of arrays per batch item

    # Build the shapes vector
    shapes = cp.array([arr.size for arrays in arrayss for arr in arrays], dtype=cp.int32)

    # Concatenate the actual data from all arrays.
    data = cp.concatenate([arr for arrays in arrayss for arr in arrays])

    # Calculate the offset of each array in the concatenated data array.
    lengths = shapes.reshape(k, dims)
    # Compute cumulative sum on flattened array (except the last element).
    offsets = cp.cumsum(cp.concatenate([cp.array([0], dtype=cp.int32), lengths.flatten()[:-1]]))

    # Compute the total number of elements for each batch item.
    sizes = cp.prod(lengths.reshape(k, dims), axis=1)
    offsets_result = cp.cumsum(cp.concatenate([cp.array([0], dtype=cp.int32), sizes[:-1]]))
    return shapes, data, offsets, offsets_result

def run_batch_broadcasted_multiplies_cuda_optimized(lambdas_list, arrayss_list):
    """
    Optimally execute multiple broadcasted multiplication jobs concurrently.
    
    Parameters:
      - lambdas_list: List of cp.ndarray, one per job. Each array shape: (N,)
      - arrayss_list: List where each element is a list (length N) of lists (length dims) 
                      of cp.ndarray representing the data arrays for that job.
                      
    Returns:
      - results: List of lists containing the split output result for each job.
    """
    if len(lambdas_list) != len(arrayss_list):
        raise ValueError("Mismatched number of lambdas and arrayss jobs.")
    
    n_jobs = len(lambdas_list)
    streams = [cp.cuda.Stream(non_blocking=True) for _ in range(n_jobs)]
    results = [None] * n_jobs

    # Launch each job asynchronously in its own stream.
    for i, (lambdas_job, arrayss) in enumerate(zip(lambdas_list, arrayss_list)):
        # Create (or ensure) lambdas_job is on GPU in float32 format.
        lambdas_job = lambdas_job.astype(cp.float32) if lambdas_job.dtype != cp.float32 else lambdas_job
        
        with streams[i]:
            # Prepare job-specific data.
            shapes, data, offsets, offsets_result = prepare_data_from_arrayss(arrayss)
            
            # Note: If your `prepare_data_from_arrayss` should include lambdas, modify accordingly.
            # Here we assume lambdas_job is already prepared (and must have the same number of elements as arrayss).
            if lambdas_job.size != len(arrayss):
                raise ValueError("The size of lambdas does not match the number of batch items in arrayss.")
            
            result_cp, offsets_result = broadcasted_multiplies_cuda(
                lambdas_job, shapes, data, offsets, offsets_result
            )
            
            # Split the flattened result into separate arrays using the precomputed offsets.
            results[i] = cp.split(result_cp, offsets_result[1:].tolist())

    # Wait for all streams to finish execution.
    for stream in streams:
        stream.synchronize()

    return results

##########################################
# Example usage of the optimized version #
##########################################

# Let's say we want to run 10 jobs.
n_jobs = 10

# Prepare a list of lambdas arrays.
lambdas_list = [cp.array([1] * 50000, dtype=cp.float32) for _ in range(n_jobs)]

# Prepare a list of arrayss; each job has 5000 batch items and each batch item contains 4 arrays.
arrayss_list = [
    [
        [cp.array([2, 1, 1], dtype=cp.float32),
         cp.array([1.2], dtype=cp.float32),
         cp.array([1, 2], dtype=cp.float32),
         cp.array([3, 4], dtype=cp.float32)]
        for _ in range(50000)
    ]
    for _ in range(n_jobs)
]

# Time the overall execution.
start = time.time()
all_results = run_batch_broadcasted_multiplies_cuda_optimized(lambdas_list, arrayss_list)
cp.cuda.Stream.null.synchronize()  # Ensure that all asynchronous work is complete.
print("Total execution time for {} jobs: {:.4f} seconds".format(n_jobs, time.time() - start))

# Now, all_results[i] holds the list of the per-batch result arrays for job i.
# for res in all_results:
#     print(res)

Total execution time for 10 jobs: 1.9461 seconds
