라이브러리 다운로드

In [None]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

각 비디오 내에서 입력 시퀀스 만들기
- window_size = 몇 프레임을 한 시퀀스로 할 건지 => 정할 것
- threshold : 한 시퀀스에서 낙상인 프레임이 몇 개 이상 있으면 '낙상' 시퀀스 인지 => 정할 것
- feature : 사용할 feature, 우리는 R값, HSSC속도 값
- step_size : 몇 프레임 씩 옮겨가면서 시퀀스 만들건지

In [None]:
def create_sequences_by_video(df, window_size=20 , step_size=2, threshold=10):
    sequences = []
    sequence_labels = []

    for video_id in df['VideoID'].unique():
        video_df = df[df['VideoID'] == video_id]
        features = video_df[['aspect_ratio','hssc']].values
        labels = video_df['Label'].values

        for start in range(0, len(features) - window_size + 1, step_size):
            end = start + window_size
            seq_features = features[start:end]
            seq_labels = labels[start:end]

            # 시퀀스 라벨 설정: 1 라벨이 threshold 이상이면 1, 아니면 0
            seq_label = 1 if np.sum(seq_labels) >= threshold else 0

            sequences.append(seq_features)
            sequence_labels.append(seq_label)

    return np.array(sequences), np.array(sequence_labels)

데이터 가져와서 시퀀스 만들기

In [None]:
# 데이터프레임 로드
csv_path = '/content/drive/MyDrive/final_dataframe.csv'
df = pd.read_csv(csv_path)

# 시퀀스 생성
X, y = create_sequences_by_video(df)

print(f"Created {len(X)} sequences")
print("Sequence shape:", X.shape)
print("Labels shape:", y.shape)


# 모델 입력을 위해 텐서로 변환
X = torch.tensor(X, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.float32).view(-1, 1)

Created 3079 sequences
Sequence shape: (3079, 20, 2)
Labels shape: (3079,)


In [None]:
print(df)

                               VideoID  frame_idx       x       y      w  \
0     FD_In_H11H21H31_0001_20210112_09          0  1409.0  1332.0  617.0   
1     FD_In_H11H21H31_0001_20210112_09          1  1422.0  1326.0  629.0   
2     FD_In_H11H21H31_0001_20210112_09          2  1433.0  1324.0  629.0   
3     FD_In_H11H21H31_0001_20210112_09          3  1437.0  1316.0  639.0   
4     FD_In_H11H21H31_0001_20210112_09          4  1443.0  1313.0  640.0   
...                                ...        ...     ...     ...    ...   
8096  FD_In_H12H22H33_0008_20201016_20         93  1349.0   778.0  723.0   
8097  FD_In_H12H22H33_0008_20201016_20         94  1350.0   775.0  722.0   
8098  FD_In_H12H22H33_0008_20201016_20         95  1351.0   774.0  721.0   
8099  FD_In_H12H22H33_0008_20201016_20         96  1353.0   770.0  719.0   
8100  FD_In_H12H22H33_0008_20201016_20         97  1353.0   768.0  721.0   

          h  aspect_ratio   point1x   point1y  point2x  ...  point14x  \
0     518.0   

tensor([[0.],
        [0.],
        [0.],
        ...,
        [0.],
        [0.],
        [0.]])


모델 구조 정의
- GRU 층 : 일단은 두개?
- 마지막은 FULLY CONNECTED로 출력값은 1개,

In [None]:
# GRU 모델 정의
class GRUModel(nn.Module):
    def __init__(self, input_size, hidden_size1, hidden_size2, output_size):
        super(GRUModel, self).__init__()
        self.hidden_size1 = hidden_size1
        self.hidden_size2 = hidden_size2

        # GRU 레이어
        self.gru1 = nn.GRU(input_size, hidden_size1, batch_first=True)
        self.gru2 = nn.GRU(hidden_size1, hidden_size2, batch_first=True)

        # Dropout 레이어
        self.dropout = nn.Dropout(p=0.3)

        # Fully connected 레이어
        self.fc = nn.Linear(hidden_size2, output_size)

    def forward(self, x):
        h0_1 = torch.zeros(1, x.size(0), self.hidden_size1).to(x.device)
        out1, _ = self.gru1(x, h0_1)

        h0_2 = torch.zeros(1, out1.size(0), self.hidden_size2).to(out1.device)
        out2, _ = self.gru2(out1, h0_2)

        out = self.dropout(out2)
        out = self.fc(out[:, -1, :])
        return out

데이터 준비&모델 파라미터 정의

In [None]:
# 데이터 준비
from sklearn.model_selection import train_test_split

X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

# 모델 정의 및 학습
input_size = X.shape[2]  # 특성 수
hidden_size1 = 32
hidden_size2 = 64
output_size = 1  # 낙상 예측 (0 또는 1)
num_epochs = 100
learning_rate = 0.001

model = GRUModel(input_size, hidden_size1, hidden_size2, output_size)
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)


모델 학습&평가

In [None]:
# 모델 학습 함수
def train_model(X_train, y_train, X_val, y_val, model, criterion, optimizer, num_epochs):
    for epoch in range(num_epochs):
        model.train()
        outputs = model(X_train)
        loss = criterion(outputs, y_train)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (epoch+1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

    # 모델 평가
    model.eval()
    with torch.no_grad():
        val_outputs = model(X_val)
        val_loss = criterion(val_outputs, y_val)
        print(f'Validation Loss: {val_loss.item():.4f}')

    return model

trained_model = train_model(X_train, y_train, X_val, y_val, model, criterion, optimizer, num_epochs)

Epoch [10/100], Loss: 0.5228
Epoch [20/100], Loss: 0.4664
Epoch [30/100], Loss: 0.4571
Epoch [40/100], Loss: 0.4556
Epoch [50/100], Loss: 0.4543
Epoch [60/100], Loss: 0.4495
Epoch [70/100], Loss: 0.4462
Epoch [80/100], Loss: 0.4471
Epoch [90/100], Loss: 0.4432
Epoch [100/100], Loss: 0.4379
Validation Loss: 0.4455


모델 저장

In [None]:
# 모델 저장
model_path = 'gru_fall_detection_model.pth'
torch.save(trained_model.state_dict(), model_path)
print(f'Model saved to {model_path}')