In [7]:
import os
import alpaca_trade_api as tradeapi


BASE_URL = os.getenv("ALPACA_BASE_URL", "https://paper-api.alpaca.markets")
API_KEY = os.getenv("ALPACA_API_KEY", "PKGWUROP3UUI5Q4TISVTRSBHZ6")
API_SECRET = os.getenv("ALPACA_API_SECRET", "FwhsDd2fUn4UQ2EjLtGXu5L8PM2dRTmFf2i69M8CMHHW")
api = tradeapi.REST(API_KEY, API_SECRET, BASE_URL)
print(api.get_account().status)      # expect 'ACTIVE'
print(api.get_clock().is_open)       # True/False
print(api.get_bars('AAPL', '1Min', limit=3).df.head())


ACTIVE
False
                            close    high     low  trade_count    open  \
timestamp                                                                
2025-11-24 09:00:00+00:00  272.19  272.50  272.19           92  272.50   
2025-11-24 09:01:00+00:00  272.26  272.26  272.26           57  272.26   
2025-11-24 09:02:00+00:00  272.48  272.48  272.28           52  272.28   

                           volume        vwap  
timestamp                                      
2025-11-24 09:00:00+00:00    1357  272.396247  
2025-11-24 09:01:00+00:00     573  272.260000  
2025-11-24 09:02:00+00:00    1371  272.382745  


In [8]:
import numpy as np

def egamm(A, B, br=32, bc=32, bk=32, eps=1e-3, return_bounds=True):
    """
    Envelope-Guarded Approximate MatMul (EGAMM).
    
    Skips inner-block tile products when envelope bound U < eps.

    Returns:
      C_hat: approximate A @ B
      bound: per-entry provable absolute error bound (if return_bounds=True)
    """
    A = np.asarray(A)
    B = np.asarray(B)
    m, k = A.shape
    k2, n = B.shape
    assert k == k2, "Inner dimensions must match"

    C_hat = np.zeros((m, n), dtype=np.result_type(A, B))
    bound = np.zeros((m, n), dtype=np.float64) if return_bounds else None

    # Tile over rows and cols
    for r0 in range(0, m, br):
        r1 = min(r0 + br, m)
        for c0 in range(0, n, bc):
            c1 = min(c0 + bc, n)

            # Sum over inner blocks
            for t0 in range(0, k, bk):
                t1 = min(t0 + bk, k)

                A_blk = A[r0:r1, t0:t1]      # shape (<=br, <=bk)
                B_blk = B[t0:t1, c0:c1]      # shape (<=bk, <=bc)

                # Envelopes
                alpha = np.max(np.abs(A_blk), axis=0)  # length bk'
                beta  = np.max(np.abs(B_blk), axis=1)  # length bk'

                # Bound on every entry of this tile contribution
                U = float(alpha @ beta)

                if U < eps:
                    if return_bounds:
                        bound[r0:r1, c0:c1] += U
                    continue

                # Otherwise compute this block contribution exactly
                C_hat[r0:r1, c0:c1] += A_blk @ B_blk

    return (C_hat, bound) if return_bounds else (C_hat, None)


if __name__ == "__main__":
    # Small demo to validate the bound
    rng = np.random.default_rng(0)
    m, k, n = 96, 128, 80
    A = rng.normal(size=(m, k))
    B = rng.normal(size=(k, n))

    C_exact = A @ B
    C_hat, bound = egamm(A, B, br=24, bc=20, bk=16, eps=0.05)

    err = np.abs(C_exact - C_hat)

    print("max abs error:", err.max())
    print("max provable bound:", bound.max())
    print("bound respected everywhere?:", np.all(err <= bound + 1e-12))

    # How much work was skipped?
    skipped_mass = np.sum(bound)  # crude proxy
    print("sum of bounds (proxy for skipped contrib):", skipped_mass)


max abs error: 2.842170943040401e-14
max provable bound: 0.0
bound respected everywhere?: True
sum of bounds (proxy for skipped contrib): 0.0


In [None]:
import time
import math
import torch
import torch.nn as nn
import torch.nn.functional as F

class EGAMMLinear(nn.Module):
    """
    Envelope-Guarded Approximate MatMul linear layer.
    CPU proof-of-concept. Safe skipping via max-abs envelopes.
    """
    def __init__(self, in_features, out_features, bias=True,
                 br=64, bc=64, bk=64, eps=1e-3):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.br, self.bc, self.bk = br, bc, bk
        self.eps = eps

        self.weight = nn.Parameter(torch.empty(out_features, in_features))
        nn.init.kaiming_uniform_(self.weight, a=math.sqrt(5))

        if bias:
            self.bias = nn.Parameter(torch.zeros(out_features))
        else:
            self.bias = None

        # stats
        self.last_skip_ratio = 0.0
        self.last_max_bound = 0.0

    @torch.no_grad()
    def _egamm_matmul(self, A, B):
        """
        A: (m, k)  activations
        B: (k, n)  weight^T
        returns C_hat (m, n), bound (m, n), skip_ratio
        """
        m, k = A.shape
        k2, n = B.shape
        assert k == k2

        br, bc, bk, eps = self.br, self.bc, self.bk, self.eps

        C_hat = A.new_zeros((m, n))
        bound = A.new_zeros((m, n), dtype=torch.float32)

        total_blocks = 0
        skipped_blocks = 0
        maxU = 0.0

        for r0 in range(0, m, br):
            r1 = min(r0 + br, m)
            A_r = A[r0:r1, :]

            for c0 in range(0, n, bc):
                c1 = min(c0 + bc, n)

                for t0 in range(0, k, bk):
                    t1 = min(t0 + bk, k)

                    A_blk = A_r[:, t0:t1]      # (<=br, <=bk')
                    B_blk = B[t0:t1, c0:c1]    # (<=bk', <=bc)

                    # envelopes
                    alpha = A_blk.abs().max(dim=0).values   # (bk',)
                    beta  = B_blk.abs().max(dim=1).values  # (bk',)

                    U = float(alpha @ beta)
                    total_blocks += 1
                    maxU = max(maxU, U)

                    if U < eps:
                        skipped_blocks += 1
                        bound[r0:r1, c0:c1] += U
                        continue

                    C_hat[r0:r1, c0:c1] += A_blk @ B_blk

        skip_ratio = skipped_blocks / max(total_blocks, 1)
        return C_hat, bound, skip_ratio, maxU

    def forward(self, x):
        """
        x: (batch, in_features)
        output: (batch, out_features)
        """
        # Use no_grad for the pruning+matmul, then attach gradients
        # via a straight-through estimator on the skipped blocks.
        # (Keeps layer trainable; we're benchmarking speed)
        with torch.no_grad():
            C_hat, bound, skip_ratio, maxU = self._egamm_matmul(
                x, self.weight.t()
            )

        # Straight-through: treat C_hat as if exact for backward
        y = x @ self.weight.t()
        y = y + (C_hat - y).detach()

        if self.bias is not None:
            y = y + self.bias

        # log stats
        self.last_skip_ratio = skip_ratio
        self.last_max_bound = maxU
        return y
