In [None]:
import math
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from tqdm.auto import tqdm
import matplotlib.pyplot as plt

#############################
# 1) Hyperparameters & Data
#############################
BATCH_SIZE       = 64
EPOCHS           = 15          # limited to 15
LR               = 2e-4        # keep a stable LR
NR_RESNET        = 5           # enough for 7×7
NR_FILTERS       = 80          # enough filters
NR_LOGISTIC_MIX  = 5
DEVICE = torch.device('cuda' if torch.cuda.is_available() 
                      else 'mps' if torch.backends.mps.is_available()
                      else 'cpu')
PRINT_EVERY      = 100
IMG_SIZE         = 7  # 7×7 downsampled MNIST

def lighter_dequantize(x):
    """
    A milder random dequant: 
      scale from [0,1] to [0..255], then add noise in [-0.5, 0.5], 
      then clamp to [0..255], then /256 => [0..1].
    """
    # x in [0,1], shape (1,H,W)
    x255 = x * 255.
    noise = torch.rand_like(x255) - 0.5  # in [-0.5,0.5]
    x255_noisy = (x255 + noise).clamp_(0., 255.)
    return x255_noisy / 256.

transform = transforms.Compose([
    transforms.Resize((7,7)),
    transforms.ToTensor(),          
    transforms.Lambda(lighter_dequantize),
    transforms.Lambda(lambda x: x*2.0 - 1.0)  # map [0,1] -> [-1,1]
])

train_dataset = datasets.MNIST(
    root='mnist_data', train=True, download=True, transform=transform)
test_dataset  = datasets.MNIST(
    root='mnist_data', train=False, download=True, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, drop_last=True)
test_loader  = DataLoader(test_dataset,  batch_size=BATCH_SIZE, shuffle=False, drop_last=True)

#############################
# 2) Shifted Convs
#############################
class DownShiftedConv2d(nn.Module):
    def __init__(self, inC, outC, kernel_size=3):
        super().__init__()
        self.conv = nn.Conv2d(inC, outC, kernel_size, stride=1, padding=1)
    def forward(self, x):
        # pad top, then remove bottom row
        out = F.pad(x, (0,0,1,0))
        out = self.conv(out)
        return out[:,:,:-1,:]

class DownRightShiftedConv2d(nn.Module):
    def __init__(self, inC, outC, kernel_size=3):
        super().__init__()
        self.conv = nn.Conv2d(inC, outC, kernel_size, stride=1, padding=1)
    def forward(self, x):
        # pad left+top, then remove bottom row & right column
        out = F.pad(x, (1,0,1,0))
        out = self.conv(out)
        return out[:,:,:-1,:-1]

#############################
# 3) Network-in-Network & GatedResnet
#############################
class Nin(nn.Module):
    def __init__(self, inC, outC):
        super().__init__()
        self.lin = nn.Linear(inC, outC)
    def forward(self, x):
        B,C,H,W = x.shape
        x = x.permute(0,2,3,1).contiguous()  # => (B,H,W,C)
        x = x.view(B*H*W, C)
        x = self.lin(x)
        x = x.view(B,H,W,-1)
        x = x.permute(0,3,1,2).contiguous()
        return x

def concat_elu(t):
    return F.elu(torch.cat([t, -t], dim=1))

class GatedResnet(nn.Module):
    """
    PixelCNN++-style gated residual block, skip_conn=1 => use extra nin skip.
    """
    def __init__(self, n_filters, conv_op, skip_conn=1):
        super().__init__()
        self.skip_conn = skip_conn
        self.conv_in = conv_op(2*n_filters, n_filters)
        if skip_conn:
            self.nin_skip = Nin(n_filters, n_filters)
        self.conv_out = conv_op(2*n_filters, 2*n_filters)

    def forward(self, x, a=None):
        c = self.conv_in(concat_elu(x))
        if self.skip_conn and a is not None:
            c = c + self.nin_skip(a)
        c = concat_elu(c)
        c = self.conv_out(c)
        a_, b_ = torch.chunk(c, 2, dim=1)
        return x + a_ * torch.sigmoid(b_)

#############################
# 4) Single-Level PixelCNN++
#############################
class PixelCNNpp(nn.Module):
    """
    Single-level PixelCNN++ for 7x7.
    """
    def __init__(self, nr_resnet=5, nr_filters=80, nr_logistic_mix=5, in_channels=1):
        super().__init__()
        self.nr_filters = nr_filters
        self.nr_logistic_mix = nr_logistic_mix

        # initial "down" and "down-right" conv
        self.u_init  = DownShiftedConv2d(in_channels+1, nr_filters)
        self.ul_init = DownRightShiftedConv2d(in_channels+1, nr_filters)

        self.resnet_v = nn.ModuleList([
            GatedResnet(nr_filters, DownShiftedConv2d, skip_conn=0) 
            for _ in range(nr_resnet)
        ])
        self.resnet_h = nn.ModuleList([
            GatedResnet(nr_filters, DownRightShiftedConv2d, skip_conn=1) 
            for _ in range(nr_resnet)
        ])
        self.nin_out = Nin(nr_filters, 3*nr_logistic_mix)

    def forward(self, x):
        B,C,H,W = x.size()
        # channel of ones
        ones = torch.ones(B,1,H,W, device=x.device)
        x_aug = torch.cat([x, ones], dim=1)

        v = self.u_init(x_aug)
        h = self.ul_init(x_aug)
        # pass through the stack of gated resnets
        for rv, rh in zip(self.resnet_v, self.resnet_h):
            v = rv(v)
            h = rh(h, a=v)

        out = self.nin_out(F.elu(h))
        return out

#############################
# 5) Mixture of Logistics Loss
#############################
def log_sum_exp(x, axis=-1):
    m,_ = x.max(dim=axis, keepdim=True)
    return m + (x - m).exp().sum(dim=axis, keepdim=True).log()

def log_prob_from_logits(x):
    m,_ = x.max(dim=-1, keepdim=True)
    x0  = x - m
    return x0 - x0.exp().sum(dim=-1, keepdim=True).log()

def discretized_mix_logistic_loss_1d(x, l):
    """
    x: (B,1,H,W), l: (B,3*nr_mix,H,W).
    We'll reorder to (B,H,W,C).
    """
    x = x.permute(0,2,3,1)  # => (B,H,W,1)
    l = l.permute(0,2,3,1)  # => (B,H,W,3*nr_mix)
    nr_mix = l.shape[-1] // 3

    logit_probs = l[...,:nr_mix]
    means       = l[...,nr_mix:2*nr_mix]
    log_scales  = l[...,2*nr_mix:3*nr_mix].clamp(min=-7.)

    log_probs = log_prob_from_logits(logit_probs)  # shape (B,H,W,nr_mix)

    x = x.unsqueeze(-1)                # => (B,H,W,1,1)
    means = means.unsqueeze(3)         # => (B,H,W,nr_mix,1)
    log_scales = log_scales.unsqueeze(3)
    scales = log_scales.exp()

    centered_x = x - means
    inv_stdv   = 1./scales
    plus_in    = inv_stdv*(centered_x + 1./255.)
    cdf_plus   = torch.sigmoid(plus_in)
    min_in     = inv_stdv*(centered_x - 1./255.)
    cdf_min    = torch.sigmoid(min_in)
    cdf_delta  = cdf_plus - cdf_min

    log_cdf_plus  = plus_in - F.softplus(plus_in)
    log_one_minus_cdf_min = -F.softplus(min_in)
    mid_in        = inv_stdv*centered_x
    log_pdf_mid   = mid_in - log_scales - 2.*F.softplus(mid_in)

    mask_left   = (x < -0.999).float()
    mask_right  = (x >  0.999).float()
    mask_center = 1. - (mask_left + mask_right)

    left_out  = log_cdf_plus[...,0]
    right_out = log_one_minus_cdf_min[...,0]
    center_out= torch.log(torch.clamp(cdf_delta[...,0], min=1e-12))
    cond_cdelta = (cdf_delta[...,0] > 1e-5).float()
    center_out  = cond_cdelta*center_out + (1.-cond_cdelta)*(log_pdf_mid[...,0] - math.log(127.5))

    out = mask_left*left_out + mask_right*right_out + mask_center*center_out
    out = out + log_probs[...,0,:]  # add mixing weights
    # log-sum-exp across mixtures
    out = torch.logsumexp(out, dim=-1)  # => (B,H,W)
    return -out.mean()

#############################
# 6) Train/Eval
#############################
def train_one_epoch(model, loader, optimizer, epoch):
    model.train()
    total_loss=0
    for i,(imgs,_) in enumerate(tqdm(loader, desc=f"Epoch {epoch}")):
        imgs = imgs.to(DEVICE)
        optimizer.zero_grad()
        out  = model(imgs)
        loss = discretized_mix_logistic_loss_1d(imgs, out)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * imgs.size(0)
        if (i+1) % PRINT_EVERY == 0:
            print(f"   step {i+1}/{len(loader)} - batch loss={loss.item():.4f}")

    return total_loss / len(loader.dataset)

@torch.no_grad()
def evaluate(model, loader):
    model.eval()
    total_loss=0
    for imgs,_ in loader:
        imgs = imgs.to(DEVICE)
        out  = model(imgs)
        loss = discretized_mix_logistic_loss_1d(imgs, out)
        total_loss += loss.item() * imgs.size(0)
    return total_loss / len(loader.dataset)

#############################
# 7) Sampling with Temperature
#############################
@torch.no_grad()
def sample_from_discretized_mix_logistic_1d(l, nr_mix, temperature=0.85):
    """
    We incorporate a 'temperature' factor to reduce randomness, 
    hopefully producing more structured samples.
    """
    B, C = l.shape
    logit_probs = l[:, :nr_mix] / temperature
    means       = l[:, nr_mix:2*nr_mix]
    log_scales  = (l[:, 2*nr_mix:3*nr_mix] / temperature).clamp(min=-7.)

    probs       = F.softmax(logit_probs, dim=1)
    # pick mixture component
    cumprobs    = torch.cumsum(probs, dim=1)
    rand_cat    = torch.rand(B, device=l.device)
    cat_idx     = torch.zeros(B, dtype=torch.long, device=l.device)
    for i in range(nr_mix):
        cat_idx = torch.where(rand_cat<=cumprobs[:,i],
                              torch.full_like(cat_idx, i), 
                              cat_idx)
    sel_means   = means.gather(1, cat_idx.unsqueeze(1)).squeeze(1)
    sel_scales  = log_scales.gather(1, cat_idx.unsqueeze(1)).squeeze(1)

    # sample logistic noise
    u = torch.rand(B, device=l.device)
    x = sel_means + torch.exp(sel_scales)*(torch.log(u) - torch.log(1.-u))
    x = x.clamp(-1.,1.)
    return x

@torch.no_grad()
def sample_model(model, batch_size=16, image_size=7, nr_mix=5, device='cpu'):
    """Autoregressive sampling: fill row-by-row, col-by-col in [-1,1]."""
    model.eval()
    x = torch.zeros(batch_size,1,image_size,image_size,device=device)
    for row in range(image_size):
        for col in range(image_size):
            out = model(x)
            l   = out[:,:,row,col]  # => (B,3*nr_mix)
            px  = sample_from_discretized_mix_logistic_1d(l, nr_mix, temperature=0.85)
            x[:,0,row,col] = px
    return x

def sample_and_show(model, batch_size=16, nr_mix=5):
    samples = sample_model(model, batch_size=batch_size, image_size=7, nr_mix=nr_mix, device=DEVICE)
    # map [-1,1]->[0,1]
    samples = (samples + 1) / 2
    samples = samples.clamp(0,1)

    import math
    nrow = int(math.sqrt(batch_size))
    ncol = int(math.ceil(batch_size / nrow))
    fig,axes = plt.subplots(nrow,ncol,figsize=(ncol*2,nrow*2))
    axes = axes.flatten() if batch_size>1 else [axes]
    for i in range(batch_size):
        if i >= len(axes): 
            break
        img = samples[i,0].cpu().numpy()
        axes[i].imshow(img, cmap='gray', vmin=0, vmax=1)
        axes[i].axis('off')
    plt.tight_layout()
    plt.show()

#############################
# 8) Main
#############################
def main():
    model = PixelCNNpp(
        nr_resnet=NR_RESNET, 
        nr_filters=NR_FILTERS, 
        nr_logistic_mix=NR_LOGISTIC_MIX,
        in_channels=1
    ).to(DEVICE)

    optimizer = torch.optim.Adam(model.parameters(), lr=LR)

    print(f"Training PixelCNN++ on 7×7 MNIST for {EPOCHS} epochs. Device={DEVICE}")
    for epoch in range(1, EPOCHS+1):
        train_nll = train_one_epoch(model, train_loader, optimizer, epoch)
        test_nll  = evaluate(model, test_loader)
        print(f"Epoch {epoch}/{EPOCHS} => train NLL={train_nll:.4f}, test NLL={test_nll:.4f}")

    print("Done training! Generating samples...")
    sample_and_show(model, batch_size=16, nr_mix=NR_LOGISTIC_MIX)

if __name__=="__main__":
    main()


In [3]:
#########################
# 1) Imports and device #
#########################

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from tqdm import tqdm
import matplotlib.pyplot as plt

device = torch.device(
    'cuda' if torch.cuda.is_available()
    else 'mps' if torch.backends.mps.is_available()
    else 'cpu'
)
print("Using device:", device)


##############################
# 2) Define WaveNet classes #
##############################

class CausalConv1d(nn.Conv1d):
    """
    1D causal convolution by left-padding, then cutting off
    to ensure position t sees no future positions t+1, etc.
    """
    def __init__(self, in_channels, out_channels, kernel_size=2, dilation=1):
        self._padding = (kernel_size - 1) * dilation
        super().__init__(
            in_channels, out_channels,
            kernel_size=kernel_size,
            dilation=dilation,
            padding=self._padding
        )
    def forward(self, x):
        # x: (B, C, T)
        out = super().forward(x)
        # slice off right padding
        if self._padding > 0:
            out = out[:, :, :-self._padding]
        return out


class ResidualBlock(nn.Module):
    """
    One WaveNet residual block with gating:
      filter: tanh
      gate: sigmoid
      skip + residual outputs
    """
    def __init__(self, residual_channels, dilation_channels, skip_channels,
                 kernel_size=2, dilation=1):
        super().__init__()
        self.conv_filter = CausalConv1d(
            residual_channels, dilation_channels,
            kernel_size=kernel_size, dilation=dilation
        )
        self.conv_gate = CausalConv1d(
            residual_channels, dilation_channels,
            kernel_size=kernel_size, dilation=dilation
        )
        self.conv_res  = nn.Conv1d(dilation_channels, residual_channels, 1)
        self.conv_skip = nn.Conv1d(dilation_channels, skip_channels, 1)

    def forward(self, x):
        # x: (B, residual_channels, T)
        f = torch.tanh(self.conv_filter(x))
        g = torch.sigmoid(self.conv_gate(x))
        out = f * g  # (B, dilation_channels, T)

        skip = self.conv_skip(out)  # (B, skip_channels, T)
        res  = self.conv_res(out)   # (B, residual_channels, T)
        return skip, x + res


class WaveNet(nn.Module):
    """
    WaveNet-like model for discrete 1D sequences of length 49 (7x7).
    Produces logits over 256 classes at each position.
    """
    def __init__(self,
                 in_channels=256,
                 residual_channels=32,
                 dilation_channels=32,
                 skip_channels=32,
                 dilations=[1,2,4,8],
                 kernel_size=2,
                 out_channels=256):
        super().__init__()
        # 1) Causal front-end (from one-hot depth -> residual_channels)
        self.causal = CausalConv1d(
            in_channels, residual_channels,
            kernel_size=kernel_size, dilation=1
        )

        # 2) Dilated residual blocks
        self.res_blocks = nn.ModuleList()
        for d in dilations:
            self.res_blocks.append(
                ResidualBlock(
                    residual_channels,
                    dilation_channels,
                    skip_channels,
                    kernel_size=kernel_size,
                    dilation=d
                )
            )

        # 3) Post-processing to final 256 logits
        self.postprocess1 = nn.Conv1d(skip_channels, skip_channels, 1)
        self.postprocess2 = nn.Conv1d(skip_channels, out_channels, 1)

    def forward(self, x):
        """
        x: (B, T), pixel values in [0..255]
        Return: logits shape (B, T, 256)
        """
        B, T = x.shape

        # One-hot => (B,T,256) => permute => (B,256,T)
        x_ohe = F.one_hot(x, num_classes=256).float()
        x_ohe = x_ohe.permute(0,2,1)

        # Causal conv => (B, residual_channels, T)
        out = self.causal(x_ohe)

        # Accumulate skip outputs
        skip_sum = 0
        for block in self.res_blocks:
            skip, out = block(out)
            skip_sum = skip_sum + skip  # broadcast sum

        # Post-process => (B,256,T)
        out = F.relu(skip_sum)
        out = self.postprocess1(out)
        out = F.relu(out)
        out = self.postprocess2(out)

        # => (B,T,256)
        out = out.permute(0,2,1).contiguous()
        return out

    def generate(self, seq_len=49):
        """
        Autoregressive generation. We'll start with a single 0 pixel (seed),
        feed it, sample next pixel, append, repeat. Return (seq_len,).
        """
        current_seq = torch.zeros((1,1), dtype=torch.long, device=device)  # shape (1,1)
        for _ in range(seq_len):
            # logits => (1,current_length,256)
            logits = self.forward(current_seq)
            last_logits = logits[:, -1, :]  # (1,256)
            probs = F.softmax(last_logits, dim=-1)
            next_pixel = torch.multinomial(probs, 1)  # (1,1)
            current_seq = torch.cat([current_seq, next_pixel], dim=1)

        # skip the seed => shape (seq_len,)
        return current_seq[0, 1:]


#############################
# 3) Load and prepare MNIST #
#############################

# We'll do standard MNIST, resized to 7x7 => flatten => 49 pixels
transform_7x7 = transforms.Compose([
    transforms.Resize((7, 7)),
    transforms.ToTensor()  # => (1,7,7) in [0,1]
])

train_dataset = datasets.MNIST(root='.', train=True, download=True, transform=transform_7x7)
test_dataset  = datasets.MNIST(root='.', train=False, download=True, transform=transform_7x7)

batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
test_loader  = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, drop_last=True)


####################################
# 4) Instantiate and train WaveNet #
####################################

model = WaveNet(
    in_channels=256,
    residual_channels=32,
    dilation_channels=32,
    skip_channels=32,
    dilations=[1,2,4,8],
    kernel_size=2,
    out_channels=256
).to(device)

optimizer = optim.Adam(model.parameters(), lr=1e-3)
epochs = 5

for epoch in range(epochs):
    model.train()
    total_loss = 0.0
    pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}")

    for images, _ in pbar:
        # images => shape (B,1,7,7)
        B_ = images.size(0)

        # Flatten => (B,49), then move to device
        images = images.view(B_, -1).to(device)  # => (B,49)

        # Scale to [0..255], cast to long
        images = (images * 255).long()  # => (B,49)

        # Forward => (B,49,256)
        logits = model(images)

        # ---- SHIFT-BY-1: "predict the next pixel"  ----
        # We do it in a simpler flattened approach:
        # Flatten all time => shape (B*49, 256)
        # logits_flat = logits.contiguous().view(-1, 256)  # => (B*49,256)

        # # Flatten images => (B,49) => (B*49)
        # targets_flat = images.contiguous().view(-1)      # => (B*49,)

        # # For the 'next pixel' approach, skip the last logit, skip the first target
        # # so logits_for_loss[i] tries to predict targets_for_loss[i].
        # logits_for_loss = logits_flat[:-1]  # => (B*49 - 1, 256)
        # targets_for_loss = targets_flat[1:] # => (B*49 - 1,)
        
        
        #------
        B = images.size(0)
        # Make tensors contiguous before reshaping
        logits = logits.contiguous()
        logits_flat = logits.view(B * 49, 256)  # => (B*49,256)
        targets_flat = images.view(-1)          # => (B*49,)
        
        # Remove batch_size-1 elements from both tensors
        logits_for_loss = logits_flat[:-B]        # => (B*48, 256)
        targets_for_loss = targets_flat[B:]        # => (B*48,)
        
        #------

        loss = F.cross_entropy(logits_for_loss, targets_for_loss)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        pbar.set_postfix({'loss': f"{loss.item():.4f}"})

    avg_loss = total_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{epochs}] - avg_loss: {avg_loss:.4f}")


############################################
# 5) Sample and visualize synthetic images #
############################################

model.eval()
num_samples = 8
generated_list = []

with torch.no_grad():
    for _ in range(num_samples):
        seq = model.generate(seq_len=49)  # shape (49,)
        generated_list.append(seq.cpu().numpy())

fig, axes = plt.subplots(1, num_samples, figsize=(num_samples*2,2))
for i, gen_seq in enumerate(generated_list):
    # Reshape => 7x7
    img = gen_seq.reshape(7,7)
    axes[i].imshow(img, cmap='gray', vmin=0, vmax=255)
    axes[i].axis('off')
plt.show()


Using device: mps


Epoch 1/5:   0%|          | 0/937 [00:00<?, ?it/s]


RuntimeError: view size is not compatible with input tensor's size and stride (at least one dimension spans across two contiguous subspaces). Use .reshape(...) instead.