In [16]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import cv2
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten
from tensorflow.keras.layers import Conv2D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.layers import MaxPooling2D
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import pandas as pd
import os


In [17]:
#LOSS AND ACCURACY PLOT

def plot_model_history(model_history):
    fig, axs = plt.subplots(1,2,figsize=(15,5))
    
    # summarize history for accuracy
    axs[0].plot(range(1,len(model_history.history['acc'])+1),model_history.history['acc'])
    axs[0].plot(range(1,len(model_history.history['val_acc'])+1),model_history.history['val_acc'])
    axs[0].set_title('Model Accuracy')
    axs[0].set_ylabel('Accuracy')
    axs[0].set_xlabel('Epoch')
    axs[0].set_xticks(np.arange(1,len(model_history.history['acc'])+1),len(model_history.history['acc'])/10)
    axs[0].legend(['train', 'val'], loc='best')
    
    # summarize history for loss
    axs[1].plot(range(1,len(model_history.history['loss'])+1),model_history.history['loss'])
    axs[1].plot(range(1,len(model_history.history['val_loss'])+1),model_history.history['val_loss'])
    axs[1].set_title('Model Loss')
    axs[1].set_ylabel('Loss')
    axs[1].set_xlabel('Epoch')
    axs[1].set_xticks(np.arange(1,len(model_history.history['loss'])+1),len(model_history.history['loss'])/10)
    axs[1].legend(['train', 'val'], loc='best')
    fig.savefig('plot.png')
    plt.show()

In [18]:
#Data gfenerators


train_dir = 'C:/Users/athen/OneDrive/Documents/Mini project 2/archive (1)/images/images/train'
val_dir = 'C:/Users/athen/OneDrive/Documents/Mini project 2/archive (1)/images/images/test'

num_train = 28709
num_val = 7178
batch_size = 64
num_epoch = 2

datagen = ImageDataGenerator(rescale=1./255)
print(datagen)
train_generator = datagen.flow_from_directory(
        train_dir,
        target_size=(48,48),
        batch_size=batch_size,
        color_mode="grayscale",
        class_mode='categorical')
validation_generator = datagen.flow_from_directory(
        val_dir,
        target_size=(48,48),
        batch_size=batch_size,
        color_mode="grayscale",
        class_mode='categorical')

<keras.src.legacy.preprocessing.image.ImageDataGenerator object at 0x00000212050BEF30>
Found 28821 images belonging to 7 classes.
Found 7066 images belonging to 7 classes.


In [21]:

import cv2
import numpy as np
import firebase_admin
from firebase_admin import credentials, storage

service_account_key = 'C:/Users/athen/OneDrive/Documents/Mini project 2/mini-project-d9780-firebase-adminsdk-excc6-1f7073b6d8.json'

# Check if Firebase app is already initialized
try:
    firebase_admin.get_app()
except ValueError:
    # Initialize Firebase app with credentials and storage bucket
    cred = credentials.Certificate(service_account_key)
    firebase_admin.initialize_app(cred, {'storageBucket': 'mini-project-d9780.appspot.com'})
bucket = storage.bucket()

def stream_video_from_firebase(file_path):
    blob = bucket.blob(file_path)
    stream = blob.download_as_bytes()
    return stream

class EmotionDetector:
    def __init__(self, model):
        self.model = model

    
    def predict(self, frame):
        
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) 
        frame = cv2.resize(frame, (48, 48))   
        frame = frame / 255.0 
        frame = frame.reshape(1, 48, 48, 1) 

        # Make predictions
        predictions = self.model.predict(frame)
        return predictions

    def detect_emotions(self,frame):
        predictions = self.predict(frame)
        
        # Convert predictions to emotion labels
        # This is a dummy implementation, replace with your actual logic
        emotions = ["angry", "disgust", "fear", "happy", "neutral", "sad", "surprise"]
        detected_emotions = [emotions[pred.argmax()] for pred in predictions]

        return detected_emotions 

from tensorflow.keras.models import load_model
if __name__ == '__main__':
    # Load your pre-trained Keras model
    model_path = 'emotion_detection_model.h5'
    model = load_model(model_path)
    
    # Define the path to the video in Firebase Storage
    video_path = 'videos/video.mp4'  # Replace with your Firebase Storage path
    
    # Stream video from Firebase Storage
    video_stream = stream_video_from_firebase(video_path)
    
    # Analyze video frames
    result = analyze_video(video_stream, model)
    
    # Close the video stream when done
    video_stream.close()

class Video:
    def __init__(self, video_blob):
        self.path =video_blob
        self.capture = cv2.VideoCapture(video_blob)
        if not self.capture.isOpened():
            print("Error: Could not open video.")

    
    def analyze(self, detector, display=False, save_path="frames"):
        os.makedirs(save_path, exist_ok=True)
        frame_count = 0
        results = []
        while True:
            ret, frame = self.capture.read()
            if not ret:
                break
    
            # Perform emotion detection on the frame
            result = detector.detect_emotions(frame) 
            results.append((frame_count, result))
            print(f"Frame {frame_count}: {result}")# Fixing the method call
    
            if display:
                for emotion in result:
                    cv2.putText(frame, str(emotion), (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                frame_filename = os.path.join(save_path, f"frame_{frame_count:04d}.jpg")
                cv2.imwrite(frame_filename, frame)
                frame_count += 1

        self.capture.release()
        print("Analysis complete.") 
        return results





model = Sequential()

model.add(Conv2D(32, kernel_size=(3, 3), activation='relu', input_shape=(48,48,1)))
model.add(Conv2D(64, kernel_size=(3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Conv2D(128, kernel_size=(3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Conv2D(128, kernel_size=(3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))

model.add(Flatten())
model.add(Dense(1024, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(7, activation='softmax'))

model.save('emotion_detection_model.h5')
model = keras.models.load_model("emotion_detection_model.h5")
model.compile(optimizer='adam',
              loss=tf.keras.losses.CategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

history = model.fit(
    train_generator,
    steps_per_epoch=num_train // batch_size,
    epochs=num_epoch,
    validation_data=validation_generator,
    validation_steps=num_val // batch_size)

video = Video(path_to_video)
emotion_detector = EmotionDetector(model)
results = video.analyze(emotion_detector, display=True)




NotFound: 404 GET https://storage.googleapis.com/download/storage/v1/b/mini-project-d9780.appspot.com/o/videos%2Fvideo.mp4?alt=media: No such object: mini-project-d9780.appspot.com/videos/video.mp4: ('Request failed with status code', 404, 'Expected one of', <HTTPStatus.OK: 200>, <HTTPStatus.PARTIAL_CONTENT: 206>)

In [11]:
if results is None:
    print("Error: analyze method returned None")
else:
    # Convert results to pandas DataFrame
   data = {"frame_count": [], "happy": [], "surprise": [], 
        "angry": [], "disgust": [], "fear": [], "sad": []}

# Populate the DataFrame with results
for frame_count, emotions in results:
    data["frame_count"].append(frame_count)
    data["happy"].append(emotions.count("happy"))
    data["surprise"].append(emotions.count("surprise"))
    data["angry"].append(emotions.count("angry"))
    data["disgust"].append(emotions.count("disgust"))
    data["fear"].append(emotions.count("fear"))
    data["sad"].append(emotions.count("sad"))
    emotions_df = pd.DataFrame(data)

    # Display the first few rows of the DataFrame
    print(emotions_df.head())

NameError: name 'results' is not defined

In [36]:
# Predict whether a person show interest in a topic or not

positive_emotions = sum(emotions_df.happy) + sum(emotions_df.surprise)
negative_emotions = sum(emotions_df.angry) + sum(emotions_df.disgust) + sum(emotions_df.fear) + sum(emotions_df.sad)

if positive_emotions > negative_emotions:
    print("Person is interested")
    update_result("Person is satisfied")
elif positive_emotions < negative_emotions:
    print("Person is not interested")
    update_result("Person is not satisfied")
else:
    print("Person is neutral")

Person is not interested


In [2]:
#connecting to neon
from sqlalchemy import create_engine

In [6]:
db_params = {
    'host' : 'ep-wild-scene-a1bnv5tq.ap-southeast-1.aws.neon.tech',
    'database' : 'miniproject',
    'user' : 'athena',
    'password' : '0hStGLnb5fDa',
    'port': '5432'
}
engine = create_engine(f'postgresql://{db_params['user']}:{db_params['password']}@{db_params['port']}/{db_params['database']}')

In [None]:
conn = psycopg2.connect(
    host=db_params['ep-wild-scene-a1bnv5tq.ap-southeast-1.aws.neon.tech'],
    database=db_params['miniproject'],
    user=db_params['athena'],
    password=db_params['0hStGLnb5fDa'],
    port=db_params['5432']

cursor = conn.cursor()
def update_result(result):
    cursor.execute('UPDATE uploaded_videos VALUES ()', (result,))
    conn.commit()

In [None]:
camera=cv2.VideoCapture(0)
while True:
    success,frame=camera.read()
    if success:
        img=cv2.cvtColor(frame,cv2.COLOR_BGR2GRAY)

        
        cv2.imshow("output",frame)
        k=cv2.waitKey(1)
        if k==ord("a"):
            camera.release()
            cv2.destroyAllWindows()
            break
            