In [2]:
from facetorch import FaceAnalyzer
from omegaconf import OmegaConf
from torch.nn.functional import cosine_similarity
from typing import Dict
import operator
import torchvision
import os
import operator
from torch.nn.functional import cosine_similarity
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import pickle5 as pickle
from tqdm import tqdm


  from .autonotebook import tqdm as notebook_tqdm


Initialize the analyzer

In [3]:

# initialize
path_config="gpu.config.yml"
cfg = OmegaConf.load(path_config)
analyzer = FaceAnalyzer(cfg.analyzer)

#Warm up
path_img_input="./test_error_1/frame_0.png"
response=None
# warmup
response = analyzer.run(
        path_image=path_img_input,
        batch_size=cfg.batch_size,
        fix_img_size=cfg.fix_img_size,
        return_img_data=False,
        include_tensors=True,
    )

Define functions

In [4]:
def generate_string(input_dir):
    # Split the input directory into parts
    parts = input_dir.split('/')
    
    # Get the last part of the directory
    last_part = parts[-1]
    
    # Split the last part by underscore
    last_part_parts = last_part.split('_')
    
    # Get the video name and frame rate
    video_name = "_".join(last_part_parts[:3])
    frame_rate = last_part_parts[6]
    
    # Generate the string
    result = f"Similarity Matrix, {video_name}, fps={frame_rate}"
    
    return result



In [5]:
def get_response_dict(input_dir):
    # Get a list of all image files in the "./test" directory
    image_files = [file for file in os.listdir(input_dir) if file.endswith(".png")]

    # Create an empty dictionary to store the responses
    responses_dict = {}
    responses_dict_path = os.path.join(input_dir, 'responses_dict.pkl')
    if os.path.exists(responses_dict_path):
        # Load the responses_dict from the pickle file
        saved_responses_dict = read_responses_dict(input_dir)
    
    # Remove the image files that are already in the responses_dict keys
    image_files = [file for file in image_files if ((file not in saved_responses_dict.keys()) and (file.endswith(".png")))]
    # Iterate over each image file with a progress bar
    for image_file in tqdm(image_files):
        
        # Get the full path of the image file
        image_path = os.path.join(input_dir, image_file)
        
        # Run the analyzer on the image
        response = analyzer.run(
            path_image=image_path,
            batch_size=cfg.batch_size,
            fix_img_size=cfg.fix_img_size,
            return_img_data=cfg.return_img_data,
            include_tensors=cfg.include_tensors,
        )
        if len(response.faces) == 0:
            print(f"No face detected in {image_file}")
            continue
        # Add the response to the dictionary with the image file name as the key
        responses_dict[image_file] = response.faces[0].preds["embed"].logits
    
    responses_dict = dict(sorted(responses_dict.items(), key=lambda x: int(''.join(filter(str.isdigit, x[0])))))
    return responses_dict

def read_responses_dict(input_dir):
    # Check if the responses_dict file exists
    responses_dict_path = os.path.join(input_dir, 'responses_dict.pkl')
    if not os.path.exists(responses_dict_path):
        print("responses_dict.pkl file does not exist in the input directory.")
        return None
        
    # Load the responses_dict from the pickle file
    with open(responses_dict_path, 'rb') as file:
        responses_dict = pickle.load(file)
    return responses_dict


In [6]:


def get_similarity_matrix(input_dir, compare_dir=None):
    if os.path.exists(os.path.join(input_dir, 'similarity_matrix_compare_neutral.png')):
        return
    
    responses_dict_1 = read_responses_dict(input_dir)
    if compare_dir is not None:
        responses_dict_2 = read_responses_dict(compare_dir)
    else:
        responses_dict_2 = responses_dict_1
    similarity_matrix = {}

    # Iterate over each pair of keys in responses_dict
    for key_1, value_1 in responses_dict_1.items():
        for key_2, value_2 in responses_dict_2.items():
            # Calculate the cosine similarity between value_1 and value_2
            similarity = cosine_similarity(value_1, value_2, dim=0).item()
            
            # Add the similarity to the similarity matrix with the key pair as the key
            similarity_matrix[key_1 + "+" + key_2] = similarity

    # Convert the similarity matrix to a dataframe
    similarity_df = pd.DataFrame.from_dict(similarity_matrix, orient='index')
    # Reset the index and split the key pair into separate columns
    similarity_df.reset_index(inplace=True)
    similarity_df[['key_1', 'key_2']] = similarity_df['index'].str.split('+', expand=True)
    # Convert the key_1 and key_2 columns to numeric type
    similarity_df['key_1'] = similarity_df['key_1'].str.extract('(\d+)', expand=False).astype(int)

    similarity_df['key_2'] = similarity_df['key_2'].str.extract('(\d+)', expand=False).astype(int)

    # Sort the dataframe by key_1 and key_2 columns
    similarity_df = similarity_df.sort_values(['key_1', 'key_2'])

    # Pivot the dataframe to create a matrix-like structure
    similarity_matrix_df = similarity_df.pivot(index='key_1', columns='key_2', values=0)

    # Create a heatmap of the similarity matrix
    plt.figure(figsize=(10, 8))
    sns.heatmap(similarity_matrix_df, cmap='coolwarm', fmt=".2f")
    if compare_dir is not None:
        plt.title(generate_string(input_dir))
    else:
        plt.title(generate_string(input_dir)+"compared to neutral baseline")
    plt.xlabel('frames')
    plt.ylabel('frames')
    if compare_dir is not None:
        plt.savefig(os.path.join(input_dir, 'similarity_matrix_compare_neutral.png'))
    else:
        plt.savefig(os.path.join(input_dir, 'similarity_matrix.png'))
    return
    


In [7]:

def remove_talking_frames(input_dir, last_time_above_threshold):
    # Load the similarity matrix
    with open(os.path.join(input_dir, 'responses_dict.pkl'), 'rb') as file:
        responses_dict = pickle.load(file)
    
    # Get the frame rate of the video
    frame_rate = 30
    key_to_delete=[]
    for key in responses_dict.keys():
    # Get the time of each frame
        times = int(key.split('_')[1].split('.')[0]) / frame_rate
        if times < last_time_above_threshold:
            key_to_delete.append(key)
    for key in key_to_delete:
        del responses_dict[key]
    # Save the new dictionary as a pickle file
    with open(os.path.join(input_dir, 'responses_dict.pkl'), 'wb') as file:
        pickle.dump(responses_dict, file)
    
    return




Test with one episode

In [None]:
import os
import re
import wave
path_to_video_dir = "/media/qihan/CORSAIR/Test/Data_clean/User_3/video_2/3_0_3_video_2_framerate_3_faces"
path_to_audio_file = re.sub(r'_video_2_framerate_\d+_faces', '_audio.wav', path_to_video_dir).replace('video_2', 'audio')

# Specify the path to the audio file

# Open the audio file
def get_auio_length(path_to_audio_file):
    with wave.open(path_to_audio_file, 'rb') as audio_file:
        # Get the number of frames in the audio file
        num_frames = audio_file.getnframes()
        
        # Get the frame rate of the audio file
        frame_rate = audio_file.getframerate()
        
        # Calculate the length of the audio file in seconds
        audio_length = num_frames / frame_rate
    return audio_length

last_speech_time = get_auio_length(path_to_audio_file)
print(last_speech_time)
remove_talking_frames(path_to_video_dir, last_speech_time)
get_similarity_matrix(path_to_video_dir)







Batch process all episodes

In [None]:

super_dir = "/media/qihan/CORSAIR/Test/Data_clean/User_3/video_2"

# Get a list of all subfolders in super_dir
subfolders = [folder for folder in os.listdir(super_dir) if os.path.isdir(os.path.join(super_dir, folder))]

# Iterate over each subfolder with a progress bar
for subfolder in tqdm(subfolders):
    input_dir = os.path.join(super_dir, subfolder)
    # Get the audio file path
    path_to_audio_file = re.sub(r'_video_2_framerate_\d+_faces', '_audio.wav', input_dir).replace('video_2', 'audio')
    # Get the last time with speech
    print(path_to_audio_file)
    last_speech_time = get_auio_length(path_to_audio_file)
    print(last_speech_time)
    # Remove the frames with speech
    get_response_dict(input_dir)
    remove_talking_frames(input_dir, last_speech_time)
    get_similarity_matrix(input_dir)


In [15]:
import os
import pickle
input_dir = "/media/qihan/CORSAIR/Test/Data_clean/User_3/neutral_baseline"
big_dict = {}

# Get a list of all subfolders in input_dir
subfolders = [folder for folder in os.listdir(input_dir) if os.path.isdir(os.path.join(input_dir, folder))]

# Iterate over each subfolder
for subfolder in subfolders:
    # Get the path to the responses_dict.pkl file
    responses_dict_path = os.path.join(input_dir, subfolder, 'responses_dict.pkl')
    
    # Check if the responses_dict.pkl file exists
    if os.path.exists(responses_dict_path):
        # Load the responses_dict from the pickle file
        with open(responses_dict_path, 'rb') as file:
            responses_dict = pickle.load(file)
        name=subfolder.split('_video')[0]
        # Iterate over each key-value pair in responses_dict
        for key, value in responses_dict.items():
            # Create the new key in the format of "{sub_folder_name}+{original key}"

            
            
            
            # Add the key-value pair to the big_dict
            big_dict[str(len(big_dict)+1)] = value

# Specify the path to save the pickle file
pickle_file_path = os.path.join(input_dir, 'responses_dict.pkl')

# Save the big_dict dictionary as a pickle file
with open(pickle_file_path, 'wb') as file:
    pickle.dump(big_dict, file)



Compare with neutral baseline

In [8]:
input_dir = "/media/qihan/CORSAIR/Test/Data_clean/User_3/video_2"
compare_dir = "/media/qihan/CORSAIR/Test/Data_clean/User_3/neutral_baseline"

for subfolder in tqdm(os.listdir(input_dir)):
    folder = os.path.join(input_dir, subfolder)
    get_similarity_matrix(folder, compare_dir=compare_dir)


100%|██████████| 84/84 [00:00<00:00, 1281.57it/s]


In [37]:

path_to_csv = "/media/qihan/CORSAIR/Test/Data_clean/User_3/catalog.csv"
# Read the CSV file

import pandas as pd
import json
import csv

# Read the CSV file
with open(path_to_csv, mode='wr') as file:
    csvFile = csv.reader(file)

    task_cnt=0
    for lines in csvFile:


        json_string = lines[0]
        data = json.loads(json_string)
        error_type = data.get('error_type')
        index=data.get('index')

        if error_type == "No Error":
            #read each entry from the pkl file and put them in a 


            
        print(f"Index: {index}, error: {error_type}")
        






Index replacement completed.
