# 大作业要求

1.大作业以2-3人为一组完成，提交材料包括PPT（最后一次课展示成果）+ 可运行的jupyter notebook，标注相应的注释并给出运行结果 + 最终的大作业报告（正文不超过5页word/pdf，附录可选不限，需组内各成员单独提交，内容为本人在课程大作业中的贡献以及对大作业问题的思考) + 提交包含分工情况及组内各成员工作量占比的表格。分工表格需组内所有成员签字确认；

2.禁止抄袭，发现雷同，所有雷同提交分数除以2；

3.写清楚大作业中的贡献和创新点，若使用开源代码和论文中的方法，在报告中必须注明（不可作为本人创新点），发现不标注引用，分数除以2。

最后一次课展示说明： 1.样例 PPT例子：https://www.sohu.com/a/166633625_642762 2.展示时间限制：展示时间为6分钟讲+2分钟同学助教老师自由提问

大作业报告：强调个人对问题的理解，以及贡献，建议增加在提问反馈之后的改进结果。

最终评分为:30%展示评分+70%大作业报告

# 问题描述

深度神经网络通常采用独立同分布(Independent-Identically)的假设进行训练，即假设测试数据分布与训练数据分布相似。然而，当用于实际任务时，这一假设并不成立，导致其性能显著下降。虽然这种性能下降对于产品推荐等宽容性大的应用是可以接受的，但在医学等宽容性小的领域使用此类系统是危险的，因为它们可能导致严重事故。理想的人工智能系统应尽可能在分布外（Out-of-Distribution）的情况下有较强的分部外泛化能力。而提高分布外泛化的关键点，就是如何让模型学习到数据中的causal feature。

一个简单的例子：以猫狗二分类为例，如果训练集中所有狗都在草地上，所有的猫都在沙发上，而测试集中所有的狗在沙发上，所有的猫在草地上，那么模型在没有测试集信息的情况下，很有可能根据训练集的信息把草地和狗联系在了一起，沙发和猫联系在了一起，当模型在测试集上测试时将会把在沙发上的狗误认为是猫。

# 数据集(Colored MNIST)

Colored MNIST是MNIST手写数字分类数据集的变体，包含有三个不同的域，每个域包含一组不相交的红色或绿色数字并分别保存为train1.pt, train2.pt, test.pt。该数据集总共包含60000个样本。 

在该数据集中，训练集和测试集之间存在Out-of-Distribution情况，color feature和数字产生了spurious correlation，即虚假的因果关系。从直观上来说，数字的形状为causal feature，数字的颜色为non-causal feature。

![image.png](attachment:image.png)

# Colored MNIST数据集

In [42]:
import os

import numpy as np
from PIL import Image

import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import grad
from torchvision import transforms
from torchvision import datasets
import torchvision.datasets.utils as dataset_utils


def color_grayscale_arr(arr, red=True):
    """Converts grayscale image to either red or green"""
    assert arr.ndim == 2
    dtype = arr.dtype
    h, w = arr.shape
    arr = np.reshape(arr, [h, w, 1])
    if red:
        arr = np.concatenate([arr,
                              np.zeros((h, w, 2), dtype=dtype)], axis=2)
    else:
        arr = np.concatenate([np.zeros((h, w, 1), dtype=dtype),
                              arr,
                              np.zeros((h, w, 1), dtype=dtype)], axis=2)
    return arr


class coloredMNIST(datasets.VisionDataset):
    """
  Colored MNIST dataset for testing IRM. Prepared using procedure from https://arxiv.org/pdf/1907.02893.pdf

  Args:
    root (string): Root directory of dataset where ``ColoredMNIST/*.pt`` will exist.
    env (string): Which environment to load. Must be 1 of 'train1', 'train2', 'test', or 'all_train'.
    transform (callable, optional): A function/transform that  takes in an PIL image
      and returns a transformed version. E.g, ``transforms.RandomCrop``
    target_transform (callable, optional): A function/transform that takes in the
      target and transforms it.
  """

    def __init__(self, root='./data', env='test', transform=None, target_transform=None):
        super(coloredMNIST, self).__init__(root, transform=transform,
                                           target_transform=target_transform)

        self.prepare_colored_mnist()
        if env in ['train1', 'train2', 'test']:
            self.data_label_tuples = torch.load(os.path.join(self.root, 'ColoredMNIST', env) + '.pt')
        elif env == 'all_train':
            self.data_label_tuples = torch.load(os.path.join(self.root, 'ColoredMNIST', 'train1.pt')) + \
                                     torch.load(os.path.join(self.root, 'ColoredMNIST', 'train2.pt'))
        else:
            raise RuntimeError(f'{env} env unknown. Valid envs are train1, train2, test, and all_train')

    def __getitem__(self, index):
        """
    Args:
        index (int): Index

    Returns:
        tuple: (image, target) where target is index of the target class.
    """
        img, target, color = self.data_label_tuples[index]

        if self.transform is not None:
            img = self.transform(img)

        if self.target_transform is not None:
            target = self.target_transform(target)

        return img, target, color

    def __len__(self):
        return len(self.data_label_tuples)

    def prepare_colored_mnist(self):
        colored_mnist_dir = os.path.join(self.root, 'ColoredMNIST')
        if os.path.exists(os.path.join(colored_mnist_dir, 'train1.pt')) \
                and os.path.exists(os.path.join(colored_mnist_dir, 'train2.pt')) \
                and os.path.exists(os.path.join(colored_mnist_dir, 'test.pt')):
            print('Colored MNIST dataset already exists')
            return

        print('Preparing Colored MNIST')
        train_mnist = datasets.mnist.MNIST(self.root, train=True, download=True)

        train1_set = []
        train2_set = []
        test_set = []
        for idx, (im, label) in enumerate(train_mnist):
            if idx % 10000 == 0:
                print(f'Converting image {idx}/{len(train_mnist)}')
            im_array = np.array(im)

            # Assign a binary label y to the image based on the digit
            binary_label = 0 if label < 5 else 1

            # Flip label with 25% probability
            if np.random.uniform() < 0.25:
                binary_label = binary_label ^ 1

            # Color the image either red or green according to its possibly flipped label
            color_red = binary_label == 0

            # Flip the color with a probability e that depends on the environment
            if idx < 20000:
                # 20% in the first training environment
                if np.random.uniform() < 0.2:
                    color_red = not color_red
            elif idx < 40000:
                # 10% in the first training environment
                if np.random.uniform() < 0.1:
                    color_red = not color_red
            else:
                # 90% in the test environment
                if np.random.uniform() < 0.9:
                    color_red = not color_red

            colored_arr = color_grayscale_arr(im_array, red=color_red)

            # 数据中储存样本颜色
            if idx < 20000:
                train1_set.append((Image.fromarray(colored_arr), binary_label, color_red))
            elif idx < 40000:
                train2_set.append((Image.fromarray(colored_arr), binary_label, color_red))
            else:
                test_set.append((Image.fromarray(colored_arr), binary_label, color_red))

            # Debug
            # print('original label', type(label), label)
            # print('binary label', binary_label)
            # print('assigned color', 'red' if color_red else 'green')
            # plt.imshow(colored_arr)
            # plt.show()
            # break

        #dataset_utils.makedir_exist_ok(colored_mnist_dir)
        torch.save(train1_set, os.path.join(colored_mnist_dir, 'train1.pt'))
        torch.save(train2_set, os.path.join(colored_mnist_dir, 'train2.pt'))
        torch.save(test_set, os.path.join(colored_mnist_dir, 'test.pt'))

In [43]:
# 第一次运行时需手动创建data/ColoredMNIST文件夹
D = coloredMNIST(env='test')
img, target, color = D[5]
img.show()
print(target, color)

Colored MNIST dataset already exists
0 False


# ▶︎▶︎▶︎基础部分

### 1. 设计ColoredMNIST数据二分类的因果图，合理即可。并基于后门准则，推导𝑃(𝑦|𝑑𝑜(𝑥)) 【提示：因果图可以为E->X, E->Y, X->Y, E为环境，比如颜色】

$P(y|do(x)) = \Sigma_{e} P(y|x,e)P(x)$

### 2. 在ColoredMNIST数据上实现基于后门准则的因果推理算法，训练神经网络，提升模型预测准确度。

# 1. 读取数据

In [44]:
########################################▶︎###############################
###一下是一个简单的读取Colored MNIST例子，请进一步完善。可以进行数据预处理等操作。###
#######################################################################

class ColoredMNIST(datasets.VisionDataset):    
    def __init__(self, path):
        self.data_label = torch.load(path)
        self.red_pics = []
        self.green_pics = []
        for img, target, color in self.data_label:
            if (color):
                self.red_pics.append((img, target))
            else:
                self.green_pics.append((img, target))
        
        # 把样本中的红色样本数补至相同
        cnt = np.zeros(2)
        for img, target in self.red_pics:
            cnt[target] += 1
        if (cnt[0]>cnt[1]):
            for img, target in self.red_pics:
                if (target==1):
                    self.red_pics.append((img, target))
                    cnt[1] += 1
                    if (cnt[0] == cnt[1]):
                        break
        if (cnt[0]<cnt[1]):
            for img, target in self.red_pics:
                if (target==0):
                    self.red_pics.append((img, target))
                    cnt[0] += 1
                    if (cnt[0] == cnt[1]):
                        break    
        
        # 把样本中的绿色样本数补至相同
        cnt = np.zeros(2)
        for img, target in self.green_pics:
            cnt[target] += 1
        if (cnt[0]>cnt[1]):
            for img, target in self.green_pics:
                if (target==1):
                    self.green_pics.append((img, target))
                    cnt[1] += 1
                    if (cnt[0] == cnt[1]):
                        break
        if (cnt[0]<cnt[1]):
            for img, target in self.green_pics:
                if (target==0):
                    self.green_pics.append((img, target))
                    cnt[0] += 1
                    if (cnt[0] == cnt[1]):
                        break
        
        # 把PIL_image转化为tensor，以使用dataloader来加载数据
        trans = transforms.ToTensor()
        self.data = []
        for idx, (img, target) in enumerate(self.red_pics):
            img_tensor = trans(img) * 255
            self.data.append((img_tensor, target))
        for idx, (img, target) in enumerate(self.green_pics):
            img_tensor = trans(img) * 255
            self.data.append((img_tensor, target))

    def __getitem__(self, index):
        img, target = self.data[index]
        return img, target
    
    def __len__(self):
        return len(self.data)

In [45]:
train_data = ColoredMNIST('data/ColoredMNIST/train1.pt')
train_loader = torch.utils.data.DataLoader(train_data, batch_size=32, shuffle=True)
test_data = ColoredMNIST('data/ColoredMNIST/test.pt')
test_loader = torch.utils.data.DataLoader(test_data, batch_size=32, shuffle=True)
print(len(train_loader))
print(len(test_loader))

1001
1123


# 2. 定义模型

In [46]:
class MyModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, 5)
        self.conv2 = nn.Conv2d(32, 64, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(64*4*4, 64)
        self.fc2 = nn.Linear(64, 2)
        
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(x)
        x = F.relu(self.conv2(x))
        x = self.pool(x)
        x = x.view(-1, 64*4*4)
        x = F.relu(self.fc1(x))
        return self.fc2(x)

# 3. 训练模型并输出测试结果

In [47]:
net = MyModel()
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001)

def train_test_MyModel():
    for epoch in range(10):
        for idx, (img, target) in enumerate(train_loader):
            #img += torch.normal(mean=0, std=10, size=img.shape)
            optimizer.zero_grad()
            output = net(img)
            loss = loss_fn(output, target)
            loss.backward()
            optimizer.step()
        
        correct = total = 0
        for idx, (img, target) in enumerate(test_loader):
            output = net(img)
            pred = torch.argmax(output, dim=1)
            correct += torch.sum(pred == target)
            total += img.shape[0]
        print(correct / total)

train_test_MyModel()



tensor(0.6709)
tensor(0.6915)
tensor(0.6851)
tensor(0.6881)
tensor(0.6646)
tensor(0.6767)
tensor(0.6042)
tensor(0.6428)
tensor(0.6152)
tensor(0.5951)


# ▶︎▶︎▶︎提高部分

### 2. 将上述过程中用到的神经网络替换为Memristor，可使用MemTorch库，并研究方法提高其泛化性能；

In [48]:
%pip install memtorch

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [49]:
import memtorch
reference_memristor = memtorch.bh.memristor.VTEAM
reference_memristor_params = {'time_series_resolution': 1e-10}
memristor = reference_memristor(**reference_memristor_params)
#memristor.plot_hysteresis_loop()
#memristor.plot_bipolar_switching_behaviour()

import copy
from memtorch.mn.Module import patch_model
from memtorch.map.Input import naive_scale
from memtorch.map.Parameter import naive_map
from memtorch.bh.nonideality.NonIdeality import apply_nonidealities

r_on = 1.4e4
r_off = 5e7


def ANN_to_MNN(model, r_on, r_off, tile_shape, ADC_resolution, failure_percentage):
    '''
    model : pretrained model.
    r_on : float
        On (minimum) resistance of the device (ohms).
    r_off : float
        Off (maximum) resistance of the device (ohms).
    tile_shape : int, int
        Tile shape to use to store weights.
    ADC_resolution : int
        ADC resolution (bit width). If None, quantization noise is not accounted for.
    lrs_proportion : float
        Proportion of devices which become stuck at a low resistance state.
    '''
    model_ = copy.deepcopy(model)
    reference_memristor = memtorch.bh.memristor.VTEAM
    reference_memristor_params = {'time_series_resolution': 1e-10, 'r_off': r_off, 'r_on': r_on}

    # 模型中每一层转化为忆阻器元件
    patched_model = patch_model(copy.deepcopy(model_),
                              memristor_model=reference_memristor,
                              memristor_model_params=reference_memristor_params,
                              module_parameters_to_patch=[torch.nn.Linear, torch.nn.Conv2d],
                              mapping_routine=naive_map,
                              transistor=True,
                              programming_routine=None,
                              scheme=memtorch.bh.Scheme.DoubleColumn,
                              tile_shape=tile_shape,
                              max_input_voltage=0.3,
                              ADC_resolution=int(ADC_resolution),
                              ADC_overflow_rate=0.,
                              quant_method='linear')

    # 加入非理想修正
    patched_model = apply_nonidealities(patched_model,
                                        non_idealities=[memtorch.bh.nonideality.NonIdeality.DeviceFaults],
                                        lrs_proportion=failure_percentage,
                                        hrs_proportion=0.,
                                        electroform_proportion=0.)
    
    patched_model.tune_()

    return patched_model

MNN = ANN_to_MNN(net, r_on=r_on, r_off=r_off, tile_shape=(128,128), ADC_resolution=8, failure_percentage=0)

Patched Conv2d(3, 32, kernel_size=(5, 5), stride=(1, 1)) -> bh.Conv2d(in_channels=3, out_channels=32, kernel_size=(5, 5), stride=(1, 1), padding=(0, 0))
Patched Conv2d(32, 64, kernel_size=(5, 5), stride=(1, 1)) -> bh.Conv2d(in_channels=32, out_channels=64, kernel_size=(5, 5), stride=(1, 1), padding=(0, 0))
Patched Linear(in_features=1024, out_features=64, bias=True) -> bh.Linear(in_features=1024, out_features=64, bias=True)
Patched Linear(in_features=64, out_features=2, bias=True) -> bh.Linear(in_features=64, out_features=2, bias=True)
Tuned bh.Conv2d(in_channels=3, out_channels=32, kernel_size=(5, 5), stride=(1, 1), padding=(0, 0)). Coefficient of determination: 0.993118 [6651.863281, -0.001044]
Tuned bh.Conv2d(in_channels=32, out_channels=64, kernel_size=(5, 5), stride=(1, 1), padding=(0, 0)). Coefficient of determination: 0.997598 [2412.404785, -0.001807]
Tuned bh.Linear(in_features=1024, out_features=64, bias=True). Coefficient of determination: 0.999190 [2591.459961, -0.001109]
Tu

In [50]:
# 测试检验MNN
def test_MNN():
    optimizer = optim.SGD(MNN.parameters(), lr=0.001)

    correct = total = 0
    for idx, (img, target) in enumerate(test_loader):
        #print(torch.sum(torch.abs(img[0]-img[1])))
        output = MNN(img)
        pred = torch.argmax(output, dim=1)
        correct += torch.sum(pred.cpu() == target)
        total += img.shape[0]
    print(correct / total)

test_MNN()

tensor(0.5093)
