전체 코드 흐름:
1. 라이브러리 및 데이터셋 로드
2. 데이터 전처리: 저역통과 필터, 슬라이딩 윈도우 생성, 정규화
3. 데이터 증강 및 학습/검증 데이터 분리
4. 데이터셋 정의 및 DataLoader 설정
5. 모델 정의 (LSTM, GRU, CNN-LSTM, Transformer)
6. 모델 학습 및 검증
7. 라벨 예측 및 자이로 데이터에 라벨 추가
8. 결과 시각화

# 1. 라이브러리 및 데이터셋 로드
***
> 필요한 라이브러리를 로드하고 GPU 사용 여부를 확인

In [1]:
# 1. 라이브러리 및 데이터셋 로드
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, f1_score
from sklearn.cluster import KMeans
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from imblearn.over_sampling import RandomOverSampler
from scipy.signal import butter, filtfilt, resample

# GPU 사용 여부 설정
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# GPU 사용 여부 확인
if torch.cuda.is_available():
    print(f"GPU is available: {torch.cuda.get_device_name(0)}")
else:
    print("GPU is not available.")

GPU is available: NVIDIA GeForce GTX 1650


# 2. 데이터 전처리: 저역통과 필터, 다운샘플링, 슬라이딩 윈도우 적용
> 자이로 데이터를 다운샘플링하고 노이즈 제거를 위한 저역통과 필터를 적용
> 슬라이딩 윈도우를 통해 데이터를 모델이 학습하기 적합한 형태로 변환

In [2]:
# 저역통과 필터 적용
def butter_lowpass(cutoff, fs, order=4):
    nyq = 0.5 * fs
    normal_cutoff = cutoff / nyq
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    return b, a

# 필터 적용 함수
def apply_lowpass_filter(data, cutoff, fs):
    b, a = butter_lowpass(cutoff, fs)
    return filtfilt(b, a, data,axis=0)

# 데이터 정규화 및 차원 맞춤 함수
def normalize_and_reshape_segments(segments):
    scaler = MinMaxScaler()
    normalized_segments = np.zeros_like(segments)
    for axis in range(segments.shape[2]):
        normalized_segments[:, :, axis] = scaler.fit_transform(segments[:, :, axis])
    return normalized_segments

# 슬라이딩 윈도우 생성 함수
def create_sliding_windows(data, window_size, stride):  # 2초 윈도우, 0.5초 stride 
    num_windows = (len(data) - window_size) // stride + 1
    windows = np.array([data[i : i + window_size] for i in range(0, len(data) - window_size + 1, stride)])
    return windows  # (num_windows, window_size, 3) 형태로 반환

# 3. UCI-HAR 데이터 및 자이로 데이터 전처리
***
> UCI-HAR 데이터를 로드하고 자이로 데이터를 동일하게 전처리
> 샘플링 주파수를 맞추고 저역통과 필터를 적용하여 노이즈를 제거

In [3]:
# UCI-HAR 데이터 로드
def load_ucihar_data():
    ucihar_path = './원본 데이터/UCI HAR Dataset/'
    gyro_x_train = pd.read_csv(ucihar_path + 'train/Inertial Signals/body_gyro_x_train.txt', sep='\s+', header=None).values
    gyro_y_train = pd.read_csv(ucihar_path + 'train/Inertial Signals/body_gyro_y_train.txt', sep='\s+', header=None).values
    gyro_z_train = pd.read_csv(ucihar_path + 'train/Inertial Signals/body_gyro_z_train.txt', sep='\s+', header=None).values
    labels_train = pd.read_csv(ucihar_path + 'train/y_train.txt', sep='\s+', header=None).values - 1

    X_train = np.stack([gyro_x_train, gyro_y_train, gyro_z_train], axis=-1)
    X_train_normalized = normalize_and_reshape_segments(X_train)

    return X_train_normalized, labels_train

X_train, y_train = load_ucihar_data()

# 자이로 데이터 전처리 함수
def preprocess_gyro_data(gyro_path, current_freq=100, desired_freq=50, window_size=128, stride=64):
    gyro_data = pd.read_csv(gyro_path)

    n_samples = int(len(gyro_data) * desired_freq / current_freq)

    gyro_data_resampled = resample(gyro_data[['X', 'Y', 'Z']].values, n_samples)
    gyro_data_filtered = apply_lowpass_filter(gyro_data_resampled, cutoff=20, fs=desired_freq)
    gyro_data_windows = create_sliding_windows(gyro_data_filtered, window_size, stride)
    gyro_data_normalized = normalize_and_reshape_segments(gyro_data_windows)
    
    return torch.tensor(gyro_data_normalized, dtype=torch.float32)

gyro_data_tensor = preprocess_gyro_data('./원본 데이터/자이로 데이터.csv')

# 데이터를 Tensor로 변환
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
gyro_data_tensor = gyro_data_tensor.to(device)

# 4. 데이터 증강
***
> 학습 데이터의 양을 늘리기 위해 증강 기법으로 가우시안 노이즈, Time Warping, Scaling을 사용

In [4]:
# 데이터 증강 함수: 가우시안 노이즈 추가
def add_gaussian_noise(data, noise_factor=0.05):
    noise = np.random.randn(*data.shape)
    augmented_data = data + noise_factor * noise
    augmented_data = np.clip(augmented_data, -1.0, 1.0)
    return augmented_data

# Time Warping 함수
def time_warp(data, sigma=0.2, knot=4):
    orig_steps = np.arange(data.shape[1])
    random_warps = np.random.normal(loc=1.0, scale=sigma, size=(data.shape[0], knot + 2, data.shape[2]))
    warp_steps = ( np.ones((data.shape[2], 1)) * (np.linspace(0, data.shape[1] - 1., num=knot + 2)) ).T
    ret = np.zeros_like(data)

    for i, pat in enumerate(data):
        warper = np.interp(orig_steps, warp_steps[:, 0], random_warps[i, :, 0])
        for dim in range(data.shape[2]):
            ret[i, :, dim] = np.interp(orig_steps, np.cumsum(warper), pat[:, dim])
            
    return ret

# Scaling 함수
def scale(data, sigma=0.1):
    scaling_factor = np.random.normal(loc=1.0, scale=sigma, size=(data.shape[0], 1, data.shape[2]))
    return data * scaling_factor

# 5. 증강된 데이터 생성 및 학습/검증 데이터 분리

In [5]:
# UCI-HAR 데이터셋 및 증강 데이터 생성
def create_augmented_data(X_train, y_train):
    # 데이터 증강
    X_train_augmented = np.concatenate([
        X_train,
        add_gaussian_noise(X_train),
        time_warp(X_train),
        scale(X_train)
    ], axis=0)
    
    y_train_augmented = np.concatenate([y_train] * 4, axis=0)
    
    return X_train_augmented, y_train_augmented

# 학습/검증 데이터 분리
def split_data(X_augmented, y_augmented):
    X_train_tensor, X_val_tensor, y_train_tensor, y_val_tensor = train_test_split(
        torch.tensor(X_augmented, dtype=torch.float32),
        torch.tensor(y_augmented, dtype=torch.long),
        test_size = 0.2, 
        random_state = 42
    )

    return X_train_tensor, X_val_tensor, y_train_tensor, y_val_tensor

# 증강된 데이터를 학습 및 검증 데이터로 분리
X_train_augmented, y_train_augmented = create_augmented_data(X_train, y_train)
X_train_tensor, X_val_tensor, y_train_tensor, y_val_tensor = split_data(X_train_augmented, y_train_augmented)

# 6. 데이터셋 정의
***
> 증강된 데이터를 학습에 사용할 수 있도록 `Dataset`으로 정의하고
> 모델을 학습시키기 위한 다양한 시계열 모델(LSTM, GRU, CNN-LSTM, Transformer 등)을 구성

In [6]:
# 데이터셋 클래스
class UCIHARData(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx].view(-1)

# 학습 및 검증 데이터 로더 정의
train_dataset = UCIHARData(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

val_dataset = UCIHARData(X_val_tensor, y_val_tensor)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# 확인 출력
print(f'Training data shape: {X_train_tensor.shape}, Labels shape: {y_train_tensor.shape}')
print(f'Validation data shape: {X_val_tensor.shape}, Labels shape: {y_val_tensor.shape}')

Training data shape: torch.Size([23526, 128, 3]), Labels shape: torch.Size([23526, 1])
Validation data shape: torch.Size([5882, 128, 3]), Labels shape: torch.Size([5882, 1])


# 7.모델 정의 (LSTM, GRU, CNN-LSTM, Transformer)
***
> 시계열 데이터를 처리할 수 있는 다양한 모델을 정의
> 각 모델은 LSTM, GRU, CNN-LSTM, Transformer의 구조를 가지고 있으며, 동일한 데이터셋에서 성능을 비교 가능

In [7]:
# LSTM 모델 정의
class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout_prob):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout_prob)
        self.batch_norm = nn.BatchNorm1d(hidden_size)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        out, _ = self.lstm(x)
        out = self.batch_norm(out[:, -1, :])
        out = self.fc(out)
        return out


# GRU 모델 정의
class GRUModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout_prob):
        super(GRUModel, self).__init__()
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True, dropout=dropout_prob)
        self.batch_norm = nn.BatchNorm1d(hidden_size)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        out, _ = self.gru(x)
        out = self.batch_norm(out[:, -1, :])
        out = self.fc(out)
        return out

# CNN-LSTM 모델 정의
class CNNLSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, dropout_prob):
        super(CNNLSTMModel, self).__init__()
        self.conv1 = nn.Conv1d(input_size, 64, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool1d(kernel_size=2, stride=2)
        self.lstm = nn.LSTM(64, hidden_size, num_layers, batch_first=True, dropout=dropout_prob)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        x = x.permute(0, 2, 1)
        x = self.pool(F.relu(self.conv1(x)))
        x = x.permute(0, 2, 1)
        out, _ = self.lstm(x)
        out = self.fc(out[:, -1, :])
        return out

# Transformer 모델 정의
class TransformerModel(nn.Module):
    def __init__(self, input_size, num_heads, num_layers, num_classes, dropout_prob):
        super(TransformerModel, self).__init__()
        self.input_embedding = nn.Linear(input_size, 128)
        self.positional_encoding = nn.Parameter(torch.randn(1, 200, 128))
        encoder_layer = nn.TransformerEncoderLayer(d_model=128, nhead=num_heads, dropout=dropout_prob)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers)
        self.fc = nn.Linear(128, num_classes)

    def forward(self, x):
        x = self.input_embedding(x)
        x = x + self.positional_encoding[:, :x.size(1), :]
        x = self.transformer(x.permute(1, 0, 2))
        out = self.fc(x[-1, :, :])
        return out

# 8. 모델 학습 및 검증
***
> 각 모델을 학습하고 검증 데이터로 평가한 후, 최적의 모델을 선택

In [8]:
# 손실 함수 및 옵티마이저 설정
criterion = nn.CrossEntropyLoss()

def get_optimizer(model, learning_rate = 0.001):
    return torch.optim.Adam(model.parameters(), lr=learning_rate)

def get_scheduler(optimizer, step_size=10, gamma=0.1):
    return torch.optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma)

# 모델 학습 함수
def train_model(model, train_loader, optimizer, criterion, scheduler, num_epochs=20):
    model.train()

    for epoch in range(num_epochs):
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device).squeeze()
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        scheduler.step()
        epoch_loss = running_loss / len(train_loader)
        epoch_accuracy = 100 * correct / total
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.2f}%')

    return epoch_accuracy

# 모델 평가 함수
def evaluate_model(model, val_loader, criterion):
    model.eval()

    correct = 0
    total = 0
    running_loss = 0.0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device).squeeze()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    val_loss = running_loss / len(val_loader)
    val_accuracy = 100 * correct / total
    print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%')

    return val_accuracy

# 9. 최적 모델 선택

In [9]:
# 여러 모델 학습 및 평가 후 가장 좋은 모델을 선택
def train_and_select_best_model(models, train_loader, val_loader, num_epochs=20):
    best_accuracy = 0.0
    best_model = None

    for model in models:
        optimizer = get_optimizer(model)
        scheduler = get_scheduler(optimizer)
        print(f"Training {model.__class__.__name__}...")

        # 모델 학습
        train_model(model, train_loader, optimizer, criterion, scheduler, num_epochs)

        # 검증 데이터로 평가
        val_accuracy = evaluate_model(model, val_loader, criterion)

        # 가장 좋은 성능을 보인 모델을 선택
        if val_accuracy > best_accuracy:
            best_accuracy = val_accuracy
            best_model = model

    print(f"Best model: {best_model.__class__.__name__}, Accuracy: {best_accuracy:.2f}%")
    return best_model

# 모델 리스트
models = [
    LSTMModel(input_size=3, hidden_size=128, num_layers=2, num_classes=6, dropout_prob=0.5).to(device),
    GRUModel(input_size=3, hidden_size=128, num_layers=2, num_classes=6, dropout_prob=0.5).to(device),
    CNNLSTMModel(input_size=3, hidden_size=128, num_layers=2, num_classes=6, dropout_prob=0.5).to(device),
    TransformerModel(input_size=3, num_heads=4, num_layers=2, num_classes=6, dropout_prob=0.5).to(device)
]

# 최적의 모델 선택
best_model = train_and_select_best_model(models, train_loader, val_loader)


NameError: name 'hiddensize' is not defined

# 10. 라벨 예측 및 자이로 데이터로 라벨 추가
***
> 최적의 모델을 선택하여 자이로 데이터에 라벨을 예측하고, 원본 데이터에 추가

In [None]:
# 예측된 라벨을 자이로 데이터에 매핑하는 함수
def map_predictions_to_original(original_data_length, predictions, window_size=128, stride=64):
    mapped_predictions = np.zeros((original_data_length, 6))  # 6은 활동 클래스의 수
    counts = np.zeros(original_data_length)

    for i, pred in enumerate(predictions):
        start = i * stride
        end = min(start + window_size, original_data_length)
        mapped_predictions[start:end, pred] += 1
        counts[start:end] += 1

    final_predictions = np.argmax(mapped_predictions, axis=1)

    return final_predictions

# 최종 예측 수행
def predict_labels(model, gyro_data_tensor):
    model.eval()

    all_predictions = []
    batch_size = 100
    with torch.no_grad():
        for i in range(0, len(gyro_data_tensor), batch_size):
            batch = gyro_data_tensor[i:i + batch_size].to(device)
            outputs = model(batch)
            _, predicted = torch.max(outputs, 1)
            all_predictions.append(predicted.cpu().numpy())

    return np.concatenate(all_predictions)

# 자이로 데이터에 예측된 라벨 추가
original_gyro_data = pd.read_csv('./원본 데이터/자이로 데이터.csv')
predictions = predict_labels(best_model, gyro_data_tensor)
original_predictions = map_predictions_to_original(len(original_gyro_data), predictions)

# 활동 라벨 매핑
activity_labels = {0: "walking", 1: "walking_upstairs", 2: "walking_downstairs", 3: "sitting", 4: "standing", 5: "laying"}
predicted_activities = [activity_labels[label] for label in original_predictions]

# 자이로 데이터에 예측 결과 추가
original_gyro_data['Predicted_Activity'] = predicted_activities

# 결과를 CSV 파일로 저장
original_gyro_data.to_csv('./원본 데이터/자이로 데이터_예측_v2.0.csv', index=False)

# 예측 결과 확인 (상위 5개의 샘플만 출력)
print(original_gyro_data.head())

# 11. 결과 시각화
***
> 예측된 활동 라벨의 분포 및 시간에 따른 변화를 확인

In [None]:
# 예측된 행동 패턴 분포 시각화
plt.figure(figsize=(12, 6))
activity_counts = pd.Series(predicted_activities).value_counts()
sns.barplot(x=activity_counts.index, y=activity_counts.values)
plt.title('Predicted Activity Distribution')
plt.xlabel('Activity')
plt.ylabel('Count')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# 시간에 따른 활동 변화 시각화
plt.figure(figsize=(20, 6))
activity_series = pd.Series(predicted_activities)
activity_numeric = pd.factorize(activity_series)[0]
plt.plot(activity_numeric)
plt.title('Activity Changes Over Time')
plt.xlabel('Time')
plt.ylabel('Activity')
plt.yticks(range(len(activity_labels)), list(activity_labels.values()))
plt.tight_layout()
plt.show()