In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix
import torch
import torch.nn as nn
from itertools import combinations

In [None]:
class Classification_Process:
    
    def __init__(self, angle=90, neuro=10, early=True, feat_train=1, epoch=1500, max_zap=20):
        self.angle = angle
        self.per_train = 10 # процент обучающей выборки 
        self.count_image_for_day = 2 # кол-во рассматриваемых изображений для каждого дня (такое же, как в прошлом ноутбуке)
        self.neuro = neuro # кол-во нейронов на скрытом слое
        self.num_img_day = 10 # кол-во частей, разиения изображений в предыдущем ноутбуке (параметр n_split)
        self.early = early # При True рассматриваются дни 1-12, при False рассматриваются 1-25
        self.lim_day = 12 if early else 25 # крайний день эксперимента
        self.days = [1, 6, 8, 12, 19, 25] # номера используемых дней
        self.feat_train = feat_train # кол-во используемых в обучении признаков
        self.name_indeces = ['TIR', 'NDVI', 'GNDVI', 'BLUE', 'GCL', 'SIPI']
        self.name_features = ['Max', 'Min', 'Max - Min', 'Mean', 'Std']
        self.epoch = epoch # кол-во эпох
        self.max_zap = max_zap # кол-во запусков сети
        
        self.lst_feat_name = [indx + '_' + f for indx in self.name_indeces 
                              for f in self.name_features] # Имена вида "индекс_признак"
        
        # Сочетания пар признаков внутри TIR(ВИ)
        self.combination_indeces = [] # индексы столбцов, соотетствующие парам признаков
        self.combination_names = [] # имена пар признаков
        for i in range(len(self.name_indeces)):
            self.combination_indeces += list(combinations(range(len(self.name_indeces) * 
                                                                len(self.name_features))[i * len(self.name_features) : i * 
                                                                                         len(self.name_features) 
                                                                                         + len(self.name_features)], 2)) 
            self.combination_names += list(combinations(self.lst_feat_name[i * len(self.name_features) : i * 
                                                                           len(self.name_features) 
                                                                           + len(self.name_features)], 2))
            
        self.fold_1 = f'Обучение по {self.feat_train} признак{"у" if self.feat_train == 1 else "ам"}'
        
    def Day_to_Classes(self, arr):
        # Перевод дней в номера классов
        
        res = arr.copy()
        for i, day in enumerate(self.days):
            res[res == day] = i
        return res
    
    def MMNorm(self, arr):
        # Нормировка некоторых столбцов двумерного массива 
        
        res = arr.copy()
        for col in range(arr.shape[1]):
            cur = res[:, col]
            if len(cur[cur > 1]) > 0:
                cur -= cur.min()
                cur /= (cur.max() - cur.min())
        
        return res
        
        
        
    def Load(self):
        # Загрузка стат. признаков, разделение на тренироочную и тестовую выборки
        
        table = [pd.read_excel(f'Table/{self.angle}/{indx}_Stat.xlsx')
         .reindex(columns=['Day'] + self.name_features if indx == 'TIR' else self.name_features) for indx in self.name_indeces]
        table = pd.concat(table, axis = 1).to_numpy()
        table[:, 1:] = self.MMNorm(table[:, 1:]) # нормировка
        train = table[range(0, table.shape[0], self.num_img_day), :]
        test = table[list(set(range(table.shape[0])) - set(range(0, table.shape[0], self.num_img_day))), :]
        
        self.x_train, self.x_test = train[:, 1:], test[:, 1:]
        self.y_train, self.y_test = train[:, 0].astype('int32'), test[:, 0].astype('int32')
        
        if self.early:
            # Если рассматриваются только ранние дни, то нужны объекты не позднее 12 дня
            self.x_train = self.x_train[self.y_train <= self.lim_day, :]
            self.x_test = self.x_test[self.y_test <= self.lim_day, :]
            
            self.y_train = self.y_train[self.y_train <= self.lim_day]
            self.y_test = self.y_test[self.y_test <= self.lim_day]
            
            self.days = [d for d in self.days if d <= self.lim_day]
            
        # Перевод дней в номера  классов
        self.y_train, self.y_test = self.Day_to_Classes(self.y_train), self.Day_to_Classes(self.y_test)
        
        # Кол-во классов (4 или 6)
        self.n_classes = len(np.unique(self.y_train))
        
        print(f'x_train: {self.x_train.shape}, y_train: {self.y_train.shape}, x_test: {self.x_test.shape}, y_test: {self.y_test.shape} \n')
    
    def SLP(self, num_in):
        # Создание SLP модели
        # num_in - размер входного слоя
        
        one = torch.nn.Linear(num_in, self.neuro)
        nn.init.kaiming_normal_(one.weight, mode='fan_in', nonlinearity='relu')
        
        two = torch.nn.Linear(self.neuro, self.n_classes)
        nn.init.xavier_normal_(two.weight)
        
        model = torch.nn.Sequential(one, torch.nn.BatchNorm1d(self.neuro), torch.nn.ReLU(), two)
        criterion = nn.CrossEntropyLoss(weight=torch.from_numpy(np.zeros(self.n_classes) + 0.5).float())
        optimizer = torch.optim.Adam(model.parameters(), lr=5e-3)
        
        return model, criterion, optimizer
    
    def CM(self, cm, cm_name, save_cm=False):
        # Визуализация матрицы ошибок
        # cm - матрица ошибок 
        # save_cm - сохранять ли матрицы ошибок
        
        days_str = [f'D-{d if d >= 10 else f"0{d}"}' for d in self.days]
        
        
        fig, ax = plt.subplots(figsize = (4, 3) if self.early else (5, 4))
        cm_plot = sns.heatmap(cm, annot=True, vmin=0, vmax=1, 
                                  xticklabels=days_str, yticklabels=days_str, 
                                  cmap=sns.color_palette("light:gray", as_cmap=True))
        ax.set_xlabel('Predicted label', fontsize = 13)
        ax.set_ylabel('True label', fontsize = 13)
        
        num_name, name = cm_name.split('.')
        ax.set_title(f'{name}', fontsize = 15)
        plt.show()
        
        if save_cm:
            # Сохранение матрицы ошибок

            folders = [self.fold_1,
                       'Матрицы ошибок', f'{self.angle} градусов', f'1-{self.lim_day} дни', ['Визуально', 'Таблицы']]
            path_img = '/'.join(folders[:4] + [folders[4][0]])
            path_table = '/'.join(folders[:4] + [folders[4][1]])
            Create_Folder(folders[0])
            Create_Folder('/'.join(folders[:2]))
            Create_Folder('/'.join(folders[:3]))
            Create_Folder('/'.join(folders[:4]))
            Create_Folder(path_img)
            Create_Folder(path_table)
            
            # Сохранение в виде изображений
            fig = cm_plot.get_figure()
            fig.savefig(f'{path_img}/{cm_name}.png', bbox_inches='tight')
            
            # Сохранение в виде таблиц
            pd.DataFrame(cm, index=self.days, columns=self.days).to_excel(f'{path_table}/{cm_name}.xlsx')
        
        
    
    def Main_Process(self, save_flag=False, save_cm=False):
        # Процесс обучения сети
        # save_flag - сохранять ли таблицу точности
        # save_cm - сохранять ли матрицы ошибок
        
        if self.feat_train == 1:
            table = np.zeros((self.x_train.shape[1], 1)) # таблица точности 30 на 1
            len_i = self.x_train.shape[1]
            
        elif self.feat_train == 2:
            table = np.zeros((self.x_train.shape[1], self.x_train.shape[1])) # таблица точности 30 на 30
            len_i = len(self.combination_indeces)
            
        elif self.feat_train == 5:
            table = np.zeros((len(self.name_indeces), 1)) # таблица точности 6 на 1
            len_i = len(self.name_indeces)
            
        elif self.feat_train in (25, 30):
            len_i = 1

        multiplier = self.count_image_for_day * self.num_img_day 
        obj_n = multiplier - multiplier / self.per_train # 18
            
        # Определение тензоров
        x_train_torch = torch.from_numpy(self.x_train).float() 
        x_test_torch = torch.from_numpy(self.x_test).float()
            
        y_train_torch = torch.from_numpy(self.y_train).long()
        y_test_torch = torch.from_numpy(self.y_test).long()
            
            
        # Цикл по столбцам (признакам)
        for i in range(len_i):
            if self.feat_train == 1:
                x_train_torch_i, x_test_torch_i  = x_train_torch[:, i:i+1], x_test_torch[:, i:i+1] # выбор одного признака
            elif self.feat_train == 2:
                cur_indx = self.combination_indeces[i] # текущая пара индексов
                x_train_torch_i, x_test_torch_i = x_train_torch[:, cur_indx], x_test_torch[:, cur_indx] # выбор пары признаков
            elif self.feat_train == 5:
                x_train_torch_i = x_train_torch[:, len(self.name_features) * i : len(self.name_features) * i + 
                                                len(self.name_features)]
                x_test_torch_i = x_test_torch[:, len(self.name_features) * i : len(self.name_features) * i + 
                                              len(self.name_features)]
            elif self.feat_train == 25:
                x_train_torch_i, x_test_torch_i  = x_train_torch[:, 5:], x_test_torch[:, 5:]
            elif self.feat_train == 30:
                x_train_torch_i, x_test_torch_i  = x_train_torch[:, :], x_test_torch[:, :]
                
            ACC = [] # Тестовая точность для каждого запуска сети
            cm_cur = np.zeros((self.n_classes, self.n_classes)) # матрица ошибок 
            for zap in range(self.max_zap):
                res_arr = {'epoch' : 0, 'acc_train' : 0, 'acc_test' : 0, 'res' : 0}
                model, criterion, optimizer = self.SLP(num_in=x_train_torch_i.shape[1]) # Определение модели
                    
                # Обучение модели
                for epoch in range(self.epoch):
                    output = model(x_train_torch_i)
                    loss = criterion(output, y_train_torch)
                        
                    model.eval()
                    test_prob = model(x_test_torch_i) 
                    cur_pred = np.argmax(test_prob.detach().numpy(), axis=-1) # предсказание тестовых классов
                    acc_test = np.sum(cur_pred == self.y_test) / len(self.y_test) # тестовая точность
            
                    acc_train = np.sum(np.argmax(model(x_train_torch_i)
                                                     .detach().numpy(), axis=-1) == self.y_train) / len(self.y_train)
                
                    # если точность высока, то сохраняем 
                    if res_arr['acc_test'] < acc_test or (res_arr['acc_test'] <= 
                                                            acc_test and res_arr['acc_train'] < acc_train): 
                        res_arr['acc_test'] = acc_test
                        res_arr['acc_train'] = acc_train
                        res_arr['epoch'] = epoch
                        res_arr['res'] = cur_pred 
                            
                    if res_arr['acc_test'] == 1.0:
                        break
                            
                    model.train()
                        
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()
                        
                ACC.append(res_arr['acc_test']) # Тестовая точность для каждого запуска сети
                cm_cur += confusion_matrix(self.y_test, res_arr['res']) # Матрица ошибок
                    
                print(f'Запуск {zap + 1} / {self.max_zap}, Лучшая точность {res_arr["acc_test"]} достигнута на эпохе {res_arr["epoch"]}')
                
            if self.feat_train == 1:
                print('-' * 120 + f'\n {self.lst_feat_name[i]}')
                cm_name = f'{i + 1}.' + " ".join(self.lst_feat_name[i].split("_")) # имя для матрицы ошибок
                table[i, 0] = np.around(np.mean(ACC), 4) # Добавление информации в таблицу точности
                
            elif self.feat_train == 2:
                print('-' * 120 + f'\n {self.combination_names[i]}')
                cm_name = f'{i + 1}.' + ', '.join(list(self.combination_names[i])) # имя для матрицы ошибок
                table[cur_indx[0], cur_indx[1]] = np.around(np.mean(ACC), 4) # Добавление информации в таблицу точности
                
            elif self.feat_train == 5:
                print('-' * 120 + f'\n {self.name_indeces[i]}')
                cm_name = f'{i + 1}.{self.name_indeces[i]} по {self.feat_train} признакам' # имя для матрицы ошибок
                table[i, 0] = np.around(np.mean(ACC), 4) # Добавление информации в таблицу точности
            
            elif self.feat_train == 25:
                print('-' * 120 + f'\n Все признаки всех ВИ')
                cm_name = f'{i + 1}.Все признаки всех ВИ' # имя для матрицы ошибок
                
            elif self.feat_train == 30:
                print('-' * 120 + f'\n Все признаки всех ВИ + TIR')
                cm_name = f'{i + 1}.Все признаки всех ВИ + TIR' # имя для матрицы ошибок
                
            print(f'\n 1-{self.lim_day} days, Accuracy: {np.mean(ACC)} \n' + '-' * 120 + '\n')
                
            # Визуализация (и сохранение) матрицы ошибок 
            self.CM(cm=(cm_cur / (self.max_zap * obj_n)), cm_name=cm_name, save_cm=save_cm)
               
        if save_flag:
            # Сохранение таблицы точности

            folders = [self.fold_1, 
                       'Таблицы точности', f'{self.angle} градусов']
            out_path = '/'.join(folders)
            Create_Folder(folders[0])
            Create_Folder('/'.join(folders[:2]))
            Create_Folder(out_path)
            
            
                
            if self.feat_train == 1:
                index_pandas = self.lst_feat_name
                columns_pandas = [f'Accuracy 1-{self.lim_day}']
                
            elif self.feat_train == 2: 
                table[table == 0] = None
                index_pandas = self.lst_feat_name
                columns_pandas = self.lst_feat_name
                
            elif self.feat_train == 5: 
                index_pandas = self.name_indeces
                columns_pandas = [f'Accuracy 1-{self.lim_day} для 5 признаков']
                
            if self.feat_train in (1, 2, 5):
                pd.DataFrame(table, index=index_pandas, columns=columns_pandas)\
                .to_excel(f'{out_path}/Таблица точности для 1-{self.lim_day} дней.xlsx')