For readers interested in a more detailed introduction to the backward pass of FlashAttention, you are warmly invited to check out the following two tutorials:

1. [Learning the Backward Pass of FlashAttention Part I Derivations](https://totalvariation.github.io/blog/2025/intro-flashattention-backward-part1/)
2. [Learning the Backward Pass of FlashAttention Part II Implementation in Triton](https://totalvariation.github.io/blog/2025/intro-flashattention-backward-part2/)

In [1]:
import pytest
import torch
import os

import triton
import triton.language as tl
from triton.runtime import driver

import numpy as np

In [2]:
HAS_TENSOR_DESC = False
DEVICE = triton.runtime.driver.active.get_active_torch_device()

def is_hip():
    return triton.runtime.driver.active.get_current_target().backend == "hip"


def is_cdna():
    return is_hip() and triton.runtime.driver.active.get_current_target().arch in ('gfx940', 'gfx941', 'gfx942',
                                                                                   'gfx90a', 'gfx908')

In [3]:
"""
FlashAttention Triton Implementation
===============

This is a Triton implementation of the Flash Attention v2 algorithm from Tri Dao (https://tridao.me/publications/flash2/flash2.pdf)

Credits: OpenAI kernel team & Triton tutorials: 06-fused-attention.py 
(https://github.com/triton-lang/triton/blob/8bd8035559d4d2fa387149eecdf10d545d8a3d03/python/tutorials/06-fused-attention.py)

Modified by X. Cai to account for both self-attention and cross-attention 
with arbitrary sequence lengths by properly handling boundary conditions.

This IPython notebook is for pedagogical purposes. 
For practical usage, I recommend using the official FlashAttention Repo (https://github.com/Dao-AILab/flash-attention) 
written in CUDA or refer to its Triton implementation.
"""

import pytest
import torch

import triton
import triton.language as tl

try:
    from triton.tools.tensor_descriptor import TensorDescriptor
    HAS_TENSOR_DESC = True
except ModuleNotFoundError:
    HAS_TENSOR_DESC = False

DEVICE = triton.runtime.driver.active.get_active_torch_device()

def is_hip():
    return triton.runtime.driver.active.get_current_target().backend == "hip"


def is_cuda():
    return triton.runtime.driver.active.get_current_target().backend == "cuda"


def supports_tma():
    return HAS_TENSOR_DESC and is_cuda() and torch.cuda.get_device_capability()[0] >= 9


@triton.jit
def _attn_fwd_inner(acc, l_i, m_i, q,  #
                    kT_ptrs, v_ptrs,  #
                    start_m, qk_scale,  #
                    stride_kn, stride_vn, #
                    BLOCK_M: tl.constexpr, BLOCK_N: tl.constexpr, STAGE: tl.constexpr,  #
                    offs_m: tl.constexpr, offs_n: tl.constexpr,  #
                    KV_LEN: tl.constexpr):
    # range of values handled by this stage
    if STAGE == 1:
        lo, hi = 0, start_m * BLOCK_M
    elif STAGE == 2: # diagonal block
        lo, hi = start_m * BLOCK_M, (start_m + 1) * BLOCK_M
        lo = tl.multiple_of(lo, BLOCK_M)
    # causal = False
    else:
        lo, hi = 0, KV_LEN
    kT_ptrs += lo * stride_kn
    v_ptrs += lo * stride_vn
    # loop over k, v and update accumulator
    for start_n in range(lo, hi, BLOCK_N):
        start_n = tl.multiple_of(start_n, BLOCK_N)
        # -- compute qk ----
        mask = (start_n + offs_n[None, :]) < KV_LEN
        kT = tl.load(kT_ptrs, mask=mask, other=0.0)
        s = tl.dot(q, kT)
        s += tl.where(mask, 0, -1.0e6)
        if STAGE == 2:
            mask = offs_m[:, None] >= (start_n + offs_n[None, :])
            s = s * qk_scale + tl.where(mask, 0, -1.0e6)
            m_ij = tl.maximum(m_i, tl.max(s, 1))
            s -= m_ij[:, None]
        else: 
            m_ij = tl.maximum(m_i, tl.max(s, 1) * qk_scale)
            s = s * qk_scale - m_ij[:, None]
        p = tl.math.exp2(s)
        l_ij = tl.sum(p, 1)
        # -- update m_i and l_i
        alpha = tl.math.exp2(m_i - m_ij)
        l_i = l_i * alpha + l_ij
        # -- update output accumulator --
        acc = acc * alpha[:, None]
        # update acc
        v = tl.load(v_ptrs, mask=(start_n + offs_n[:, None]) < KV_LEN, other=0.0)
        p = p.to(tl.float16)
        acc = tl.dot(p, v, acc)
        # update m_i and l_i
        m_i = m_ij
        kT_ptrs += BLOCK_N * stride_kn
        v_ptrs += BLOCK_N * stride_vn
    return acc, l_i, m_i


# We don't run auto-tuning every time to keep the tutorial fast. Keeping
# the code below and commenting out the equivalent parameters is convenient for
# re-tuning.
configs = [
    triton.Config({'BLOCK_M': BM, 'BLOCK_N': BN}, num_stages=s, num_warps=w) \
    for BM in [32, 64]\
    for BN in [32, 64]\
    for s in ([1] if is_hip() else [3, 4, 7])\
    for w in [4, 8]\
]


def keep(conf):
    BLOCK_M = conf.kwargs["BLOCK_M"]
    BLOCK_N = conf.kwargs["BLOCK_N"]
    if BLOCK_M * BLOCK_N < 128 * 128 and conf.num_warps == 8:
        return False
    return True


@triton.autotune(list(filter(keep, configs)), key=["Q_LEN", "HEAD_DIM_QK"])
@triton.jit
def _attn_fwd(Q, K, V, sm_scale, M, Out,  #
              stride_qz, stride_qh, stride_qm, stride_qk,  #
              stride_kz, stride_kh, stride_kn, stride_kk,  #
              stride_vz, stride_vh, stride_vn, stride_vk,  #
              stride_oz, stride_oh, stride_om, stride_ok,  #
              Z, H, Q_LEN, KV_LEN,  #
              HEAD_DIM_QK: tl.constexpr,  #
              HEAD_DIM_V: tl.constexpr,  #
              BLOCK_M: tl.constexpr,  #
              BLOCK_N: tl.constexpr,  #
              STAGE: tl.constexpr  #
              ):
    tl.static_assert(BLOCK_N <= HEAD_DIM_QK)
    tl.static_assert(BLOCK_N <= HEAD_DIM_V)
    start_m = tl.program_id(0)
    off_hz = tl.program_id(1)
    off_z = off_hz // H
    off_h = off_hz % H
    q_offset = off_z.to(tl.int64) * stride_qz + off_h.to(tl.int64) * stride_qh
    k_offset = off_z.to(tl.int64) * stride_kz + off_h.to(tl.int64) * stride_kh
    v_offset = off_z.to(tl.int64) * stride_vz + off_h.to(tl.int64) * stride_vh
    o_offset = off_z.to(tl.int64) * stride_oz + off_h.to(tl.int64) * stride_oh

    # block pointers
    Q += q_offset
    K += k_offset
    V += v_offset
    Out += o_offset
    
    # initialize offsets
    offs_m = start_m * BLOCK_M + tl.arange(0, BLOCK_M)
    offs_n = tl.arange(0, BLOCK_N)
    offs_k_qk = tl.arange(0, HEAD_DIM_QK)
    offs_k_v = tl.arange(0, HEAD_DIM_V)
    # initialize pointer to m and l
    m_i = tl.zeros([BLOCK_M], dtype=tl.float32) - float("inf")
    l_i = tl.zeros([BLOCK_M], dtype=tl.float32) + 1.0
    acc = tl.zeros([BLOCK_M, HEAD_DIM_V], dtype=tl.float32)
    # load scales
    qk_scale = sm_scale
    qk_scale *= 1.44269504  # 1/ln(2)
    # load q: it will stay in SRAM throughout
    q_ptrs = Q + offs_m[:, None] * stride_qm + offs_k_qk[None, :] * stride_qk 
    q = tl.load(q_ptrs, mask=offs_m[:, None] < Q_LEN, other=0.0)

    kT_ptrs = K + offs_n[None, :] * stride_kn + offs_k_qk[:, None] * stride_kk
    v_ptrs =  V + offs_n[:, None] * stride_vn + offs_k_v[None, :] * stride_vk

    # stage 1: off-band
    # For causal = True, STAGE = 3 and _attn_fwd_inner gets 1 as its STAGE
    # For causal = False, STAGE = 1, and _attn_fwd_inner gets 3 as its STAGE
    if STAGE & 1:
        acc, l_i, m_i = _attn_fwd_inner(acc, l_i, m_i, q, kT_ptrs, v_ptrs,  #
                                        start_m, qk_scale,  #
                                        stride_kn, stride_vn,  #
                                        BLOCK_M, BLOCK_N,  #
                                        4 - STAGE, offs_m, offs_n,  #
                                        KV_LEN  #
                                        )
    # stage 2: on-band
    if STAGE & 2:
        # barrier makes it easier for compiler to schedule the
        # two loops independently
        acc, l_i, m_i = _attn_fwd_inner(acc, l_i, m_i, q, kT_ptrs, v_ptrs,  #
                                        start_m, qk_scale,  #
                                        stride_kn, stride_vn,  #
                                        BLOCK_M, BLOCK_N,  #
                                        2, offs_m, offs_n,  #
                                        KV_LEN  #
                                        )
    # epilogue
    m_i += tl.math.log2(l_i)
    acc = acc / l_i[:, None]
    m_ptrs = M + off_hz * Q_LEN + offs_m
    tl.store(m_ptrs, m_i, mask=offs_m < Q_LEN)
    o_ptrs = Out + offs_m[:, None] * stride_om + offs_k_v[None, :] * stride_ok
    tl.store(o_ptrs, acc.to(Out.type.element_ty), mask=offs_m[:, None] < Q_LEN)


@triton.jit
def _attn_bwd_preprocess(O, DO,  #
                         Delta,  #
                         Z, H, O_LEN,  #
                         BLOCK_M: tl.constexpr, HEAD_DIM: tl.constexpr  #
                         ):
    off_m = tl.program_id(0) * BLOCK_M + tl.arange(0, BLOCK_M)
    off_hz = tl.program_id(1)
    off_n = tl.arange(0, HEAD_DIM)
    # load
    o = tl.load(O + off_hz * HEAD_DIM * O_LEN + off_m[:, None] * HEAD_DIM + off_n[None, :], 
                mask=off_m[:, None] < O_LEN, other=0.0).to(tl.float32)
    do = tl.load(DO + off_hz * HEAD_DIM * O_LEN + off_m[:, None] * HEAD_DIM + off_n[None, :], 
                 mask=off_m[:, None] < O_LEN, other=0.0).to(tl.float32)
    delta = tl.sum(o * do, axis=1)
    # write-back
    tl.store(Delta + off_hz * O_LEN + off_m, delta, mask=off_m < O_LEN)


# The main inner-loop logic for computing dK and dV.
@triton.jit
def _attn_bwd_dkdv(dk, dv,  #
                   Q, k, v, sm_scale,  #
                   DO,  #
                   M, D,  #
                   stride_qm, stride_qk,  #
                   stride_om, stride_ok,  #
                   H, Q_LEN,  # 
                   BLOCK_M1: tl.constexpr,  #
                   BLOCK_N1: tl.constexpr,  #
                   HEAD_DIM_Q: tl.constexpr,  #
                   HEAD_DIM_O: tl.constexpr,  #
                   # Filled in by the wrapper.
                   start_n, start_m, num_steps,  #
                   MASK: tl.constexpr):
    offs_m = start_m + tl.arange(0, BLOCK_M1)
    offs_n = start_n + tl.arange(0, BLOCK_N1)
    offs_k_q = tl.arange(0, HEAD_DIM_Q)
    offs_k_o = tl.arange(0, HEAD_DIM_O)
    qT_ptrs = Q + offs_m[None, :] * stride_qm + offs_k_q[:, None] * stride_qk
    do_ptrs = DO + offs_m[:, None] * stride_om + offs_k_o[None, :] * stride_ok
    # BLOCK_N1 must be a multiple of BLOCK_M1, otherwise the code wouldn't work.
    # tl.static_assert(BLOCK_N1 % BLOCK_M1 == 0)
    curr_m = start_m
    step_m = BLOCK_M1
    for blk_idx in range(num_steps):
        # Load m before computing qk to reduce pipeline stall.
        offs_m = curr_m + tl.arange(0, BLOCK_M1)
        mask = offs_m[None, :] < Q_LEN
        qT = tl.load(qT_ptrs, mask=mask, other=0.0)
        m = tl.load(M + offs_m, mask=offs_m < Q_LEN, other=0.0)
        sT = tl.dot(k, qT)
        sT += tl.where(mask, 0, -1.0e6)
        pT = tl.math.exp2(sT - m[None, :])
        # Autoregressive masking.
        if MASK:
            mask = (offs_m[None, :] >= offs_n[:, None])  # triu
            pT = tl.where(mask, pT, 0.0)
        do = tl.load(do_ptrs, mask=offs_m[:, None] < Q_LEN, other=0.0)
        # Compute dV.
        ppT = pT
        ppT = ppT.to(tl.float16)
        dv += tl.dot(ppT, do)
        # D (= delta) is pre-divided by ds_scale.
        Di = tl.load(D + offs_m, mask=offs_m < Q_LEN, other=0.0)
        # Compute dP and dS.
        dpT = tl.dot(v, tl.trans(do)).to(tl.float32)
        dsT = pT * (dpT - Di[None, :])
        dsT = dsT.to(tl.float16)
        dk += tl.dot(dsT, tl.trans(qT))
        # Increment pointers.
        curr_m += step_m
        qT_ptrs += step_m * stride_qm
        do_ptrs += step_m * stride_om
    return dk, dv


# the main inner-loop logic for computing dQ
@triton.jit
def _attn_bwd_dq(dq, q, K, V,  #
                 do, m, Di,
                 stride_kn, stride_kk,  #
                 stride_vn, stride_vk,  #
                 H, KV_LEN,  #
                 BLOCK_M2: tl.constexpr,  #
                 BLOCK_N2: tl.constexpr,  #
                 HEAD_DIM_K: tl.constexpr,  #
                 HEAD_DIM_V: tl.constexpr,  #
                 # Filled in by the wrapper.
                 start_m, start_n, num_steps,  #
                 MASK: tl.constexpr):
    offs_m = start_m + tl.arange(0, BLOCK_M2)
    offs_n = start_n + tl.arange(0, BLOCK_N2)
    offs_k_k = tl.arange(0, HEAD_DIM_K)
    offs_k_v = tl.arange(0, HEAD_DIM_V)
    kT_ptrs = K + offs_n[None, :] * stride_kn + offs_k_k[:, None] * stride_kk
    vT_ptrs = V + offs_n[None, :] * stride_vn + offs_k_v[:, None] * stride_vk
    # BLOCK_M2 must be a multiple of BLOCK_N2, otherwise the code wouldn't work.
    # tl.static_assert(BLOCK_M2 % BLOCK_N2 == 0)
    curr_n = start_n
    step_n = BLOCK_N2
    for blk_idx in range(num_steps):
        offs_n = curr_n + tl.arange(0, BLOCK_N2)
        mask = offs_n[None, :] < KV_LEN
        kT = tl.load(kT_ptrs, mask=mask, other=0.0)
        vT = tl.load(vT_ptrs, mask=mask, other=0.0)
        s = tl.dot(q, kT)
        s += tl.where(mask, 0, -1.0e6)
        p = tl.math.exp2(s - m)
        # Autoregressive masking.
        if MASK:
            #offs_n = curr_n + tl.arange(0, BLOCK_N2)
            mask = (offs_m[:, None] >= offs_n[None, :]) # tril
            p = tl.where(mask, p, 0.0)
        # Compute dP and dS.
        dp = tl.dot(do, vT).to(tl.float32)
        ds = p * (dp - Di)
        ds = ds.to(tl.float16)
        # Compute dQ.
        # NOTE: We need to de-scale dq in the end, because kT was pre-scaled.
        dq += tl.dot(ds, tl.trans(kT))
        # Increment pointers.
        curr_n += step_n
        kT_ptrs += step_n * stride_kn
        vT_ptrs += step_n * stride_vn
    return dq


@triton.jit
def _attn_bwd(Q, K, V, sm_scale,  #
              DO,  #
              DQ, DK, DV,  #
              M, D,  #
              stride_qz, stride_qh, stride_qm, stride_qk,  #
              stride_kz, stride_kh, stride_kn, stride_kk,  #
              stride_vz, stride_vh, stride_vn, stride_vk,  #
              stride_oz, stride_oh, stride_om, stride_ok,  #
              H, Q_LEN, KV_LEN,  #
              BLOCK_M1: tl.constexpr,  #
              BLOCK_N1: tl.constexpr,  #
              BLOCK_M2: tl.constexpr,  #
              BLOCK_N2: tl.constexpr,  #
              BLK_SLICE_FACTOR: tl.constexpr,  #
              HEAD_DIM_QK: tl.constexpr, HEAD_DIM_V: tl.constexpr,  #
              STAGE: tl.constexpr):
    LN2: tl.constexpr = 0.6931471824645996  # = ln(2)

    pid = tl.program_id(0)
    bhid = tl.program_id(2)
    off_z = bhid // H
    off_h = bhid % H
    off_chz = (bhid * Q_LEN).to(tl.int64)
    q_offset = off_z.to(tl.int64) * stride_qz + off_h.to(tl.int64) * stride_qh
    k_offset = off_z.to(tl.int64) * stride_kz + off_h.to(tl.int64) * stride_kh
    v_offset = off_z.to(tl.int64) * stride_vz + off_h.to(tl.int64) * stride_vh
    o_offset = off_z.to(tl.int64) * stride_oz + off_h.to(tl.int64) * stride_oh
    
    # offset pointers for batch/head
    Q += q_offset
    K += k_offset
    V += v_offset
    DO += o_offset
    DQ += q_offset
    DK += k_offset
    DV += v_offset
    M += off_chz
    D += off_chz

    offs_k_qk = tl.arange(0, HEAD_DIM_QK)
    offs_k_v = tl.arange(0, HEAD_DIM_V)

    if pid * BLOCK_N1 < KV_LEN:  # handle imbalance between different lengths of Q and KV
        tl.static_assert(BLOCK_N1 % BLOCK_M1 == 0)
        # load scales
        
        start_n = pid * BLOCK_N1
        offs_n = start_n + tl.arange(0, BLOCK_N1)

        dk = tl.zeros([BLOCK_N1, HEAD_DIM_QK], dtype=tl.float32)
        dv = tl.zeros([BLOCK_N1, HEAD_DIM_V], dtype=tl.float32)

        # load K and V: they stay in SRAM throughout the inner loop.
        k = tl.load(K + offs_n[:, None] * stride_kn + offs_k_qk[None, :] * stride_kk, 
                    mask=offs_n[:, None] < KV_LEN, other=0.0)
        v = tl.load(V + offs_n[:, None] * stride_vn + offs_k_v[None, :] * stride_vk,
                    mask=offs_n[:, None] < KV_LEN, other=0.0)
        
        # For causal = True, STAGE = 3 
        # For causal = False, STAGE = 1
        # Compute dK and dV for non-masked blocks.
        start_m = start_n + BLOCK_N1 if STAGE == 3 else 0
        num_steps = tl.cdiv((Q_LEN - start_m), BLOCK_M1)

        dk, dv = _attn_bwd_dkdv(  #
                                dk, dv,  #
                                Q, k, v, sm_scale,  #
                                DO,  #
                                M, D,  #
                                stride_qm, stride_qk,  #
                                stride_om, stride_ok,  #
                                H, Q_LEN,  #
                                BLOCK_M1, BLOCK_N1,  #
                                HEAD_DIM_QK, HEAD_DIM_V,  #
                                start_n, start_m, num_steps,  #
                                MASK=False  #
                                )

        if STAGE & 2: # diagonal block for causal masking
            start_m = start_n
            MASK_BLOCK_M1: tl.constexpr = BLOCK_M1 // BLK_SLICE_FACTOR
            num_steps = BLOCK_N1 // MASK_BLOCK_M1
            #num_steps = tl.cdiv(BLOCK_N1, MASK_BLOCK_M1)

            dk, dv = _attn_bwd_dkdv(dk, dv,  #
                                    Q, k, v, sm_scale,  #
                                    DO,  #
                                    M, D,  #
                                    stride_qm, stride_qk,  #
                                    stride_om, stride_ok,  #
                                    H, Q_LEN,  #
                                    MASK_BLOCK_M1, BLOCK_N1,  #
                                    HEAD_DIM_QK, HEAD_DIM_V,  #
                                    start_n, start_m, num_steps,  #
                                    MASK=True  #
                                    )

        # Write back dV.
        dv_ptrs = DV + offs_n[:, None] * stride_vn + offs_k_v[None, :] * stride_vk
        tl.store(dv_ptrs, dv, mask=offs_n[:, None] < KV_LEN)

        # Write back dK.
        dk *= sm_scale
        dk_ptrs = DK + offs_n[:, None] * stride_kn + offs_k_qk[None, :] * stride_kk
        tl.store(dk_ptrs, dk, mask=offs_n[:, None] < KV_LEN)

    if pid * BLOCK_M2 < Q_LEN:
        # THIS BLOCK DOES DQ:
        # BLOCK_M2 must be a multiple of BLOCK_N2, otherwise the code wouldn't work.
        tl.static_assert(BLOCK_M2 % BLOCK_N2 == 0)
        
        start_m = pid * BLOCK_M2
        offs_m = start_m + tl.arange(0, BLOCK_M2)
        
        q = tl.load(Q + offs_m[:, None] * stride_qm + offs_k_qk[None, :] * stride_qk, 
                    mask=offs_m[:, None] < Q_LEN, other=0.0)
        dq = tl.zeros([BLOCK_M2, HEAD_DIM_QK], dtype=tl.float32)
        do = tl.load(DO + offs_m[:, None] * stride_om + offs_k_v[None, :] * stride_ok, 
                     mask=offs_m[:, None] < Q_LEN, other=0.0)
        
        m = tl.load(M + offs_m, mask=offs_m < Q_LEN, other=0.0)
        m = m[:, None]

        Di = tl.load(D + offs_m, mask=offs_m < Q_LEN, other=0.0)
        Di = Di[:, None]
        
        end_n = start_m if KV_LEN >= start_m else KV_LEN
        num_steps = tl.cdiv(end_n, BLOCK_N2)
        dq = _attn_bwd_dq(dq, q, K, V,  #
                          do, m, Di,  #
                          stride_kn, stride_kk,  #
                          stride_vn, stride_vk,  #
                          H, KV_LEN,  #
                          BLOCK_M2, BLOCK_N2,  #
                          HEAD_DIM_QK, HEAD_DIM_V,  #
                          start_m, 0, num_steps,  #
                          MASK=False  #
                          )
        
        if STAGE & 2:
            # Compute dQ for masked (diagonal) blocks when using causal masking
            MASK_BLOCK_N2: tl.constexpr = BLOCK_N2 // BLK_SLICE_FACTOR
            num_steps = BLOCK_M2 // MASK_BLOCK_N2
            # num_steps = tl.cdiv(BLOCK_M2, MASK_BLOCK_N2)
            
            dq = _attn_bwd_dq(dq, q, K, V,  #
                              do, m, Di,  #
                              stride_kn, stride_kk,  #
                              stride_vn, stride_vk,  #
                              H, KV_LEN,  #
                              BLOCK_M2, MASK_BLOCK_N2,  #
                              HEAD_DIM_QK, HEAD_DIM_V,  #
                              start_m, start_m, num_steps,  #
                              MASK=True  #
                              )
        else:
            end_n = KV_LEN - start_m if KV_LEN >= start_m else 0
            num_steps = tl.cdiv(end_n, BLOCK_N2)
            dq = _attn_bwd_dq(dq, q, K, V,  #
                              do, m, Di,  #
                              stride_kn, stride_kk,  #
                              stride_vn, stride_vk,  #
                              H, KV_LEN,  #
                              BLOCK_M2, BLOCK_N2,  #
                              HEAD_DIM_QK, HEAD_DIM_V,  #
                              start_m, start_m, num_steps,  #
                              MASK=False  #
                              )
        
        
        # Write back dQ.
        dq_ptrs = DQ + offs_m[:, None] * stride_qm + offs_k_qk[None, :] * stride_qk
        dq *= LN2
        tl.store(dq_ptrs, dq, mask=offs_m[:, None] < Q_LEN)


class _attention(torch.autograd.Function):

    @staticmethod
    def forward(ctx, q, k, v, causal, sm_scale):
        # shape constraints
        HEAD_DIM_Q, HEAD_DIM_K = q.shape[-1], k.shape[-1]
        # when v is in float8_e5m2 it is transposed.
        HEAD_DIM_V = v.shape[-1]
        assert HEAD_DIM_Q == HEAD_DIM_K
        assert HEAD_DIM_K in {16, 32, 64, 128, 256}
        o = torch.empty((q.shape[0], q.shape[1], q.shape[2], v.shape[-1]), device=q.device, dtype=q.dtype)
        stage = 3 if causal else 1
        extra_kern_args = {}
        # Tuning for AMD target
        if is_hip():
            waves_per_eu = 3 if HEAD_DIM_K <= 64 else 2
            extra_kern_args = {"waves_per_eu": waves_per_eu, "allow_flush_denorm": True}

        M = torch.empty((q.shape[0], q.shape[1], q.shape[2]), device=q.device, dtype=torch.float32)
        
        grid = lambda args: (triton.cdiv(q.shape[2], args["BLOCK_M"]), q.shape[0] * q.shape[1], 1)
        ctx.grid = grid
        _attn_fwd[grid](
            q, k, v, sm_scale, M, o,  #
            q.stride(0), q.stride(1), q.stride(2), q.stride(3),  #
            k.stride(0), k.stride(1), k.stride(2), k.stride(3),  #
            v.stride(0), v.stride(1), v.stride(2), v.stride(3),  #
            o.stride(0), o.stride(1), o.stride(2), o.stride(3),  #
            q.shape[0], q.shape[1],  #
            Q_LEN=q.shape[2], KV_LEN=k.shape[2],  #
            HEAD_DIM_QK=HEAD_DIM_Q, HEAD_DIM_V=HEAD_DIM_V,  #
            STAGE=stage,  #
            **extra_kern_args)

        ctx.save_for_backward(q, k, v, o, M)
        ctx.sm_scale = sm_scale
        ctx.HEAD_DIM_QK = HEAD_DIM_Q
        ctx.HEAD_DIM_V = HEAD_DIM_V
        ctx.causal = causal
        return o

    @staticmethod
    def backward(ctx, do):
        q, k, v, o, M = ctx.saved_tensors
        assert do.is_contiguous()
        assert o.stride() == do.stride()
        dq = torch.empty_like(q)
        dk = torch.empty_like(k)
        dv = torch.empty_like(v)
        BATCH, N_HEAD, Q_LEN = q.shape[:3]
        assert Q_LEN == o.shape[-2]
        assert o.shape[-1] == v.shape[-1]
        assert q.stride(-1) == k.stride(-1) == v.stride(-1) == o.stride(-1) == 1
        assert dq.stride(-1) == dk.stride(-1) == dv.stride(-1) == do.stride(-1) == 1
        KV_LEN = k.shape[-2]
        PRE_BLOCK = 128
        NUM_WARPS, NUM_STAGES = 4, 5
        BLOCK_M1, BLOCK_N1, BLOCK_M2, BLOCK_N2 = 32, 128, 128, 32
        BLK_SLICE_FACTOR = 2
        RCP_LN2 = 1.4426950408889634  # = 1.0 / ln(2)
        arg_k = k
        arg_k = arg_k * (ctx.sm_scale * RCP_LN2)
        pre_grid = (triton.cdiv(Q_LEN, PRE_BLOCK), BATCH * N_HEAD)
        delta = torch.empty_like(M)
        _attn_bwd_preprocess[pre_grid](
            o, do,  #
            delta,  #
            BATCH, N_HEAD, Q_LEN,  #
            BLOCK_M=PRE_BLOCK, HEAD_DIM=ctx.HEAD_DIM_V  #
        )
        grid = (triton.cdiv(max(Q_LEN, KV_LEN), BLOCK_N1), 1, BATCH * N_HEAD)
        _attn_bwd[grid](
            q, arg_k, v, ctx.sm_scale, do, dq, dk, dv,  #
            M, delta,  #
            q.stride(0), q.stride(1), q.stride(2), q.stride(3),  #
            k.stride(0), k.stride(1), k.stride(2), k.stride(3),  #
            v.stride(0), v.stride(1), v.stride(2), v.stride(3),  #
            o.stride(0), o.stride(1), o.stride(2), o.stride(3),  #
            N_HEAD, Q_LEN, KV_LEN,  #
            BLOCK_M1=BLOCK_M1, BLOCK_N1=BLOCK_N1,  #
            BLOCK_M2=BLOCK_M2, BLOCK_N2=BLOCK_N2,  #
            BLK_SLICE_FACTOR=BLK_SLICE_FACTOR,  #
            HEAD_DIM_QK=ctx.HEAD_DIM_QK,  #
            HEAD_DIM_V=ctx.HEAD_DIM_V,  #
            STAGE=3 if ctx.causal else 1  #
        )

        return dq, dk, dv, None, None, None, None


attention = _attention.apply

In [4]:
def test_op(Z, H, Q_LEN=1024, KV_LEN=1024, HEAD_DIM=64, causal=True, dtype=torch.float16):
    torch.manual_seed(20)
    q = (torch.empty((Z, H, Q_LEN, HEAD_DIM), dtype=dtype, device=DEVICE).normal_(mean=0.0, std=0.5).requires_grad_())
    k = (torch.empty((Z, H, KV_LEN, HEAD_DIM), dtype=dtype, device=DEVICE).normal_(mean=0.0, std=0.5).requires_grad_())
    v = (torch.empty((Z, H, KV_LEN, HEAD_DIM), dtype=dtype, device=DEVICE).normal_(mean=0.0, std=0.5).requires_grad_())
    sm_scale = 0.5
    dout = torch.randn_like(q)
    # reference implementation
    p = torch.matmul(q, k.transpose(2, 3)) * sm_scale
    if causal:
        assert Q_LEN == KV_LEN
        N_CTX = Q_LEN
        M = torch.tril(torch.ones((N_CTX, N_CTX), device=DEVICE))
        p[:, :, M == 0] = float("-inf")
    p = torch.softmax(p.float(), dim=-1).half()
    #print(f'p torch dtype: {p.dtype}')
    # p = torch.exp(p)
    # p = p.to(dtype)
    ref_out = torch.matmul(p, v)
    #print(f'ref_out dtype: {ref_out.dtype}')
    #print(f'dout dtype: {dout.dtype}')
    ref_out.backward(dout)
    ref_dv, v.grad = v.grad.clone(), None
    ref_dk, k.grad = k.grad.clone(), None
    ref_dq, q.grad = q.grad.clone(), None
    #print(f'ref_dq dtype: {ref_dq.dtype}')
    # triton implementation
    tri_out = attention(q, k, v, causal, sm_scale).half()
    #print(f'tri_out dtype: {tri_out.dtype}')
    #print(f'dout dtype: {dout.dtype}')
    tri_out.backward(dout)
    tri_dv, v.grad = v.grad.clone(), None
    tri_dk, k.grad = k.grad.clone(), None
    tri_dq, q.grad = q.grad.clone(), None
    #print(f'tri_dq dtype: {tri_dq.dtype}')
    # compare
    torch.testing.assert_close(ref_out, tri_out, atol=1e-2, rtol=0)
    rtol = 0.0
    # Relative tolerance workaround for known hardware limitation of CDNA2 GPU.
    # For details see https://pytorch.org/docs/stable/notes/numerical_accuracy.html#reduced-precision-fp16-and-bf16-gemms-and-convolutions-on-amd-instinct-mi200-devices
    if torch.version.hip is not None and triton.runtime.driver.active.get_current_target().arch == "gfx90a":
        rtol = 1e-2
    torch.testing.assert_close(ref_dv, tri_dv, atol=1e-2, rtol=rtol)
    torch.testing.assert_close(ref_dk, tri_dk, atol=1e-2, rtol=rtol)
    torch.testing.assert_close(ref_dq, tri_dq, atol=1e-2, rtol=rtol)

In [5]:
# causal self-attention
test_op(1, 2, 1234, 1234, 64, True)

  return Variable._execution_engine.run_backward(  # Calls into the C++ engine to run the backward pass


In [6]:
# non-causal cross-attention
test_op(1, 2, 1234, 4321, 64, False)