In [None]:

"""
A producer P over a buffer list B is a set of workers P(W), a single job prod:(source, target) and holds a function 
ready signal s_p : B -> {0,1} indicating the buffers that need to be produced at the current time. 

it has states P.free, P.waiting, P.producing. 

A consumer C over that same buffer list B is also a set of workers and single job, and holds a function filled signal s_c : B->{0,1}
indicating the buffers that are filled and ready to consumed 

it has states C.free, C.waiting, C.consuming. 

a buffer b itself, is again a set/sequence of memory objects, (maybe some contigous chunk of memory)

and it has states b.empty, b.getting_produced, b.full, b.getting_consumed.  


okay, now let P be a producer, and C a consumer and B a list of buffers 

lets write some basic rules 

1. C.state(t+1) == C.waiting if  for all b in B, b.state(t) != b.full. 

2. P.state(t+1) == P.waiting if  for all b in B, b.state(t) != b.empty. 

3. The other is of course mutual exclisivity of states, a thing cannot be in two states at once 

4. P.state(t+1) = P.producing if for some b in B, b.state(t) = b.empty, in which case that b goes to b.state(t+1) = b.getting_produced
    and p.state(t) != p.producing. 
    
5  p.state(t+1) = P.free if P.state(t) = P.producing, in which case, the one b for which b.state(t) = b.getting_produced 
  now bas b.state(t+1) = b.full. 
  
6 C.state(t+1) = C.consuming if for some b in B, b.state(t) = b.full, in which case that b goes to b.state(t+1) = b.getting_consumed
   and C.state(t) != c.consuming 
7 C.state(t+1) = C.free if C.state(t) = C.consuming in which case the one b for which b.state(t) = b.getting_consumed 
now has state b.state(t_1) = b.empty. 



The init state is (P.free, C.free, bi = bi.empty for all bi in B)




"""

#### Correctness spec: 

we have a producer $P$ a consumer $C$ and a buffer set $B$. 
We have states 

$S(P) = \{p.f,s.w,p.pr\}$, $S(C) = \{c.f, c.w,c.cs\}$, $s(b) = \{e, gp, fl, gc\}$

###### System invariants: 

1. Mutual exclusivity, (implied by the set theory) any object can be in exactly one state at a given time. 
2. Producer lock: at any $t$, $S(P)_t = p.pr \iff !\exists b \in B: s(b)_t = gp$ 
3. Consumer lock: at any $t$  $S(C)_t = c.cs \iff !\exists b \in B: s(b)_t = gc$


##### Inital state: 

$(S(P)_0 = p.f, \ S(C)_0 = p.f, \  s(b) = e \  \forall b \in B$ 

##### tansitions 




In [None]:
import numpy as np
class producer_consumer: 
  def __init__ (self, n_resources:int, n_producers:int, n_buffers:int, prod_job_latency:int, cons_job_latency:int, n_jobs): 
    """Here, we have n paralell resources of the same "size" in some sense (same number of threads say)
    we partition those into two parts, one part of producers, the other consumers. 
    we have a number of buffers, from which we can produce into and consume from, and we have some latency per resource 
    of both production and consumption, and it is obvoious that one resrouce handles the production/consumption of one job
    the goal is to compute consume(produce(job)) for n_jobs. 

    Args:
        n_resources (int): _description_
        n_producers (int): _description_
        n_buffers (int): _description_
        prod_job_latency (int): _description_
        cons_job_latency (int): _description_
        n_jobs (_type_): _description_
    """
    
    #we just creating as many states as there are atmost clock cycles which would just be (pl + cl + 1)*n_jobs
    self.pl = prod_job_latency 
    self.cl - cons_job_latency
    self.max_clocks = (self.pl + self.cl + 2)*n_jobs
    self.n_jobs = n_jobs 
    self.n_w = n_resources
    self.n_prod = n_producers 
    self.n_cons = self.n_w - self.n_prod
    self.n_buff = n_buffers
    
    self.producers_states = np.zeros((self.n_prod, self.max_clocks)).astype(int)
    self.consumers_states = np.zeros((self.n_cons,self.max_clocks)).astype(int)
    self.buffers_states = np.zeros((self.n_buff,n_jobs)).astype(int)
    
    self.producers_latency_points = np.zeros((self.n_prod)).astype(int)
    self.consumers_latency_points = np.zeros((self.n_cons)).astype(int)
    
    
    
    for i in range(1, self.max_clocks): 
      prev_empty_buffers = self.get_empty_buffers(self.buffers_states[i-1])
      prev_full_buffers = self.get_filled_buffers(self.buffers_states[i-1])
      prev_getting_produced_buffers = self.get_getting_produced_buffers(self.buffers_states[i-1])
      prev_getting_consumed_buffers = self.get_getting_consumed_buffers(self.buffers_states[i-1])
      prev_free_producers = self.get_free_producers(self.producers_states[i-1])
      prev_free_consumers = self.get_free_consumers(self.consumers_states[i-1])
      prev_waiting_producers = self.get_waiting_producers(self.producers_states[i-1])
      prev_waiting_consumers = self.get_waiting_consumers(self.consumers_states[i-1])
      prev_producing_producers = self.get_producing_producers(self.producers_states[i-1])
      prev_consuming_consumers = self.get_consuming_consumers(self.consumers_states[i-1])
      
  # --- Buffer State Getters ---
  # 0: empty
  # 1: getting_produced
  # 2: full
  # 3: getting_consumed

  def get_empty_buffers(self, buffers_state): 
    """Finds all buffers in the 'empty' (0) state."""
    return np.where(buffers_state == 0)

  def get_filled_buffers(self, buffers_state): 
    """Finds all buffers in the 'full' (1) state."""
    return np.where(buffers_state == 2)

  def get_getting_produced_buffers(self, buffers_state):
    """Finds all buffers in the 'getting_produced' (2) state."""
    return np.where(buffers_state == 1)

  def get_getting_consumed_buffers(self, buffers_state):
    """Finds all buffers in the 'getting_consumed' (3) state."""
    return np.where(buffers_state == 3)

  # --- Producer State Getters ---
  # 0: free
  # 1: waiting
  # 2: producing

  def get_free_producers(self, producers_state):
    """Finds all producers in the 'free' (0) state."""
    return np.where(producers_state == 0)

  def get_waiting_producers(self, producers_state):
    """Finds all producers in the 'waiting' (1) state."""
    return np.where(producers_state == 1)
    
  def get_producing_producers(self, producers_state): 
    """Finds all producers in the 'producing' (2) state."""
    return np.where(producers_state == 2)

  # --- Consumer State Getters ---
  # 0: free
  # 1: waiting
  # 2: consuming

  def get_free_consumers(self, consumers_state):
    """Finds all consumers in the 'free' (0) state."""
    return np.where(consumers_state == 0)

  def get_waiting_consumers(self, consumers_state):
    """Finds all consumers in the 'waiting' (1) state."""
    return np.where(consumers_state == 1)
    
  def get_consuming_consumers(self, consumers_state): 
    """Finds all consumers in the 'consuming' (2) state."""
    return np.where(consumers_state == 2)

In [None]:
""" 
okay I guess we dont really need to maintain everything, what we will do 
is maintain a ready queue and wait queue as well as a "processing" counter for each producer and consumer 
and just maintain the number of empty, full, filling and consumed on buffers 


"""




In [None]:
from collections import deque # <-- 1. Import deque

class Producer_Consumer:
  def __init__ (self, n_resources:int, n_producers:int, n_buffers:int, prod_job_latency:int, cons_job_latency:int, n_jobs):
    """Here, we have n paralell resources of the same "size" in some sense (same number of threads say)
    we partition those into two parts, one part of producers, the other consumers.
    we have a number of buffers, from which we can produce into and consume from, and we have some latency per resource
    of both production and consumption, and it is obvoious that one resrouce handles the production/consumption of one job
    the goal is to compute consume(produce(job)) for n_jobs.

    Args:
        n_resources (int): _description_
        n_producers (int): _description_
        n_buffers (int): _description_
        prod_job_latency (int): _description_
        cons_job_latency (int): _description_
        n_jobs (_type_): _description_
    """

    #we just creating as many states as there are atmost clock cycles which would just be (pl + cl + 1)*n_jobs
    self.pl = prod_job_latency
    self.cl = cons_job_latency # <-- 2. Fixed bug (was 'self.cl -')
    self.max_clocks = (self.pl + self.cl + 2)*n_jobs
    self.n_jobs = n_jobs
    self.n_w = n_resources
    self.n_prod = n_producers
    self.n_cons = self.n_w - self.n_prod
    self.n_buff = n_buffers

    # --- 3. Converted lists to deques ---
    # These are pools or queues, so deque is ideal.
    self.free_producers = deque([i for i in range(self.n_prod)])
    self.free_consumers = deque([i for i in range(self.n_cons)])
    self.waiting_consumers = deque()
    self.waiting_producers = deque()
    
    # These will likely store (id, finish_clock) tuples
    self.producing_producers = deque() 
    self.consuming_consumers = deque()
    # --- End of deque conversion ---

    self.n_empty_buffers = self.n_buff
    self.n_full_buffers = 0
    self.n_getting_filled_buffers = 0
    self.n_getting_consumed_buffers = 0

    # --- 4. Kept these as lists ---
    # These look like they are used as arrays (accessed by index),
    # so a standard list is the correct choice here.
    self.produces_latency_points = [0 for _ in range (self.n_prod)]
    self.consumer_latency_points = [0 for _ in range (self.n_cons)]

    timer = 0
    for i in range(self.max_clocks):
    #phase one, arrivals: 
      while (self.produces_latency_points[self.producing_producers[-1]] == self.pl -1) and self.producing_producers: 
        x = self.producing_producers.pop()
        self.produces_latency_points[x] = 0 
        self.n_full_buffers +=1 
        self.n_getting_filled_buffers -= 1
        self.free_producers.appendleft(x) #arrives 
        
      while (self.consumer_latency_points[self.consuming_consumers[-1]] == self.cl -1) and self.consuming_consumers: 
        x = self.consuming_consumers.pop()
        self.consumer_latency_points[x] = 0 
        self.n_empty_buffers +=1 
        self.n_getting_consumed_buffers -= 1
        self.free_consumers.appendleft(x) #arrives 
        self.n_jobs -= 1
        
      #phase 1.1 increments 
      for x in self.producing_producers: 
        self.produces_latency_points[x] += 1
      
      for x in self.consuming_consumers: 
        self.consumer_latency_points[x] += 1
        
      #phase 2 schedule new jobs 
      while(self.waiting_consumers) and (self.n_full_buffers > 0): 
        x = self.waiting_consumers.pop() 
        self.n_full_buffers -= 1 
        self.n_getting_consumed_buffers += 1
        self.consuming_consumers.appendleft(x) 
        
      while(self.waiting_producers) and (self.n_empty_buffers > 0): 
        x = self.waiting_producers.pop()
        self.n_empty_buffers -= 1
        self.n_getting_filled_buffers += 1 
        self.producing_producers.appendleft(x) 
        
      #phase3 free ones wait for new jobs 
      
      while(self.free_consumers): 
        x  = self.free_consumers.pop()
        self.waiting_consumers.appendleft(x)
        
      while (self.free_producers): 
        x = self.free_producers.pop()
        self.waiting_producers.appendleft(x)
        
        


In [None]:
from collections import deque
import textwrap

class Producer_Consumer:
    def __init__(self, n_resources: int, n_producers: int, n_buffers: int, prod_job_latency: int, cons_job_latency: int, n_jobs: int):
        
        # --- Parameters ---
        self.pl = prod_job_latency
        self.cl = cons_job_latency
        self.n_jobs_total = n_jobs
        self.n_w = n_resources
        self.n_prod = n_producers
        self.n_cons = self.n_w - self.n_prod
        self.n_buff = n_buffers
        
        # Max clock safeguard to prevent infinite loops
        self.max_clocks = (self.pl + self.cl + 2) * self.n_jobs_total + 1000 # Added a buffer

        # --- Job Counters ---
        self.n_jobs_started = 0
        self.n_jobs_completed = 0

        # --- Worker Pools (using append/pop as stacks) ---
        self.free_producers = deque(range(self.n_prod))
        self.free_consumers = deque(range(self.n_cons))
        self.waiting_consumers = deque()
        self.waiting_producers = deque()
        
        # --- Active Worker Lists (using deque as a set/list) ---
        self.producing_producers = deque() 
        self.consuming_consumers = deque()
        
        # --- Buffer State Counters ---
        self.n_empty_buffers = self.n_buff
        self.n_full_buffers = 0
        self.n_getting_filled_buffers = 0
        self.n_getting_consumed_buffers = 0

        # --- Latency Tracking ---
        self.produces_latency_points = [0] * self.n_prod
        self.consumer_latency_points = [0] * self.n_cons
        
        # This will hold the history of our simulation
        self.state_history = []

    def _get_state_repr(self, timer: int) -> str:
        """Helper method to create a snapshot of the current state."""
        # Using textwrap to make the output clean
        state = f"""
        =====================================================
        TIMER: {timer}
        =====================================================
        JOBS:    {self.n_jobs_completed} / {self.n_jobs_total} Completed. ({self.n_jobs_started} Started)
        BUFFERS: Empty({self.n_empty_buffers}), Full({self.n_full_buffers}), Filling({self.n_getting_filled_buffers}), Consuming({self.n_getting_consumed_buffers})
        ---
        PRODUCERS:
          Free:     {list(self.free_producers)}
          Waiting:  {list(self.waiting_producers)}
          Producing: {list(self.producing_producers)}
        CONSUMERS:
          Free:     {list(self.free_consumers)}
          Waiting:  {list(self.waiting_consumers)}
          Consuming: {list(self.consuming_consumers)}
        """
        return textwrap.dedent(state)

    def run_simulation(self):
        """
        Runs the full simulation clock by clock.
        
        The order of operations in each clock cycle is critical:
        1. Check for finished work (Reap)
        2. Schedule new work (Sow)
        3. Assign new jobs to free workers
        4. Move any remaining free workers to 'waiting'
        5. Increment latency counters for active workers
        """
        timer = 0

        # Run until all jobs are *completed*, with a safeguard
        while self.n_jobs_completed < self.n_jobs_total:
            
            # Log the state *before* any changes in this cycle
            self.state_history.append(self._get_state_repr(timer))
            
            if timer > self.max_clocks:
                self.state_history.append("!!! SIMULATION TIMED OUT !!!")
                break
            
            # --- PHASE 1: CHECK FOR FINISHED WORK (REAP) ---
            # We iterate over a static list() copy to safely remove from the deque
            
            for producer_id in list(self.producing_producers):
                # Check if the job is done (latency counter is at max)
                if self.produces_latency_points[producer_id] == self.pl:
                    self.producing_producers.remove(producer_id)
                    self.produces_latency_points[producer_id] = 0 # Reset counter
                    
                    self.n_full_buffers += 1
                    self.n_getting_filled_buffers -= 1
                    self.free_producers.append(producer_id) # Add to FREE pool

            for consumer_id in list(self.consuming_consumers):
                if self.consumer_latency_points[consumer_id] == self.cl:
                    self.consuming_consumers.remove(consumer_id)
                    self.consumer_latency_points[consumer_id] = 0 # Reset counter
                    
                    self.n_empty_buffers += 1
                    self.n_getting_consumed_buffers -= 1
                    self.free_consumers.append(consumer_id) # Add to FREE pool
                    
                    self.n_jobs_completed += 1 # A job is fully done!

            # --- PHASE 2: SCHEDULE NEW WORK (SOW) ---
            # Match waiting workers to available buffers
            
            while self.waiting_producers and self.n_empty_buffers > 0:
                producer_id = self.waiting_producers.pop()
                
                self.n_empty_buffers -= 1
                self.n_getting_filled_buffers += 1
                self.producing_producers.append(producer_id) # Move to PRODUCING
                
            while self.waiting_consumers and self.n_full_buffers > 0:
                consumer_id = self.waiting_consumers.pop()
                
                self.n_full_buffers -= 1
                self.n_getting_consumed_buffers += 1
                self.consuming_consumers.append(consumer_id) # Move to CONSUMING

            # --- PHASE 3: ASSIGN NEW JOBS & MOVE FREE TO WAITING ---
            # This is your explicit model of the barrier `arrive` phase
            
            # First, assign new job "tickets" to free producers
            while self.free_producers and self.n_jobs_started < self.n_jobs_total:
                producer_id = self.free_producers.pop()
                self.waiting_producers.append(producer_id) # Move to WAITING
                self.n_jobs_started += 1
            
            # Second, move *all remaining* free workers to the waiting state
            # This models them "arriving" and now "waiting" for the next cycle
            while self.free_producers:
                self.waiting_producers.append(self.free_producers.pop())
                
            while self.free_consumers:
                self.waiting_consumers.append(self.free_consumers.pop())

            # --- PHASE 4: INCREMENT LATENCY COUNTERS ---
            # This must happen *last*, after all assignments are done.
            # We increment the timer for any worker that is *actively* working.
            
            for producer_id in self.producing_producers:
                self.produces_latency_points[producer_id] += 1
            
            for consumer_id in self.consuming_consumers:
                self.consumer_latency_points[consumer_id] += 1
                
            # --- END OF CYCLE ---
            timer += 1
        
        # Log the final state
        self.state_history.append(self._get_state_repr(timer))
        self.state_history.append(f"--- SIMULATION FINISHED AT CLOCK {timer} ---")
        
        return self.state_history

# --- Example of how to run it ---

# Create the simulation
sim = Producer_Consumer(
    n_resources=10, 
    n_producers=4,    # 4 producers
    n_buffers=6,      # 6 buffers
    prod_job_latency=5, # 5 clock cycles to produce
    cons_job_latency=3, # 3 clock cycles to consume
    n_jobs=20         # 20 total jobs
)

# Run it
history = sim.run_simulation()

# Print the first 10 and last 10 states
for state in history[:10]:
    print(state)

print("\n... [simulation running] ...\n")

for state in history[-10:]:
    print(state)


TIMER: 0
JOBS:    0 / 20 Completed. (0 Started)
BUFFERS: Empty(6), Full(0), Filling(0), Consuming(0)
---
PRODUCERS:
  Free:     [0, 1, 2, 3]
  Waiting:  []
  Producing: []
CONSUMERS:
  Free:     [0, 1, 2, 3, 4, 5]
  Waiting:  []
  Consuming: []


TIMER: 1
JOBS:    0 / 20 Completed. (4 Started)
BUFFERS: Empty(6), Full(0), Filling(0), Consuming(0)
---
PRODUCERS:
  Free:     []
  Waiting:  [3, 2, 1, 0]
  Producing: []
CONSUMERS:
  Free:     []
  Waiting:  [5, 4, 3, 2, 1, 0]
  Consuming: []


TIMER: 2
JOBS:    0 / 20 Completed. (4 Started)
BUFFERS: Empty(2), Full(0), Filling(4), Consuming(0)
---
PRODUCERS:
  Free:     []
  Waiting:  []
  Producing: [0, 1, 2, 3]
CONSUMERS:
  Free:     []
  Waiting:  [5, 4, 3, 2, 1, 0]
  Consuming: []


TIMER: 3
JOBS:    0 / 20 Completed. (4 Started)
BUFFERS: Empty(2), Full(0), Filling(4), Consuming(0)
---
PRODUCERS:
  Free:     []
  Waiting:  []
  Producing: [0, 1, 2, 3]
CONSUMERS:
  Free:     []
  Waiting:  [5, 4, 3, 2, 1, 0]
  Consuming: []


TIMER: 4
JO

In [2]:
%load_ext nvcc4jupyter

from nvcc4jupyter import set_defaults
set_defaults(compiler_args='-arch=sm_100a -Xptxas=-v')

Source files will be saved in "/tmp/tmpkxacqu0t".


In [4]:
%%cuda
#include<stdio.h>
#include<stdlib.h>
#include<cuda.h> 
#include<cuda_runtime.h> 
#include <cuda/barrier>
#include <cooperative_groups.h>

namespace cg = cooperative_groups;
using barrier = cuda::barrier<cuda::thread_scope_block>;


constexpr int N = 4096; 
constexpr int BN = 32; 
constexpr int N_blocks = 1;
constexpr int n_warps = 16;
constexpr int n_producer_warps = 8;
constexpr int n_consumer_warps = n_warps - n_producer_warps;

constexpr int block_size = n_warps*32;
constexpr int N_jobs = N//BN; 


__device__ void producer(barrier ready[], barrier filled[], float* As, float*Bs, float*A, float*B)
{
  for (int k = 0; k < N; k+= BN)
  {
    //#at any iteration, producer threads need to arrive at wait until that iterations's buffer is ready. 
    int buffer_idx = k % 2; 
    ready[buffer_idx].arrive_and_wait(); 
    //#okay now, n_producer_warps has n_producer_Warps*32 threads, and they need to load stuff of As and Bs. 
    //#we have 8 producer warps, so we treat out thread block like (32,8) and do float 4 loads. 
    int t = threadIdx.x; 
    int row = t // 8; 
    int col = t % 8;
    int A_row = 0 + row; 
    int A_col = k + 4*col; 
    int As_row = row; 
    int As_col = 4*col; 
    int B_row = k + row; 
    int B_col = 0 + 4*col; 
    int Bs_row = row; 
    int Bs_col = 4*col;
    
    *reinterpret_cast<float4*>(&As[buffer_idx][As_row*BN + As_col]) = *reinterpret_cast<float4*>(&A[A_row*N + A_col])
    *reinterpret_cast<float4*>(&Bs[buffer_idx][Bs_row*BN + Bs_col]) = *reinterpret_cast<float4*>(&B[B_row*BN + B_col])
    
    barrier::arrival_token token = filled[buffer_idx].arrive();
  }
}
__device__ void consumer(barrier ready[], barrier filled[], float* As, float*Bs, float*C)
{
  barrier::arrival_token token1 = ready[0].arrive(); /* buffer_0 is ready for initial fill */
  barrier::arrival_token token2 = ready[1].arrive(); /* buffer_1 is ready for initial fill */
  for (int k = 0; k < N; k+= BN)
  {
    //#at any iteration, consumer threads need to arrive at wait until that iterations's buffer is full. 
    int buffer_idx = k % 2; 
    filled[buffer_idx].arrive_and_wait(); 
    int t = threadIdx.x - (8*32); 
    int As_row = t // 32; 
    int As_col = t % 32; 
    int Bs_row = As_col; 
    int Bs_col = As_row; 
    
    //#again we have to do a 32x32 matmul, but we have (32x8) threads, which means, we need to do  A(8x32) B(32x8) matmuls
    for (int i = 0; i < 4; i++)
    {
      for (int j = 0; j < 4; j++)
        {
          int As_true_row = As_row + 4*i 
          int Bs_true_col = Bs_col + 4*j 
          C[As_true_row*BN + Bs_true_col] += As[buffer_idx][As_true_row*BN + As_col] * Bs[buffer_idx][Bs_row*BN + Bs_true_col];
        }
    }
  
    barrier::arrival_token token = ready[buffer_idx].arrive(); 
  }
    
}

__global__ void matmul(float*A, float*B, float*C)
{
  __shared__ float As[2][BN*BN];
  __shared__ float Bs[2][BN*BN];
  __shared__ barrier bar[4]; 
  int t = threadIdx.x; 
  if (t<4): 
    init(bar + t, block_size);
  __syncthreads(); 
  
  if (t < n_producer_warps*32)
    producer(bar, bar + 2, As, Bs, A, B)
  else 
    consumer(bar, bar + 2, As, Bs, C)
  
  __syncthreads();
  
  
}

int main()
{
  float *A_h, *A_d, *B_h, *B_d, *C_h, *C_d; 
  size_t size = N*BN*sizeof(float); 
  size_t size_C = BN*BN*sizeof(float);
  cudaHostAlloc(&A_h, size, cudaHostAllocDefault);
  cudaHostAlloc(&B_h, size, cudaHostAllocDefault);
  cudaHostAlloc(&C_h, size_C, cudaHostAllocDefault);
  cudaMalloc(&A_d, size);
  cudaMalloc(&B_d, size); 
  cudaMalloc(&C_d, size_C);
  for (int i = 0; i < BN*N; i++)
  {
    A_h[i] = (float) i / 10000.0; 
    B_h[i] = (float) i / 10000.0;
  }
  cudaMemcpy(A_d, A_h, size, cudaMemcpyHostToDevice);
  cudaMemcpy(B_d, B_h, size, cudaMemcpyHostToDevice);
  
  matmul<<<N_blocks, block_size>>>(A_d, B_d, C_d);
  
  cudaDeviceSynchronize();
  cudaMemcpy(C_h, C_d, size_C, cudaMemcpyDeviceToHost);
  for (int i = 0; i < 100; i++)
  {
    printf("%f, \n", C_h[i]);
  }
  return 0;
}


/tmp/tmpkxacqu0t/67cbe858-af9f-4bca-ac68-29bab99fc28d/single_file.cu(23): error: expected a ";"
  __attribute__((device)) void producer(barrier ready[], barrier filled[], float* As, float*Bs, float*A, float*B)
  ^

      barrier::arrival_token token = filled[buffer_idx].arrive();
                                                                ^


/tmp/tmpkxacqu0t/67cbe858-af9f-4bca-ac68-29bab99fc28d/single_file.cu(48): error: expected a declaration
    }
    ^

            C[As_true_row*BN + Bs_true_col] += As[buffer_idx][As_true_row*BN + As_col] * Bs[buffer_idx][Bs_row*BN + Bs_true_col];
                                                                                                                                ^

/tmp/tmpkxacqu0t/67cbe858-af9f-4bca-ac68-29bab99fc28d/single_file.cu(73): error: expected a declaration
          }
          ^

      barrier::arrival_token token = ready[buffer_idx].arrive();
                                                               ^

/tmp/tmpkxacq

In [6]:
%%cuda
#include<stdio.h>
#include<stdlib.h>
// #include<cuda.h> // Not strictly needed when using cuda_runtime.h
#include<cuda_runtime.h> 
#include <cuda/barrier>
#include <cooperative_groups.h> // Not used in this fix, but fine to include

// namespace cg = cooperative_groups; // Not used
using barrier = cuda::barrier<cuda::thread_scope_block>;


constexpr int N = 4096; 
constexpr int BN = 32; 
constexpr int N_blocks = 1;
constexpr int n_warps = 16;
constexpr int n_producer_warps = 8;
constexpr int n_consumer_warps = n_warps - n_producer_warps;

constexpr int block_size = n_warps*32; // 512 threads
// constexpr int N_jobs = N / BN; // Fixed syntax, but this constant wasn't used


// Note the change in As and Bs types:
// They are now pointers to arrays of size [BN*BN], which is what __shared__ As[2][BN*BN] decays to.
__device__ void producer(
    barrier ready[], 
    barrier filled[], 
    float (*As)[BN*BN], // Correct type for __shared__ array
    float (*Bs)[BN*BN], // Correct type for __shared__ array
    float* A, 
    float* B)
{
  for (int k = 0; k < N; k += BN)
  {
    // BUG FIX: The buffer index must be based on the *iteration*, not the value of k.
    // k=0 -> idx=0. k=BN -> idx=1. k=2*BN -> idx=0.
    int buffer_idx = (k / BN) % 2; 
    
    ready[buffer_idx].arrive_and_wait(); 
    
    int t = threadIdx.x; // 0..255 for producers
    int row = t / 8; // BUG FIX: C++ integer division is /
    int col = t % 8;
    
    int A_row = row; 
    int A_col = k + 4*col; 
    int As_row = row; 
    int As_col = 4*col; 
    
    int B_row = k + row; 
    int B_col = 4*col; 
    int Bs_row = row; 
    int Bs_col = 4*col;
    
    // Perform the 128-bit (float4) loads and stores
    
    // Load from A (BN x N) matrix: A[A_row, A_col] -> A[row, k + 4*col]
    *reinterpret_cast<float4*>(&As[buffer_idx][As_row*BN + As_col]) = 
        *reinterpret_cast<float4*>(&A[A_row*N + A_col]); // BUG FIX: Added semicolon
    
    // Load from B (N x BN) matrix: B[B_row, B_col] -> B[k + row, 4*col]
    // BUG FIX: B's index is B[row, col] = row * BN + col. Your code used N.
    *reinterpret_cast<float4*>(&Bs[buffer_idx][Bs_row*BN + Bs_col]) = 
        *reinterpret_cast<float4*>(&B[B_row*BN + B_col]); // BUG FIX: Added semicolon
    
    // Signal that this buffer is full
    filled[buffer_idx].arrive();
  }
}

__device__ void consumer(
    barrier ready[], 
    barrier filled[], 
    float (*As)[BN*BN], 
    float (*Bs)[BN*BN], 
    float* C)
{
  // BUG FIX (Major Logic): Accumulation must happen in registers.
  // We have 256 consumer threads. Map to a 8x32 grid.
  // Each thread will compute 4 elements of the final C matrix.
  
  int t = threadIdx.x - (n_producer_warps * 32); // 0..255 for consumers
  int thr_row = t / 32; // Thread's base row (0..7)
  int thr_col = t % 32; // Thread's column (0..31)
  
  // These registers will accumulate C[thr_row + (0, 8, 16, 24), thr_col]
  float C_reg[4] = {0.0f, 0.0f, 0.0f, 0.0f};

  // Signal that both buffers are ready to be filled for the first time
  ready[0].arrive();
  ready[1].arrive();

  for (int k = 0; k < N; k += BN)
  {
    // BUG FIX: Correct buffer index logic
    int buffer_idx = (k / BN) % 2; 
    
    // Wait for the producer to fill this buffer
    filled[buffer_idx].arrive_and_wait(); 
    
    // Pointers to the correct shared memory tile
    float* A_tile = As[buffer_idx];
    float* B_tile = Bs[buffer_idx];

    // --- Perform tile-level matrix multiplication ---
    // C_tile = A_tile * B_tile
    // Each thread computes 4 dot products
    for (int dot_k = 0; dot_k < BN; dot_k++)
    {
      C_reg[0] += A_tile[(thr_row +  0)*BN + dot_k] * B_tile[dot_k*BN + thr_col];
      C_reg[1] += A_tile[(thr_row +  8)*BN + dot_k] * B_tile[dot_k*BN + thr_col];
      C_reg[2] += A_tile[(thr_row + 16)*BN + dot_k] * B_tile[dot_k*BN + thr_col];
      C_reg[3] += A_tile[(thr_row + 24)*BN + dot_k] * B_tile[dot_k*BN + thr_col];
    }
  
    // Signal that this buffer is now ready to be refilled
    ready[buffer_idx].arrive(); 
  }
  
  // After all k-loops, write accumulated registers to global memory
  C[(thr_row +  0)*BN + thr_col] = C_reg[0];
  C[(thr_row +  8)*BN + thr_col] = C_reg[1];
  C[(thr_row + 16)*BN + thr_col] = C_reg[2];
  C[(thr_row + 24)*BN + thr_col] = C_reg[3];
}

__global__ void matmul(float*A, float*B, float*C)
{
  __shared__ float As[2][BN*BN];
  __shared__ float Bs[2][BN*BN];
  
  // We need 4 barriers: 2 for 'ready' (ping/pong) and 2 for 'filled' (ping/pong)
  __shared__ barrier bar[4]; 
  
  int t = threadIdx.x; 
  
  // BUG FIX: C++ syntax for if
  // BUG FIX: init() is a member function
  // Initialize all 4 barriers. All 512 threads are expected to participate
  // in the producer/consumer pattern, so the count is block_size.
  if (t < 4) 
    init(bar + t, block_size); 
  
  __syncthreads(); 
  
  if (t < n_producer_warps*32)
    // bar[0], bar[1] are 'ready' barriers
    // bar[2], bar[3] are 'filled' barriers
    producer(bar, bar + 2, As, Bs, A, B);
  else 
    consumer(bar, bar + 2, As, Bs, C);
  
}

int main()
{
  float *A_h, *A_d, *B_h, *B_d, *C_h, *C_d; 
  
  // A is (BN x N) -> (32 x 4096)
  // B is (N x BN) -> (4096 x 32)
  // C is (BN x BN) -> (32 x 32)
  size_t size_A = BN*N*sizeof(float); 
  size_t size_B = N*BN*sizeof(float); 
  size_t size_C = BN*BN*sizeof(float);
  
  cudaHostAlloc(&A_h, size_A, cudaHostAllocDefault);
  cudaHostAlloc(&B_h, size_B, cudaHostAllocDefault);
  cudaHostAlloc(&C_h, size_C, cudaHostAllocDefault);
  
  cudaMalloc(&A_d, size_A);
  cudaMalloc(&B_d, size_B); 
  cudaMalloc(&C_d, size_C);
  
  // Initialize A and B
  for (int i = 0; i < BN*N; i++)
  {
    A_h[i] = (float) (i % 100) + 1.0f; // Simpler test data
    B_h[i] = (float) (i % 100) + 1.0f;
  }
  
  // BUG FIX: C must be zeroed before the kernel, as the kernel performs an
  // accumulation (C += A*B), which in our case is handled by C_reg[4] = {0.0f...}
  // But if C were used for reduction, this would be critical.
  // It's still critical here because our kernel *writes* C, it doesn't add to it.
  // If the consumer threads didn't write to *all* of C, we would need this.
  // Better safe than sorry.
  cudaMemset(C_d, 0, size_C); 

  cudaMemcpy(A_d, A_h, size_A, cudaMemcpyHostToDevice);
  cudaMemcpy(B_d, B_h, size_B, cudaMemcpyHostToDevice);
  
  matmul<<<N_blocks, block_size>>>(A_d, B_d, C_d);
  
  cudaError_t err = cudaGetLastError();
  if (err != cudaSuccess) {
    printf("CUDA Error: %s\n", cudaGetErrorString(err));
  }
  
  cudaDeviceSynchronize();
  
  cudaMemcpy(C_h, C_d, size_C, cudaMemcpyDeviceToHost);
  
  printf("First 100 elements of C:\n");
  for (int i = 0; i < 100; i++)
  {
    printf("%f, \n", C_h[i]);
  }
  
  cudaFree(A_d);
  cudaFree(B_d);
  cudaFree(C_d);
  cudaFreeHost(A_h);
  cudaFreeHost(B_h);
  cudaFreeHost(C_h);
  
  return 0;
}

First 100 elements of C:
10061716.000000, 
10268372.000000, 
10475028.000000, 
10681684.000000, 
10207740.000000, 
10414396.000000, 
10621052.000000, 
10827708.000000, 
10304564.000000, 
10511220.000000, 
10717876.000000, 
10924532.000000, 
10352188.000000, 
10558844.000000, 
10765500.000000, 
10972156.000000, 
10350612.000000, 
10557268.000000, 
10763924.000000, 
10970580.000000, 
10299836.000000, 
10506492.000000, 
10713148.000000, 
10919804.000000, 
10199860.000000, 
10406516.000000, 
10613172.000000, 
10819828.000000, 
10060384.000000, 
10267040.000000, 
10473696.000000, 
10680352.000000, 
10062452.000000, 
10269124.000000, 
10475796.000000, 
10682468.000000, 
9864140.000000, 
10070812.000000, 
10277484.000000, 
10484156.000000, 
10026628.000000, 
10233300.000000, 
10439972.000000, 
10646644.000000, 
10139916.000000, 
10346588.000000, 
10553260.000000, 
10759932.000000, 
10204004.000000, 
10410676.000000, 
10617348.000000, 
10824020.000000, 
10218892.000000, 
10425564.000000, 
1063

In [17]:
%%cuda
#include<stdio.h>
#include<stdlib.h>
#include<cuda_runtime.h> 
#include <cuda/barrier>

using barrier = cuda::barrier<cuda::thread_scope_block>;

constexpr int N = 4096; 
constexpr int BN = 32; 
constexpr int N_blocks = 1;
constexpr int n_warps = 16;
constexpr int n_producer_warps = 8;
constexpr int n_consumer_warps = n_warps - n_producer_warps;
constexpr int block_size = n_warps*32;

__device__ void producer(
    barrier ready[], 
    barrier filled[], 
    float (*As)[BN*BN], 
    float (*Bs)[BN*BN], 
    float* A, 
    float* B)
{
  for (int k = 0; k < N; k += BN)
  {
    int buffer_idx = (k / BN) % 2; 
    
    ready[buffer_idx].arrive_and_wait(); 
    
    int t = threadIdx.x;
    int row = t / 8;
    int col = t % 8;
    
    int A_row = row; 
    int A_col = k + 4*col; 
    int As_row = row; 
    int As_col = 4*col; 
    
    int B_row = k + row; 
    int B_col = 4*col; 
    int Bs_row = row; 
    int Bs_col = 4*col;
    
    *reinterpret_cast<float4*>(&As[buffer_idx][As_row*BN + As_col]) = 
        *reinterpret_cast<float4*>(&A[A_row*N + A_col]);
    
    *reinterpret_cast<float4*>(&Bs[buffer_idx][Bs_row*BN + Bs_col]) = 
        *reinterpret_cast<float4*>(&B[B_row*BN + B_col]);
    
    filled[buffer_idx].arrive();
  }
}

__device__ void consumer(
    barrier ready[], 
    barrier filled[], 
    float (*As)[BN*BN], 
    float (*Bs)[BN*BN], 
    float* C)
{
  int t = threadIdx.x - (n_producer_warps * 32);
  int thr_row = t / 32;
  int thr_col = t % 32;
  
  float C_reg[4] = {0.0f, 0.0f, 0.0f, 0.0f};

  ready[0].arrive();
  ready[1].arrive();

  for (int k = 0; k < N; k += BN)
  {
    int buffer_idx = (k / BN) % 2; 
    
    filled[buffer_idx].arrive_and_wait(); 
    
    float* A_tile = As[buffer_idx];
    float* B_tile = Bs[buffer_idx];

    for (int dot_k = 0; dot_k < BN; dot_k++)
    {
      C_reg[0] += A_tile[(thr_row +  0)*BN + dot_k] * B_tile[dot_k*BN + thr_col];
      C_reg[1] += A_tile[(thr_row +  8)*BN + dot_k] * B_tile[dot_k*BN + thr_col];
      C_reg[2] += A_tile[(thr_row + 16)*BN + dot_k] * B_tile[dot_k*BN + thr_col];
      C_reg[3] += A_tile[(thr_row + 24)*BN + dot_k] * B_tile[dot_k*BN + thr_col];
    }
  
    ready[buffer_idx].arrive(); 
  }
  
  C[(thr_row +  0)*BN + thr_col] = C_reg[0];
  C[(thr_row +  8)*BN + thr_col] = C_reg[1];
  C[(thr_row + 16)*BN + thr_col] = C_reg[2];
  C[(thr_row + 24)*BN + thr_col] = C_reg[3];
}

__global__ void matmul(float*A, float*B, float*C)
{
  __shared__ float As[2][BN*BN];
  __shared__ float Bs[2][BN*BN];
  
  __shared__ barrier bar[4]; 
  
  int t = threadIdx.x; 
  
  if (t < 4) 
   init(bar + t, block_size);
  
  __syncthreads(); 
  
  if (t < n_producer_warps*32)
    producer(bar, bar + 2, As, Bs, A, B);
  else 
    consumer(bar, bar + 2, As, Bs, C);
  
}

int main()
{
  float *A_h, *A_d, *B_h, *B_d, *C_h, *C_d; 
  
  size_t size_A = BN*N*sizeof(float); 
  size_t size_B = N*BN*sizeof(float); 
  size_t size_C = BN*BN*sizeof(float);
  
  cudaHostAlloc(&A_h, size_A, cudaHostAllocDefault);
  cudaHostAlloc(&B_h, size_B, cudaHostAllocDefault);
  cudaHostAlloc(&C_h, size_C, cudaHostAllocDefault);
  
  cudaMalloc(&A_d, size_A);
  cudaMalloc(&B_d, size_B); 
  cudaMalloc(&C_d, size_C);
  
  for (int i = 0; i < BN*N; i++)
  {
    A_h[i] = (float) (i % 100) + 1.0f;
    B_h[i] = (float) (i % 100) + 1.0f;
  }
  
  cudaMemset(C_d, 0, size_C); 

  cudaMemcpy(A_d, A_h, size_A, cudaMemcpyHostToDevice);
  cudaMemcpy(B_d, B_h, size_B, cudaMemcpyHostToDevice);
  
  cudaEvent_t start, stop;
  float elapsedTime_ms;
  cudaEventCreate(&start);
  cudaEventCreate(&stop);

  cudaEventRecord(start);
  
  matmul<<<N_blocks, block_size>>>(A_d, B_d, C_d);
  
  cudaEventRecord(stop);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&elapsedTime_ms, start, stop);
  
  cudaError_t err = cudaGetLastError();
  if (err != cudaSuccess) {
    printf("CUDA Error: %s\n", cudaGetErrorString(err));
  }
  
  cudaMemcpy(C_h, C_d, size_C, cudaMemcpyDeviceToHost);
  
  printf("First 100 elements of C:\n");
  for (int i = 0; i < 100; i++)
  {
    printf("%f, \n", C_h[i]);
  }
  
  double total_flops = (double)BN * (double)BN * (2.0 * (double)N);
  double gflops_per_sec = (total_flops / 1e9) / (elapsedTime_ms / 1000.0);

  printf("\n--- Performance ---\n");
  printf("Kernel Time: %f ms\n", elapsedTime_ms);
  printf("Total FLOPs: %e\n", total_flops);
  printf("GFLOPS/s:    %f\n", gflops_per_sec);
  
  cudaEventDestroy(start);
  cudaEventDestroy(stop);
  
  cudaFree(A_d);
  cudaFree(B_d);
  cudaFree(C_d);
  cudaFreeHost(A_h);
  cudaFreeHost(B_h);
  cudaFreeHost(C_h);
  
  return 0;
}

First 100 elements of C:
10061716.000000, 
10268372.000000, 
10475028.000000, 
10681684.000000, 
10207740.000000, 
10414396.000000, 
10621052.000000, 
10827708.000000, 
10304564.000000, 
10511220.000000, 
10717876.000000, 
10924532.000000, 
10352188.000000, 
10558844.000000, 
10765500.000000, 
10972156.000000, 
10350612.000000, 
10557268.000000, 
10763924.000000, 
10970580.000000, 
10299836.000000, 
10506492.000000, 
10713148.000000, 
10919804.000000, 
10199860.000000, 
10406516.000000, 
10613172.000000, 
10819828.000000, 
10060384.000000, 
10267040.000000, 
10473696.000000, 
10680352.000000, 
10062452.000000, 
10269124.000000, 
10475796.000000, 
10682468.000000, 
9864140.000000, 
10070812.000000, 
10277484.000000, 
10484156.000000, 
10026628.000000, 
10233300.000000, 
10439972.000000, 
10646644.000000, 
10139916.000000, 
10346588.000000, 
10553260.000000, 
10759932.000000, 
10204004.000000, 
10410676.000000, 
10617348.000000, 
10824020.000000, 
10218892.000000, 
10425564.000000, 
1063

In [16]:
%%cuda
#include<stdio.h>
#include<stdlib.h>
#include<cuda_runtime.h> 

// No barrier needed for this simpler version

constexpr int N = 4096; 
constexpr int BN = 32; 
constexpr int N_blocks = 1;
constexpr int n_warps = 16;
// These are not needed for this kernel
// constexpr int n_producer_warps = 8;
// constexpr int n_consumer_warps = n_warps - n_producer_warps;
constexpr int block_size = n_warps*32; // 512 threads

__global__ void matmul_tiled(float*A, float*B, float*C)
{
  // Single buffer for shared memory
  __shared__ float As[BN*BN];
  __shared__ float Bs[BN*BN];
  
  int t = threadIdx.x; // 0..511
  
  // Map 512 threads to a 16x32 grid
  // Each thread will compute 2 elements of the C matrix
  int thr_row = t / 32; // 0..15
  int thr_col = t % 32; // 0..31
  
  // Registers to hold C results
  float C_reg[2] = {0.0f, 0.0f};

  // Each thread loads 2 floats (one float2) for As and 2 for Bs
  // 512 threads * 2 floats/thread = 1024 floats (32*32)
  int s_idx = t * 2;
  int s_row = s_idx / BN;
  int s_col = s_idx % BN;

  for (int k = 0; k < N; k += BN)
  {
    // --- Load Tile from Global to Shared Memory ---
    
    // Load for As
    // A is (BN x N), load A[s_row, k + s_col] & A[s_row, k + s_col + 1]
    float* gA_ptr = &A[s_row * N + (k + s_col)];
    float* sA_ptr = &As[s_row * BN + s_col];
    *reinterpret_cast<float2*>(sA_ptr) = *reinterpret_cast<float2*>(gA_ptr);
    
    // Load for Bs
    // B is (N x BN), load B[k + s_row, s_col] & B[k + s_row, s_col + 1]
    float* gB_ptr = &B[(k + s_row) * BN + s_col];
    float* sB_ptr = &Bs[s_row * BN + s_col];
    *reinterpret_cast<float2*>(sB_ptr) = *reinterpret_cast<float2*>(gB_ptr);
    
    __syncthreads();
    
    // --- Compute Tile-level Matmul ---
    // Each thread computes 2 elements
    for (int dot_k = 0; dot_k < BN; dot_k++)
    {
      C_reg[0] += As[(thr_row +  0)*BN + dot_k] * Bs[dot_k*BN + thr_col];
      C_reg[1] += As[(thr_row + 16)*BN + dot_k] * Bs[dot_k*BN + thr_col];
    }
    
    __syncthreads();
  }
  
  // --- Write results from Registers to Global ---
  C[(thr_row +  0)*BN + thr_col] = C_reg[0];
  C[(thr_row + 16)*BN + thr_col] = C_reg[1];
}

int main()
{
  float *A_h, *A_d, *B_h, *B_d, *C_h, *C_d; 
  
  size_t size_A = BN*N*sizeof(float); 
  size_t size_B = N*BN*sizeof(float); 
  size_t size_C = BN*BN*sizeof(float);
  
  cudaHostAlloc(&A_h, size_A, cudaHostAllocDefault);
  cudaHostAlloc(&B_h, size_B, cudaHostAllocDefault);
  cudaHostAlloc(&C_h, size_C, cudaHostAllocDefault);
  
  cudaMalloc(&A_d, size_A);
  cudaMalloc(&B_d, size_B); 
  cudaMalloc(&C_d, size_C);
  
  for (int i = 0; i < BN*N; i++)
  {
    A_h[i] = (float) (i % 100) + 1.0f;
    B_h[i] = (float) (i % 100) + 1.0f;
  }
  
  cudaMemset(C_d, 0, size_C); 

  cudaMemcpy(A_d, A_h, size_A, cudaMemcpyHostToDevice);
  cudaMemcpy(B_d, B_h, size_B, cudaMemcpyHostToDevice);
  
  cudaEvent_t start, stop;
  float elapsedTime_ms;
  cudaEventCreate(&start);
  cudaEventCreate(&stop);

  cudaEventRecord(start);
  
  // Call the new kernel
  matmul_tiled<<<N_blocks, block_size>>>(A_d, B_d, C_d);
  
  cudaEventRecord(stop);
  cudaEventSynchronize(stop);
  cudaEventElapsedTime(&elapsedTime_ms, start, stop);
  
  cudaError_t err = cudaGetLastError();
  if (err != cudaSuccess) {
    printf("CUDA Error: %s\n", cudaGetErrorString(err));
  }
  
  cudaMemcpy(C_h, C_d, size_C, cudaMemcpyDeviceToHost);
  
  printf("First 100 elements of C:\n");
  for (int i = 0; i < 100; i++)
  {
    printf("%f, \n", C_h[i]);
  }
  
  double total_flops = (double)BN * (double)BN * (2.0 * (double)N);
  double gflops_per_sec = (total_flops / 1e9) / (elapsedTime_ms / 1000.0);

  printf("\n--- Performance ---\n");
  printf("Kernel Time: %f ms\n", elapsedTime_ms);
  printf("Total FLOPs: %e\n", total_flops);
  printf("GFLOPS/s:    %f\n", gflops_per_sec);
  
  cudaEventDestroy(start);
  cudaEventDestroy(stop);
  
  cudaFree(A_d);
  cudaFree(B_d);
  cudaFree(C_d);
  cudaFreeHost(A_h);
  cudaFreeHost(B_h);
  cudaFreeHost(C_h);
  
  return 0;
}

First 100 elements of C:
10061716.000000, 
10268372.000000, 
10475028.000000, 
10681684.000000, 
10207740.000000, 
10414396.000000, 
10621052.000000, 
10827708.000000, 
10304564.000000, 
10511220.000000, 
10717876.000000, 
10924532.000000, 
10352188.000000, 
10558844.000000, 
10765500.000000, 
10972156.000000, 
10350612.000000, 
10557268.000000, 
10763924.000000, 
10970580.000000, 
10299836.000000, 
10506492.000000, 
10713148.000000, 
10919804.000000, 
10199860.000000, 
10406516.000000, 
10613172.000000, 
10819828.000000, 
10060384.000000, 
10267040.000000, 
10473696.000000, 
10680352.000000, 
10062452.000000, 
10269124.000000, 
10475796.000000, 
10682468.000000, 
9864140.000000, 
10070812.000000, 
10277484.000000, 
10484156.000000, 
10026628.000000, 
10233300.000000, 
10439972.000000, 
10646644.000000, 
10139916.000000, 
10346588.000000, 
10553260.000000, 
10759932.000000, 
10204004.000000, 
10410676.000000, 
10617348.000000, 
10824020.000000, 
10218892.000000, 
10425564.000000, 
1063

So basically, the producer consumer overhead is completely fucked in our case. Time to think differently. 