In [None]:
# 모델 경로 체크
# test example 경로 체크

## AI Hub에서 제안된 모델 사용 (EfficientNet)

In [None]:
import tensorflow as tf
from keras.utils import Sequence
import math
import numpy as np
import json
import cv2 as cv
import random
from datetime import datetime
import keras.applications as ka

In [None]:
IM_HEIGHT = 224
IM_WIDTH = 224


def preprocess_image(np_image, net_type='efficientnet'):
    assert net_type in ['mobilenet', 'resnet50', 'efficientnet'], 'network not in the list'

    m_image = tf.convert_to_tensor(np_image, dtype=tf.float32)

    if net_type == 'efficientnet':
        return m_image

def read_data_1_input(json_file):

    temp, hum, co2 = get_data(json_file)

    temp = div_func(temp)
    temp = normalize_resize_concat(temp, resize=True)

    hum = div_func(hum)
    hum = normalize_resize_concat(hum, resize=True)

    co2 = div_func(co2)
    co2 = normalize_resize_concat(co2, resize=True)

    return cv.merge((temp, hum, co2))


def get_data(annot_file):
    with open(annot_file, "r", encoding='UTF-8-SIG') as f:
        data = json.loads(f.read())

    temp = np.array(data["environment"]['in_temperature'], dtype='float')
    humidity = np.array(data["environment"]['in_humidity'], dtype='float')
    co2 = np.array(data["environment"]['in_carbon_monoxide'], dtype='float')

    return temp, humidity, co2


def div_func(x):
    x = x.squeeze()
    x1 = x[:189]
    for n in range(1, 189):
        x1 = np.vstack((x1, x[n*189:(n+1)*189]))
    return x1

def normalize_resize_concat(x, hist=False, resize=False, merge=False):

    x = cv.normalize(x, None, alpha=0, beta=255, norm_type=cv.NORM_MINMAX, dtype=cv.CV_32F)
    x = np.array(x, np.uint8)

    if resize:
        x = cv.resize(x, (IM_HEIGHT, IM_WIDTH), cv.INTER_AREA)

    if hist:
        x = cv.equalizeHist(x)

    if merge:
        x = cv.merge((x, x, x))

    return x

In [None]:
def make_model_efficientnet_1input():
    m1 = tf.keras.applications.EfficientNetB0(input_shape=(224, 224, 3), include_top=False, weights='imagenet')
    gap1 = tf.keras.layers.GlobalAveragePooling2D()(m1.output)
    l3 = tf.keras.layers.Dense(7, activation='softmax')(gap1)
    model = tf.keras.models.Model(inputs=m1.inputs, outputs=l3)
    return model

In [None]:
def get_prediction(model_file, raw_data):
    data = read_data_1_input(raw_data)
    data = preprocess_image(np_image=data, net_type='efficientnet')
    prediction = model_file.predict(np.expand_dims(data, axis=0), batch_size=1)
    return np.argmax(prediction)


# pretrained model 사용
model = make_model_efficientnet_1input()
# net_name = '/content/network.hdf5'
net_name = 'input model path'
model.load_weights(net_name)

In [None]:
# Example of data
# json_file = '/content/drive/MyDrive/제4회 AI교육 해커톤/Sample/time_series_data/성충/성충_응애/007/B_001_001_20230822083841_001_002_001_001.json'
json_file = 'input json file path'

m_prediction = get_prediction(model, json_file)

class_names = ["유충_정상", "유충_응애", "유충_석고병", "유충_부저병", "성충_정상", "성충_응애", "성충_날개불구바이러스감염증"]
print(f'Prediction is class {m_prediction}, => {class_names[m_prediction]}')

Prediction is class 5, => 성충_응애


In [None]:
!pip install torch --upgrade



## 시계열 특성 이용하여 LSTM training (from scratch)

In [None]:
import os
import json
import numpy as np
import torch

import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import MinMaxScaler

# LSTM 모델 정의
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(num_layers, x.size(0), hidden_size).to(device)
        c0 = torch.zeros(num_layers, x.size(0), hidden_size).to(device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])
        return out

# 데이터셋 클래스 정의
class BeeDataset(Dataset):
    def __init__(self, data_dir, target_classes, seq_length):
        self.data = []
        self.labels = []
        self.seq_length = seq_length
        self.scaler = MinMaxScaler()

        for target_class in target_classes:
            class_dir = os.path.join(data_dir, target_class)

            for folder in os.listdir(class_dir):
                folder_path = os.path.join(class_dir, folder)
                if os.path.isdir(folder_path):
                    # 폴더 내에서 .json 파일을 찾기
                    for file in os.listdir(folder_path):
                        if file.endswith(".json"):
                            json_file = os.path.join(folder_path, file)
                            with open(json_file, 'r', encoding='utf-8') as f:
                                json_data = json.load(f)
                                # 시계열 데이터 불러오기 (단 최근 100개만 사용)
                                in_temp = json_data['environment']['in_temperature'][-100:]
                                hum = json_data['environment']['in_humidity'][-100:]
                                co = json_data['environment']['in_carbon_monoxide'][-100:]

                                # feature로 사용할 데이터를 결합
                                features = np.array([in_temp, hum, co]).T
                                # 정규화
                                features = self.scaler.fit_transform(features)

                                # 시퀀스 길이에 맞게 슬라이딩 윈도우 방식으로 분할
                                for i in range(len(features) - seq_length):
                                    self.data.append(features[i:i + seq_length])
                                    self.labels.append(target_classes.index(target_class))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return torch.tensor(self.data[idx], dtype=torch.float32), torch.tensor(self.labels[idx], dtype=torch.long)

# 학습 파라미터 설정
input_size = 3  # in_temperature, in_humidity, in_carbon_monoxide
hidden_size = 64
num_layers = 2
output_size = 7  # 각 클래스의 수
seq_length = 10
batch_size = 32
learning_rate = 0.001
num_epochs = 100

# 데이터 로드
data_dir = '/content/drive/MyDrive/제4회 AI교육 해커톤/Sample/time_series_data/data'  # 데이터 폴더 경로
target_classes = ['유충_정상', '유충_응애', '유충_석고병', '유충_부저병', '성충_정상', '성충_응애', '성충_날개불구바이러스감염증']

dataset = BeeDataset(data_dir, target_classes, seq_length)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)


In [None]:
from torch.utils.data import DataLoader, random_split

def train_val_split(dataset, train_ratio=0.5):
    train_size = int(train_ratio * len(dataset))
    val_size = len(dataset) - train_size
    return random_split(dataset, [train_size, val_size])

# Train/Validation Split
train_dataset, val_dataset = train_val_split(dataset, train_ratio=0.5)

# DataLoader 설정
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

In [None]:
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=20, device='cuda'):
    model = model.to(device)  # 모델을 지정된 장치로 이동
    for epoch in range(num_epochs):
        model.train()
        train_loss = 0.0
        correct_train = 0
        total_train = 0
        cnt = 0
        for features, labels in train_loader:
            features = features.to(device)  # 입력 데이터를 지정된 장치로 이동
            labels = labels.to(device)      # 라벨을 지정된 장치로 이동

            optimizer.zero_grad()
            outputs = model(features)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total_train += labels.size(0)
            correct_train += (predicted == labels).sum().item()

        train_accuracy = 100 * correct_train / total_train
        print(1)
        # Validation
        model.eval()
        val_loss = 0.0
        correct_val = 0
        total_val = 0

        with torch.no_grad():
            for features, labels in val_loader:
                features = features.to(device)  # 입력 데이터를 지정된 장치로 이동
                labels = labels.to(device)      # 라벨을 지정된 장치로 이동

                outputs = model(features)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

                _, predicted = torch.max(outputs, 1)
                total_val += labels.size(0)
                correct_val += (predicted == labels).sum().item()

        val_accuracy = 100 * correct_val / total_val

        print(f'Epoch [{epoch+1}/{num_epochs}], '
              f'Train Loss: {train_loss/len(train_loader):.4f}, '
              f'Train Accuracy: {train_accuracy:.2f}%, '
              f'Val Loss: {val_loss/len(val_loader):.4f}, '
              f'Val Accuracy: {val_accuracy:.2f}%')


In [None]:
input_size = 3  # in_temperature, in_humidity, in_carbon_monoxide
hidden_size = 64
num_layers = 2
output_size = 7  # 각 클래스의 수
seq_length = 10
batch_size = 32
learning_rate = 0.001
num_epochs = 100

In [None]:
import torch.optim as optim
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = LSTMModel(input_size, hidden_size, num_layers, output_size).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=num_epochs)

1
Epoch [1/100], Train Loss: 1.6712, Train Accuracy: 31.02%, Val Loss: 1.5438, Val Accuracy: 37.35%
1
Epoch [2/100], Train Loss: 1.4503, Train Accuracy: 40.46%, Val Loss: 1.4146, Val Accuracy: 43.29%
1
Epoch [3/100], Train Loss: 1.3779, Train Accuracy: 43.65%, Val Loss: 1.3664, Val Accuracy: 43.84%
1
Epoch [4/100], Train Loss: 1.3346, Train Accuracy: 45.47%, Val Loss: 1.3119, Val Accuracy: 47.39%
1
Epoch [5/100], Train Loss: 1.2999, Train Accuracy: 47.50%, Val Loss: 1.3276, Val Accuracy: 46.71%
1
Epoch [6/100], Train Loss: 1.2738, Train Accuracy: 48.65%, Val Loss: 1.2456, Val Accuracy: 50.03%
1
Epoch [7/100], Train Loss: 1.2406, Train Accuracy: 50.09%, Val Loss: 1.2844, Val Accuracy: 47.67%
1
Epoch [8/100], Train Loss: 1.2179, Train Accuracy: 50.76%, Val Loss: 1.2809, Val Accuracy: 46.17%
1
Epoch [9/100], Train Loss: 1.1850, Train Accuracy: 52.24%, Val Loss: 1.1948, Val Accuracy: 51.39%
1
Epoch [10/100], Train Loss: 1.1587, Train Accuracy: 53.33%, Val Loss: 1.1609, Val Accuracy: 53.18%

KeyboardInterrupt: 

In [None]:
# 모델 저장
torch.save(model.state_dict(), 'bee_lstm_model.pth')