In [1]:
%matplotlib inline
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import sys, os
from pickle import dump, load

from tensorflow.keras.models import load_model, Sequential, Model
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.layers import Dense, Conv2D, Flatten, AveragePooling2D, Input, Reshape
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.applications import *
from tensorflow.keras.optimizers import *

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, MinMaxScaler, StandardScaler
from sklearn.metrics import precision_score, recall_score, f1_score

import time
import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings('ignore')

num_rows, num_columns, num_channels = 40, 1292, 1
CLASS_COUNT = 23
PATIENCE = 100
scaler = MinMaxScaler()

In [None]:
model = VGG16()
model.summary()

In [None]:
li = ['2.5톤', '5톤', '9.5톤', '10톤', '27톤',
      'K-1', 'K1a1', 'k10탄약운반차', 'K56', 'K77',
      'K288a1', 'K200', 'K800', 'km9ace', '교량전차',
      '다목적굴착기', '대형버스', '부식차', '살수차', '승용차',
      '장애물개척자', '통신가설차량', '화생방정찰차'] # 중분류, 소분류

def Class_ins(t):
      if t in (5, 6):
            return f'중분류 : 전차 소분류 : {li[t]}'
      elif t in (7, 8, 9, 10, 11, 12, 13, 14, 20, 22):
            return f'중분류 : 궤도/장갑차 소분류 : {li[t]}'
      elif t in (0, 1, 2, 3, 4, 15, 16, 17, 18, 19, 21):
            return f'중분류 : 차륜 전투차량 소분류 : {li[t]}'

In [None]:
# 로스, 정확도 그래프 출력
def plot_loss_accuracy(history):
        acc = history.history['accuracy']
        val_acc = history.history['val_accuracy']
        loss = history.history['loss']
        val_loss = history.history['val_loss']
        epochs = range(len(acc))

        plt.figure(1)
        plt.plot(epochs, acc, 'b', label='Training accuracy')
        plt.plot(epochs, val_acc, 'r', label='Validation accuracy')
        plt.title('Training and validation accuracy')
        plt.legend()
        plt.figure(2)
        plt.plot(epochs, loss, 'b', label='Training Loss')
        plt.plot(epochs, val_loss, 'r', label='Validation Loss')
        plt.title('Training and validation loss')
        plt.legend()
        plt.show()


# 모델 셋팅
def setting(model, x_train, y_train, x_val, y_val, BATCH_SIZE, EPOCHS):
        checkpoint=ModelCheckpoint(filepath='../Saved_Model/' + sys._getframe(1).f_code.co_name + '_Weight_best.hdf5', monitor="val_loss",
                                verbose=1, save_best_only=True)

        ealrystopping=EarlyStopping(monitor="val_loss", patience=PATIENCE)

        model.summary()

        history = model.fit(x_train, y_train, batch_size=BATCH_SIZE, epochs=EPOCHS, validation_data=(x_val, y_val),
                        callbacks=[checkpoint, ealrystopping], verbose=1)
        
        plot_loss_accuracy(history)
        
        
def result(x_test, y_test, model_name):
    model = load_model('../Saved_Model/' + model_name + '_Weight_best.hdf5')
    
    if model_name in ('SVM', 'Logistic'):
        x_test = x_test.reshape(x_test.shape[0], num_rows * num_columns)
    else:
        x_test = x_test.reshape(x_test.shape[0], num_rows, num_columns, num_channels)
    
    data = {'accuracy':[], 'f1':[], 'test_time':[]}
    
    start = time.time()
    data['accuracy'].append(round(model.evaluate(x_test, y_test, verbose=0)[1] * 100, 2))
    test_time = time.time() - start
    
    result = model.predict(x_test)
    
    y_pred = np.array([np.argmax(result[i]) for i in range(result.shape[0])])
    y_test = np.argmax(y_test, axis=1)
    
    data['f1'].append(round(f1_score(y_test, y_pred, average='weighted') * 100, 2))
    data['test_time'].append(round(test_time, 2))
    
    df = pd.DataFrame(data)
    df.to_csv('../L_Result/' + model_name + '_result.csv', index=False)  # csv 파일 생성

    print(df)


In [None]:
# 모델 구조 정의
def ResNet(x_train, y_train, x_val, y_val, BATCH_SIZE, EPOCHS, lr):
        EPOCHS = EPOCHS
        BATCH_SIZE = BATCH_SIZE
        
        x_train = x_train.reshape(x_train.shape[0], num_rows, num_columns, num_channels)
        x_val = x_val.reshape(x_val.shape[0], num_rows, num_columns, num_channels)
        
        model = ResNet50(weights=None, input_shape=(num_rows, num_columns, num_channels), include_top=False)
        output = model.output
        
        flat = Flatten()(output)
        fc1 = Dense(1024, activation='relu')(flat)
        fc2 = Dense(512, activation='relu')(fc1)
        output = Dense(CLASS_COUNT, activation='softmax')(fc2)
                
        model = Model(inputs=model.input, outputs=output)
        
        model.compile(loss="categorical_crossentropy",
                metrics=["accuracy"], optimizer=Adam(learning_rate=lr))

        return setting(model, x_train, y_train, x_val, y_val, BATCH_SIZE, EPOCHS)


def VGG(x_train, y_train, x_val, y_val, BATCH_SIZE, EPOCHS, lr):
        EPOCHS = EPOCHS
        BATCH_SIZE = BATCH_SIZE
        
        x_train = x_train.reshape(x_train.shape[0], num_rows, num_columns, num_channels)
        x_val = x_val.reshape(x_val.shape[0], num_rows, num_columns, num_channels)
        
        model = VGG16(weights=None, input_shape=(num_rows, num_columns, num_channels), include_top=False)
        output = model.output
        
        flat = Flatten()(output)
        fc1 = Dense(4096, activation='relu')(flat)
        fc2 = Dense(2048, activation='relu')(fc1)
        fc3 = Dense(1024, activation='relu')(fc2)
        output = Dense(CLASS_COUNT, activation='softmax')(fc3)
                
        model = Model(inputs=model.input, outputs=output)
        
        model.compile(loss="categorical_crossentropy",
                metrics=["accuracy"], optimizer=Adam(learning_rate=lr)) # 0.0001

        return setting(model, x_train, y_train, x_val, y_val, BATCH_SIZE, EPOCHS)


def LeNet(x_train, y_train, x_val, y_val, BATCH_SIZE, EPOCHS, lr):
        EPOCHS = EPOCHS
        BATCH_SIZE = BATCH_SIZE
        
        x_train = x_train.reshape(x_train.shape[0], num_rows, num_columns, num_channels)
        x_val = x_val.reshape(x_val.shape[0], num_rows, num_columns, num_channels)
        
        inputs = Input(shape=(num_rows, num_columns, num_channels))
        conv1 = Conv2D(6, kernel_size=(5,5), strides=(1,1), padding="same", activation='relu')(inputs)
        avgpoll1 = AveragePooling2D(pool_size=(2,2), strides=(2,2), padding="valid")(conv1)
        conv2 = Conv2D(16, kernel_size=(5,5), strides=(1,1), padding="same", activation='relu')(avgpoll1)
        avgpoll2 = AveragePooling2D(pool_size=(2,2), strides=(2,2), padding="valid")(conv2)
        flat = Flatten()(avgpoll2)
        fc1 = Dense(120, activation='relu')(flat)
        fc2 = Dense(84, activation='relu')(fc1)
        outputs = Dense(CLASS_COUNT, activation='softmax')(fc2)

        model = Model(inputs, outputs)
        
        model.compile(loss="categorical_crossentropy",
                metrics=["accuracy"], optimizer=Adam(learning_rate=lr))

        return setting(model, x_train, y_train, x_val, y_val, BATCH_SIZE, EPOCHS)


def SVM(x_train, y_train, x_val, y_val, BATCH_SIZE, EPOCHS, lr):
        EPOCHS = EPOCHS
        BATCH_SIZE = BATCH_SIZE
        
        x_train = x_train.reshape(x_train.shape[0], num_rows * num_columns)
        x_val = x_val.reshape(x_val.shape[0], num_rows * num_columns)
        
        inputs = Input(shape=(num_rows * num_columns,))
        outputs = Dense(CLASS_COUNT, activation=None)(inputs)

        model = Model(inputs, outputs)
        
        model.compile(loss="CategoricalHinge",
                metrics=["accuracy"], optimizer=Adam(learning_rate=lr)) # 5e-4

        return setting(model, x_train, y_train, x_val, y_val, BATCH_SIZE, EPOCHS)


def Logistic(x_train, y_train, x_val, y_val, BATCH_SIZE, EPOCHS, lr):
        EPOCHS = EPOCHS
        BATCH_SIZE = BATCH_SIZE
        
        x_train = x_train.reshape(x_train.shape[0], num_rows * num_columns)
        x_val = x_val.reshape(x_val.shape[0], num_rows * num_columns)
        
        inputs = Input(shape=(num_rows * num_columns,))
        outputs = Dense(CLASS_COUNT, activation='softmax')(inputs)

        model = Model(inputs, outputs)
        
        model.compile(loss="categorical_crossentropy",
                metrics=["accuracy"], optimizer=Adam(learning_rate=lr)) # 1e-3

        return setting(model, x_train, y_train, x_val, y_val, BATCH_SIZE, EPOCHS)

In [None]:
tag, scale = 'mfcc', 'test'
features_df = pd.read_json('../Processed_Data/data_' + tag + '_' + scale + '.json')

x = np.array(features_df.feature.tolist())
y = np.array(features_df.class_label.tolist())

le = LabelEncoder()
y = to_categorical(le.fit_transform(y))

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=1)
x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.25, random_state=1)

scaler.fit(x_train.reshape(x_train.shape[0], -1))
x_train = scaler.transform(x_train.reshape(x_train.shape[0], -1))
dump(scaler, open('../Saved_Scale/minmax_scaler.pkl', 'wb'))
x_val = scaler.transform(x_val.reshape(x_val.shape[0], -1))
x_test = scaler.transform(x_test.reshape(x_test.shape[0], -1))

x_train = x_train.reshape(x_train.shape[0], num_rows, num_columns)
x_val = x_val.reshape(x_val.shape[0], num_rows, num_columns)
x_test = x_test.reshape(x_test.shape[0], num_rows, num_columns)

print(x_train.shape, y_train.shape)
print(x_val.shape, y_val.shape)
print(x_test.shape, y_test.shape)

In [None]:
# scaler = load(open('../Saved_Scale/minmax_scaler.pkl', 'rb'))
# Logistic(x_train, y_train, x_val, y_val, BATCH_SIZE=512, EPOCHS=300, lr=1e-4)
# result(x_test, y_test, 'Logistic')

SVM(x_train, y_train, x_val, y_val, BATCH_SIZE=512, EPOCHS=100, lr=5e-4)
result(x_test, y_test, 'SVM')

# LeNet(x_train, y_train, x_val, y_val, BATCH_SIZE=512, EPOCHS=300, lr=1e-3)
# result(x_test, y_test, 'LeNet')