In [None]:
import torch
import torch.nn.functional as F
import torch.utils.data

from s2cnn import SO3Convolution
from s2cnn import S2Convolution
from s2cnn import so3_integrate
from s2cnn import so3_near_identity_grid
from s2cnn import s2_near_identity_grid

import time
import random

random.seed()

In [None]:
def input0():
    return torch.zeros(1,60,60)

def inputrand():
    return torch.rand(1,60,60)

def visualize_tensor_layer(t): # expecting 2D tensor of any size
    assert len(t.shape) == 2
    for i in range(t.shape[0]):
        for j in range(t.shape[1]):
            print('{}'.format('1' if t[i][j].item() > 0. else '0'), end='')
        print('')

def add_light_fuzz(t, fuzz_factor=0.02): # expecting N * 60 * 60, add 2% fuzz
    assert fuzz_factor >= 0.
    assert fuzz_factor < 1.
    out = t.clone()
    for i in range(60):
        for j in range(60):
            if random.random() < fuzz_factor: # 5%ish
                out[:,i,j] = 1.
    return out
        
# could pass in all 0s, or add this feature to existing tensor
# applies to all layers
def make_training_feature1(t): # expecting N * 60 * 60
    for i in range(5, 35):
        for j in range(5, 35):
            x = i - 20
            y = j - 20
            r2 = x * x + y * y 
            if r2 > 64 and r2 < 144:
                t[:,i,j] = 1.
    return t

def make_training_feature2(t): # expecting N * 60 * 60
    for i in range(30, 60):
        for j in range(30, 60):
            x = i - 30
            y = 60 - j # 0,0 @ ctr btm
            if abs(x - y) <= 2:
                t[:,i,j] = 1.
    return t

def make_training_feature3(t): # expecting N * 60 * 60
    for i in range(30, 60):
        for j in range(0, 30):
            if abs(45 - i) <= 2 or abs(15 - j) <= 2:
                t[:,i,j] = 1.
    return t

def make_training_feature4(t):
    for i in range(0, 30):
        for j in range(30, 60):
            x = i
            y = 0 - j
            if abs(x - y) <= 2 or ((30 - x) - y) <= 2:
                t[:,i,j] = 1.
    return t

In [None]:
def reflect_vert(t): # expects 2d tensor
    out = t.clone()
    assert len(out.shape) == 2
    
    w = out.shape[0]
    h = out.shape[1]
    buffer = torch.zeros(h)
    for i in range(w // 2):
        buffer = out[i, :].clone() # i think i have to clone else broadcasting turns weird?
        out[i, :] = out[w - i - 1, :].clone()
        out[w - i - 1, :] = buffer
    return out

def reflect_horiz(t):
    return reflect_vert(t.transpose(0,1)).transpose(0,1)

In [None]:
# vis each of N kernels separately
# each out kernel is a 20^3 tensor in default config
# changed to 5 * 10^3 for this run
def visualize_s2conv_out(t):
    assert len(t.shape) == 4
    for kernel in range(t.shape[0]):
        print('KERNEL {}'.format(kernel))
        outblock = []
        for i in range(t.shape[1]): # row * column * depth = grid * depth = block
            outgrid = []
            for j in range(t.shape[2]): # row * column = grid
                outrow = ''
                for k in range(t.shape[3]): # row
                    outrow += '1' if t[kernel,i,j,k] > 0. else '0'
                outgrid.append(outrow)
            outblock.append(outgrid)
            
        visblock = []
        for i in range(len(outblock[0])):
            visblock.append([])
        for i in range(len(outblock)): # pull each grid from block
            grid = outblock[i]
            for j in range(len(grid)): # pull each row from grid
                visblock[j].append(grid[j])
        for i in range(len(visblock)):
            print(' '.join(visblock[i]))
                    
def show_conv_layer_for(t, model): # expects 1 * 60 * 60 tensor
    torch.unsqueeze(t, 0) # batch
    _, _, t_conv = model(t)
    visualize_s2conv_out(torch.squeeze(t_conv, 0))

In [None]:
class DummyDataset(torch.utils.data.Dataset):
    def __init__(self, n_inst=1000, real_only=False, fuzz_factor=None): # make 1000 instances of my training instances plus some bllshit
        tf1 = make_training_feature1(input0()) # depends on global fns oops
        tf2 = make_training_feature2(input0())
        tf3 = make_training_feature3(input0())
        tf4 = make_training_feature4(input0())
        REAL_FEATURES = 4
        MAX_FEATURES = 5
        self.instances = []
        for _ in range(n_inst):
            r = random.randrange(REAL_FEATURES if real_only else MAX_FEATURES)
            new_instance = None
            if r == 0:
                new_instance = tf1 # circle in upper left
            elif r == 1:
                new_instance = tf2 # SW-NE diag line in lower right
            elif r == 2:
                new_instance = tf3 # cross upper right
            elif r == 3:
                new_instance = tf4 # diag cross lower right
            else:
                new_instance = inputrand() # bullshit
            
            assert not (new_instance is None)
            if (not (fuzz_factor is None)) and r < REAL_FEATURES:
                new_instance = add_light_fuzz(new_instance, fuzz_factor=fuzz_factor)
            self.instances.append((new_instance, r))
    
    def __len__(self):
        return len(self.instances)
    
    def __getitem__(self, idx):
        return self.instances[idx] # tensor, class idx

training_dataset = DummyDataset()
training_loader = torch.utils.data.DataLoader(training_dataset, batch_size=4, shuffle=True)
testing_dataset = DummyDataset(n_inst = 100, real_only=True, fuzz_factor=0.01)
testing_loader = torch.utils.data.DataLoader(testing_dataset, batch_size=1, shuffle=False)

In [None]:
class S2TestModel(torch.nn.Module):
    def __init__(self):
        super(S2TestModel, self).__init__()
        self.conv1 = S2Convolution(nfeature_in=1, nfeature_out=10, b_in=30, b_out=5, grid=s2_near_identity_grid())
        self.fc1 = torch.nn.Linear(10, 20) # i am just fucking around with this architecture - will it learn?
        self.fc2 = torch.nn.Linear(20, 5)
    
    def forward(self, x):
        # print(x.shape)
        x = self.conv1(x)
        conv_out = x
        # print(x.shape)
        x = F.relu(x)
        x = so3_integrate(x)
        int_out = x
        # print(x.shape)
        # activation?
        x = self.fc1(x)
        # print(x.shape)
        x = F.relu(x)
        x = self.fc2(x)
        # print(x.shape)
        return x, int_out, conv_out

In [None]:
def tlog(s):
    print('{}: {}'.format(time.asctime(), s))

In [None]:
N_EPOCHS = 100
BATCH_SIZE = 4
FUZZ_FACTOR = 0.01

In [None]:
training_dataset = DummyDataset(fuzz_factor=FUZZ_FACTOR)
training_loader = torch.utils.data.DataLoader(training_dataset, batch_size=BATCH_SIZE, shuffle=True)
testing_dataset = DummyDataset(n_inst=100, real_only=True, fuzz_factor=FUZZ_FACTOR)
testing_loader = torch.utils.data.DataLoader(testing_dataset, batch_size=1, shuffle=False)

model = S2TestModel()

loss_fn = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.5)

In [None]:
for epoch in range(N_EPOCHS):
    tlog('EPOCH {} of {}'.format(epoch + 1, N_EPOCHS))
    
    # train
    running_loss = 0.
    for i, (images, labels) in enumerate(training_loader):
        model.train()
        
        optimizer.zero_grad()
        guesses, _, _ = model(images) # discard intermediate outputs
        loss = loss_fn(guesses, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        if (i + 1) % 50 == 0:
            tlog('iter {}/{}  loss {}  running loss {}'.format(i + 1, len(training_dataset) // BATCH_SIZE, loss.item(), running_loss))
    
    # validate
    total = 0
    correct = 0
    for i, (images, labels) in enumerate(testing_loader):
        model.eval()
        with torch.no_grad():
            guesses, _, _ = model(images)
            _, predictions = torch.max(guesses, 1)
            total += labels.size(0) # add batch size to total
            correct += (predictions == labels).long().sum().item()
    tlog('Test accuracy for epoch {}: {}'.format(epoch, float(correct) / float(total)))

In [None]:
tf1 = make_training_feature1(input0()) # depends on global fns oops
tf2 = make_training_feature2(input0())
tf3 = make_training_feature3(input0())
tf4 = make_training_feature4(input0())
tfs = [tf1, tf2, tf3, tf4]

In [None]:
for i, tf_raw in enumerate(tfs):
    tf = torch.squeeze(tf_raw)
    tfv = reflect_vert(tf)
    tfh = reflect_horiz(tf)
    tfvh = reflect_horiz(tfv)
    for t_in in [tf, tfv, tfh, tfvh]:
        t_in = torch.squeeze(add_light_fuzz(torch.unsqueeze(t_in, 0), fuzz_factor=FUZZ_FACTOR))
        guess, _, _ = model(torch.unsqueeze(torch.unsqueeze(t_in, 0), 0))
        _, pred = torch.max(guess, 1)
        print('got {} for {}'.format(pred.item(), i))

In [None]:
t_out, t_int, t_conv = model(torch.unsqueeze(input0(), 0))
print('output shape {}'.format(t_out.shape))
print('post-integration shape {}'.format(t_int.shape))
print('post-conv shape {}'.format(t_conv.shape))
print(torch.squeeze(t_out, 0))
print(torch.squeeze(t_int, 0))
visualize_s2conv_out(torch.squeeze(t_conv, 0))

In [None]:
show_conv_layer_for(tf1, model)