In [11]:
#export

from exp.nb_01 import *
from torch import nn, optim, tensor, Tensor
from torch.utils.data import TensorDataset, Dataset, DataLoader

In [8]:
#export
data_path=Path(r'd:\git\dl\data')
mnist_path=data_path.joinpath('mnist.pkl.gz')
Path.ls=lambda x: L(x.iterdir())

In [10]:
data_path.ls()

(#3) [d:\git\dl\data\imagenette2-160,d:\git\dl\data\imagewoof2-160,d:\git\dl\data\mnist.pkl.gz]

In [16]:
with gzip.open(mnist_path, 'rb') as f:
    ((x_train, y_train), (x_valid, y_valid), _)=pickle.load(f, encoding='latin-1')
    
x_train, y_train, x_valid, y_valid=map(tensor, (x_train, y_train, x_valid, y_valid))

In [17]:
x_train[0].shape

torch.Size([784])

In [21]:
idx=x_train[0].nonzero()[0]
x_train[0][idx-3: idx+5]

tensor([0.0000, 0.0000, 0.0000, 0.0117, 0.0703, 0.0703, 0.0703, 0.4922])

In [34]:
bs=128
epochs=2
lr=0.1

In [23]:
train_ds=TensorDataset(x_train, y_train)
valid_ds=TensorDataset(x_valid, y_valid)

In [24]:
#export
Rank0Tensor = NewType('OneEltTensor', Tensor)
LossFunction = Callable[[Tensor, Tensor], Rank0Tensor]
Model = nn.Module

def is_listy(x:Any)->bool: return isinstance(x, (tuple,list))

def loss_batch(model:Model, xb:Tensor, yb:Tensor, 
               loss_fn:LossFunction, opt:optim.Optimizer=None):
    "Calculate loss for the batch `xb,yb` and backprop with `opt`"
    if not is_listy(xb): xb = [xb]
    if not is_listy(yb): yb = [yb]
    loss = loss_fn(model(*xb), *yb)

    if opt is not None:
        loss.backward()
        opt.step()
        opt.zero_grad()
        
    return loss.item(), len(yb)

In [25]:
#export
def fit(epochs:int, model:Model, loss_fn:LossFunction, 
        opt:optim.Optimizer, train_dl:DataLoader, valid_dl:DataLoader):
    "Train `model` on `train_dl` with `optim` then validate against `valid_dl`"
    for epoch in range(epochs):
        model.train()
        for xb,yb in train_dl: loss,_ = loss_batch(model, xb, yb, loss_fn, opt)

        model.eval()
        with torch.no_grad():
            losses,nums = zip(*[loss_batch(model, xb, yb, loss_fn)
                                for xb,yb in valid_dl])
        val_loss = np.sum(np.multiply(losses,nums)) / np.sum(nums)

        print(epoch, val_loss)

In [26]:
#export
LambdaFunc = Callable[[Tensor],Tensor]
class Lambda(nn.Module):
    "An easy way to create a pytorch layer for a simple `func`"
    def __init__(self, func:LambdaFunc):
        "create a layer that simply calls `func` with `x`"
        super().__init__()
        self.func=func
        
    def forward(self, x): return self.func(x)

In [27]:
#export
def noop(x): return x

def ResizeBatch(*size:int) -> Tensor: 
    "Layer that resizes x to `size`, good for connecting mismatched layers"
    return Lambda(lambda x: x.view((-1,)+size))
def Flatten()->Tensor: 
    "Flattens `x` to a single dimension, often used at the end of a model"
    return Lambda(lambda x: x.view((x.size(0), -1)))
def PoolFlatten()->nn.Sequential:
    "Apply `nn.AdaptiveAvgPool2d` to `x` and then flatten the result"
    return nn.Sequential(nn.AdaptiveAvgPool2d(1), Flatten())

def conv2d(ni:int, nf:int, ks:int=3, stride:int=1, padding:int=None, bias=False) -> nn.Conv2d:
    "Create `nn.Conv2d` layer: `ni` inputs, `nf` outputs, `ks` kernel size. `padding` defaults to `k//2`"
    if padding is None: padding = ks//2
    return nn.Conv2d(ni, nf, kernel_size=ks, stride=stride, padding=padding, bias=bias)

def conv2d_relu(ni:int, nf:int, ks:int=3, stride:int=1, 
                padding:int=None, bn:bool=False) -> nn.Sequential:
    "Create a `conv2d` layer with `nn.ReLU` activation and optional(`bn`) `nn.BatchNorm2d`"
    layers = [conv2d(ni, nf, ks=ks, stride=stride, padding=padding), nn.ReLU()]
    if bn: layers.append(nn.BatchNorm2d(nf))
    return nn.Sequential(*layers)

def conv2d_trans(ni:int, nf:int, ks:int=2, stride:int=2, padding:int=0) -> nn.ConvTranspose2d:
    "Create `nn.nn.ConvTranspose2d` layer: `ni` inputs, `nf` outputs, `ks` kernel size. `padding` defaults to 0"
    return nn.ConvTranspose2d(ni, nf, kernel_size=ks, stride=stride, padding=padding)

In [28]:
model = nn.Sequential(
    ResizeBatch(1,28,28),
    conv2d_relu(1,  16), 
    conv2d_relu(16, 16),
    conv2d_relu(16, 10),
    PoolFlatten()
)

In [29]:
def get_data(train_ds, valid_ds, bs):
    return (DataLoader(train_ds, batch_size=bs, shuffle=True),
            DataLoader(valid_ds, batch_size=bs*2))

train_dl,valid_dl = get_data(train_ds, valid_ds, bs)

In [30]:
loss_fn = F.cross_entropy

In [31]:
opt = optim.SGD(model.parameters(), lr=lr)

In [32]:
loss_fn(model(x_valid[0:bs]), y_valid[0:bs])

tensor(2.3012, grad_fn=<NllLossBackward>)

In [None]:
fit(epochs, model, loss_fn, opt, train_dl, valid_dl)

0 2.293850153684616
