In [1]:
student_id =  401210989
student_name = "Yashar Moradi"

print("your student id:", student_id)
print("your name:", student_name)

your student id: 401210989
your name: Yashar Moradi


Import 

In [2]:
import torch
import torch.nn as nn
from tqdm import tqdm
from torch import tensor, Tensor
from torch.optim import Adam,SGD
from torchvision import datasets
from torchvision.transforms import Compose, ToTensor, Lambda
from torch.utils.data import DataLoader
import numpy as np
from scipy.signal import convolve2d
import torchvision

Select device

In [19]:
print("Torch version:",torch.__version__)

print("Is CUDA enabled?",torch.cuda.is_available())

if torch.cuda.is_available(): 
 dev = "cuda:0" 
else: 
 dev = "cpu" 
device = torch.device(dev) 

Torch version: 2.1.0+cu118
Is CUDA enabled? True


FF Loss function

عبارتی که به عنوان تابع خطا آمده پیاده سازی شده است

In [4]:
def FFLoss(pos, neg, threshold):
    return torch.log(1 + torch.exp(torch.cat([threshold - pos, neg - threshold]))).mean()

Generate positive and  negative data

در تولید دیتای مثبت ، 10 پیکس اول با مقدار صحیح تگ ها و در تولید دیتای منفی ، تگ های اشتباه در 10 پیکسل اول قرار داده می شود

In [5]:
def gen_pos_data(x, y):
    out = x.clone()
    out[:, :10] = y
    return out

def gen_neg_data(x, y):
    out = x.clone()
    rnd = torch.randperm(x.shape[0])
    out[:, :10] = y[rnd]
    return out

FF Layer model

لایه شبکه با گرفتن ابعاد ورودی و خروجی تشکیل شده و در قسمت فوروارد یک ترکیب خطی از تابع رلو عبور داده می شود و در تابع خطا ، مقدار خطا با گودنس محاسبه شده و در همان لایه ضرایب آپدیت می شوند 

In [6]:
class FFLayer(nn.Linear):

    def __init__(self, in_features, out_features, threshold,
                 bias=True, device=None, dtype=None):
        super().__init__(in_features, out_features, bias, device, dtype)
        self.opt = Adam(self.parameters(), lr=0.03)
        self.threshold = threshold
        self.relu = nn.ReLU()

    def forward(self, x):
      x_direction = x / (x.norm(2, 1, keepdim=True) + 1e-4)
      return self.relu(nn.functional.linear(x_direction, self.weight, self.bias))

    def loss(self, x_pos, x_neg):
        pos_out = self.forward(x_pos)
        neg_out = self.forward(x_neg)
        g_pos = pos_out.pow(2).mean(1)
        g_neg = neg_out.pow(2).mean(1)
        loss = FFLoss(g_pos, g_neg, self.threshold)
        self.opt.zero_grad()
        loss.backward()
        self.opt.step()
        return self.forward(x_pos).detach(), self.forward(x_neg).detach()

FF Network model

 شبکه با گرفتن آرایه ای ابعد لایه های مخفی تشکیل شده و در قسمت آموزش لایه به لایه ورودی داده شده و ضرایب هر لایه آپدیت می شود و در قسمت 
پیش بینی ، تابع گودنس برای 10 حالت ممکن از اعداد محاسبه شده و مقدار ماکسیمم به عنوان تگ خروجی برگردانده می شود

In [7]:
class FFNetwork(torch.nn.Module):

    def __init__(self, dims, threshold, device = None):
        super().__init__()
        self.layers = []
        for d in range(len(dims) - 1):
            self.layers += [FFLayer(dims[d], dims[d + 1], threshold, device)]

    def predict(self, x):
        goodness_list = []
        for i in range(10):
            vec = torch.zeros([10], dtype=torch.float)
            vec[i] = 1
            h = gen_pos_data(x, vec)
            goodness = []
            for layer in self.layers:
                h = layer(h)
                goodness += [h.pow(2).mean(1)]
            goodness_list += [sum(goodness).unsqueeze(1)]
        goodness_list = torch.cat(goodness_list, 1)
        pred = goodness_list.argmax(1)
        return pred

    def train(self, x_pos, x_neg):
        out_pos, out_neg = x_pos, x_neg
        for i, layer in enumerate(self.layers):
            out_pos, out_neg = layer.loss(out_pos, out_neg)

Load dataset

In [15]:
transform = Compose([
    ToTensor(),
    Lambda(lambda x: torch.flatten(x))])

train_data_one_hot = datasets.MNIST(
    root='./',
    train=True,
    download=True,
    transform=transform,
    target_transform=Lambda(lambda y: torch.zeros(10, dtype=torch.float).scatter_(0, torch.tensor(y), value=1))
)

train_data = datasets.MNIST(
    root='./',
    train=True,
    download=True,
    transform=transform,
)

test_data = datasets.MNIST(
    root='./',
    train=False,
    download=True,
    transform=transform,
)

train_one_hot_dataloader = DataLoader(train_data_one_hot, batch_size=500, shuffle=True)
train_dataloader = DataLoader(train_data, batch_size=50000, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=10000, shuffle=True)

Train supervised learning

تعداد ایپاک و آستانه مشخص شده و با خواندن بچ ها از داده ، در هر بچ دیتای منفی تشکیل شده و برای آموزش به شبکه داده میشود

In [117]:
threshold = 1.5
n_epochs = 5

FFmodel = FFNetwork([784, 100, 100], threshold, device)

for epoch in tqdm(range(n_epochs)):
    for x_batch, y_batch in train_one_hot_dataloader:
        x_train_pos = gen_pos_data(x_batch, y_batch)
        x_train_neg = gen_neg_data(x_batch, y_batch)
        FFmodel.train(x_train_pos, x_train_neg)

x_train, y_train = next(iter(train_dataloader))
y_train_pred = FFmodel.predict(x_train)
correct_train = (y_train_pred == y_train).float().sum()
print("Train Accuracy: {:.3f}".format(correct_train/x_train.shape[0]))

x_test, y_test = next(iter(test_dataloader))
y_test_pred = FFmodel.predict(x_test)
correct_test = (y_test_pred == y_test).float().sum()
print("Test Accuracy: {:.3f}".format(correct_test/x_test.shape[0]))

100%|██████████| 5/5 [01:14<00:00, 14.98s/it]


Train Accuracy: 0.886
Test Accuracy: 0.890


Creat mask

مسک طبق مقاله گفته شده نشکیل شده و به تعداد 10 بار فیلتر بلور روی تصویر ورودی انجام می شود. برای این قسمت از منبع زیر کمک گرفته شده است

https://github.com/ghadialhajj/FF_unsupervised/tree/master

In [8]:
def create_mask(shape, iterations: int = 10):

    blur_filter_1 = np.array(((0, 0, 0), (1/4, 1/2, 1/4), (0, 0, 0)))
    blur_filter_2 = blur_filter_1.T

    image = np.random.randint(0, 2, size=shape)

    for i in range(iterations):
        image = np.abs(convolve2d(image, blur_filter_1, mode='same') / blur_filter_1.sum())
        image = np.abs(convolve2d(image, blur_filter_2, mode='same') / blur_filter_2.sum())

    mask = np.round(image).astype(np.uint8)

    return tensor(mask)

Creat negative data

تولید دیتای هیبرید

In [9]:
def create_negative_image(image_1: Tensor, image_2: Tensor):

    mask = create_mask((image_1.shape[0], image_1.shape[1]))

    image_1 = torch.mul(image_1, mask)
    image_2 = torch.mul(image_2, 1 - mask)

    return torch.add(image_1, image_2)


def create_negative_batch(images: Tensor):
    neg_imgs = []
    batch_size = images.shape[0]
    for _ in range(batch_size):
        idx1, idx2 = np.random.randint(batch_size, size=2)
        neg_imgs.append(create_negative_image(images[idx1].squeeze(), images[idx2].squeeze()))
    return torch.unsqueeze(torch.stack(neg_imgs), dim=1)

Load dataset for unsupervised FF model

دیتای ورودی برای این قسمت بصورت تصویر 28*28 خوانده شده

In [11]:
transform = Compose([
    ToTensor()])

train_data = datasets.MNIST(
    root='./',
    train=True,
    download=True,
    transform=transform,
)

test_data = datasets.MNIST(
    root='./',
    train=False,
    download=True,
    transform=transform,
)

train_dataloader = DataLoader(train_data, batch_size=50000, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=10000, shuffle=True)
pos_dataloader = DataLoader(train_data, batch_size=500, shuffle=True)


FF Unsupervised network

شبکه بدون نظارت با دریافت بردار لایه های مخفی و خروجی تشکیل شده است. برای آموزش ورودی به لایه داده شده و ضرایب در همان قسمت آپدیت می شوند. برای پیش بینی تابع گودنس لایه خروجی محاسبه شده و مقدار ماکسیمم به عنوان تگ خروجی انتخاب می شود

In [13]:
class FFNetworkUnsupervised(torch.nn.Module):

    def __init__(self, dims, threshold, device = None):
        super().__init__()
        self.layers = []
        for d in range(len(dims) - 1):
            self.layers += [FFLayer(dims[d], dims[d + 1], threshold, device)]

    def predict(self, x):
        h = torch.reshape(x, (x.shape[0], -1))
        goodness_list = []
        goodness = []
        for layer in self.layers:
            h = layer(h)
            goodness += [h.pow(2).mean(1)]
        goodness_list += [sum(goodness).unsqueeze(1)]
        goodness_list = torch.cat(goodness_list, 1)
        pred = goodness_list.argmax(1)
        return pred

    def train(self, x_pos, x_neg):
        out_pos, out_neg = x_pos, x_neg
        for i ,layer in enumerate(self.layers):
            out_pos, out_neg = layer.loss(out_pos, out_neg)

Train unsupervised FF network

آموزش شبکه بدون نظارت با تعداد ایپاک و مقدار استانه مشخص شده

In [16]:
threshold = 1.5
n_epochs = 5

FFmodelUnsupervised = FFNetworkUnsupervised([784, 500, 500, 10], threshold, device)

for epoch in tqdm(range(n_epochs)):
    for x_pos_batch, y_pos_batch in pos_dataloader:
        x_neg_batch = create_negative_batch(x_pos_batch)
        x_pos = torch.reshape(x_pos_batch, (x_pos_batch.shape[0], -1))
        x_neg = torch.reshape(x_neg_batch, (x_neg_batch.shape[0], -1))
        FFmodelUnsupervised.train(x_pos, x_neg)

  0%|          | 0/5 [00:00<?, ?it/s]

100%|██████████| 5/5 [06:41<00:00, 80.26s/it]


Accuracy of unsupervised FF network

In [18]:
x_train, y_train = next(iter(train_dataloader))
y_train_pred = FFmodelUnsupervised.predict(x_train)
correct_train = (y_train_pred == y_train).float().sum()
print("Train Accuracy: {:.3f}".format(correct_train/x_train.shape[0]))

x_test, y_test = next(iter(test_dataloader))
y_test_pred = FFmodelUnsupervised.predict(x_test)
correct_test = (y_test_pred == y_test).float().sum()
print("Test Accuracy: {:.3f}".format(correct_test/x_test.shape[0]))

Train Accuracy: 0.099
Test Accuracy: 0.098
