-
Notifications
You must be signed in to change notification settings - Fork 0
/
Train_imbalanced_mnist_multi_class.py
150 lines (118 loc) · 4.99 KB
/
Train_imbalanced_mnist_multi_class.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
# @Time : 4/12/23 3:35 PM
# @Author : Zhou-Lin-yong
# @File : Train_imbalanced_mnist_multi_class.py
# @SoftWare: PyCharm
import time, Model, math
import numpy as np
import torch
import torch.nn as nn
from torch.autograd import Variable
from sklearn.metrics import confusion_matrix
import torch.nn.functional as F
from sklearn import metrics
# from sklearn.metrics import precision_score, f1_score, recall_score
# from sklearn.metrics import roc_curve, precision_recall_curve
from collections import Counter
import Loss_Function
import os
batch_size = 100
num_class = 10
minority_class = 'former_5' # former_5 or even_class
if minority_class == 'former_5':
majority_class = [5, 6, 7, 8, 9]
elif minority_class == 'even_class':
majority_class = [1, 3, 5, 7, 9]
else:
print('error minority_class!')
print('minority_class:', minority_class)
print('majority_class:', majority_class)
data_num = '3000'
abs_path = './create_imbalaced_mnist/' + minority_class
abs_path_train = abs_path + '/50_' + data_num
train_data_path = abs_path_train + '/imbalanced_x.npy'
train_label_path = abs_path_train + '/imbalanced_y.npy'
test_data_path = abs_path + '/eval_x.npy'
test_label_path = abs_path + '/eval_y.npy'
# Loss_fun = "CE"
# Loss_fun = "FL"
# Loss_fun = "ASL"
Loss_fun = 'GPPE'
train_times = 1
if Loss_fun == 'CE':
Loss = nn.CrossEntropyLoss().cuda()
save_confu_path = abs_path + '/50_' + data_num + '/result/result_CE/' + str(train_times) + '/'
elif Loss_fun == 'FL':
Loss = Loss_Function.Focal_Loss().cuda()
save_confu_path = abs_path + '/50_' + data_num + '/result/result_FL/' + str(train_times) + '/'
elif Loss_fun == 'ASL':
Loss = Loss_Function.ASLSingleLabel().cuda()
save_confu_path = abs_path + '/50_' + data_num + '/result/result_ASL/' + str(train_times) + '/'
else:
Loss = Loss_Function.GPPE_Multi_Class().cuda()
save_confu_path = abs_path + '/50_' + data_num + '/result/result_GPPE/' + str(train_times) + '/'
print('Loss:', Loss)
print('save_confu_path:', save_confu_path)
train_data = np.load(train_data_path)
train_label = np.load(train_label_path)
print(Counter(train_label))
test_data = np.load(test_data_path)
test_label = np.load(test_label_path)
print(Counter(test_label))
# shuffle
ssl_data_seed = 1
rng_data = np.random.RandomState(ssl_data_seed)
train_inds = rng_data.permutation(train_data.shape[0])
train_data = train_data[train_inds]
train_label = train_label[train_inds]
test_inds = rng_data.permutation(test_data.shape[0])
test_data = test_data[test_inds]
test_label = test_label[test_inds]
train_data = train_data.reshape((train_data.shape[0], 1, 28, 28))
test_data = test_data.reshape((test_data.shape[0], 1, 28, 28))
print(train_data.shape)
num_batch_train = math.ceil(train_data.shape[0] / batch_size)
test_num_bathces = math.ceil(test_data.shape[0] / batch_size)
print(test_num_bathces)
device = torch.device('cuda')
model = Model.Model_Mnist(num_class=num_class).to(device=device)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001, betas=(0.5, 0.999))
maj_min_express = [0 if train_label[i] in majority_class else 1 for i in range(len(train_label))]
maj_min_express = Variable(torch.from_numpy(np.array(maj_min_express)).long()).cuda()
maj_min_express = maj_min_express.view(-1, 1)
if __name__ == "__main__":
epochs = 100
for epoch in range(epochs):
print('epoch:', epoch)
t1 = time.time()
for iteration in range(num_batch_train):
from_l_c = iteration * batch_size
to_l_c = (iteration + 1) * batch_size
data = train_data[from_l_c:to_l_c]
label = train_label[from_l_c:to_l_c]
maj_min_express_ = maj_min_express[from_l_c:to_l_c]
data = Variable(torch.from_numpy(data).float()).cuda()
label = Variable(torch.from_numpy(label).long()).cuda()
optimizer.zero_grad()
output_label = model(data)
if Loss_fun == 'GPPE':
loss = Loss(output_label, label, maj_min_express_)
else:
loss = Loss(output_label, label)
loss.backward()
optimizer.step()
print('Train_time:', time.time() - t1)
te_data = Variable(torch.from_numpy(test_data).float()).cuda()
te_label = test_label
output = model(te_data)
predict_probility = F.softmax(output, dim=1)
pi_1_cpu = predict_probility.data.cpu().numpy()
predict_label = torch.max(output, 1)[1].data
predict_label_cpu = np.int32(predict_label.cpu().numpy())
Acc = metrics.accuracy_score(te_label, predict_label.data.cpu().numpy())
Confu_matir = confusion_matrix(te_label, predict_label_cpu, labels=[0,1,2,3,4,5,6,7,8,9])
print('ACC_all:', Acc)
if epoch > 90:
np.save(save_confu_path + 'Confu_matir_' + str(epoch) + '.npy', Confu_matir)
np.save(save_confu_path + 'predicted_probility_' + str(epoch) + '.npy', pi_1_cpu)
np.save(save_confu_path + 'target_' + str(epoch) + '.npy', te_label)
print(Confu_matir)