Data


In [7]:
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from torch.utils.data import DataLoader
import numpy as np


In [3]:
transform = transforms.Compose([
    transforms.Resize((32, 32)),
    transforms.ToTensor(),  
])

In [4]:

train_dataset = ImageFolder(root="Data/asl_alphabet_train/asl_alphabet_train", transform=transform)
test_dataset = ImageFolder(root="Data/asl_alphabet_test/asl_alphabet_test", transform=transform)

In [5]:
train_loader = DataLoader(train_dataset, batch_size=100, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=29, shuffle=False)

In [6]:
print(f"Training samples: {len(train_dataset)}, Test samples: {len(test_dataset)}")

Training samples: 87000, Test samples: 29


In [18]:
import numpy as np
import torch

def extract_images(dataloader):
    images = []
    for batch in dataloader:
        x_batch, _ = batch  
        images.append(x_batch.numpy())  
    return np.concatenate(images, axis=0)
# Extract training and test images
x_train = extract_images(train_loader)  
x_test = extract_images(test_loader)    

# Convert to float32 for compatibility with NumPy operations
x_train = x_train.astype(np.int32)
x_test = x_test.astype(np.int32)

print(f"x_train shape: {x_train.shape}, x_test shape: {x_test.shape}")


x_train shape: (87000, 3, 32, 32), x_test shape: (29, 3, 32, 32)


In [23]:
B, I, H, W = 10, 3, 32, 32  
x = np.random.rand(B, I, H, W) 
x_flattened = x.reshape(B, I * H * W)

print(f"Original shape: {x.shape}")    
print(f"Flattened shape: {x_flattened.shape}")  

Original shape: (10, 3, 32, 32)
Flattened shape: (10, 3072)


In [None]:
class Reservoir(object):
    def __init__(self,i_size,r_size,i_coef=1.0,r_coef=0.999,sparse=0.5,leak=0.5):
        self.w_i = i_coef * np.random.uniform(-1,1,(r_size,i_size)).astype(np.float32)
        self.w_r = np.random.uniform(-1,1,(r_size*r_size, )).astype(np.float32)
        np.random.shuffle(self.w_r)
        self.w_r = self.w_r.reshape(r_size,r_size)
        self.w_r = self.w_r/max(abs(np.linalg.eig(self.w_r)[0])) * r_coef
        self.w_r = np.array(self.w_r)

        self.leak = leak

    def reset(self,batch):
        self.x = np.zeros((batch,self.w_r.shape[0]),dtype=np.float32)


    def __call__(self,u):
        if not u.shape[0] == self.x.shape[0]:
            print("different batchsize")
            print("required:",self.x.shape[0])
            print("actual",u.shape[0])
        self.x = np.tanh((1-self.leak)*self.x+self.leak*(u.dot(self.w_i.T)+self.x.dot(self.w_r.T)),dtype=np.float32)
        return self.x
    
    def reset_and_call(self,us):
        self.reset(batch=us.shape[1])

        for u in us:
            x = self.__call__(u)
        return copy.deepcopy(x)


        

In [None]:
class ReservoirBasedConvolution2D(object):
    def __init__(self,k_size,stride,padding,in_ch,num_reservoir,num_node,sparse,i_coef=1.0,r_coef=0.999):
        leaks = np.linspace(0.1,0.9,num_reservoir)
        self.res = [Reservoir(in_ch*k_size,num_node,sparse=sparse,leak=l,i_coef=i_coef,r_coef=r_coef) for l in leaks]
        self.k_size = k_size
        self.stride = stride
        self.padding = padding

    def __call__(self,x):
        aois_h,num_row_h,num_col_h = self._image2aoisH(x,self.k_size,self.stride,self.padding)
        aois_v,num_row_v,num_col_v = self._image2aoisV(x,self.k_size,self.stride,self.padding)
        hs_h = []
        for res in self.res:
            h = res.reset_and_call(aois_h)
            hs_h.append(h)

        #(B*N,C)->(N,B,C) = (H*W,B,C)
        hs_h = np.concatenate(hs_h,axis=1)
        hs_h = hs_h.reshape(num_row_h*num_col_h,-1,hs_h.shape[1])

        #conv 2 image: (H*W,B,C) -> (B,C,H*W)->(B,C,H,W)
        images_h = np.array(hs_h)
        images_h = np.rollaxis(images_h,0,3)
        images_h = images_h.reshape(images_h.shape[0],images_h.shape[1],num_row_h,num_col_h)

        hs_v = []
        for res in self.res:
            h = res.reset_and_call(aois_v)
            hs_v.append(h)
        
        # (B*N,C)->(N,B,C) = (H*W,B,C)
        hs_v = np.concatenate(hs_v,axis=1)
        hs_v = hs_v.reshape(num_row_v*num_col_v,-1,hs_v.shape[1])

        # convs 2 image: (H*W,B,C)
        images_v = np.asarray(hs_v)
        images_v = np.rollaxis(images_v, 0, 3)
        images_v = images_v.reshape(images_v.shape[0], images_v.shape[1], num_row_v, num_col_v)

        # (B, C, H, W) + (B, C, H, W) -> (B, 2C, H, W)
        images = np.concatenate([images_h, images_v], axis=1)
        return images
    
    def _image2aoisH(self, x, k_size, stride, padding):
        # x (B, C, H, W)

        # padding
        x_pad = np.zeros((x.shape[0], x.shape[1], x.shape[2] + 2*padding, x.shape[3] + 2*padding), dtype=xp.float32)
        x_pad[:, :, padding:padding + x.shape[2], padding:padding + x.shape[3]] = x

        # areas of interest
        aois = []
        num_row = (x_pad.shape[2] - k_size) // stride + 1
        num_col = (x_pad.shape[3] - k_size) // stride + 1
        for h in range(0, x_pad.shape[2] - k_size + 1, stride):
            for w in range(0, x_pad.shape[3] - k_size + 1, stride):
                aoi = x_pad[:, :, h:h+k_size, w:w+k_size]

                # (B, C, H, W) -> (H, B, C, W) -> (H, B, C*W) = (T, B, C*W)
                aoi = np.rollaxis(aoi, 2, 0)
                aoi = aoi.reshape(aoi.shape[0], aoi.shape[1], -1)

                aois.append(aoi)

        # (T, B*N, C*W)
        aois = np.concatenate(aois, axis=1)
        return aois, num_row, num_col


    def _image2aoisV(self, x, k_size, stride, padding):
        # x (B, C, H, W)

        # padding
        x_pad = np.zeros((x.shape[0], x.shape[1], x.shape[2] + 2*padding, x.shape[3] + 2*padding), dtype=xp.float32)
        x_pad[:, :, padding:padding + x.shape[2], padding:padding + x.shape[3]] = x

        # areas of interest
        aois = []
        num_row = (x_pad.shape[2] - k_size) // stride + 1
        num_col = (x_pad.shape[3] - k_size) // stride + 1
        for h in range(0, x_pad.shape[2] - k_size + 1, stride):
            for w in range(0, x_pad.shape[3] - k_size + 1, stride):
                aoi = x_pad[:, :, h:h+k_size, w:w+k_size]

                # (B, C, H, W) -> (W, B, C, H) -> (W, B, C*H) = (T, B, C*H)
                aoi = np.rollaxis(aoi, 3, 0)
                aoi = aoi.reshape(aoi.shape[0], aoi.shape[1], -1)

                aois.append(aoi)

        # (T, B*N, C*H)
        aois = np.concatenate(aois, axis=1)
        return aois, num_row, num_col

In [None]:
def max_pool2d(x, k_size, stride, padding=0):
    # padding
    x_pad = np.zeros((x.shape[0], x.shape[1], x.shape[2] + 2*padding, x.shape[3] + 2*padding), dtype=xp.float32)
    x_pad[:, :, padding:padding + x.shape[2], padding:padding + x.shape[3]] = x

    # aois
    aois = []
    num_row = (x_pad.shape[2] - k_size) // stride + 1
    num_col = (x_pad.shape[3] - k_size) // stride + 1
    for h in range(0, x_pad.shape[2] - k_size + 1, stride):
        for w in range(0, x_pad.shape[3] - k_size + 1, stride):
            aoi = x_pad[:, :, h:h+k_size, w:w+k_size]
            aoi = aoi.reshape(aoi.shape[0], aoi.shape[1], -1)
            aois.append(aoi)

    # pooling
    aois = np.array(aois)
    pools = np.max(aois, axis=3)

    # (H*W, B, C) -> (B, C, H*W)
    pools = np.array(pools)
    pools = np.rollaxis(pools, 0, 3)
    pools = pools.reshape(pools.shape[0], pools.shape[1], num_row, num_col)

    return pools

In [None]:
def ridge_regression(x, t, norm=1.0):
    array = x.T.dot(x)
    array = array + norm * np.eye(x.shape[1])
    array = np.linalg.inv(array)
    array = array.dot(x.T)
    array = array.dot(t)

    return array.T

In [None]:
conv1 = ReservoirBasedConvolution2D(k_size=3, stride=1, padding=1, in_ch=3, num_reservoir=5, num_node=12, sparse=0.5)
conv2 = ReservoirBasedConvolution2D(k_size=3, stride=1, padding=1, in_ch=120, num_reservoir=5, num_node=30, sparse=1.0)
w_o = np.zeros((29, 7501), dtype=np.float32)

In [None]:
def forward(x):
    h = conv1(x)
    h = max_pool2d(h, k_size=2, stride=2)
    h = conv2(h)
    h = max_pool2d(h, k_size=2, stride=2)

    h = h.reshape(h.shape[0], -1)
    h_pad = np.ones((h.shape[0], h.shape[1] + 1), dtype=np.float32)
    h_pad[:, 0:-1] = h

    y = h_pad.dot(w_o.T)
    return y, h_pad