<a href="https://colab.research.google.com/github/Sookyung87/MobileProgramming/blob/lsk/TSN_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [26]:
!pip install torch torchvision torchaudio
!pip install pytorchvideo

Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
Collecting nvidia-cuda-runtime-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_runtime_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (823 kB)
Collecting nvidia-cuda-cupti-cu12==12.1.105 (from torch)
  Using cached nvidia_cuda_cupti_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (14.1 MB)
Collecting nvidia-cudnn-cu12==8.9.2.26 (from torch)
  Using cached nvidia_cudnn_cu12-8.9.2.26-py3-none-manylinux1_x86_64.whl (731.7 MB)
Collecting nvidia-cublas-cu12==12.1.3.1 (from torch)
  Using cached nvidia_cublas_cu12-12.1.3.1-py3-none-manylinux1_x86_64.whl (410.6 MB)
Collecting nvidia-cufft-cu12==11.0.2.54 (from torch)
  Using cached nvidia_cufft_cu12-11.0.2.54-py3-none-manylinux1_x86_64.whl (121.6 MB)
Collecting nvidia-curand-cu12==10.3.2.106 (from torch)
  Using cached nvidia_curand_cu12-10.3.2.106-py3-none-manylinux1_x86_64.whl (56.5 MB)
Collectin

In [None]:
import os
import json
import cv2
import numpy as np
from keras.preprocessing.image import img_to_array
from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import train_test_split

# JSON 파일 로드
annotations_file_path = '/content/drive/MyDrive/annotations.json'
with open(annotations_file_path, 'r', encoding='utf-8') as f:
    annotations = json.load(f)

# 비디오 폴더 경로
video_folder_path = '/content/drive/MyDrive/dataset/영상'

# TSN 모델에 사용할 프레임 수
num_segments = 5

# 프레임을 전처리하는 함수
def preprocess_frame(frame):
    img_array = img_to_array(frame)
    resized_img = cv2.resize(img_array, (224, 224))  # 크기 조정
    return resized_img

# 모든 프레임을 전처리하는 함수
def preprocess_video_frames(video_path, num_segments):
    cap = cv2.VideoCapture(video_path)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    segment_length = frame_count // num_segments

    frames = []
    for i in range(num_segments):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i * segment_length)
        ret, frame = cap.read()
        if ret:
            frames.append(preprocess_frame(frame))
    cap.release()
    return np.array(frames)

# 비디오 파일 이름 추출하는 방법
def get_video_filename(img_name):
    base_name = img_name.split('IMG')[0]  # 예시: 'SGA2100044S0312'
    return base_name + '.mp4'  # 비디오 파일 이름 형식 맞추기

# 모든 비디오에 대해 전처리 수행
def process_videos_in_chunks(annotations, chunk_size=100):
    total_videos = len(annotations)
    used_videos = 0
    for start_idx in range(0, total_videos, chunk_size):
        end_idx = min(start_idx + chunk_size, total_videos)
        chunk_annotations = annotations[start_idx:end_idx]

        videos = []
        labels = []

        for annotation in chunk_annotations:
            video_name = get_video_filename(annotation['img_name'])
            video_path = os.path.join(video_folder_path, video_name)  # 비디오 폴더 경로 사용

            if not os.path.exists(video_path):
                print(f'Video file not found: {video_path}')
                continue

            video_frames = preprocess_video_frames(video_path, num_segments)
            if len(video_frames) == num_segments:  # 모든 프레임이 정상적으로 처리된 경우에만 추가
                videos.append(video_frames)
                labels.append(annotation['category_id'])
                used_videos += 1  # 사용된 비디오 파일 개수 증가

        # 예외처리: 유효한 비디오가 있는 경우에만 처리
        if videos:
            videos = np.array(videos)
            labels = np.array(labels)

            # 레이블을 One-Hot 인코딩
            lb = LabelBinarizer()
            labels = lb.fit_transform(labels)

            # 데이터셋을 학습용과 테스트용으로 분할
            X_train, X_test, y_train, y_test = train_test_split(videos, labels, test_size=0.2, random_state=42)

            # 전처리된 데이터 및 레이블 저장 경로
            output_folder_path = '/content/drive/MyDrive/dataset/preprocessed_videos'
            if not os.path.exists(output_folder_path):
                os.makedirs(output_folder_path)

            np.save(os.path.join(output_folder_path, f'X_train_{start_idx}.npy'), X_train)
            np.save(os.path.join(output_folder_path, f'X_test_{start_idx}.npy'), X_test)
            np.save(os.path.join(output_folder_path, f'y_train_{start_idx}.npy'), y_train)
            np.save(os.path.join(output_folder_path, f'y_test_{start_idx}.npy'), y_test)

            print(f'Processed chunk {start_idx} to {end_idx}')
        else:
            print(f'No valid videos were processed in chunk {start_idx} to {end_idx}')

    print(f'Total used videos: {used_videos}')  # 사용된 비디오 파일 개수 출력

# 청크 단위로 비디오 전처리 수행
process_videos_in_chunks(annotations, chunk_size=100)


Processed chunk 0 to 100
Processed chunk 100 to 200
Processed chunk 200 to 300
Processed chunk 300 to 400
Processed chunk 400 to 500
Processed chunk 500 to 600
Processed chunk 600 to 700
Processed chunk 700 to 800
Processed chunk 800 to 900
Processed chunk 900 to 1000
Processed chunk 1000 to 1100
Processed chunk 1100 to 1200
Processed chunk 1200 to 1300
Processed chunk 1300 to 1400
Processed chunk 1400 to 1500
Processed chunk 1500 to 1600
Processed chunk 1600 to 1700
Processed chunk 1700 to 1800
Processed chunk 1800 to 1900
Processed chunk 1900 to 2000
Processed chunk 2000 to 2100
Processed chunk 2100 to 2200
Processed chunk 2200 to 2300
Processed chunk 2300 to 2400
Processed chunk 2400 to 2500
Processed chunk 2500 to 2600
Processed chunk 2600 to 2700
Processed chunk 2700 to 2800
Processed chunk 2800 to 2900
Processed chunk 2900 to 3000
Processed chunk 3000 to 3100
Processed chunk 3100 to 3200
Processed chunk 3200 to 3300
Processed chunk 3300 to 3400
Processed chunk 3400 to 3500
Proces

In [None]:
import os
import numpy as np
from keras.models import Model
from keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout, TimeDistributed, LSTM
from keras.optimizers import Adam

# 데이터 경로 설정
data_folder_path = '/content/drive/MyDrive/dataset/preprocessed_videos'

# 데이터 로드 함수
def load_data_chunks(folder_path, prefix):
    data_list = []
    i = 0
    while True:
        file_path = os.path.join(folder_path, f'{prefix}_{i}.npy')
        if os.path.exists(file_path):
            data = np.load(file_path)
            data_list.append(data)
            i += 1
        else:
            break
    return np.concatenate(data_list, axis=0)

# 모든 청크 데이터 로드
X_train = load_data_chunks(data_folder_path, 'X_train')
X_test = load_data_chunks(data_folder_path, 'X_test')
y_train = load_data_chunks(data_folder_path, 'y_train')
y_test = load_data_chunks(data_folder_path, 'y_test')

# 데이터 로드 및 크기 확인
print(f'X_train shape: {X_train.shape}')
print(f'X_test shape: {X_test.shape}')
print(f'y_train shape: {y_train.shape}')
print(f'y_test shape: {y_test.shape}')

# 레이블을 One-Hot 인코딩
lb = LabelBinarizer()
y_train = lb.fit_transform(y_train)
y_test = lb.transform(y_test)

# TSN 모델 설계
def create_tsn_model(input_shape, num_classes):
    input_layer = Input(shape=input_shape)

    # 각 프레임에 대해 동일한 CNN 적용
    cnn = TimeDistributed(Conv2D(32, (3, 3), activation='relu'))(input_layer)
    cnn = TimeDistributed(MaxPooling2D((2, 2)))(cnn)
    cnn = TimeDistributed(Conv2D(64, (3, 3), activation='relu'))(cnn)
    cnn = TimeDistributed(MaxPooling2D((2, 2)))(cnn)
    cnn = TimeDistributed(Flatten())(cnn)
    cnn = TimeDistributed(Dense(128, activation='relu'))(cnn)
    cnn = TimeDistributed(Dropout(0.5))(cnn)

    # 시간적인 순서를 고려하기 위해 LSTM 적용
    lstm = LSTM(128)(cnn)
    output_layer = Dense(num_classes, activation='softmax')(lstm)

    model = Model(inputs=input_layer, outputs=output_layer)
    return model

# 입력 형태 및 클래스 수 설정
input_shape = (X_train.shape[1], 224, 224, 3)
num_classes = y_train.shape[1]

# 모델 생성
model = create_tsn_model(input_shape, num_classes)
model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])

# 모델 학습
model.fit(X_train, y_train, epochs=10, batch_size=8, validation_data=(X_test, y_test))

# 모델 저장
model.save('/content/drive/MyDrive/dataset/tsn_model.h5')
print('TSN model training complete and model saved.')


X_train shape: (80, 5, 224, 224, 3)
X_test shape: (20, 5, 224, 224, 3)
y_train shape: (80, 1)
y_test shape: (20, 1)
Epoch 1/10


  return dispatch_target(*args, **kwargs)




  return dispatch_target(*args, **kwargs)


Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


  saving_api.save_model(


TSN model training complete and model saved.


In [None]:
# 레이블을 One-Hot 인코딩
lb = LabelBinarizer()
y_train = lb.fit_transform(y_train)
y_test = lb.transform(y_test)

# 데이터셋 확인
print(f"y_train shape: {y_train.shape}")
print(f"y_test shape: {y_test.shape}")


y_train shape: (80, 1)
y_test shape: (20, 1)


In [None]:
from keras.models import Model
from keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout, TimeDistributed, LSTM
from keras.optimizers import Adam

# TSN 모델 설계
def create_tsn_model(input_shape, num_classes):
    input_layer = Input(shape=input_shape)

    # 각 프레임에 대해 동일한 CNN 적용
    cnn = TimeDistributed(Conv2D(32, (3, 3), activation='relu'))(input_layer)
    cnn = TimeDistributed(MaxPooling2D((2, 2)))(cnn)
    cnn = TimeDistributed(Conv2D(64, (3, 3), activation='relu'))(cnn)
    cnn = TimeDistributed(MaxPooling2D((2, 2)))(cnn)
    cnn = TimeDistributed(Flatten())(cnn)
    cnn = TimeDistributed(Dense(128, activation='relu'))(cnn)
    cnn = TimeDistributed(Dropout(0.5))(cnn)

    # 시간적인 순서를 고려하기 위해 LSTM 적용
    lstm = LSTM(128)(cnn)
    output_layer = Dense(num_classes, activation='softmax')(lstm)

    model = Model(inputs=input_layer, outputs=output_layer)
    return model

# 입력 형태 및 클래스 수 설정
input_shape = (X_train.shape[1], 224, 224, 3)
num_classes = y_train.shape[1]

# 모델 생성
model = create_tsn_model(input_shape, num_classes)
model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])

# 모델 학습
model.fit(X_train, y_train, epochs=100, batch_size=8, validation_data=(X_test, y_test))

# 모델 저장
model.save('/content/drive/MyDrive/dataset/tsn_model.h5')
print('TSN model training complete and model saved.')


Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 13/100
Epoch 14/100
Epoch 15/100
Epoch 16/100
Epoch 17/100
Epoch 18/100
Epoch 19/100
Epoch 20/100
Epoch 21/100
Epoch 22/100
Epoch 23/100
Epoch 24/100
Epoch 25/100
Epoch 26/100
Epoch 27/100
Epoch 28/100
Epoch 29/100
Epoch 30/100
Epoch 31/100
Epoch 32/100
Epoch 33/100
Epoch 34/100
Epoch 35/100
Epoch 36/100
Epoch 37/100
Epoch 38/100
Epoch 39/100
Epoch 40/100
Epoch 41/100
Epoch 42/100
Epoch 43/100
Epoch 44/100
Epoch 45/100
Epoch 46/100
Epoch 47/100
Epoch 48/100
Epoch 49/100
Epoch 50/100
Epoch 51/100
Epoch 52/100
Epoch 53/100
Epoch 54/100
Epoch 55/100
Epoch 56/100
Epoch 57/100
Epoch 58/100
Epoch 59/100
Epoch 60/100
Epoch 61/100
Epoch 62/100
Epoch 63/100
Epoch 64/100
Epoch 65/100
Epoch 66/100
Epoch 67/100
Epoch 68/100
Epoch 69/100
Epoch 70/100
Epoch 71/100
Epoch 72/100
Epoch 73/100
Epoch 74/100
Epoch 75/100
Epoch 76/100
Epoch 77/100
Epoch 78

  saving_api.save_model(


TSN model training complete and model saved.


In [None]:
import numpy as np

# y_train과 y_test의 클래스 분포 확인
train_class_distribution = np.sum(y_train, axis=0)
test_class_distribution = np.sum(y_test, axis=0)

print(f"Train class distribution: {train_class_distribution}")
print(f"Test class distribution: {test_class_distribution}")


Train class distribution: [77.  3.]
Test class distribution: [18.  2.]


In [None]:
from keras.preprocessing.image import ImageDataGenerator

# 데이터 증강 설정
datagen = ImageDataGenerator(
    rotation_range=10,
    width_shift_range=0.1,
    height_shift_range=0.1,
    shear_range=0.1,
    zoom_range=0.1,
    horizontal_flip=True,
    fill_mode='nearest'
)

# 데이터 증강 적용 함수
def augment_frame(frame, datagen):
    frame = frame.reshape((1,) + frame.shape)  # (1, 224, 224, 3)
    augmented_frames = [datagen.flow(frame, batch_size=1)[0].reshape(frame.shape[1:]) for _ in range(1)]
    return np.array(augmented_frames)

# 모든 프레임에 대해 증강 수행
def augment_video_frames(video, datagen):
    augmented_video = []
    for frame in video:
        augmented_frame = augment_frame(frame, datagen)
        augmented_video.append(augmented_frame[0])
    return np.array(augmented_video)

# 모든 비디오에 대해 증강 수행
def augment_data(X, y, datagen, batch_size=8):
    augmented_X, augmented_y = [], []
    for i in range(X.shape[0]):
        augmented_video = augment_video_frames(X[i], datagen)
        for _ in range(batch_size):
            augmented_X.append(augmented_video)
            augmented_y.append(y[i])
    return np.array(augmented_X), np.array(augmented_y)

# 데이터 증강 적용
X_train_augmented, y_train_augmented = augment_data(X_train, y_train, datagen)

# 데이터셋 확인
print(f"X_train_augmented shape: {X_train_augmented.shape}")
print(f"y_train_augmented shape: {y_train_augmented.shape}")


X_train_augmented shape: (640, 5, 224, 224, 3)
y_train_augmented shape: (640, 2)


In [None]:
from keras.models import Model
from keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, Dropout, TimeDistributed, LSTM
from keras.optimizers import Adam

def create_tsn_model(input_shape, num_classes):
    input_layer = Input(shape=input_shape)

    # 각 프레임에 대해 동일한 CNN 적용
    cnn = TimeDistributed(Conv2D(32, (3, 3), activation='relu'))(input_layer)
    cnn = TimeDistributed(MaxPooling2D((2, 2)))(cnn)
    cnn = TimeDistributed(Conv2D(64, (3, 3), activation='relu'))(cnn)
    cnn = TimeDistributed(MaxPooling2D((2, 2)))(cnn)
    cnn = TimeDistributed(Conv2D(128, (3, 3), activation='relu'))(cnn)
    cnn = TimeDistributed(MaxPooling2D((2, 2)))(cnn)
    cnn = TimeDistributed(Flatten())(cnn)
    cnn = TimeDistributed(Dense(256, activation='relu'))(cnn)
    cnn = TimeDistributed(Dropout(0.5))(cnn)

    # 시간적인 순서를 고려하기 위해 LSTM 적용
    lstm = LSTM(128)(cnn)
    output_layer = Dense(num_classes, activation='softmax')(lstm)

    model = Model(inputs=input_layer, outputs=output_layer)
    return model

# 입력 형태 및 클래스 수 설정
input_shape = (X_train.shape[1], 224, 224, 3)
num_classes = y_train.shape[1]

# 모델 생성
model = create_tsn_model(input_shape, num_classes)
model.compile(optimizer=Adam(), loss='categorical_crossentropy', metrics=['accuracy'])

# 모델 학습
model.fit(X_train_augmented, y_train_augmented, epochs=50, batch_size=8, validation_data=(X_test, y_test))

# 모델 저장
model.save('/content/drive/MyDrive/dataset/tsn_model.h5')
print('TSN model training complete and model saved.')


Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
TSN model training complete and model saved.


In [None]:
import numpy as np

# 각 클래스 레이블을 원-핫 인코딩에서 다시 복원합니다.
y_train_classes = np.argmax(y_train, axis=1)
y_test_classes = np.argmax(y_test, axis=1)

# 각 클래스별 데이터 수를 계산합니다.
train_class_counts = np.unique(y_train_classes, return_counts=True)
test_class_counts = np.unique(y_test_classes, return_counts=True)

# 클래스 인덱스와 행동 이름을 매핑하는 딕셔너리
class_labels = {
    0: '졸음운전',
    1: '음주운전',
    2: '물건찾기',
    3: '통화',
    4: '휴대폰조작',
    5: '차량제어',
    6: '운전자폭행'
}

# 학습 데이터에서 각 클래스별 데이터 수 출력
print("Train class distribution:")
for class_idx, count in zip(train_class_counts[0], train_class_counts[1]):
    print(f"{class_labels[class_idx]}: {count}")

# 테스트 데이터에서 각 클래스별 데이터 수 출력
print("\nTest class distribution:")
for class_idx, count in zip(test_class_counts[0], test_class_counts[1]):
    print(f"{class_labels[class_idx]}: {count}")


Train class distribution:
졸음운전: 77
음주운전: 3

Test class distribution:
졸음운전: 18
음주운전: 2


In [None]:
# 데이터셋 확인
print(f'X_train shape: {X_train.shape}')
print(f'X_test shape: {X_test.shape}')
print(f'y_train shape: {y_train.shape}')
print(f'y_test shape: {y_test.shape}')

# 레이블 데이터 확인
print(f'y_train sample: {y_train[:5]}')
print(f'y_test sample: {y_test[:5]}')


X_train shape: (80, 5, 224, 224, 3)
X_test shape: (20, 5, 224, 224, 3)
y_train shape: (80, 2)
y_test shape: (20, 2)
y_train sample: [[1. 0.]
 [1. 0.]
 [1. 0.]
 [1. 0.]
 [1. 0.]]
y_test sample: [[1. 0.]
 [1. 0.]
 [0. 1.]
 [1. 0.]
 [1. 0.]]


In [None]:
import os
import cv2
import numpy as np
from keras.models import load_model
from keras.preprocessing.image import img_to_array

# TSN 모델 로드
model_path = '/content/drive/MyDrive/dataset/tsn_model.h5'
model = load_model(model_path)

# 비디오 파일 경로
video_path = '/content/drive/MyDrive/SGA2100030S0219.mp4'

# TSN 모델에 사용할 프레임 수
num_segments = 5

# 프레임을 전처리하는 함수
def preprocess_frame(frame):
    img_array = img_to_array(frame)
    resized_img = cv2.resize(img_array, (224, 224))  # 크기 조정
    return resized_img

# 비디오 파일을 전처리하여 프레임을 추출하는 함수
def preprocess_video_frames(video_path, num_segments):
    cap = cv2.VideoCapture(video_path)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    segment_length = frame_count // num_segments

    frames = []
    for i in range(num_segments):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i * segment_length)
        ret, frame = cap.read()
        if ret:
            frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            frames.append(preprocess_frame(frame))
    cap.release()
    return np.array(frames)

# 비디오 파일을 전처리하여 프레임을 추출
video_frames = preprocess_video_frames(video_path, num_segments)
video_frames = np.expand_dims(video_frames, axis=0)  # 모델 입력 형태에 맞게 차원 추가

# 모델을 사용하여 예측
predictions = model.predict(video_frames)
predicted_class = np.argmax(predictions, axis=1)

# 예측 결과 출력
print(f"Predicted class: {predicted_class}")


Predicted class: [0]


In [None]:
print(f'X_train shape: {X_train.shape}')
print(f'X_test shape: {X_test.shape}')
print(f'y_train shape: {y_train.shape}')
print(f'y_test shape: {y_test.shape}')

# 레이블 데이터 확인
print(f'y_train sample: {y_train[:5]}')
print(f'y_test sample: {y_test[:5]}')


X_train shape: (80, 5, 224, 224, 3)
X_test shape: (20, 5, 224, 224, 3)
y_train shape: (80, 1)
y_test shape: (20, 1)
y_train sample: [[0]
 [0]
 [0]
 [0]
 [0]]
y_test sample: [[0]
 [0]
 [1]
 [0]
 [0]]


In [None]:
from sklearn.preprocessing import OneHotEncoder

# 원-핫 인코딩
encoder = OneHotEncoder(sparse=False)
y_train = encoder.fit_transform(y_train)
y_test = encoder.transform(y_test)

# 데이터셋 확인
print(f"y_train shape: {y_train.shape}")
print(f"y_test shape: {y_test.shape}")
print(f"y_train sample: {y_train[:5]}")
print(f"y_test sample: {y_test[:5]}")


y_train shape: (80, 2)
y_test shape: (20, 2)
y_train sample: [[1. 0.]
 [1. 0.]
 [1. 0.]
 [1. 0.]
 [1. 0.]]
y_test sample: [[1. 0.]
 [1. 0.]
 [0. 1.]
 [1. 0.]
 [1. 0.]]




In [None]:
import numpy as np

# 파일 경로
file_path = '/content/drive/MyDrive/dataset/preprocessed_videos/X_test_100.npy'

# 파일 로드
data = np.load(file_path)

# 데이터의 모양 출력
print("Shape of the data:", data.shape)

# 데이터 일부 출력
print("Data sample:", data[0])


Shape of the data: (19, 5, 224, 224, 3)
Data sample: [[[[0.00000000e+00 0.00000000e+00 0.00000000e+00]
   [0.00000000e+00 0.00000000e+00 0.00000000e+00]
   [0.00000000e+00 0.00000000e+00 0.00000000e+00]
   ...
   [1.44785721e+02 1.44428574e+02 1.38214279e+02]
   [1.43892853e+02 1.45892853e+02 1.38892853e+02]
   [1.37681122e+02 1.39681122e+02 1.32681122e+02]]

  [[0.00000000e+00 0.00000000e+00 0.00000000e+00]
   [0.00000000e+00 0.00000000e+00 0.00000000e+00]
   [0.00000000e+00 0.00000000e+00 0.00000000e+00]
   ...
   [1.45571426e+02 1.45214279e+02 1.39000000e+02]
   [1.43619904e+02 1.45619904e+02 1.38619904e+02]
   [1.31951538e+02 1.33951538e+02 1.26951538e+02]]

  [[0.00000000e+00 0.00000000e+00 0.00000000e+00]
   [0.00000000e+00 0.00000000e+00 0.00000000e+00]
   [0.00000000e+00 0.00000000e+00 0.00000000e+00]
   ...
   [1.46357147e+02 1.46000000e+02 1.39785721e+02]
   [1.34127548e+02 1.36127548e+02 1.29127548e+02]
   [6.56734695e+01 6.76734695e+01 6.06734695e+01]]

  ...

  [[0.0000000

In [None]:
import json
import pprint

# JSON 파일 로드
def load_json(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        return json.load(f)

# JSON 파일 경로
file_path = '/content/drive/MyDrive/annotations.json'

# JSON 데이터 로드
json_data = load_json(file_path)

# JSON 데이터의 구조를 보기 좋게 출력
pp = pprint.PrettyPrinter(indent=4)
pp.pprint(json_data)


[1;30;43m스트리밍 출력 내용이 길어서 마지막 5000줄이 삭제되었습니다.[0m
        'category_id': 'A002',
        'emotion': '중립',
        'face_b_box': [   662.4000000000001,
                          52.800000000000004,
                          303.99999999999994,
                          309.125],
        'img_name': 'SGA2100030S0199IMG0002.jpg'},
    {   'abnormal_behavior': '음주운전',
        'action': '무언가를보다',
        'body_b_box': [472, 49.925000000000004, 753.6, 670.075],
        'category_id': 'A002',
        'emotion': '중립',
        'face_b_box': [697.6, 54.400000000000006, 307.20000000000005, 321.925],
        'img_name': 'SGA2100030S0199IMG0003.jpg'},
    {   'abnormal_behavior': '음주운전',
        'action': '무언가를보다',
        'body_b_box': [472, 48.325, 752, 671.6750000000001],
        'category_id': 'A002',
        'emotion': '중립',
        'face_b_box': [697.6, 54.400000000000006, 310.40000000000003, 325.125],
        'img_name': 'SGA2100030S0199IMG0004.jpg'},
    {   'abnormal_behavior': '음주운전',
   

KeyboardInterrupt: 

In [None]:
from sklearn.preprocessing import OneHotEncoder

# 원-핫 인코딩
encoder = OneHotEncoder(sparse=False)
y_train = encoder.fit_transform(y_train)
y_test = encoder.transform(y_test)

# 데이터셋 확인
print(f"y_train shape: {y_train.shape}")
print(f"y_test shape: {y_test.shape}")
print(f"y_train sample: {y_train[:5]}")
print(f"y_test sample: {y_test[:5]}")


y_train shape: (80, 4)
y_test shape: (20, 4)
y_train sample: [[0. 1. 1. 0.]
 [0. 1. 1. 0.]
 [0. 1. 1. 0.]
 [0. 1. 1. 0.]
 [0. 1. 1. 0.]]
y_test sample: [[0. 1. 1. 0.]
 [0. 1. 1. 0.]
 [1. 0. 0. 1.]
 [0. 1. 1. 0.]
 [0. 1. 1. 0.]]




In [None]:
import json

# JSON 파일 로드
annotations_file_path = '/content/drive/MyDrive/annotations.json'
with open(annotations_file_path, 'r', encoding='utf-8') as f:
    annotations = json.load(f)

# 클래스별 데이터 수를 계산
class_counts = {}
for annotation in annotations:
    category_id = annotation['category_id']
    if category_id in class_counts:
        class_counts[category_id] += 1
    else:
        class_counts[category_id] = 1

# 클래스 인덱스와 행동 이름을 매핑하는 딕셔너리
class_labels = {
    'A001': '졸음운전',
    'A002': '음주운전',
    'A003': '물건찾기',
    'A004': '통화',
    'A005': '휴대폰조작',
    'A006': '차량제어',
    'A007': '운전자폭행'
}

# 클래스별 데이터 수 출력
for class_id, count in class_counts.items():
    print(f"{class_labels[class_id]}: {count}")


졸음운전: 3865
음주운전: 3745
차량제어: 1200
물건찾기: 2165
휴대폰조작: 905
통화: 745


In [None]:
import os
import json
import cv2
import numpy as np
from keras.preprocessing.image import img_to_array
from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder

# JSON 파일 로드
annotations_file_path = '/content/drive/MyDrive/annotations.json'
with open(annotations_file_path, 'r', encoding='utf-8') as f:
    annotations = json.load(f)

# 비디오 폴더 경로
video_folder_path = '/content/drive/MyDrive/dataset/영상'

# TSN 모델에 사용할 프레임 수
num_segments = 5

# 프레임을 전처리하는 함수
def preprocess_frame(frame):
    img_array = img_to_array(frame)
    resized_img = cv2.resize(img_array, (224, 224))  # 크기 조정
    return resized_img

# 모든 프레임을 전처리하는 함수
def preprocess_video_frames(video_path, num_segments):
    cap = cv2.VideoCapture(video_path)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    segment_length = frame_count // num_segments

    frames = []
    for i in range(num_segments):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i * segment_length)
        ret, frame = cap.read()
        if ret:
            frames.append(preprocess_frame(frame))
    cap.release()
    return np.array(frames)

# 비디오 파일 이름 추출하는 방법
def get_video_filename(img_name):
    base_name = img_name.split('IMG')[0]  # 예시: 'SGA2100044S0312'
    return base_name + '.mp4'  # 비디오 파일 이름 형식 맞추기

# 모든 비디오에 대해 전처리 수행
def process_videos_in_chunks(annotations, chunk_size=100):
    total_videos = len(annotations)
    used_videos = 0
    for start_idx in range(0, total_videos, chunk_size):
        end_idx = min(start_idx + chunk_size, total_videos)
        chunk_annotations = annotations[start_idx:end_idx]

        videos = []
        labels = []

        for annotation in chunk_annotations:
            video_name = get_video_filename(annotation['img_name'])
            video_path = os.path.join(video_folder_path, video_name)  # 비디오 폴더 경로 사용

            if not os.path.exists(video_path):
                print(f'Video file not found: {video_path}')
                continue

            video_frames = preprocess_video_frames(video_path, num_segments)
            if len(video_frames) == num_segments:  # 모든 프레임이 정상적으로 처리된 경우에만 추가
                videos.append(video_frames)
                labels.append(annotation['category_id'])
                used_videos += 1  # 사용된 비디오 파일 개수 증가

        # 예외처리: 유효한 비디오가 있는 경우에만 처리
        if videos:
            videos = np.array(videos)
            labels = np.array(labels)

            # 레이블을 One-Hot 인코딩
            encoder = OneHotEncoder(sparse_output=False)
            labels = labels.reshape(-1, 1)
            labels = encoder.fit_transform(labels)

            # 데이터셋을 학습용과 테스트용으로 분할
            X_train, X_test, y_train, y_test = train_test_split(videos, labels, test_size=0.2, random_state=42, stratify=labels)

            # 전처리된 데이터 및 레이블 저장 경로
            output_folder_path = '/content/drive/MyDrive/dataset/preprocessed_videos'
            if not os.path.exists(output_folder_path):
                os.makedirs(output_folder_path)

            np.save(os.path.join(output_folder_path, f'X_train_{start_idx}.npy'), X_train)
            np.save(os.path.join(output_folder_path, f'X_test_{start_idx}.npy'), X_test)
            np.save(os.path.join(output_folder_path, f'y_train_{start_idx}.npy'), y_train)
            np.save(os.path.join(output_folder_path, f'y_test_{start_idx}.npy'), y_test)

            print(f'Processed chunk {start_idx} to {end_idx}')
        else:
            print(f'No valid videos were processed in chunk {start_idx} to {end_idx}')

    print(f'Total used videos: {used_videos}')  # 사용된 비디오 파일 개수 출력

# 청크 단위로 비디오 전처리 수행
process_videos_in_chunks(annotations, chunk_size=100)


Processed chunk 0 to 100
Processed chunk 100 to 200
Processed chunk 200 to 300
Processed chunk 300 to 400
Processed chunk 400 to 500
Processed chunk 500 to 600
Processed chunk 600 to 700
Processed chunk 700 to 800
Processed chunk 800 to 900
Processed chunk 900 to 1000
Processed chunk 1000 to 1100
Processed chunk 1100 to 1200
Processed chunk 1200 to 1300
Processed chunk 1300 to 1400
Processed chunk 1400 to 1500
Processed chunk 1500 to 1600
Processed chunk 1600 to 1700
Processed chunk 1700 to 1800
Processed chunk 1800 to 1900
Processed chunk 1900 to 2000
Processed chunk 2000 to 2100
Processed chunk 2100 to 2200
Processed chunk 2200 to 2300
Processed chunk 2300 to 2400
Processed chunk 2400 to 2500
Processed chunk 2500 to 2600
Processed chunk 2600 to 2700
Processed chunk 2700 to 2800
Processed chunk 2800 to 2900
Processed chunk 2900 to 3000
Processed chunk 3000 to 3100
Processed chunk 3100 to 3200
Processed chunk 3200 to 3300
Processed chunk 3300 to 3400
Processed chunk 3400 to 3500
Proces

In [None]:
import os
import numpy as np

# 전처리된 데이터가 저장된 폴더 경로
output_folder_path = '/content/drive/MyDrive/dataset/preprocessed_videos'

# 데이터 로드 함수
def load_data_chunks(folder_path, prefix):
    data_list = []
    i = 0
    while True:
        file_path = os.path.join(folder_path, f'{prefix}_{i}.npy')
        if os.path.exists(file_path):
            data = np.load(file_path)
            data_list.append(data)
            i += 1
        else:
            break
    if len(data_list) > 0:
        return np.concatenate(data_list, axis=0)
    else:
        return np.array([])  # 비어 있는 경우 빈 배열 반환

# 모든 청크 데이터 로드
X_train = load_data_chunks(output_folder_path, 'X_train')
X_test = load_data_chunks(output_folder_path, 'X_test')
y_train = load_data_chunks(output_folder_path, 'y_train')
y_test = load_data_chunks(output_folder_path, 'y_test')

# 데이터셋이 올바르게 로드되었는지 확인
if X_train.size > 0 and X_test.size > 0 and y_train.size > 0 and y_test.size > 0:
    print(f"X_train shape: {X_train.shape}")
    print(f"X_test shape: {X_test.shape}")
    print(f"y_train shape: {y_train.shape}")
    print(f"y_test shape: {y_test.shape}")

    # 원-핫 인코딩을 다시 클래스로 변환
    y_train_classes = np.argmax(y_train, axis=1)
    y_test_classes = np.argmax(y_test, axis=1)

    # 각 클래스별 데이터 수를 계산
    train_class_counts = np.unique(y_train_classes, return_counts=True)
    test_class_counts = np.unique(y_test_classes, return_counts=True)

    # 클래스 인덱스와 행동 이름을 매핑하는 딕셔너리
    class_labels = {
        0: '졸음운전',
        1: '음주운전',
        2: '물건찾기',
        3: '통화',
        4: '휴대폰조작',
        5: '차량제어'
    }

    # 학습 데이터에서 각 클래스별 데이터 수 출력
    print("Train class distribution:")
    for class_idx, count in zip(train_class_counts[0], train_class_counts[1]):
        if class_idx == 3:  # 클래스 인덱스 3 (통화)만 출력
            print(f"{class_labels[class_idx]}: {count}")

    # 테스트 데이터에서 각 클래스별 데이터 수 출력
    print("\nTest class distribution:")
    for class_idx, count in zip(test_class_counts[0], test_class_counts[1]):
        if class_idx == 3:  # 클래스 인덱스 3 (통화)만 출력
            print(f"{class_labels[class_idx]}: {count}")
else:
    print("No valid data chunks found.")


X_train shape: (80, 5, 224, 224, 3)
X_test shape: (20, 5, 224, 224, 3)
y_train shape: (80, 2)
y_test shape: (20, 2)
Train class distribution:

Test class distribution:


In [None]:
import os
import json
from concurrent.futures import ThreadPoolExecutor

# JSON 파일이 저장된 최상위 경로 설정
json_folder_path = '/content/drive/MyDrive/dataset/DataLabeling(json)'

# 모든 JSON 파일 경로 재귀적으로 가져오기
json_files = []
for root, dirs, files in os.walk(json_folder_path):
    for file in files:
        if file.endswith('.json'):
            json_files.append(os.path.join(root, file))

print(f'Total JSON files found: {len(json_files)}')

# JSON 파일 로드 함수
def load_json(file):
    with open(file, 'r', encoding='utf-8') as f:
        return json.load(f)

# 이상행동 분류 함수
def classify_abnormal_behavior(category_id):
    mapping = {
        'A001': '졸음운전',
        'A002': '음주운전',
        'A003': '물건찾기',
        'A004': '통화',
        'A005': '휴대폰조작',
        'A006': '차량제어',
        'A007': '운전자폭행'
    }
    return mapping.get(category_id, 'unknown')

# 병렬로 JSON 파일 로드
annotations = []
with ThreadPoolExecutor() as executor:
    results = executor.map(load_json, json_files)
    for data in results:
        for scene in data.get('scene', {}).get('data', []):
            for occupant in scene.get('occupant', []):
                annotation = {
                    'img_name': scene.get('img_name', ''),
                    'body_b_box': occupant.get('body_b_box', []),
                    'face_b_box': occupant.get('face_b_box', []),
                    'action': occupant.get('action', ''),
                    'emotion': occupant.get('emotion', '')
                }
                # 'scene_info'에 이상행동 관련 정보가 있다고 가정
                if 'scene_info' in data:
                    category_id = data['scene_info'].get('category_id', '')
                    annotation['abnormal_behavior'] = classify_abnormal_behavior(category_id)
                    annotation['category_id'] = category_id  # category_id 추가
                annotations.append(annotation)

print(f'Total annotations loaded: {len(annotations)}')

# JSON 파일로 저장
output_file_path = '/content/drive/MyDrive/Annotations.json'
with open(output_file_path, 'w', encoding='utf-8') as f:
    json.dump(annotations, f, ensure_ascii=False, indent=4)

print(f'Annotations saved to {output_file_path}')


Total JSON files found: 8175
Total annotations loaded: 40875
Annotations saved to /content/drive/MyDrive/Annotations.json


In [None]:
import json

# JSON 파일 로드
annotations_file_path = '/content/drive/MyDrive/Annotations.json'
with open(annotations_file_path, 'r', encoding='utf-8') as f:
    annotations = json.load(f)

# 클래스별 데이터 수를 계산
class_counts = {}
for annotation in annotations:
    category_id = annotation['category_id']
    if category_id in class_counts:
        class_counts[category_id] += 1
    else:
        class_counts[category_id] = 1

# 클래스 인덱스와 행동 이름을 매핑하는 딕셔너리
class_labels = {
    'A001': '졸음운전',
    'A002': '음주운전',
    'A003': '물건찾기',
    'A004': '통화',
    'A005': '휴대폰조작',
    'A006': '차량제어',
    'A007': '운전자폭행'
}

# 클래스별 데이터 수 출력
for class_id, count in class_counts.items():
    print(f"{class_labels[class_id]}: {count}")


음주운전: 12450
졸음운전: 11810
물건찾기: 6795
차량제어: 3505
휴대폰조작: 3290
통화: 3025


In [None]:
import json

# JSON 파일 로드
annotations_file_path = '/content/drive/MyDrive/annotations.json'
with open(annotations_file_path, 'r', encoding='utf-8') as f:
    annotations = json.load(f)

# 클래스별 데이터 수를 계산
class_counts = {}
for annotation in annotations:
    category_id = annotation['category_id']
    if category_id in class_counts:
        class_counts[category_id] += 1
    else:
        class_counts[category_id] = 1

# 클래스 인덱스와 행동 이름을 매핑하는 딕셔너리
class_labels = {
    'A001': '졸음운전',
    'A002': '음주운전',
    'A003': '물건찾기',
    'A004': '통화',
    'A005': '휴대폰조작',
    'A006': '차량제어',
    'A007': '운전자폭행'
}

# 클래스별 데이터 수 출력
for class_id, count in class_counts.items():
    print(f"{class_labels[class_id]}: {count}")


졸음운전: 3865
음주운전: 3745
차량제어: 1200
물건찾기: 2165
휴대폰조작: 905
통화: 745


In [None]:
#청크 단위로 비디오를 전처리하고 임시 파일로 저장
import os
import json
import cv2
import numpy as np
from keras.preprocessing.image import img_to_array
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split

# JSON 파일 로드
annotations_file_path = '/content/drive/MyDrive/Annotations.json'
with open(annotations_file_path, 'r', encoding='utf-8') as f:
    annotations = json.load(f)

# 비디오 폴더 경로
video_folder_path = '/content/drive/MyDrive/dataset/영상'

# TSN 모델에 사용할 프레임 수
num_segments = 5

# 프레임을 전처리하는 함수
def preprocess_frame(frame):
    img_array = img_to_array(frame)
    resized_img = cv2.resize(img_array, (224, 224))  # 크기 조정
    return resized_img

# 모든 프레임을 전처리하는 함수
def preprocess_video_frames(video_path, num_segments):
    cap = cv2.VideoCapture(video_path)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    segment_length = frame_count // num_segments

    frames = []
    for i in range(num_segments):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i * segment_length)
        ret, frame = cap.read()
        if ret:
            frames.append(preprocess_frame(frame))
    cap.release()
    return np.array(frames)

# 비디오 파일 이름 추출하는 방법
def get_video_filename(img_name):
    base_name = img_name.split('IMG')[0]
    return base_name + '.mp4'  # 비디오 파일 이름 형식 맞추기

# 청크 단위로 비디오를 전처리하고 임시 파일로 저장
def process_videos_in_chunks(annotations, chunk_size=100):
    total_videos = len(annotations)
    used_videos = 0
    for start_idx in range(0, total_videos, chunk_size):
        end_idx = min(start_idx + chunk_size, total_videos)
        chunk_annotations = annotations[start_idx:end_idx]

        videos = []
        labels = []

        for annotation in chunk_annotations:
            video_name = get_video_filename(annotation['img_name'])
            video_path = os.path.join(video_folder_path, video_name)

            if not os.path.exists(video_path):
                print(f'Video file not found: {video_path}')
                continue

            video_frames = preprocess_video_frames(video_path, num_segments)
            if len(video_frames) == num_segments:  # 모든 프레임이 정상적으로 처리된 경우에만 추가
                videos.append(video_frames)
                labels.append(annotation['category_id'])
                used_videos += 1

        if videos:
            videos = np.array(videos)
            labels = np.array(labels).reshape(-1, 1)

            # 원-핫 인코딩
            encoder = OneHotEncoder(sparse_output=False)
            labels = encoder.fit_transform(labels)

            # 전처리된 데이터 및 레이블 저장 경로
            temp_folder_path = '/content/drive/MyDrive/dataset/Temp_preprocessed_videos'
            if not os.path.exists(temp_folder_path):
                os.makedirs(temp_folder_path)

            chunk_idx = start_idx // chunk_size
            np.save(os.path.join(temp_folder_path, f'X_{chunk_idx}.npy'), videos)
            np.save(os.path.join(temp_folder_path, f'y_{chunk_idx}.npy'), labels)

            print(f'Processed chunk {chunk_idx}: {start_idx} to {end_idx}')
        else:
            print(f'No valid videos processed in chunk {start_idx} to {end_idx}')

    print(f'Total used videos: {used_videos}')

# 청크 단위로 비디오 전처리 수행
process_videos_in_chunks(annotations, chunk_size=100)

# 청크 파일들을 합쳐서 전체 데이터셋으로 통합
def load_data_chunks(folder_path, prefix):
    data_list = []
    i = 0
    while True:
        file_path = os.path.join(folder_path, f'{prefix}_{i}.npy')
        if os.path.exists(file_path):
            data = np.load(file_path)
            data_list.append(data)
            i += 1
        else:
            break
    return np.concatenate(data_list, axis=0) if data_list else np.array([])

# 모든 청크 데이터 로드
X = load_data_chunks('/content/drive/MyDrive/dataset/Temp_preprocessed_videos', 'X')
y = load_data_chunks('/content/drive/MyDrive/dataset/Temp_preprocessed_videos', 'y')

# 데이터셋을 학습용과 테스트용으로 분할
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

# 전처리된 데이터 및 레이블 저장 경로
output_folder_path = '/content/drive/MyDrive/dataset/Preprocessed_videos'
if not os.path.exists(output_folder_path):
    os.makedirs(output_folder_path)

np.save(os.path.join(output_folder_path, 'X_train.npy'), X_train)
np.save(os.path.join(output_folder_path, 'X_test.npy'), X_test)
np.save(os.path.join(output_folder_path, 'y_train.npy'), y_train)
np.save(os.path.join(output_folder_path, 'y_test.npy'), y_test)

print('Data preprocessing complete and data saved.')


[1;30;43m스트리밍 출력 내용이 길어서 마지막 5000줄이 삭제되었습니다.[0m
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100046S0072.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100046S0072.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100046S0072.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100046S0072.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100046S0072.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100046S0073.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100046S0073.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100046S0073.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100046S0073.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100046S0073.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100046S0062.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100046S0062.mp4
Video file not found: /content/drive/M

In [None]:
import os
import numpy as np
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split

# 데이터 로드 함수 (청크 단위로 처리)
def load_data_chunks_in_batches(folder_path, prefix, batch_size):
    i = 0
    data_list = []
    while True:
        file_path = os.path.join(folder_path, f'{prefix}_{i}.npy')
        if os.path.exists(file_path):
            data = np.load(file_path)
            print(f"Loaded file: {file_path} with shape {data.shape}")  # 디버깅 정보 출력
            if data.size > 0:  # 데이터가 비어 있지 않은 경우에만 추가
                data_list.append(data)
                if sum(d.shape[0] for d in data_list) >= batch_size:
                    yield np.concatenate(data_list, axis=0)
                    data_list = []
            i += 1
        else:
            break
    if data_list:
        yield np.concatenate(data_list, axis=0)

# 모든 청크 데이터 로드
temp_folder_path = '/content/drive/MyDrive/dataset/Temp_preprocessed_videos'
batch_size = 3000  # 청크의 크기를 적절히 설정

X_list = []
y_list = []



# y 데이터 로드
for y_batch in load_data_chunks_in_batches(temp_folder_path, 'y', batch_size):
    y_list.append(y_batch)

# X 데이터 로드
for X_batch in load_data_chunks_in_batches(temp_folder_path, 'X', batch_size):
    X_list.append(X_batch)

# 모든 y 데이터를 하나로 합치기
y_combined = np.concatenate(y_list, axis=0)

# 모든 y 데이터를 (n, 1) 형태로 맞추기
if y_combined.ndim == 1:
    y_combined = y_combined.reshape(-1, 1)
elif y_combined.shape[1] != 1:
    y_combined = y_combined[:, :1]

# 실제로 존재하는 6개의 클래스에 대해 원-핫 인코딩 수행
encoder = OneHotEncoder(sparse_output=False, categories=[['A001', 'A002', 'A003', 'A004', 'A005', 'A006']])
y_encoded = encoder.fit_transform(y_combined)

# 데이터셋 분할
X = np.concatenate(X_list, axis=0)
y = y_encoded

print(f"Total X shape: {X.shape}")  # 디버깅 정보 출력
print(f"Total y shape: {y.shape}")  # 디버깅 정보 출력

if X.size == 0 or y.size == 0:
    print("No valid data found. Please check the data chunks.")
else:
    # 데이터셋을 학습용과 테스트용으로 분할
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

    # 전처리된 데이터 및 레이블 저장 경로
    output_folder_path = '/content/drive/MyDrive/dataset/Preprocessed_videos'
    if not os.path.exists(output_folder_path):
        os.makedirs(output_folder_path)

    # 데이터 저장
    np.save(os.path.join(output_folder_path, 'X_train.npy'), X_train)
    np.save(os.path.join(output_folder_path, 'X_test.npy'), X_test)
    np.save(os.path.join(output_folder_path, 'y_train.npy'), y_train)
    np.save(os.path.join(output_folder_path, 'y_test.npy'), y_test)

    print('Data preprocessing complete and data saved.')

    # 각 클래스별 데이터 수를 확인하여 출력
    def print_class_distribution(y_data, class_labels):
        y_classes = np.argmax(y_data, axis=1)
        class_counts = np.unique(y_classes, return_counts=True)

        for class_idx, count in zip(class_counts[0], class_counts[1]):
            print(f"{class_labels[class_idx]}: {count}")

    # 클래스 인덱스와 행동 이름을 매핑하는 딕셔너리
    class_labels = {
        0: '졸음운전',
        1: '음주운전',
        2: '물건찾기',
        3: '통화',
        4: '휴대폰조작',
        5: '차량제어'
    }

    print("\nTrain class distribution:")
    print_class_distribution(y_train, class_labels)

    print("\nTest class distribution:")
    print_class_distribution(y_test, class_labels)


Loaded file: /content/drive/MyDrive/dataset/Temp_preprocessed_videos/y_0.npy with shape (50, 1)
Loaded file: /content/drive/MyDrive/dataset/Temp_preprocessed_videos/y_1.npy with shape (60, 1)
Loaded file: /content/drive/MyDrive/dataset/Temp_preprocessed_videos/y_2.npy with shape (60, 2)
Loaded file: /content/drive/MyDrive/dataset/Temp_preprocessed_videos/y_3.npy with shape (45, 1)
Loaded file: /content/drive/MyDrive/dataset/Temp_preprocessed_videos/y_4.npy with shape (45, 2)
Loaded file: /content/drive/MyDrive/dataset/Temp_preprocessed_videos/y_5.npy with shape (55, 1)
Loaded file: /content/drive/MyDrive/dataset/Temp_preprocessed_videos/y_6.npy with shape (80, 1)
Loaded file: /content/drive/MyDrive/dataset/Temp_preprocessed_videos/y_7.npy with shape (55, 1)
Loaded file: /content/drive/MyDrive/dataset/Temp_preprocessed_videos/y_8.npy with shape (50, 1)
Loaded file: /content/drive/MyDrive/dataset/Temp_preprocessed_videos/y_9.npy with shape (55, 2)
Loaded file: /content/drive/MyDrive/data

ValueError: all the input array dimensions except for the concatenation axis must match exactly, but along dimension 1, the array at index 0 has size 1 and the array at index 2 has size 2

In [None]:
import os
import json
import cv2
import numpy as np
from keras.preprocessing.image import img_to_array
from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import train_test_split

# JSON 파일 로드
annotations_file_path = '/content/drive/MyDrive/Annotations.json'
with open(annotations_file_path, 'r', encoding='utf-8') as f:
    annotations = json.load(f)

# 비디오 폴더 경로
video_folder_path = '/content/drive/MyDrive/dataset/영상'

# TSN 모델에 사용할 프레임 수
num_segments = 5

# 프레임을 전처리하는 함수
def preprocess_frame(frame):
    img_array = img_to_array(frame)
    resized_img = cv2.resize(img_array, (224, 224))  # 크기 조정
    return resized_img

# 모든 프레임을 전처리하는 함수
def preprocess_video_frames(video_path, num_segments):
    cap = cv2.VideoCapture(video_path)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    segment_length = frame_count // num_segments

    frames = []
    for i in range(num_segments):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i * segment_length)
        ret, frame = cap.read()
        if ret:
            frames.append(preprocess_frame(frame))
    cap.release()
    return np.array(frames)

# 비디오 파일 이름 추출하는 방법
def get_video_filename(img_name):
    base_name = img_name.split('IMG')[0]  # 예시: 'SGA2100044S0312'
    return base_name + '.mp4'  # 비디오 파일 이름 형식 맞추기

# 모든 비디오에 대해 전처리 수행
def process_videos_in_chunks(annotations, chunk_size=100):
    total_videos = len(annotations)
    used_videos = 0
    for start_idx in range(0, total_videos, chunk_size):
        end_idx = min(start_idx + chunk_size, total_videos)
        chunk_annotations = annotations[start_idx:end_idx]

        videos = []
        labels = []

        for annotation in chunk_annotations:
            video_name = get_video_filename(annotation['img_name'])
            video_path = os.path.join(video_folder_path, video_name)  # 비디오 폴더 경로 사용

            if not os.path.exists(video_path):
                print(f'Video file not found: {video_path}')
                continue

            video_frames = preprocess_video_frames(video_path, num_segments)
            if len(video_frames) == num_segments:  # 모든 프레임이 정상적으로 처리된 경우에만 추가
                videos.append(video_frames)
                labels.append(annotation['category_id'])
                used_videos += 1  # 사용된 비디오 파일 개수 증가

        # 예외처리: 유효한 비디오가 있는 경우에만 처리
        if videos:
            videos = np.array(videos)
            labels = np.array(labels)

            # 레이블을 One-Hot 인코딩
            lb = LabelBinarizer()
            labels = lb.fit_transform(labels)

            # 데이터셋을 학습용과 테스트용으로 분할
            X_train, X_test, y_train, y_test = train_test_split(videos, labels, test_size=0.2, random_state=42)

            # 전처리된 데이터 및 레이블 저장 경로
            output_folder_path = '/content/drive/MyDrive/dataset/Processed_data'
            if not os.path.exists(output_folder_path):
                os.makedirs(output_folder_path)

            np.save(os.path.join(output_folder_path, f'X_train_{start_idx}.npy'), X_train)
            np.save(os.path.join(output_folder_path, f'X_test_{start_idx}.npy'), X_test)
            np.save(os.path.join(output_folder_path, f'y_train_{start_idx}.npy'), y_train)
            np.save(os.path.join(output_folder_path, f'y_test_{start_idx}.npy'), y_test)

            print(f'Processed chunk {start_idx} to {end_idx}')
        else:
            print(f'No valid videos were processed in chunk {start_idx} to {end_idx}')

    print(f'Total used videos: {used_videos}')  # 사용된 비디오 파일 개수 출력

# 청크 단위로 비디오 전처리 수행
process_videos_in_chunks(annotations, chunk_size=100)


In [None]:
import json

# JSON 파일 로드
annotations_file_path = '/content/drive/MyDrive/Annotations.json'
with open(annotations_file_path, 'r', encoding='utf-8') as f:
    annotations = json.load(f)

# 클래스별 데이터 수를 계산
class_counts = {}
for annotation in annotations:
    category_id = annotation['category_id']
    if category_id in class_counts:
        class_counts[category_id] += 1
    else:
        class_counts[category_id] = 1

# 클래스 인덱스와 행동 이름을 매핑하는 딕셔너리
class_labels = {
    'A001': '졸음운전',
    'A002': '음주운전',
    'A003': '물건찾기',
    'A004': '통화',
    'A005': '휴대폰조작',
    'A006': '차량제어',
    'A007': '운전자폭행'
}

# 클래스별 데이터 수 출력
for class_id, count in class_counts.items():
    print(f"{class_labels[class_id]}: {count}")

# 존재하지 않는 클래스 목록
missing_classes = [key for key in class_labels.keys() if key not in class_counts.keys()]
print(f"Missing classes: {missing_classes}")


음주운전: 12450
졸음운전: 11810
물건찾기: 6795
차량제어: 3505
휴대폰조작: 3290
통화: 3025
Missing classes: ['A007']


In [23]:
import os
import json
import cv2
import numpy as np
from keras.preprocessing.image import img_to_array
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split

# JSON 파일 로드
annotations_file_path = '/content/drive/MyDrive/Annotations.json'
with open(annotations_file_path, 'r', encoding='utf-8') as f:
    annotations = json.load(f)

# 비디오 폴더 경로
video_folder_path = '/content/drive/MyDrive/dataset/영상'

# TSN 모델에 사용할 프레임 수
num_segments = 5

# 프레임을 전처리하는 함수
def preprocess_frame(frame):
    img_array = img_to_array(frame)
    resized_img = cv2.resize(img_array, (224, 224))  # 크기 조정
    return resized_img

# 모든 프레임을 전처리하는 함수
def preprocess_video_frames(video_path, num_segments):
    cap = cv2.VideoCapture(video_path)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    segment_length = frame_count // num_segments

    frames = []
    for i in range(num_segments):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i * segment_length)
        ret, frame = cap.read()
        if ret:
            frames.append(preprocess_frame(frame))
    cap.release()
    return np.array(frames)

# 비디오 파일 이름 추출하는 방법
def get_video_filename(img_name):
    base_name = img_name.split('IMG')[0]
    return base_name + '.mp4'  # 비디오 파일 이름 형식 맞추기

# 클래스별 데이터 제한 수
max_samples_per_class = 500  # 500개로 설정

# 클래스별로 데이터 개수를 세기 위한 딕셔너리 초기화
class_counts = {
    'A001': 0,
    'A002': 0,
    'A003': 0,
    'A004': 0,
    'A005': 0,
    'A006': 0,
    'A007': 0
}

# 모든 비디오에 대해 전처리 수행
def process_videos_in_chunks(annotations, chunk_size=100):
    total_videos = len(annotations)
    used_videos = 0
    all_labels = []

    for start_idx in range(0, total_videos, chunk_size):
        end_idx = min(start_idx + chunk_size, total_videos)
        chunk_annotations = annotations[start_idx:end_idx]

        videos = []
        labels = []

        for annotation in chunk_annotations:
            category_id = annotation['category_id']
            if class_counts[category_id] >= max_samples_per_class:
                continue

            video_name = get_video_filename(annotation['img_name'])
            video_path = os.path.join(video_folder_path, video_name)

            if not os.path.exists(video_path):
                print(f'Video file not found: {video_path}')
                continue

            video_frames = preprocess_video_frames(video_path, num_segments)
            if len(video_frames) == num_segments:  # 모든 프레임이 정상적으로 처리된 경우에만 추가
                videos.append(video_frames)
                labels.append(category_id)
                all_labels.append(category_id)
                used_videos += 1
                class_counts[category_id] += 1

        if videos:
            videos = np.array(videos)
            labels = np.array(labels)

            # 데이터셋을 학습용과 테스트용으로 분할
            X_train, X_test, y_train, y_test = train_test_split(videos, labels, test_size=0.2, random_state=42)

            # 전처리된 데이터 및 레이블 저장 경로
            output_folder_path = '/content/drive/MyDrive/dataset/Processed_data'
            if not os.path.exists(output_folder_path):
                os.makedirs(output_folder_path)

            np.save(os.path.join(output_folder_path, f'X_train_{start_idx}.npy'), X_train)
            np.save(os.path.join(output_folder_path, f'X_test_{start_idx}.npy'), X_test)
            np.save(os.path.join(output_folder_path, f'y_train_{start_idx}.npy'), y_train)
            np.save(os.path.join(output_folder_path, f'y_test_{start_idx}.npy'), y_test)

            print(f'Processed chunk {start_idx} to {end_idx}')
        else:
            print(f'No valid videos were processed in chunk {start_idx} to {end_idx}')

    if len(all_labels) > 0:
        # 모든 레이블을 모아서 원-핫 인코딩 수행
        encoder = OneHotEncoder(sparse_output=False, categories=[['A001', 'A002', 'A003', 'A004', 'A005', 'A006', 'A007']])
        all_labels = np.array(all_labels).reshape(-1, 1)
        y_encoded = encoder.fit_transform(all_labels)

        # 저장된 레이블 파일을 다시 로드하여 업데이트
        for start_idx in range(0, total_videos, chunk_size):
            y_train_file_path = os.path.join(output_folder_path, f'y_train_{start_idx}.npy')
            y_test_file_path = os.path.join(output_folder_path, f'y_test_{start_idx}.npy')

            if os.path.exists(y_train_file_path):
                y_train = np.load(y_train_file_path).reshape(-1, 1)
                y_test = np.load(y_test_file_path).reshape(-1, 1)

                y_train_encoded = encoder.transform(y_train)
                y_test_encoded = encoder.transform(y_test)

                np.save(y_train_file_path, y_train_encoded)
                np.save(y_test_file_path, y_test_encoded)

    print(f'Total used videos: {used_videos}')  # 사용된 비디오 파일 개수 출력

    # 각 클래스별 데이터 개수 출력
    for class_id, count in class_counts.items():
        print(f"Class {class_id} count: {count}")

    # y_train과 y_test 샘플 출력
    y_train_samples = np.load(os.path.join(output_folder_path, 'y_train_0.npy'))
    y_test_samples = np.load(os.path.join(output_folder_path, 'y_test_0.npy'))

    print(f"\ny_train shape: {y_train_samples.shape}")
    print(f"y_test shape: {y_test_samples.shape}")
    print(f"y_train sample:\n{y_train_samples[:5]}")
    print(f"y_test sample:\n{y_test_samples[:5]}")

# 청크 단위로 비디오 전처리 수행
process_videos_in_chunks(annotations, chunk_size=100)


Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100002S0032.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100002S0032.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100002S0032.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100002S0032.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100002S0032.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100002S0031.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100002S0031.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100002S0031.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100002S0031.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100002S0031.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100002S0028.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100002S0028.mp4
Video file not found: /content/drive/MyDrive/dataset/영상/SGA2100002S0028.mp4
Video file n

In [25]:
import os
import numpy as np

# 폴더 경로
folder_path = '/content/drive/MyDrive/dataset/Processed_data'

# y_train 파일들 로드 및 출력
y_train_files = [f for f in os.listdir(folder_path) if f.startswith('y_train_') and f.endswith('.npy')]
for y_train_file in y_train_files:
    y_train_file_path = os.path.join(folder_path, y_train_file)
    y_train = np.load(y_train_file_path)
    print(f"Loaded file: {y_train_file_path} with shape {y_train.shape}")
    print("y_train sample:")
    print(y_train[:5])  # y_train 샘플 출력
    print()  # 줄 바꿈

# y_test 파일들 로드 및 출력
y_test_files = [f for f in os.listdir(folder_path) if f.startswith('y_test_') and f.endswith('.npy')]
for y_test_file in y_test_files:
    y_test_file_path = os.path.join(folder_path, y_test_file)
    y_test = np.load(y_test_file_path)
    print(f"Loaded file: {y_test_file_path} with shape {y_test.shape}")
    print("y_test sample:")
    print(y_test[:5])  # y_test 샘플 출력
    print()  # 줄 바꿈


Loaded file: /content/drive/MyDrive/dataset/Processed_data/y_train_0.npy with shape (40, 7)
y_train sample:
[[0. 1. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0.]]

Loaded file: /content/drive/MyDrive/dataset/Processed_data/y_train_100.npy with shape (48, 7)
y_train sample:
[[0. 1. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0.]]

Loaded file: /content/drive/MyDrive/dataset/Processed_data/y_train_200.npy with shape (48, 7)
y_train sample:
[[0. 1. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0.]
 [0. 1. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0.]]

Loaded file: /content/drive/MyDrive/dataset/Processed_data/y_train_300.npy with shape (36, 7)
y_train sample:
[[1. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0.]
 [1. 0. 0. 0. 0. 0. 0.]]

Loaded file: /content/drive/MyDrive/dataset/Processed_data/y_train_400.npy

In [27]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout, TimeDistributed, LSTM
from tensorflow.keras.optimizers import Adam

# 경로 설정
output_folder_path = '/content/drive/MyDrive/dataset/Processed_data'

# 전처리된 데이터 로드
X_train = np.load(os.path.join(output_folder_path, 'X_train_0.npy'))
X_test = np.load(os.path.join(output_folder_path, 'X_test_0.npy'))
y_train = np.load(os.path.join(output_folder_path, 'y_train_0.npy'))
y_test = np.load(os.path.join(output_folder_path, 'y_test_0.npy'))

print(f"X_train shape: {X_train.shape}")
print(f"X_test shape: {X_test.shape}")
print(f"y_train shape: {y_train.shape}")
print(f"y_test shape: {y_test.shape}")

# TSN 모델 설계
def build_tsn_model(input_shape, num_classes):
    model = Sequential()

    # CNN 기반 모델
    model.add(TimeDistributed(Conv2D(32, (3, 3), activation='relu'), input_shape=input_shape))
    model.add(TimeDistributed(MaxPooling2D((2, 2))))
    model.add(TimeDistributed(Conv2D(64, (3, 3), activation='relu')))
    model.add(TimeDistributed(MaxPooling2D((2, 2))))
    model.add(TimeDistributed(Conv2D(128, (3, 3), activation='relu')))
    model.add(TimeDistributed(MaxPooling2D((2, 2))))
    model.add(TimeDistributed(Flatten()))

    # LSTM 레이어
    model.add(LSTM(128, return_sequences=False))
    model.add(Dropout(0.5))

    # 완전 연결층
    model.add(Dense(128, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(num_classes, activation='softmax'))

    return model

# 모델 인풋 형태 설정
input_shape = (X_train.shape[1], X_train.shape[2], X_train.shape[3], X_train.shape[4])
num_classes = y_train.shape[1]

# 모델 빌드
model = build_tsn_model(input_shape, num_classes)

# 모델 컴파일
model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])

# 모델 학습
model.fit(X_train, y_train, epochs=50, batch_size=32, validation_data=(X_test, y_test))

# 모델 저장
model.save(os.path.join(output_folder_path, 'tsn_model.h5'))

print("Model training complete and model saved.")


X_train shape: (40, 5, 224, 224, 3)
X_test shape: (10, 5, 224, 224, 3)
y_train shape: (40, 7)
y_test shape: (10, 7)
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


  saving_api.save_model(


Model training complete and model saved.


In [42]:
import os
import cv2
import numpy as np
from keras.models import load_model
from keras.preprocessing.image import img_to_array

# 모델 로드
model_path = '/content/drive/MyDrive/dataset/tsn_model.h5'
model = load_model(model_path)

# TSN 모델에 사용할 프레임 수
num_segments = 5

# 프레임을 전처리하는 함수
def preprocess_frame(frame):
    img_array = img_to_array(frame)
    resized_img = cv2.resize(img_array, (224, 224))  # 크기 조정
    return resized_img

# 모든 프레임을 전처리하는 함수
def preprocess_video_frames(video_path, num_segments):
    cap = cv2.VideoCapture(video_path)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    segment_length = frame_count // num_segments

    frames = []
    for i in range(num_segments):
        cap.set(cv2.CAP_PROP_POS_FRAMES, i * segment_length)
        ret, frame = cap.read()
        if ret:
            frames.append(preprocess_frame(frame))
    cap.release()
    return np.array(frames)

# 비디오 파일 경로
video_file_path = '/content/drive/MyDrive/SGA2100234S0054.mp4'  # 여기에 테스트할 비디오 파일 경로를 입력하세요

# 비디오 프레임 전처리
video_frames = preprocess_video_frames(video_file_path, num_segments)
if len(video_frames) == num_segments:  # 모든 프레임이 정상적으로 처리된 경우에만 예측 수행
    video_frames = np.expand_dims(video_frames, axis=0)  # 모델 입력에 맞게 차원 추가

    # 예측 수행
    predictions = model.predict(video_frames)
    predicted_class = np.argmax(predictions, axis=1)

    # 클래스 인덱스와 행동 이름을 매핑하는 딕셔너리
    class_labels = {
        0: '졸음운전',
        1: '음주운전',
        2: '물건찾기',
        3: '통화',
        4: '휴대폰조작',
        5: '차량제어'
    }

    print(f"Predicted class: {class_labels[predicted_class[0]]}")
else:
    print("Error: The video does not have the required number of segments.")


Predicted class: 졸음운전


In [38]:
import tensorflow as tf
from tensorflow.keras.models import load_model
import os

# 모델 경로 설정
model_path_1 = '/content/drive/MyDrive/dataset/tsn_model.h5'
model_path_2 = '/content/drive/MyDrive/dataset/Processed_data/tsn_model.h5'
final_model_path = '/content/drive/MyDrive/dataset/Final_tsn_model.h5'

# 모델 로드
model_1 = load_model(model_path_1)
model_2 = load_model(model_path_2)

# 모델 구조 및 가중치 확인
model_1.summary()
model_2.summary()

# 여기서는 model_2를 선택한다고 가정& 필요에 따라 model_1을 선택할 수도 있다.
selected_model = model_2

# 모델 저장
selected_model.save(final_model_path)

print(f"Selected model has been saved to {final_model_path}")


Model: "model_6"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_7 (InputLayer)        [(None, 5, 224, 224, 3)   0         
                             ]                                   
                                                                 
 time_distributed_44 (TimeD  (None, 5, 222, 222, 32)   896       
 istributed)                                                     
                                                                 
 time_distributed_45 (TimeD  (None, 5, 111, 111, 32)   0         
 istributed)                                                     
                                                                 
 time_distributed_46 (TimeD  (None, 5, 109, 109, 64)   18496     
 istributed)                                                     
                                                                 
 time_distributed_47 (TimeD  (None, 5, 54, 54, 64)     0   

In [31]:
import os
import json

def find_first_file_by_category(root_folder):
    # 클래스별 파일을 저장할 딕셔너리
    category_files = {
        'A001': None,
        'A002': None,
        'A003': None,
        'A004': None,
        'A005': None,
        'A006': None,
        'A007': None
    }

    # 경로 순회하며 JSON 파일 찾기
    for foldername, subfolders, filenames in os.walk(root_folder):
        for filename in filenames:
            if filename.endswith('.json'):
                file_path = os.path.join(foldername, filename)
                with open(file_path, 'r', encoding='utf-8') as f:
                    data = json.load(f)
                    category_id = data['scene_info']['category_id']
                    if category_id in category_files and category_files[category_id] is None:
                        category_files[category_id] = file_path

    return category_files

# 루트 폴더 경로 설정
root_folder = '/content/drive/MyDrive/DataSet/라벨링데이터/SGA2100092'

# 파일 찾기 실행
category_files = find_first_file_by_category(root_folder)

# 결과 출력
for category_id, file in category_files.items():
    if file is not None:
        print(f"\nCategory {category_id} first file: {file}")
    else:
        print(f"\nCategory {category_id} has no files")



Category A001 first file: /content/drive/MyDrive/DataSet/라벨링데이터/SGA2100092/SGA2100092S0010/label/SGA2100092S0010.json

Category A002 first file: /content/drive/MyDrive/DataSet/라벨링데이터/SGA2100092/SGA2100092S0019/label/SGA2100092S0019.json

Category A003 has no files

Category A004 has no files

Category A005 has no files

Category A006 has no files

Category A007 has no files


In [45]:
import tensorflow as tf

# 모델 경로
model_path = '/content/drive/MyDrive/dataset/Processed_data/tsn_model.h5'
tflite_model_path = '/content/drive/MyDrive/dataset/Processed_data/tsn_model.tflite'

# Keras 모델 로드
model = tf.keras.models.load_model(model_path)

# TensorFlow Lite Converter 사용
converter = tf.lite.TFLiteConverter.from_keras_model(model)
converter.target_spec.supported_ops = [
    tf.lite.OpsSet.TFLITE_BUILTINS,  # 기본 TFLite 연산
    tf.lite.OpsSet.SELECT_TF_OPS  # TensorFlow 연산 사용 허용
]
converter._experimental_lower_tensor_list_ops = False

tflite_model = converter.convert()

# TensorFlow Lite 모델 저장
with open(tflite_model_path, 'wb') as f:
    f.write(tflite_model)

print(f"TensorFlow Lite model saved to {tflite_model_path}")


TensorFlow Lite model saved to /content/drive/MyDrive/dataset/Processed_data/tsn_model.tflite
