<a href="https://colab.research.google.com/github/KyPython/systolic-array-simulator/blob/main/Systolic_Array_Architecture.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [59]:
class ProcessingElement:
    """
    A single cell in a systolic array.
    Performs one multiply-accumulate operation per cycle.
    """
    def __init__(self, row, col):
        self.row = row
        self.col = col
        self.accumulator = 0
        self.a_reg = 0  # value from left (stored from previous cycle)
        self.b_reg = 0  # value from top (stored from previous cycle)

    def process(self, a_in_for_current_cycle, b_in_for_current_cycle):
        """
        One clock cycle of operation:
        1. Use OLD a_reg, b_reg to update accumulator.
        2. Store current inputs into a_reg, b_reg for NEXT cycle.
        3. Output current inputs to neighbors (right, down).
        """
        # 1) MAC with previous-cycle values
        self.accumulator += self.a_reg * self.b_reg

        # 2) Load current inputs into registers for next cycle
        self.a_reg = a_in_for_current_cycle
        self.b_reg = b_in_for_current_cycle

        # 3) Pass current inputs to neighbors
        a_out_to_neighbor = a_in_for_current_cycle
        b_out_to_neighbor = b_in_for_current_cycle

        return a_out_to_neighbor, b_out_to_neighbor


class SystolicArray:
    """
    Systolic Array for Matrix Multiplication.
    Computes C = A x B where:
    - A flows left to right
    - B flows top to bottom
    - Results accumulate in place
    """
    def __init__(self, size=3):
        self.size = size
        self.pe = [
            [ProcessingElement(i, j) for j in range(size)]
            for i in range(size)
        ]

        # Pipeline registers between neighbors (outputs from this cycle,
        # become neighbor inputs next cycle)
        self.a_pipe_out_regs = [[0] * size for _ in range(size)]
        self.b_pipe_out_regs = [[0] * size for _ in range(size)]

    def multiply(self, A, B, verbose=True):
        """
        Perform matrix multiplication using systolic array.
        A and B are size x size matrices (lists of lists).
        """
        cycles = 3 * self.size - 1  # enough for all products to flow through

        if verbose:
            print("\nSYSTOLIC ARRAY MATRIX MULTIPLICATION")
            print("=" * 60)
            print(f"Array Size: {self.size}x{self.size}")
            print(f"Total Cycles: {cycles}\n")

        for cycle_num in range(cycles):
            if verbose:
                print(f"Cycle {cycle_num + 1}:")

            # Inputs to PEs for THIS cycle
            inputs_to_pes_this_cycle_a = [[0] * self.size for _ in range(self.size)]
            inputs_to_pes_this_cycle_b = [[0] * self.size for _ in range(self.size)]

            # Outputs from PEs produced in THIS cycle
            outputs_from_pes_this_cycle_a = [[0] * self.size for _ in range(self.size)]
            outputs_from_pes_this_cycle_b = [[0] * self.size for _ in range(self.size)]

            # 1) Decide inputs to each PE this cycle
            for i in range(self.size):
                for j in range(self.size):
                    # A input: from left neighbor or external A
                    if j == 0:
                        # A[i][k] enters PE[i][0] at cycle_num = i + k  -> k = cycle_num - i
                        if cycle_num >= i and (cycle_num - i) < self.size:
                            inputs_to_pes_this_cycle_a[i][j] = A[i][cycle_num - i]
                        else:
                            inputs_to_pes_this_cycle_a[i][j] = 0
                    else:
                        inputs_to_pes_this_cycle_a[i][j] = self.a_pipe_out_regs[i][j - 1]

                    # B input: from top neighbor or external B
                    if i == 0:
                        # B[k][j] enters PE[0][j] at cycle_num = j + k  -> k = cycle_num - j
                        if cycle_num >= j and (cycle_num - j) < self.size:
                            inputs_to_pes_this_cycle_b[i][j] = B[cycle_num - j][j]
                        else:
                            inputs_to_pes_this_cycle_b[i][j] = 0
                    else:
                        inputs_to_pes_this_cycle_b[i][j] = self.b_pipe_out_regs[i - 1][j]

            # 2) Process all PEs in parallel for this cycle
            for i in range(self.size):
                for j in range(self.size):
                    a_out, b_out = self.pe[i][j].process(
                        inputs_to_pes_this_cycle_a[i][j],
                        inputs_to_pes_this_cycle_b[i][j]
                    )
                    outputs_from_pes_this_cycle_a[i][j] = a_out
                    outputs_from_pes_this_cycle_b[i][j] = b_out

            # 3) Update neighbor pipeline registers for NEXT cycle
            self.a_pipe_out_regs = outputs_from_pes_this_cycle_a
            self.b_pipe_out_regs = outputs_from_pes_this_cycle_b

            if verbose:
                self.print_state()

        # Extract result matrix from accumulators
        result = [
            [self.pe[i][j].accumulator for j in range(self.size)]
            for i in range(self.size)
        ]
        return result

    def print_state(self):
        print("  Accumulators:")
        for i in range(self.size):
            row = []
            for j in range(self.size):
                row.append(f"{self.pe[i][j].accumulator:3}")
            print("    [" + ", ".join(row) + "]")
        print()


def standard_matmul(A, B):
    """Reference CPU-style matrix multiplication."""
    n = len(A)
    C = [[0] * n for _ in range(n)]
    for i in range(n):
        for j in range(n):
            s = 0
            for k in range(n):
                s += A[i][k] * B[k][j]
            C[i][j] = s
    return C


# ---- Simple validation harness ----

if __name__ == "__main__":
    # 3x3 test
    A_3x3 = [[1, 2, 3],
             [4, 5, 6],
             [7, 8, 9]]

    B_3x3 = [[9, 8, 7],
             [6, 5, 4],
             [3, 2, 1]]

    expected_3x3 = [[sum(A_3x3[i][k] * B_3x3[k][j] for k in range(len(A_3x3[0])))
                     for j in range(len(B_3x3[0]))]
                    for i in range(len(A_3x3))]

    array_3x3 = SystolicArray(size=3)
    result_3x3 = array_3x3.multiply(A_3x3, B_3x3, verbose=False)

    print("3x3 expected:", expected_3x3)
    print("3x3 systolic:", result_3x3)

    # 2x2 test
    A_2x2 = [[1, 2],
             [3, 4]]

    B_2x2 = [[5, 6],
             [7, 8]]

    expected_2x2 = [[sum(A_2x2[i][k] * B_2x2[k][j] for k in range(len(A_2x2[0])))
                     for j in range(len(B_2x2[0]))]
                    for i in range(len(A_2x2))]

    array_2x2 = SystolicArray(size=2)
    result_2x2 = array_2x2.multiply(A_2x2, B_2x2, verbose=False)

    print("2x2 expected:", expected_2x2)
    print("2x2 systolic:", result_2x2)


3x3 expected: [[30, 24, 18], [84, 69, 54], [138, 114, 90]]
3x3 systolic: [[30, 24, 18], [84, 69, 54], [138, 114, 90]]
2x2 expected: [[19, 22], [43, 50]]
2x2 systolic: [[19, 22], [43, 50]]
