In [1]:
from google.colab import drive
import os

# 1️⃣ Google Drive
drive.mount('/content/drive')



Mounted at /content/drive


In [None]:
import math
import inspect
from dataclasses import dataclass
import numpy as np
from tqdm import tqdm

import torch
import torch.nn as nn
from torch.nn import functional as F

In [None]:
vocab = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '=', '+', '&', '*']
device = 'cuda' if torch.cuda.is_available() else 'cpu'
padding_token_index = 13
end_token_index = 12

In [None]:
# create a mapping from chars to ints
stoi = {ch:i for i, ch in enumerate(vocab)}
itos = {i:ch for i, ch in enumerate(vocab)}
encode = lambda s:[stoi[c] for c in s] # encoder: take a string, output a list of ints
decode = lambda l: ''.join([itos[i] for i in l]) # decoder: take a list of ints, output a string

print(encode("1+2=3&"))
print(decode(encode("1+2=3&")))

[1, 11, 2, 10, 3, 12]
1+2=3&


In [None]:
def get_batch(num_digits, batch_size=32, block_size=256):
    # pick 0-9 in 1-digit addition
    a = np.random.randint(10**(num_digits-1), 10**num_digits, batch_size)
    b = np.random.randint(10**(num_digits-1), 10**num_digits, batch_size)
    c = a + b

    online_data = []
    val_list = []
    for i, j, k in zip(a, b, c):
        online_data.append(np.array(encode(f'{i}+{j}={k}&')))
        # val_list.append(f'{i}+{j}={k}&')

    data = [np.pad(row, (0, block_size - len(row)), mode='constant', constant_values=encode('*')[0]) for row in online_data]
    data = np.array(data)

    x_list = []
    y_list = []

    for arr in data:
        # 1. Extract first 6 elements for x
        x_values = arr[:2+2*num_digits]  # Get first 6 elements
        x_list.append(torch.tensor(x_values, dtype=torch.int64))

        first_13_index = np.where(arr == 13)[0]
        # If 13 is found, proceed with slicing
        if len(first_13_index) > 0:
            y_values = arr[2+2*num_digits:first_13_index[0]]  # Take elements before first 13 after '='
        else:
            y_values = arr[2+2*num_digits+1:]  # If no 13, take full array
        y_list.append(torch.tensor(y_values, dtype=torch.int64))

    # 3. Pad x to (32, 256) using padding_value=13
    x_tensor = torch.nn.utils.rnn.pad_sequence(x_list, batch_first=True, padding_value=13)

    # Ensure x_tensor is exactly (32, 256)
    if x_tensor.shape[1] < block_size:
        x_padding = torch.full((batch_size, block_size - x_tensor.shape[1]), 13, dtype=torch.int64)
        x_tensor = torch.cat((x_tensor, x_padding), dim=1)

    x_tensor = x_tensor[:, :block_size]  # Ensure fixed size (32, 256)

    # 4. Pad y to (32, max_len) using padding_value=13
    y_tensor = torch.nn.utils.rnn.pad_sequence(y_list, batch_first=True, padding_value=13)

    # Ensure y_tensor is exactly (32, 256)
    if y_tensor.shape[1] < block_size:
        y_padding = torch.full((batch_size, block_size - y_tensor.shape[1]), 13, dtype=torch.int64)
        y_tensor = torch.cat((y_tensor, y_padding), dim=1)


    # print("x shape:", x_tensor.shape)  # Expected output: (32, 256)
    # print("y shape:", y_tensor.shape)  # Expected output: (32, 256)


    # return x.to(device), y.to(device)
    return x_tensor.to(device), y_tensor.to(device)
    # return val_list, x_tensor.to(device), y_tensor.to(device)

In [None]:
get_batch(1)

(tensor([[ 5, 11,  6,  ..., 13, 13, 13],
         [ 8, 11,  3,  ..., 13, 13, 13],
         [ 5, 11,  8,  ..., 13, 13, 13],
         ...,
         [ 1, 11,  5,  ..., 13, 13, 13],
         [ 3, 11,  5,  ..., 13, 13, 13],
         [ 3, 11,  2,  ..., 13, 13, 13]]),
 tensor([[ 1,  1, 12,  ..., 13, 13, 13],
         [ 1,  1, 12,  ..., 13, 13, 13],
         [ 1,  3, 12,  ..., 13, 13, 13],
         ...,
         [ 6, 12, 13,  ..., 13, 13, 13],
         [ 8, 12, 13,  ..., 13, 13, 13],
         [ 5, 12, 13,  ..., 13, 13, 13]]))

In [None]:
import math
import inspect
from dataclasses import dataclass

import torch
import torch.nn as nn
from torch.nn import functional as F

class LayerNorm(nn.Module):
    """ LayerNorm but with an optional bias. PyTorch doesn't support simply bias=False """

    def __init__(self, ndim, bias):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(ndim))
        self.bias = nn.Parameter(torch.zeros(ndim)) if bias else None

    def forward(self, input):
        return F.layer_norm(input, self.weight.shape, self.weight, self.bias, 1e-5)

class CausalSelfAttention(nn.Module):

    def __init__(self, n_embd, n_head, dropout, block_size, bias=True):
        super().__init__()
        # 确保 n_embd 可以被 n_head 整除， 每个头维度相等
        assert n_embd % n_head == 0
        # key, query, value projections for all heads, but in a batch
        # 一个linear层，同时计算Q, K, V; 输出维度是（B, T, 3 * n_embd）
        # 输入的x是(B, T, n_embd)
        self.c_attn = nn.Linear(n_embd, 3 * n_embd, bias=bias)
        # output projection
        self.c_proj = nn.Linear(n_embd, n_embd, bias=bias)
        # regularization
        self.attn_dropout = nn.Dropout(dropout)
        self.resid_dropout = nn.Dropout(dropout)
        self.n_head = n_head
        self.n_embd = n_embd
        self.dropout = dropout
        # flash attention make GPU go brrrrr but support is only in PyTorch >= 2.0
        self.flash = hasattr(torch.nn.functional, 'scaled_dot_product_attention')
        if not self.flash:
            print("WARNING: using slow attention. Flash Attention requires PyTorch >= 2.0")
            # causal mask to ensure that attention is only applied to the left in the input sequence
            self.register_buffer("bias", torch.tril(torch.ones(block_size, block_size))
                                        .view(1, 1, block_size, block_size))

    def forward(self, x, padding_mask=None):
        B, T, C = x.size() # batch size, sequence length, embedding dimensionality (n_embd)

        # calculate query, key, values for all heads in batch and move head forward to be the batch dim
        #tensor.split(a = 拆成多大一个，b = 在哪个维度上拆)
        q, k, v  = self.c_attn(x).split(self.n_embd, dim=2)
        # view(B, T, self.n_head, C // self.n_head) 拆分多头
        # n_embd 被拆分成 n_head 份，每个头的维度是 C // n_head
        # (B, T, n_embd) → (B, T, n_head, head_dim)
        # transpose(1, 2) 交换维度
        # 交换 T（序列长度）和 n_head
        # (B, T, n_head, head_dim) -> (B, n_head, T, head_dim)
        k = k.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
        q = q.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)
        v = v.view(B, T, self.n_head, C // self.n_head).transpose(1, 2) # (B, nh, T, hs)

        # causal self-attention; Self-attend: (B, nh, T, hs) x (B, nh, hs, T) -> (B, nh, T, T)
        if self.flash:
            # efficient attention using Flash Attention CUDA kernels
            # 这个函数就是实现标准的注意力机制运算
            y = torch.nn.functional.scaled_dot_product_attention(q, k, v, attn_mask=None, dropout_p=self.dropout if self.training else 0, is_causal=True)
        else:
            # manual implementation of attention
            att = (q @ k.transpose(-2, -1)) * (1.0 / math.sqrt(k.size(-1)))

            if padding_mask is not None:
                att = att.masked_fill(padding_mask[:, None, None, :T] == 0, float('-inf'))

            att = F.softmax(att, dim=-1)
            att = self.attn_dropout(att)
            y = att @ v # (B, nh, T, T) x (B, nh, T, hs) -> (B, nh, T, hs)
        # transpose(1, 2)的作用： (B, n_head, T, head_dim) → (B, T, n_head, head_dim)
        # contiguous() 让张量在内存中连续存储，以便 view() 正常运行
        # view(B, T, C) 合并 n_head 和 head_dim，变回 n_embd
        # (B, T, n_head, head_dim) → (B, T, n_embd)
        y = y.transpose(1, 2).contiguous().view(B, T, C) # re-assemble all head outputs side by side

        # output projection
        y = self.resid_dropout(self.c_proj(y))
        return y

class MLP(nn.Module):

    def __init__(self, n_embd, n_head, dropout, block_size, bias=True):
        super().__init__()
        # 作用：将 输入的 768 维向量扩展到 3072 维，相当于增加了特征维度
        # 目的：更高维度的特征空间，使模型更容易学习复杂关系
        self.c_fc = nn.Linear(n_embd, 4 * n_embd, bias=bias)
        # 激活函数 GELU（Gaussian Error Linear Unit）
        # GELU 是 ReLU 的改进版，常用于 Transformer 模型
        self.gelu = nn.GELU()
        # 变回 768 维
        self.c_proj  = nn.Linear(4 * n_embd, n_embd, bias=bias)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.c_fc(x) # (B, T, 768) -> (B, T, 3072)
        x = self.gelu(x) # GELU 激活函数
        x = self.c_proj(x) # (B, T, 3072) -> (B, T, 768)
        x = self.dropout(x)
        return x

class Block(nn.Module):

    def __init__(self, n_embd, n_head, dropout, block_size, bias=True):
        super().__init__()
        self.ln_1 = LayerNorm(n_embd, bias=bias)
        self.attn = CausalSelfAttention(n_embd, n_head, dropout, block_size, bias=True)
        self.ln_2 = LayerNorm(n_embd, bias=bias)
        self.mlp = MLP(n_embd, n_head, dropout, block_size, bias=True)

    def forward(self, x, padding_mask=None):
        x = x + self.attn(self.ln_1(x), padding_mask=padding_mask)
        x = x + self.mlp(self.ln_2(x))
        return x


class GPT(nn.Module):

    def __init__(self, vocab_size, block_size, n_embd, n_layer, n_head, dropout, bias=True):
        super().__init__()
        assert vocab_size is not None
        assert block_size is not None
        self.vocab_size = vocab_size
        self.block_size = block_size
        self.n_embd = n_embd
        self.n_layer = n_layer
        self.n_head = n_head
        self.dropout = dropout
        self.bias = bias

        self.transformer = nn.ModuleDict(dict(
            wte = nn.Embedding(vocab_size, n_embd), # token embeddings
            wpe = nn.Embedding(block_size, n_embd), # positional embeddings
            drop = nn.Dropout(dropout),
            h = nn.ModuleList([Block(n_embd, n_head, dropout, block_size, bias=bias) for _ in range(n_layer)]), # a stack of n_layer blocks
            ln_f = LayerNorm(n_embd, bias=bias), # final layer norm
        ))
        self.lm_head = nn.Linear(n_embd, vocab_size, bias=False) # projects the final transformer output to the vocab size

        # init all weights
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, idx, targets=None, padding_mask=None):
        device = idx.device
        b, t = idx.size()
        assert t <= self.block_size, f"Cannot forward sequence of length {t}, block size is only {self.cblock_size}"
        pos = torch.arange(0, t, dtype=torch.long, device=device) # shape (t)

        # forward the GPT model itself
        tok_emb = self.transformer.wte(idx) # token embeddings of shape (b, t, n_embd)
        pos_emb = self.transformer.wpe(pos) # position embeddings of shape (t, n_embd)
        x = self.transformer.drop(tok_emb + pos_emb)
        for block in self.transformer.h:
            x = block(x, padding_mask=padding_mask)
        x = self.transformer.ln_f(x)

        logits = self.lm_head(x)  # Compute logits outside the if block
        loss = None

        if targets is not None:

            loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=13)
            # inference-time mini-optimization: only forward the lm_head on the very last position
            # logits = self.lm_head(x[:, [-1], :]) # note: using list [-1] to preserve the time dim

        return logits, loss

    @torch.no_grad()
    def generate(self, idx, max_new_tokens, temperature=1.0, top_k=None):
        """
        Take a conditioning sequence of indices idx (LongTensor of shape (b,t)) and complete
        the sequence max_new_tokens times, feeding the predictions back into the model each time.
        Most likely you'll want to make sure to be in model.eval() mode of operation for this.
        """
        for _ in range(max_new_tokens):
            # if the sequence context is growing too long we must crop it at block_size
            idx_cond = idx if idx.size(1) <= self.block_size else idx[:, -self.block_size:]
            # forward the model to get the logits for the index in the sequence
            logits, _ = self(idx_cond)
            # pluck the logits at the final step and scale by desired temperature
            logits = logits[:, -1, :] / temperature
            # optionally crop the logits to only the top k options
            if top_k is not None:
                v, _ = torch.topk(logits, min(top_k, logits.size(-1)))
                logits[logits < v[:, [-1]]] = -float('Inf')
            # apply softmax to convert logits to (normalized) probabilities
            probs = F.softmax(logits, dim=-1)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1)
            # append sampled index to the running sequence and continue
            idx = torch.cat((idx, idx_next), dim=1)

        return idx

In [None]:
eval_iters = 200

@torch.no_grad()
def estimate_loss(num_digits, padding_mask):
    out = {}
    model.eval()
    for split in ['train', 'val']:
      losses = torch.zeros(eval_iters)
      for k in range(eval_iters):
          X, Y = get_batch(num_digits)
          logits, loss = model(X, Y, padding_mask)
          losses[k] = loss.item()
      out[split] = losses.mean()
    model.train()
    return out

In [None]:
batch_size = 32 # how many independent sequences will we process in parallel?
block_size = 256 # what is the maximum context length for predictions?
max_iters = 1000
num_epochs = 1
eval_interval = 100
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 20
n_embd = 256
n_head = 8
n_layer = 6
dropout = 0.1
# # torch.manual_seed(1337)
# if torch.cuda.is_available():
#     torch.cuda.manual_seed_all(1337)
bias = True # if using bias inside all Linear layers
block_size = 256
vocab_size = len(vocab)

In [None]:
model = GPT(vocab_size, block_size, n_embd, n_layer, n_head, dropout, bias=bias)
m = model.to(device)

RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [None]:
# create a PyTorch optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-6)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.9)

In [None]:
print(sum(p.numel() for p in m.parameters())/1e6, 'M parameters')

for epoch in range(num_epochs):

    for iter in tqdm(range(max_iters), desc="Processing"):
        # sample a batch of data
        xb, yb = get_batch(epoch+1)
        padding_mask_x = (xb != padding_token_index).long()

        if iter % 100 == 0:
          losses = estimate_loss(epoch+1, padding_mask_x)
          print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

        # evaluate the loss
        logits, loss = model(xb, yb, padding_mask_x)
        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()

    scheduler.step()  # 调整学习率


4.811776 M parameters


Processing:   0%|          | 0/1000 [00:00<?, ?it/s]


RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


In [None]:
# early_stopping
# accuracy function

In [None]:
# Assume you have encode() and decode() functions
input_str = "12+34="

# Step 1: Encode string into token indices
input_tokens = torch.tensor(encode(input_str), dtype=torch.long, device=device).unsqueeze(0)  # Shape (1, t)

# Step 2: Generate new tokens
output_tokens = model.generate(input_tokens, max_new_tokens=500)

# Step 3: Decode token indices back into a string
output_str = decode(output_tokens[0].tolist())

# Step 4: Print the generated result
print(output_str)


12+34=6
6




4



6


344



5


4
43
2
6





2

6
*60
3
2
55+6
+
+


=
3

3
9+
503+
=147
00
5
6
7
438




=46

0
+=79
=

3
*
71
729
2
3
*
+

2+06
9
+248
758

4
46
6
6
5372


=5
276
23

6756
5=
86
7
2



8
1
58
*53
68469


=2*62+484*+
2++

5


94

468
=
4427
136
=952

=
*85
66*88=
6=183
7385=
2

598
6
096
7

29*4=


67
905
0
3


58
9398709*779
=789738
6
0926


=4338
8+6

*+6
27

98326
9*3+2
1
3
=92*8
92*239+464
83*9=
5
1
682
1656*
7
98
9269+3

446619*3

86
6448+++
=71=
794

416
*
1=6+28+
59
1345876



In [None]:
import subprocess

os.system('git config --global user.email "zifeibai@umich.edu"')
os.system('git config --global user.name "ZifeiBai"')

# 2️⃣ **使用 Google Drive 存储 GitHub Token**
GITHUB_TOKEN_PATH = "/content/drive/MyDrive/URPS/github_token.txt"
if os.path.exists(GITHUB_TOKEN_PATH):
    with open(GITHUB_TOKEN_PATH, "r") as f:
        os.environ["GITHUB_TOKEN"] = f.read().strip()
else:
    print("❌ GitHub Token")
    exit(1)

# 3️⃣ **设置 GitHub 远程仓库**
GIT_PATH = "/content/drive/MyDrive/URPS/Git"
REPO_URL = f"https://{os.environ['GITHUB_TOKEN']}@github.com/ZifeiBai/URPS.git"

if not os.path.exists(GIT_PATH):
    print(f"📁 Creating directory: {GIT_PATH}")
    os.makedirs(GIT_PATH)

# 4️⃣ **如果 .git/ 目录不存在，说明不是 Git 仓库，需要克隆**
if not os.path.exists(os.path.join(GIT_PATH, ".git")):
    print("❌ Git repository not found. Cloning...")
    subprocess.run(f"rm -rf {GIT_PATH}", shell=True, check=True)
    subprocess.run(f"git clone {REPO_URL} {GIT_PATH}", shell=True, check=True)

# 5️⃣ **进入 Git 目录**
os.chdir(GIT_PATH)
print("📂 Changed working directory to:", os.getcwd())

# # 6️⃣ **复制文件**
# SOURCE_FILE = "/content/drive/MyDrive/URPS/Colabs/transformer.ipynb"
# if os.path.exists(SOURCE_FILE):
#     subprocess.run(f"cp {SOURCE_FILE} {GIT_PATH}/", shell=True, check=True)
#     print(f"✅ Copied {SOURCE_FILE} to {GIT_PATH}")
# else:
#     print("❌ transformer.ipynb not found!")
#     exit(1)


# 7️⃣ **检查 Git 状态**
status_output = subprocess.run("git status", shell=True, capture_output=True, text=True)
print(status_output.stdout)

# 8️⃣ **提交更改并推送**
print("🚀 Adding files to Git...")
subprocess.run("git add .", shell=True, check=True)

print("📝 Committing changes...")
commit_output = subprocess.run('git commit -m "Auto update from Google Colab"', shell=True, capture_output=True, text=True)
print(commit_output.stdout)



print("📤 Pushing to GitHub...")
push_output = subprocess.run("git push origin main", shell=True, capture_output=True, text=True)
if "fatal" in push_output.stderr or "error:" in push_output.stderr:
    print("❌ Real Git Push Error:", push_output.stderr)
else:
    print("✅ Git Push Success! (Warnings ignored)")

📂 Changed working directory to: /content/drive/MyDrive/URPS/Git
On branch main
Your branch is up to date with 'origin/main'.

Changes not staged for commit:
  (use "git add <file>..." to update what will be committed)
  (use "git restore <file>..." to discard changes in working directory)
	modified:   transformer.ipynb

no changes added to commit (use "git add" and/or "git commit -a")

🚀 Adding files to Git...
📝 Committing changes...
[main 045b55e] Auto update from Google Colab
 1 file changed, 1 insertion(+), 1 deletion(-)
 rewrite transformer.ipynb (96%)

📤 Pushing to GitHub...
