In [None]:
import torch
from utils import *
from matplotlib import pyplot as plt
set_seed(123)

device=torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print("Device:",device)

In [None]:
def normal_distribution(mean, std, amount=100):
    if len(mean)!=len(std):
        raise("Different Dim!")
    dim=len(mean)
    data=torch.empty((amount, dim), device=device)
    for i in range(dim):
        data[:,i]=data[:,i].normal_(mean=mean[i], std=std[i])
    return data

In [None]:
INPUT_DIM=3
CLASS_NUM=30
SAMPLE_NUM=10
SAMPLE_NUM_RANGE=0.2
MEAN_ARRANGE=10
STD_ARRANGE=1

if INPUT_DIM==2:
    # 2D
    mean_list=[]
    std_list=[]
    for i in range(CLASS_NUM):
        mean_list.append([random.random()*MEAN_ARRANGE, random.random()*MEAN_ARRANGE])
        std_list.append([0.5+random.random()*STD_ARRANGE, 0.5+random.random()*STD_ARRANGE])
    
    data={}
    for i in range(len(mean_list)):
        temp=normal_distribution(mean_list[i], std_list[i], int( (1 + random.sample([1, -1], 1)[0] * random.random() * SAMPLE_NUM_RANGE) * SAMPLE_NUM))
        data[str(i)]=temp

    if CLASS_NUM<=25:
        for tag in data:
            plt.scatter(data[tag][:, 0].cpu(), data[tag][:, 1].cpu(), label=tag)
        plt.legend()
elif INPUT_DIM==3:
    # 3D
    mean_list=[]
    std_list=[]
    for i in range(CLASS_NUM):
        mean_list.append([random.random()*MEAN_ARRANGE, random.random()*MEAN_ARRANGE, random.random()*MEAN_ARRANGE])
        std_list.append([0.5+random.random()*STD_ARRANGE, 0.5+random.random()*STD_ARRANGE, 0.5+random.random()*STD_ARRANGE])

    data={}
    for i in range(len(mean_list)):
        temp=normal_distribution(mean_list[i], std_list[i], int( (1 + random.sample([1, -1], 1)[0] * random.random() * SAMPLE_NUM_RANGE) * SAMPLE_NUM))
        data[str(i)]=temp

    if CLASS_NUM<=25:
        fig = plt.figure()
        ax = fig.add_subplot(projection='3d')
        for tag in data:
            ax.scatter(data[tag][:, 0].cpu(), data[tag][:, 1].cpu(), data[tag][:, 2].cpu(), label=tag)
        ax.set_xlabel('X')
        ax.set_ylabel('Y')
        ax.set_zlabel('Z')
        ax.legend()

train_pipe=[]
train_dict={}
test_dict={}
for tag in data:
    train, test = train_test_split([i for i in data[tag]], test_size=0.2)
    train_pipe.extend([(_,tag) for _ in train])
    train_dict[tag]=train
    test_dict[tag]=test

In [None]:
def batch_train(batch_size, shuffle=False, evaluate=True):
    if shuffle:
        pipe = random.sample(train_pipe, len(train_pipe))
    else:
        pipe = train_pipe.copy()
    
    for i in range(0, len(pipe)+batch_size, batch_size):
        if pipe[i:i+batch_size]:
            Model.batch_train(pipe[i:i+batch_size])
            if evaluate:
                if Model.iteration%int(SAMPLE_NUM*(1-SAMPLE_NUM_RANGE))==0:
                    predict_trainset(silent=True)
                    predict_testset(silent=True)

def continual_forward_baseline(shuffle=False, evaluate=True):
    if shuffle:
        pipe = random.sample(train_pipe, len(train_pipe))
    else:
        pipe = train_pipe.copy()
    for i in tqdm(pipe):
        Model.continual_forward_baseline(i[0],i[1])
        if evaluate:
            if Model.iteration%int(SAMPLE_NUM*(1-SAMPLE_NUM_RANGE))==0:
                predict_trainset(silent=True)
                predict_testset(silent=True)

def continual_train(shuffle=False, evaluate=True):
    if shuffle:
        pipe = random.sample(train_pipe, len(train_pipe))
    else:
        pipe = train_pipe.copy()
    for i in tqdm(pipe):
        Model.continual_forward(i[0],i[1])
        if evaluate:
            if Model.iteration%int(SAMPLE_NUM*(1-SAMPLE_NUM_RANGE))==0:
                predict_trainset(silent=True)
                predict_testset(silent=True)

def predict_trainset(silent=False):
    acc_list=[]
    for key, value in train_dict.items():
        if Model.tag_dict.get(key):
            correct=0
            pred_tags_list=Model.predict(value)
            for pred_tags in pred_tags_list:
                if key in pred_tags:
                    correct+=1
            acc=correct/len(value)
            acc_dict[key+"_train"].append([Model.iteration, acc])
            acc_list.append(acc)
        else:
            acc_list.append(0)
    
    average_acc=sum(acc_list)/len(acc_list)
    acc_dict["average_train"].append([Model.iteration, average_acc])
    if not silent:
        print(average_acc)

def predict_testset(silent=False):
    acc_list=[]
    for key, value in test_dict.items():
        if Model.tag_dict.get(key):
            correct=0
            pred_tags_list=Model.predict(value)
            for pred_tags in pred_tags_list:
                if key in pred_tags:
                    correct+=1
            acc=correct/len(value)
            acc_dict[key+"_test"].append([Model.iteration, acc])
            acc_list.append(acc)
        else:
            acc_list.append(0)
    
    average_acc=sum(acc_list)/len(acc_list)
    acc_dict["average_test"].append([Model.iteration, average_acc])
    if not silent:
        print(average_acc)

def predict_train_and_paint():
    predict_result={}
    for key, value in tqdm(train_dict.items()):
        pred_tags_list=Model.predict(value)
        for pred_tags, sample in zip(pred_tags_list, value):
            pred_tag=pred_tags[0]
            if not predict_result.get(pred_tag):
                predict_result[pred_tag]=[sample]
            else:
                predict_result[pred_tag].append(sample)
    predict_result=dict(sorted(predict_result.items(), key=lambda x:x[0]))
    if INPUT_DIM==2:
        for tag in predict_result:
            t=torch.tensor([i.tolist() for i in predict_result[tag]])
            plt.scatter(t[:, 0], t[:, 1], label=tag)
        # plt.legend()
        plt.title("TRAIN")
    elif INPUT_DIM==3:
        fig = plt.figure()
        ax = fig.add_subplot(projection='3d')
        for tag in predict_result:
            t=torch.tensor([i.tolist() for i in predict_result[tag]])
            ax.scatter(t[:, 0].cpu(), t[:, 1].cpu(), t[:, 2].cpu(), label=tag)
        ax.set_xlabel('X')
        ax.set_ylabel('Y')
        ax.set_zlabel('Z')
        # ax.legend()
        plt.title("TRAIN")

def predict_test_and_paint():
    predict_result={}
    for key, value in tqdm(test_dict.items()):
        pred_tags_list=Model.predict(value)
        for pred_tags, sample in zip(pred_tags_list, value):
            pred_tag=pred_tags[0]
            if not predict_result.get(pred_tag):
                predict_result[pred_tag]=[sample]
            else:
                predict_result[pred_tag].append(sample)
    predict_result=dict(sorted(predict_result.items(), key=lambda x:x[0]))
    if INPUT_DIM==2:
        for tag in predict_result:
            t=torch.tensor([i.tolist() for i in predict_result[tag]])
            plt.scatter(t[:, 0], t[:, 1], label=tag)
        # plt.legend()
        plt.title("TEST")
    elif INPUT_DIM==3:
        fig = plt.figure()
        ax = fig.add_subplot(projection='3d')
        for tag in predict_result:
            t=torch.tensor([i.tolist() for i in predict_result[tag]])
            ax.scatter(t[:, 0].cpu(), t[:, 1].cpu(), t[:, 2].cpu(), label=tag)
        ax.set_xlabel('X')
        ax.set_ylabel('Y')
        ax.set_zlabel('Z')
        # ax.legend()
        plt.title("TEST")

def plot_loss():
    plt.figure()
    keys=[]
    for key in Model.loss_dict.keys():
        if Model.loss_dict[key]:
            t=torch.tensor(Model.loss_dict[key])
            plt.plot(t[:,0], t[:,1])
            keys.append(key)
    plt.legend(keys)
    plt.show()

def plot_acc():
    plt.figure(figsize=(16,8))
    ax=plt.subplot(1,2,1)
    for key in [i for i in acc_dict.keys() if "average" not in i and "train" in i]:
        t=torch.tensor(acc_dict[key])
        ax.plot(t[:,0], t[:,1], label=key[:-6])
    ax.set_ylim(top=1.1, bottom=-0.1)
    ax.set_title("Train ACC")
    ax.legend()

    ax=plt.subplot(1,2,2)
    for key in [i for i in acc_dict.keys() if "average" not in i and "test" in i]:
        t=torch.tensor(acc_dict[key])
        ax.plot(t[:,0], t[:,1], label=key[:-5])
    ax.set_ylim(top=1.1, bottom=-0.1)
    ax.set_title("Test ACC")
    ax.legend()
    
    plt.show()

def initialize():
    global acc_dict
    acc_dict={}
    for tag in train_dict:
        if acc_dict.get(tag)==None:
            acc_dict[tag+"_test"]=[]
            acc_dict[tag+"_train"]=[]
    acc_dict["average_train"]=[]
    acc_dict["average_test"]=[]

def reserve_acc(suffix):
    for key in acc_dict.keys():
        if "average" in key:
            acc_reservation[key+"_"+suffix]=acc_dict[key].copy()


In [None]:
INNER_DIM=100

class Classifier(torch.nn.Module):
    def __init__(self):
        super(Classifier, self).__init__()
        self.fc = torch.nn.Linear(INPUT_DIM, INNER_DIM)
        init_network(self)

    def forward(self, feature_vecs):
        out = self.fc(feature_vecs)
        return out

class Generator(torch.nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.fc = torch.nn.Linear(INNER_DIM, INPUT_DIM)
        init_network(self)

    def forward(self, tag_vecs):
        out = self.fc(tag_vecs)
        return out

class EmptyModel(torch.nn.Module):
    def __init__(self, *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
    
    def forward(self, t):
        t=t.reshape(1,-1)
        return t

In [None]:
SHUFFLE=False
acc_reservation={}
SEED=random.randint(1, 999999)
SEED

In [None]:
from ContinualLearning import BaseModel
set_seed(SEED)
Model=BaseModel(
    EmptyModel,
    Classifier,
    0.001
)

initialize()
for i in tqdm(range(len(train_pipe))):
    batch_train(len(train_pipe), shuffle=SHUFFLE)
plot_loss()
plot_acc()
predict_trainset()
predict_testset()
reserve_acc("batch")

In [None]:
from ContinualLearning import BaseModel
set_seed(SEED)
Model=BaseModel(
    EmptyModel,
    Classifier,
    0.001
)

initialize()
continual_forward_baseline(shuffle=SHUFFLE)
predict_trainset()
predict_testset()
plot_loss()
plot_acc()
reserve_acc("baseline")

In [None]:
from ContinualLearning import ContinualLearningModel_Generate
set_seed(SEED)
Model=ContinualLearningModel_Generate(
    EmptyModel,
    Classifier,
    0.001, 99, Generator, 0.01
)

initialize()
continual_train(shuffle=SHUFFLE)
predict_trainset()
predict_testset()
plot_loss()
plot_acc()
reserve_acc("generate")

In [None]:
from ContinualLearning import ContinualLearningModel_Store
set_seed(SEED)
Model=ContinualLearningModel_Store(
    EmptyModel,
    Classifier,
    0.001, 5
)

initialize()
continual_train(shuffle=SHUFFLE)
predict_trainset()
predict_testset()
plot_loss()
plot_acc()
reserve_acc("store")

In [None]:
plt.figure(figsize=(16,8))
ax=plt.subplot(1,2,1)
markers=["H", "s", "p","*"]
colors=["#00A2FF", "#252500", "#FF0000", "#00FF11"]
top=0
bottom=1
for key in [i for i in acc_reservation.keys() if "train" in i]:
    t=torch.tensor(acc_reservation[key])
    bottom=min(torch.min(t[:, 1]).tolist(), bottom)
    top=max(torch.max(t[:, 1]).tolist(), top)
    ax.plot(t[:,0], t[:,1], label=key, marker=markers[0], markersize=10, color=colors[0])
    markers.pop(0)
    colors.pop(0)
ax.set_ylim(top=top+0.1, bottom=bottom-0.1)
ax.set_title("Total Train ACC")
ax.legend()

ax=plt.subplot(1,2,2)
markers=["H", "s", "p","*"]
colors=["#00A2FF", "#252500", "#FF0000", "#00FF11"]
top=0
bottom=1
for key in [i for i in acc_reservation.keys() if "test" in i]:
    t=torch.tensor(acc_reservation[key])
    bottom=min(torch.min(t[:, 1]).tolist(), bottom)
    top=max(torch.max(t[:, 1]).tolist(), top)
    ax.plot(t[:,0], t[:,1], label=key, marker=markers[0], markersize=10, color=colors[0])
    markers.pop(0)
    colors.pop(0)
ax.set_ylim(top=top+0.1, bottom=bottom-0.1)
ax.set_title("Total Test ACC")
ax.legend()

plt.show()

In [None]:
predict_train_and_paint()

In [None]:
predict_test_and_paint()