In [None]:
# Plotly Dash로 AWS S3에 데이터 적재하기 - 로컬에서 실행

# S3 데이터 적재

import dash
from dash import dcc, html
from dash.dependencies import Input, Output, State
import boto3
from botocore.exceptions import NoCredentialsError
from io import BytesIO
import base64

# AWS S3 관련 설정 - 변경 필요
AWS_ACCESS_KEY = 'your_access_key'
AWS_SECRET_KEY = 'your_secret_key'
S3_BUCKET = 'your_s3_bucket_name'
S3_REGION = 'your_s3_region'


# Dash 애플리케이션 초기화
app = dash.Dash(__name__)


# 파일 업로드 및 S3 업로드 콜백 함수
@app.callback(
    Output('video-player', 'src'),
    Output('video-player', 'controls'),
    Output('upload-output', 'children'),  # 업로드 결과를 출력할 Div 추가
    Input('upload-video', 'contents'),
    Input('upload-video', 'filename')
)


def update_video_and_upload(contents, filename):
    if contents is not None and filename is not None:
        content_type, content_string = contents.split(',')
        decoded = base64.b64decode(content_string)
        video_src = f'data:video/mp4;base64,{base64.b64encode(decoded).decode()}'
        controls = True

        try:
            # AWS S3에 파일 업로드
            s3 = boto3.client('s3', aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY, region_name=S3_REGION)
            
            # BytesIO를 사용하여 contents를 읽어옴
            file_content = base64.b64decode(content_string)
            file_stream = BytesIO(file_content)

            # 파일의 원래 이름 가져오기
            original_filename = filename  # 업로드된 파일의 이름으로 설정

            # S3에 업로드
            s3.upload_fileobj(file_stream, S3_BUCKET, original_filename)

            upload_result = html.Div([
                f'파일 "{original_filename}"이 성공적으로 업로드되었습니다.'
            ])
        except NoCredentialsError:
            upload_result = 'AWS 자격 증명이 올바르지 않습니다.'
        except Exception as e:
            upload_result = f'에러 발생: {str(e)}'

        return video_src, controls, upload_result
    else:
        return dash.no_update, dash.no_update, dash.no_update


# 레이아웃 설정
app.layout = html.Div([
    dcc.Upload(
        id='upload-video',
        children=html.Div([
            'Drag and Drop or ',
            html.A('Select a Video')
        ]),
        multiple=False,
        style={
            'width': '50%',
            'height': '30px',
            'lineHeight': '30px',
            'borderWidth': '1px',
            'borderStyle': 'dashed',
            'borderRadius': '5px',
            'textAlign': 'center',
            'margin': '10px',
            'margin-left': 'auto',
            'margin-right': 'auto'
        }
    ),
    html.Video(
        id='video-player',
        controls=False,
        style={'width': '50%'}
    ),
    html.Div(id='upload-output')  # 업로드 결과를 출력할 Div 추가
])


# 애플리케이션 실행
if __name__ == '__main__':
    app.run_server(debug=True)

In [None]:
# S3에서 모델 및 저장되어 있는 동영상 전체를 가져온 후 감정분석 진행
# 각 영상마다 json 파일을 생성하고 데이터는 RDS에 저장 - Google Colab에서 실행


# 감정인식 모델 - CNN 모델 사용
# S3에 저장되어 있는 모든 영상 데이터 및 CNN 모델을 가져와서 감정분석 진행
# 결과값은 각 영상마다 json 파일로 저장
# josn 파일에 저장되어 있는 데이터 RDS에 저장하기

import cv2
import tensorflow as tf
from tensorflow.keras.models import load_model
import json
import os
from collections import OrderedDict
import boto3
from botocore.exceptions import NoCredentialsError
import ntpath
import mysql.connector

# AWS S3 및 MySQL 연결 정보
AWS_ACCESS_KEY = 'your_access_key'
AWS_SECRET_KEY = 'your_secret_key'
BUCKET_NAME = 'your_s3_bucket_name'

DB_CONFIG = {
	  'host': 'your_rds_host',
    'user': 'your_username',
    'password': 'your_password',
    'database': 'your_database',
}

# S3 설정
s3 = boto3.client('s3', aws_access_key_id=AWS_ACCESS_KEY, aws_secret_access_key=AWS_SECRET_KEY)

# 동영상 파일이 있는 로컬 경로
local_folder_path = 'C:/Users/user/Downloads/test'

# 결과를 저장할 리스트 초기화
emotion_data_list = []

# MySQL 연결
connection = mysql.connector.connect(**DB_CONFIG)
cursor = connection.cursor()

# 테이블 생성 함수
def create_table(cursor):
    create_table_query = """
    CREATE TABLE IF NOT EXISTS emotion_data (
        id INT AUTO_INCREMENT PRIMARY KEY,
        video_name VARCHAR(255),
        angry INT,
        disgust INT,
        fear INT,
        happy INT,
        sad INT,
        surprise INT,
        neutral INT
    )
    """
    cursor.execute(create_table_query)

# 테이블 생성
create_table(cursor)

# AWS S3에서 모든 객체 목록 가져오기
def download_video_from_s3(file_name, local_path):
    try:
        print(f"{file_name}를 {local_path}로 다운로드 중입니다.")
        s3.download_file(BUCKET_NAME, file_name, local_path)
    except NoCredentialsError:
        print("자격 증명을 찾을 수 없습니다.")
    except Exception as e:
        print(f"{file_name} 다운로드 중 오류 발생: {e}")

def download_all_videos_from_s3(local_folder_path):
    objects = s3.list_objects(Bucket=BUCKET_NAME)['Contents']
    video_objects = [obj['Key'] for obj in objects if obj['Key'].endswith('.mp4')]
    
    for video_object in video_objects:
        download_video_from_s3(video_object, os.path.join(local_folder_path, video_object))

# 얼굴 감지기 초기화 (Haar Cascade 사용)
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

# 동영상 및 감정 분석 모델 다운로드
download_all_videos_from_s3(local_folder_path)
download_video_from_s3('emotion_model.h5', 'emotion_model.h5')

# 감정 레이블
emotion_labels = {0: 'Angry', 1: 'Disgust', 2: 'Fear', 3: 'Happy', 4: 'Sad', 5: 'Surprise', 6: 'Neutral'}

# 감정 분석 모델 로드
emotion_model = load_model('emotion_model.h5')

# MySQL에 데이터 삽입 함수
def insert_data(cursor, video_name, emotion_counts):
    insert_query = """
    INSERT INTO emotion_data (video_name, angry, disgust, fear, happy, sad, surprise, neutral)
    VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
    """
    
    data_tuple = (video_name, emotion_counts['Angry'], emotion_counts['Disgust'],
                  emotion_counts['Fear'], emotion_counts['Happy'], emotion_counts['Sad'],
                  emotion_counts['Surprise'], emotion_counts['Neutral'])
    
    cursor.execute(insert_query, data_tuple)

# 각 동영상에 대한 처리
for file_name in os.listdir(local_folder_path):
    if file_name.endswith('.mp4'):
        video_path = os.path.join(local_folder_path, file_name)
        video_name = os.path.splitext(ntpath.basename(file_name))[0]

        # 동영상 파일 열기
        cap = cv2.VideoCapture(video_path)
        emotion_counts = {emotion: 0 for emotion in emotion_labels.values()}
        frame_count = 0
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            faces = face_cascade.detectMultiScale(gray, scaleFactor=1.3, minNeighbors=8, minSize=(30, 30))
            if len(faces) > 0:
                x, y, w, h = faces[0]
                face = gray[y:y + h, x:x + w]
                face = cv2.resize(face, (48, 48))
                face = face / 255.0
                
                if frame_count % 5 == 0:
                    face = tf.expand_dims(face, axis=-1)
                    face = tf.expand_dims(face, axis=0)
                    emotion_prediction = emotion_model.predict(face)
                    emotion_label = emotion_labels[tf.argmax(emotion_prediction, axis=1).numpy()[0]]
                    emotion_counts[emotion_label] += 1
            frame_count += 1
        cap.release()
        
        # 결과를 리스트에 추가
        result_for_video = {'video_name': video_name, 'emotion_counts': emotion_counts}
        emotion_data_list.append(result_for_video)

        # MySQL에 데이터 삽입
        insert_data(cursor, video_name, emotion_counts)

        # JSON 파일로 결과 저장
        output_json_path = os.path.join(local_folder_path, f'{video_name}_EA.json')
        with open(output_json_path, 'w', encoding='utf-8') as f:
            json.dump(result_for_video, f, ensure_ascii=False, indent=4)

# 변경사항 커밋 및 연결 종료
connection.commit()
connection.close()

In [None]:
# RDS에 데이터가 잘 적재되었는지 확인 - Google Colab에서 실행

import mysql.connector

# RDS 연결 정보
DB_CONFIG = {
	  'host': 'your_rds_host',
    'user': 'your_username',
    'password': 'your_password',
    'database': 'your_database',
}

# MySQL 연결
connection = mysql.connector.connect(**DB_CONFIG)
cursor = connection.cursor()

# 예시 쿼리 실행
cursor.execute("SELECT * FROM emotion_data")
result = cursor.fetchall()
for row in result:
    print(row)

# 연결 종료
connection.close()

In [None]:
# RDS에 적재되어 있는 영상 감정분석 데이터를 가져온 후 Dash에서 그래프 만들기 - Google Colab에서 실행

# RDS에서 데이터를 가져온 후 영상별로 그래프로 나타내기

import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output
import mysql.connector
import pandas as pd
import plotly.express as px

# RDS 연결 정보
DB_CONFIG = {
	  'host': 'your_rds_host',
    'user': 'your_username',
    'password': 'your_password',
    'database': 'your_database',
}

# MySQL 연결
connection = mysql.connector.connect(**DB_CONFIG)

# 데이터베이스에서 데이터 가져오기
query = "SELECT video_name, angry, disgust, fear, happy, sad, surprise, neutral FROM emotion_data"
df = pd.read_sql(query, connection)

# 연결 종료
connection.close()

# 각 영상에 대한 데이터프레임 생성
video_names = df['video_name'].unique()

# Dash 앱 초기화
app = dash.Dash(__name__)

# 앱 레이아웃 정의
app.layout = html.Div(children=[
    html.H1(children='Emotion Analysis Dashboard'),
    
    # Dropdown 메뉴
    dcc.Dropdown(
        id='video-dropdown',
        options=[{'label': video_name, 'value': video_name} for video_name in video_names],
        value=video_names[0],  # 초기 선택값
        style={'width': '50%'}
    ),

    # 그래프 영역
    dcc.Graph(
        id='emotion-graph'
    )
])

# 콜백 함수 정의
@app.callback(
    Output('emotion-graph', 'figure'),
    [Input('video-dropdown', 'value')]
)
def update_graph(selected_video):
    # 선택된 영상에 대한 데이터프레임 생성
    selected_video_df = df[df['video_name'] == selected_video]

    # 그래프 데이터 생성
    data = []
    for emotion in ['angry', 'disgust', 'fear', 'happy', 'sad', 'surprise', 'neutral']:
        trace = {
            'x': [emotion],
            'y': [selected_video_df[emotion].iloc[0]],
            'type': 'bar',
            'name': emotion
        }
        data.append(trace)

    # 그래프 레이아웃 설정
    layout = {
        'title': f'Emotion Analysis for {selected_video}',
        'xaxis': {'title': 'Emotion'},
        'yaxis': {'title': 'Emotion Counts'},
        'barmode': 'group'
    }

    return {'data': data, 'layout': layout}

# 앱 실행
if __name__ == '__main__':
    app.run_server(debug=True)