In [4]:
import os
import math
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
from torch.utils.data import DataLoader

In [5]:
csv_path = "/home/richard/labrotation/processed_sample_data/human_origins_labels.csv"
array_path = "//home/richard/labrotation/processed_sample_data/arrays"
index = 0

In [6]:
annotations = pd.read_csv(csv_path)
array_dir = array_path

In [66]:
class sampleDataset(Dataset):
    def __init__(self, csv_file, array_dir, transform=None):
        self.annotations = pd.read_csv(csv_file)
        self.array_dir = array_dir
        self.transform = transform
    
    def __len__(self):
        return len(self.annotations)
    
    def __getitem__(self, index):
        array_path = os.path.join(self.array_dir, self.annotations.iloc[index, 0]+".npy")
        array = np.load(array_path)
        array = array.astype('float')
        y_label = annotations.iloc[index,1]

        if self.transform:
            array = self.transform(array)
        
        return array, y_label

In [67]:
dataset = sampleDataset(csv_file=csv_path, array_dir=array_dir)
data_loader = DataLoader(dataset=dataset, batch_size=5, shuffle=True)

In [68]:
test_batch = next(iter(data_loader))
test_array = test_batch[0]
test_array.shape

torch.Size([5, 4, 1000])

In [69]:
test_array.dtype

torch.float64

In [70]:
'''
expected input: Tensor of dimensions N*C*in_features
output: Tensor of dimensions N*out_sets*⌈(in_features/m)⌉
N: batch size
C: number of channels (e.g. 4 for one-hot-encoding of SNPs)
in_features: number of input features (e.g. SNP positions)
out_sets: number of output sets (new channels)
m: how many in_features to group together
kernel_size: kernel of flat tensor: m*C
padding: should we padd at the end of the dataframe if in_features%m !=0? 
'''
class LocallyConnectedLayer(torch.nn.Module):
    def __init__(self, in_features, m, C=4, padding=True, bias=False, out_sets=4):
        super().__init__()
        self.in_features = in_features
        self.C = C
        self.m = m
        self.kernel_size = m*C
        self.padding = (m-(in_features%m))%m*C if padding else 0
        self.weight = nn.Parameter(torch.randn(1,self.kernel_size, out_sets))
        self.bias = nn.Parameter(torch.randn(1,out_sets)) if bias else None # first try without bias but with batchnorm
    
    def forward(self, x):
        x = x.float()
        x = x.transpose(-1,-2) # we need to transpose first to ensure that the channel values of one in_feature are next to each other after flattening
        x = x.flatten(1) # dim(N,in_features*C)
        x = F.pad(x,(0,self.padding))
        x = x.unfold(-1, size=self.kernel_size, step=self.kernel_size)
        x = torch.matmul(x.float(),self.weight)
        if self.bias is not None:
            x = x+self.bias
        x = x = x.transpose(-1,-2) # transpose back to have the more convenient dimension order
        return x


In [71]:
class LCBlock(nn.Module):
    def __init__(self, in_features, m, out_sets=4, p=0.0):
        super().__init__()
        self.bn = nn.BatchNorm1d(out_sets)
        self.silu = nn.SiLU()
        self.drop = nn.Dropout(p)
        self.LCL1 = LocallyConnectedLayer(in_features, m=m, padding=True, out_sets=out_sets)
        self.LCL2 = LocallyConnectedLayer(in_features=math.ceil(in_features/m),m=m, padding=True, out_sets=out_sets)
        self.identity_downsample = nn.Linear(in_features, out_features=math.ceil(in_features/m**2)) if m!=1 else None

    def forward(self, x):
        identity = x

        x = self.bn(x)
        x = self.silu(x)
        x = self.LCL1(x)
        x = self.bn(x)
        x = self.silu(x)
        x = self.drop(x)
        x = self.LCL2(x)

        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)

        x = x+identity
        return x

In [72]:
'''
expected input: flat tensor of shape N*in_features
expected output: flat tensor of shape N*out_features
N: batch size
'''
class FCBlock(nn.Module):
    def __init__(self, in_features, out_features, p=0.5):
        super().__init__()
        self.bn1 = nn.BatchNorm1d(in_features)
        self.bn2 = nn.BatchNorm1d(out_features)
        self.silu = nn.SiLU()
        self.drop = nn.Dropout(p)
        self.FCL1 = nn.Linear(in_features=in_features, out_features=out_features)
        self.FCL2 = nn.Linear(in_features=out_features, out_features=out_features)
        self.identity_downsample = nn.Linear(in_features, out_features=out_features) if in_features != out_features else None

    def forward(self, x):
        identity = x

        x = self.bn1(x)
        x = self.silu(x)
        x = self.FCL1(x)
        x = self.bn2(x)
        x = self.silu(x)
        x = self.drop(x)
        x = self.FCL2(x)

        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)

        x = x+identity
        return x

In [None]:
class GLN(nn.Module):
    def __init__(self, block, num_residual_blocks, in_features):
        self.LCL0 = LocallyConnectedLayer(in_features, m=2)
        self.block1

    def forward(self,x):
        x = self.LCL0(in_features, m=2)



In [64]:
model = FCBlock(in_features=4000, out_features=4000)

In [74]:
test_array.dtype
model.forward(test_array.flatten(1)).shape
bla.forward(test_array).shape

RuntimeError: expected scalar type Double but found Float

In [None]:
seed = 42
torch.manual_seed(seed)
fc0 = LocallyConnectedLayer(in_features=1000,m=2,bias=False)
a = torch.from_numpy(array).unsqueeze(0).float()
out = fc0.forward(a)

In [None]:
b = LocallyConnectedLayer(in_features=500, m=8).forward(a)
c = LocallyConnectedLayer(in_features=125,m=8).forward(b)

In [None]:
bla = c.flatten()
bla = bla.unsqueeze(0)
bla2 = nn.BatchNorm1d(64)
bla2(bla)