In [None]:
import cv2
import numpy as np
import mediapipe as mp
import matplotlib.pyplot as plt
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.image import img_to_array
import face_recognition
import pandas as pd
import subprocess
import urllib.request
from pytube import YouTube
import os
from youtube_transcript_api import YouTubeTranscriptApi
import math
import time

In [None]:
# Function to calculate speaker position relative to the center
def calculate_speaker_position(frame, shoulder_midpoint_x):
    frame_width = frame.shape[1]
    speaker_position = shoulder_midpoint_x - frame_width / 2
    return speaker_position

In [None]:
def download_transcript_with_timestamps(video_id):
    try:
        # video_id = video_url.split("v=")[1]
        transcript = YouTubeTranscriptApi.get_transcript(video_id)
        transcript_text = ""

        for entry in transcript:
            start = entry["start"]
            text = entry["text"]
            transcript_text += f"[{start:.2f}] {text}\n"

        return transcript_text

    except Exception as e:
        return f"An error occurred: {str(e)}"

In [None]:
from transformers import BertModel, BertTokenizer
import torch
    
# Load pre-trained BERT model and tokenizer
model_name = 'bert-base-uncased'
tokenizer = BertTokenizer.from_pretrained(model_name)
model = BertModel.from_pretrained(model_name)

def get_text_embedding(sentence):
    # Tokenize the sentence and generate embeddings
    inputs = tokenizer(sentence, return_tensors="pt", padding=True, truncation=True)
    outputs = model(**inputs)
    embeddings = outputs.pooler_output
    # embeddings will contain the contextual embeddings for each token in the sentence
    one_dimensional_array = torch.cat([t for t in embeddings], dim=0)
    # Convert the 1D tensor to a Python list
    flattened_list = one_dimensional_array.view(-1).detach().tolist()
    return flattened_list

In [None]:
import math

def convert_to_float(s):
    if isinstance(s, (int, float)):
        return s
    s = s.strip().lower()
    multiplier = 1.0

    if s.endswith('k'):
        multiplier = 1e3
        s = s[:-1]
    elif s.endswith('m'):
        multiplier = 1e6
        s = s[:-1]

    # Handle 'K', 'M', 'B', 'T', etc.
    if s.endswith('k'):
        multiplier *= 1e3
        s = s[:-1]
    elif s.endswith('m'):
        multiplier *= 1e6
        s = s[:-1]
    elif s.endswith('b'):
        multiplier *= 1e9
        s = s[:-1]
    elif s.endswith('t'):
        multiplier *= 1e12
        s = s[:-1]

    # Convert to float or log scale float
    try:
        result = float(s) * multiplier
    except ValueError:
        # Handle the case where the input is not a valid number
        result = None

    return result

# # Examples
# numbers = ['13M', '7.7K', '3.14k','900m','09.0']

# for num_str in numbers:
#     converted_num = convert_to_float(num_str)
#     print(f"{num_str}: {converted_num}")


In [None]:
import re

maxLengthOfEmbedding = 768

def calculate_avg_words_between_timestamps(file_path, video_length_seconds):
    with open(file_path, 'r') as file:
        lines = file.readlines()
    
    timestamps = []
    texts = []

    timestamp_pattern = re.compile(r'\[(\d+\.\d+)\]\s(.+)')

    i = 0
    while i < len(lines):
        line = lines[i]
        match = re.match(timestamp_pattern, line)
        if match:
            timestamp = float(match.group(1))
            text = match.group(2)
            
            # Check if the text continues on the next line
            while i + 1 < len(lines) and not re.match(timestamp_pattern, lines[i + 1]):
                i += 1
                text += " " + lines[i].strip()

            timestamps.append(round(timestamp))
            texts.append(text)
        i += 1

    # Calculate average number of words between each timestamp
    # avg_words_list = []
    words_in_each_second = [0] * math.floor(video_length_seconds)
    words_embedding = [np.zeros(maxLengthOfEmbedding)] * math.floor(video_length_seconds)
    for i in range(len(timestamps) - 1):
        start_time = timestamps[i]
        # if i == len(timestamps) - 1:
        #     end_time = video_length_seconds
        # else
        end_time = timestamps[i + 1]
        # words_between = 0
        count = end_time-start_time
        words_between = len(texts[i].split())

        # for j in range(len(texts)):
        #     if start_time <= float(timestamps[j]) < end_time:
        #         words_between += len(texts[j].split())
        #         count += 1

        avg_words = math.ceil(words_between / count) if count > 0 else 0
        words_taken = 0
        for k in range(count):
            # words_embedding_each_second =  [0] * (maxLengthOfEmbedding)
            if(words_taken >= words_between):
                # padded_embedding =  [0] * (maxLengthOfEmbedding)
                # for l in range(count-k):
                #     words_embedding.append(padded_embedding)
                    # words_in_each_second[timestamps[i]+k] = avg_words
                break
            if(words_taken + avg_words > words_between):
                avg_words = words_between - words_taken
            
            words_in_each_second[timestamps[i]+k] = avg_words
            sentence = texts[i][words_taken:words_taken + avg_words]
            embedding = get_text_embedding(sentence)
            if len(embedding) >= (maxLengthOfEmbedding):
                padded_embedding = embedding[:maxLengthOfEmbedding]
            else:
                padded_embedding = np.pad(embedding, (0, maxLengthOfEmbedding - len(embedding)), mode='constant')

            words_taken += avg_words
            # print(padded_embedding)
            # print("padded embedding")
            # print(len(padded_embedding))
            words_embedding[timestamps[i]+k] = padded_embedding

        # avg_words_list.append(avg_words)
    print("Words in each second")
    print(len(words_in_each_second))
    print("Words embedding")
    print(len(words_embedding))
    return words_in_each_second, words_embedding

In [None]:
emotions = []
emotion_labels = ['Anger', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprised', 'Neutral']
emotion_model = load_model('emotion_model.h5')
face_haar_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')

def one_hot_encode(number, num_classes=7):
    """
    One-hot encodes a number from 1 to num_classes.
    """
    encoding = [0] * num_classes
    encoding[number - 1] = 1
    return encoding

def get_emotion(frame):
    frame_grey = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    faces = face_haar_cascade.detectMultiScale(frame_grey)
    if len(faces) > 0:
        (x, y, w, h) = faces[0]
        cv2.rectangle(frame, pt1=(x, y), pt2=(x + w, y + h), color=(0, 0, 255), thickness=2)
        roi_gray = frame_grey[y - 5:y + h + 5, x - 5:x + w + 5]
        if not roi_gray.size == 0:
            roi_gray = cv2.resize(roi_gray, (48, 48))
            image_pixels = img_to_array(roi_gray)
            image_pixels = np.expand_dims(image_pixels, axis=0)
        else:
            image_pixels = None
            # matrxOfCurrentSecond.append(0)
            return [0,0,0,0,0,0,0]
        image_pixels /= 255
        predictions = emotion_model.predict(image_pixels)
        max_index = np.argmax(predictions[0])
        detected_emotion = emotion_labels[max_index]
        emotions.append(detected_emotion)
        # matrxOfCurrentSecond.append(max_index + 1)
        return one_hot_encode(max_index + 1)
    else:
        emotions.append(None)
        # matrxOfCurrentSecond.append(0)
        return [0,0,0,0,0,0,0]

In [None]:
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)
mp_holistic = mp.solutions.holistic
holistic_model = mp_holistic.Holistic(
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5
)

def get_head_turn_angle(results):
    list_of_head_turn_angles = []
    left_eye = results.pose_landmarks.landmark[mp_pose.PoseLandmark.LEFT_EYE_INNER]
    right_eye = results.pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_EYE_INNER]
    nose = results.pose_landmarks.landmark[mp_pose.PoseLandmark.NOSE]
    if left_eye and right_eye and nose:
        eye_line_vector = np.array([right_eye.x - left_eye.x, right_eye.y - left_eye.y])
        eye_left_nose_vector = np.array([nose.x - left_eye.x, nose.y - left_eye.y])
        eye_right_nose_vector = np.array([right_eye.x - nose.x, right_eye.y - nose.y])

        dot_product_left = np.dot(eye_line_vector, eye_left_nose_vector)
        eye_line_magnitude = np.linalg.norm(eye_line_vector)
        eye_left_nose_magnitude = np.linalg.norm(eye_left_nose_vector)

        dot_product_right = np.dot(eye_line_vector, eye_right_nose_vector)
        eye_right_nose_magnitude = np.linalg.norm(eye_right_nose_vector)

        cosine_angle_left = dot_product_left / (eye_line_magnitude * eye_left_nose_magnitude)
        cosine_angle_right = dot_product_right / (eye_line_magnitude * eye_right_nose_magnitude)

        head_turn_angle_left = np.arccos(cosine_angle_left) * (180 / np.pi)
        head_turn_angle_right = np.arccos(cosine_angle_right) * (180 / np.pi)

        list_of_head_turn_angles.append(head_turn_angle_left)
        list_of_head_turn_angles.append(head_turn_angle_right)
    else:
        list_of_head_turn_angles.append(0)
        list_of_head_turn_angles.append(0)
    return list_of_head_turn_angles

def get_shoulder_midpoint(results):
    shoulder_midpoints = []
    left_shoulder = results.pose_landmarks.landmark[mp_pose.PoseLandmark.LEFT_SHOULDER]
    right_shoulder = results.pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_SHOULDER]

    if left_shoulder and right_shoulder:
        shoulder_midpoint_x = (left_shoulder.x + right_shoulder.x) / 2
        shoulder_midpoints.append(shoulder_midpoint_x)
    else:
        shoulder_midpoints.append(0)
    return shoulder_midpoints





In [None]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.callbacks import TensorBoard

In [None]:


# Define the model
model = Sequential()
model.add(LSTM(32, activation='relu', input_shape=(None, 1376)))
model.add(Dropout(0.2))
model.add(Dense(16, activation='relu'))
model.add(Dropout(0.2))
model.add(Dense(1, activation='linear'))

model.compile(optimizer='adam', loss='mean_squared_error')

In [None]:

from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.optimizers import Adam
import numpy as np
import os
from sklearn.preprocessing import LabelEncoder
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Define your model
model = Sequential([
    Dense(64, activation='relu'),  # Remove input_shape
    Dense(1)  # Modify output layer as needed
])
model.compile(optimizer='adam', loss='mse')  # Modify loss function and optimizer as needed

# Load data from files in DataSetFeatures and Rating directories
data_dir = 'DataSetFeatures'
label_dir = 'Rating'

data_files = os.listdir(data_dir)
label_files = os.listdir(label_dir)

# Ensure that the files are sorted in the same order
data_files.sort()
label_files.sort()

# Initialize lists to store features and labels
matrixOfVideos = []
popularity_labels = []

for data_file, label_file in zip(data_files, label_files):
    # Load features from data file
    features = np.load(os.path.join(data_dir, data_file))

    # Load label from label file
    label = np.load(os.path.join(label_dir, label_file))

    matrixOfVideos.append(features)
    popularity_labels.append(label)

# Pad sequences to ensure they have the same length
matrixOfVideos_padded = pad_sequences(matrixOfVideos, dtype='float32', padding='post', truncating='post')

popularity_labels = np.array(popularity_labels)



# Train the model
model.fit(matrixOfVideos_padded, popularity_labels, epochs=1000, batch_size=30, validation_split=0.0, callbacks=[TensorBoard()])

# Save the model
model.save('model.h5')

# Print model summary
model.summary()


In [None]:
# len(matrixOfVideos_padded)
# len(matrixOfVideos)
len(popularity_labels)

In [None]:
from tensorflow.keras.models import load_model
import tensorflow.keras.backend as K
import os
import numpy as np
from tensorflow.keras.preprocessing.sequence import pad_sequences
import re

# Define custom metric if needed
def mse(y_true, y_pred):
    return K.mean(K.square(y_pred - y_true), axis=-1)

# Path to your model
model_path = 'model.h5'

# Load the model with custom objects
model = load_model(model_path, custom_objects={'mse': mse})

# Test data directories
test_data_dir = 'DataSetFeatures'
test_label_dir = 'Rating'

test_data_files = os.listdir(test_data_dir)
test_label_files = os.listdir(test_label_dir)

# Ensure files are sorted in the same order
test_data_files.sort()
test_label_files.sort()

# Initialize lists to store test features and labels
test_matrixOfVideos = []
test_popularity_labels = []
test_file_names = []

for test_data_file, test_label_file in zip(test_data_files, test_label_files):
    # Load features from test data file
    test_features = np.load(os.path.join(test_data_dir, test_data_file))

    # Load label from test label file
    test_label = np.load(os.path.join(test_label_dir, test_label_file))
    test_file_names.append(test_data_file)
    test_matrixOfVideos.append(test_features)
    test_popularity_labels.append(test_label)

# Pad sequences for test data
# Assuming your model is already trained and loaded
# Evaluate the model on the test data
test_padded = pad_sequences(test_matrixOfVideos, dtype='float32', padding='post', truncating='post')
test_popularity_labels = np.array(test_popularity_labels)

# Reshape test_popularity_labels to match the shape of test_predictions
test_popularity_labels = np.repeat(test_popularity_labels, test_padded.shape[1], axis=1)
test_popularity_labels = np.expand_dims(test_popularity_labels, axis=-1)

# Evaluate the model on the test data
test_loss = model.evaluate(test_padded, test_popularity_labels)
print("Test Loss:", test_loss)

# Make predictions on the test set
test_predictions = model.predict(test_padded)

# Evaluate additional metrics if needed
# For example, Mean Absolute Error (MAE) on test data
test_mae = np.mean(np.abs(test_predictions - test_popularity_labels))
print("Test Mean Absolute Error:", test_mae)

# Print predictions and original labels for each test video
existing_video_id = ""
existing_window_id = ""
for i in range(len(test_predictions)):
    match = re.match(r"matrix_of_video_(\d+)_(\d+).npy", test_file_names[i])

    if match:
        video_id = int(match.group(1))
        window_id = int(match.group(2))

        existing_window_id = window_id
        if existing_video_id != video_id:
            existing_video_id = video_id
            print("Video ID:", video_id)
            print(f"Window ID: {window_id}")
    print(f"Video {i} - Prediction: {test_predictions[i][0]}, Original Label: {test_popularity_labels[i][0][0]}")
