# Facial Feature Detection with OpenFace



In [None]:
import os
from os.path import exists, join, basename, splitext

################# Need to revert back to CUDA 10.0 ##################
# Thanks to http://aconcaguasci.blogspot.com/2019/12/setting-up-cuda-100-for-mxnet-on-google.html
#Uninstall the current CUDA version
!apt-get --purge remove cuda nvidia* libnvidia-*
!dpkg -l | grep cuda- | awk '{print $2}' | xargs -n1 dpkg --purge
!apt-get remove cuda-*
!apt autoremove
!apt-get update

#Download CUDA 10.0
!wget  --no-clobber https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64/cuda-repo-ubuntu1804_10.0.130-1_amd64.deb
#install CUDA kit dpkg
!dpkg -i cuda-repo-ubuntu1804_10.0.130-1_amd64.deb
!sudo apt-key adv --fetch-keys https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64/7fa2af80.pub
!apt-get update
!apt-get install cuda-10-0
#Slove libcurand.so.10 error
!wget --no-clobber http://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64/nvidia-machine-learning-repo-ubuntu1804_1.0.0-1_amd64.deb
#-nc, --no-clobber: skip downloads that would download to existing files.
!apt install ./nvidia-machine-learning-repo-ubuntu1804_1.0.0-1_amd64.deb
!apt-get update
####################################################################

git_repo_url = 'https://github.com/TadasBaltrusaitis/OpenFace.git'
project_name = splitext(basename(git_repo_url))[0]
# clone openface
!git clone -q --depth 1 $git_repo_url

# install new CMake becaue of CUDA10
!wget -q https://cmake.org/files/v3.13/cmake-3.13.0-Linux-x86_64.tar.gz
!tar xfz cmake-3.13.0-Linux-x86_64.tar.gz --strip-components=1 -C /usr/local

# Get newest GCC
!sudo apt-get update
!sudo apt-get install build-essential
!sudo apt-get install g++-8

#added 5/15/2022. Thanks to @weskhoo
!sudo apt-key del 7fa2af80
!sudo apt-key adv --fetch-keys http://developer.download.nvidia.com/compute/cuda/repos/ubuntu1604/x86_64/3bf863cc.pub

# install python dependencies
!pip install -q youtube-dl

# Finally, actually install OpenFace
!cd OpenFace && bash ./download_models.sh && sudo bash ./install.sh

## Detect facial expressions on a test video

We are going to detect facial features on the following Youtube video:

In [None]:
from IPython.display import YouTubeVideo

# Change the Youtube_ID with the link to your group's video.
YOUTUBE_ID = 'XtA6FQz8BHQ'

YouTubeVideo(YOUTUBE_ID)

Download the above youtube video, cut the first 10 seconds and do the face detection & feature extraction on that clip. This takes about a minute or two. Instead of `FaceLandmarkVidMulti` you may also use `FeatureExtraction` to extract features of a single face or `FaceLandmarkImg` to extract features on a face image. See full description of the arguments [here](https://github.com/TadasBaltrusaitis/OpenFace/wiki/Command-line-arguments).

In [None]:
!rm -rf youtube.mp4
# download the youtube with the given ID
!youtube-dl -f 'bestvideo[ext=mp4]' --output "youtube.%(ext)s" https://www.youtube.com/watch?v=$YOUTUBE_ID
# cut the first 5 seconds
!ffmpeg -y -loglevel info -i youtube.mp4 -t 10 video.mp4
# clear any previous outputs.
!rm -rf processed
# detect poses on the these 10 seconds.
!./OpenFace/build/bin/FaceLandmarkVidMulti -f video.mp4 -out_dir processed
# convert the result into MP4
!ffmpeg -y -loglevel info -i processed/video.avi output.mp4

In [None]:
# input my own video
# cut the first X seconds
!ffmpeg -y -loglevel info -i emotions.mov -t 10 video2.mov

# detect poses on the these X seconds.
!./bin/FaceLandmarkVid -f 'video2.mov' #-out_dir
# convert the result into MP4
!ffmpeg -y -loglevel info -i video2.mov output2.mp4

Finally, visualize the result:

In [None]:
def show_local_mp4_video(file_name, width=640, height=480):
  import io
  import base64
  from IPython.display import HTML
  video_encoded = base64.b64encode(io.open(file_name, 'rb').read())
  return HTML(data='''<video width="{0}" height="{1}" alt="test" controls>
                        <source src="data:video/mp4;base64,{2}" type="video/mp4" />
                      </video>'''.format(width, height, video_encoded.decode('ascii')))

show_local_mp4_video('/content/output2.mp4', width=960, height=720)

# Postprocess faces

In [None]:
import os
import pandas as pd

# Define the folder path containing your CSV files
folder_path = ''

# Function to threshold the data and save to a new CSV file
def threshold_and_save(file_path, threshold=0.70):
    # Check if the file is empty or does not exist
    if os.path.getsize(file_path) == 0:
        print(f"Skipping empty file: {file_path}")
        return

    # Load data
    df = pd.read_csv(file_path, encoding='latin1')

    # Check if the DataFrame is empty
    if df.empty:
        print(f"DataFrame is empty after reading, skipping file: {file_path}")
        return

    # Remove empty spaces in column names
    df.columns = [col.replace(" ", "") for col in df.columns]
    # Convert 'confidence' column to numeric (float) if it's not already
    df['confidence'] = df['confidence'].astype(float)
    # Threshold the data
    df_clean = df[df.confidence >= threshold]

    # Check if the DataFrame is empty after thresholding
    if df_clean.empty:
        print(f"Empty DataFrame after thresholding, skipping file: {file_path}")
        return

    # Save the clean DataFrame to a new CSV file
    file_name = os.path.basename(file_path)
    clean_file_path = os.path.join('', file_name)
    df_clean.to_csv(clean_file_path, index=False)

# Loop through files in the folder and process each one
for file_name in os.listdir(folder_path):
    if file_name.endswith('.csv') and not file_name.startswith('._'):
        file_path = os.path.join(folder_path, file_name)
        threshold_and_save(file_path)


In [None]:
import os
import csv
import pandas as pd

# Define the folder containing the CSV files
csv_folder = ""

# Define the output CSV file
output_csv_file = ""

# Initialize the output data list with header
output_data = [['Filename', 'neutral', 'anger', 'disgust', 'fear', 'happiness', 'sadness', 'surprise', 'contempt', 'pain', 'unknown']]

# Initialize an empty DataFrame to store the combined data
combined_data = pd.DataFrame()


# Loop through all CSV files in the folder
csv_files = [file for file in os.listdir(csv_folder) if file.endswith(".csv")]
for csv_file in csv_files:
    csv_path = os.path.join(csv_folder, csv_file)

    # Read the first row from the CSV file with pandas
    df = pd.read_csv(csv_path, nrows=1, encoding='ISO-8859-1')

    # Append the filename as the first element in the first row
    df.insert(0, 'Filename', csv_file)

    # Append the data to the combined DataFrame
    combined_data = pd.concat([combined_data, df], ignore_index=True)

# Write the combined data to the output CSV file
combined_data.to_csv(output_csv_file, index=False)

print("Data from all files has been combined and saved to", output_csv_file)


## determine the percenatage that a specific emotion occurs instead of counts

In [None]:
import pandas as pd

# Load the CSV file
input_csv_file = ""  # Replace with the path to your CSV file
output_csv_file = ""  # Replace with the desired output file path

# Read the CSV file into a DataFrame
df = pd.read_csv(input_csv_file, delimiter=';')

# Calculate the total count for each row
total_emotions = df[['neutral','anger', 'disgust', 'fear', 'happiness', 'sadness', 'surprise', 'contempt', 'pain', 'unknown']].sum(axis=1)

# Calculate the percentage of each emotion compared to the total and replace the counts
emotions = ['neutral','anger', 'disgust', 'fear', 'happiness', 'sadness', 'surprise', 'contempt', 'pain', 'unknown']
for emotion in emotions:
    df[emotion] = df[emotion] / total_emotions * 100

# Save the modified DataFrame back to a new CSV file
df.to_csv(output_csv_file, index=False)

print("Percentage data has been calculated and saved to", output_csv_file)


## threshold based on likelihood of it being a face

In [None]:
# Threshold data by 75%
df_clean = df[df.confidence>=.75]
# Plot all Action Unit time series.
au_regex_pat = re.compile(r'^AU[0-9]+_r$')
au_columns = df.columns[df.columns.str.contains(au_regex_pat)]
print("List of AU columns:", au_columns)
f,axes = plt.subplots(6, 3, figsize=(10,12), sharex=True, sharey=True)
axes = axes.flatten()
for au_ix, au_col in enumerate(au_columns):
    sns.lineplot(x='frame', y=au_col, hue='face_id', data=df_clean, ax=axes[au_ix])
    axes[au_ix].set(title=au_col, ylabel='Intensity')
    axes[au_ix].legend(loc=5)
plt.suptitle("AU intensity predictions by time for each face", y=1.02)
plt.tight_layout()

We could also compare how synchronized each individuals are to one another during the interaction by using a simple Pearson correlation.

In [None]:
# Let's compare how much AU12 (smiling) activity occurs at similar times across people.
df_clean.pivot(index='frame', columns='face_id', values='AU12_r').corr()

* AU1: Inner Brow Raiser - Associated with surprise or fear.
* AU2: Outer Brow Raiser - Associated with surprise or fear.
* AU4: Brow Lowerer - Associated with anger or sadness.
* AU6: Cheek Raiser - Associated with happiness or fear.
* AU7: Lid Tightener - Associated with fear or anger.
* AU9: Nose Wrinkler - Associated with disgust.
* AU10: Upper Lip Raiser - Associated with disgust or sadness.
* AU12: Lip Corner Puller (Smile) - Associated with happiness.
* AU14: Dimpler - Associated with happiness.
* AU15: Lip Corner Depressor - Associated with sadness.
* AU17: Chin Raiser - Associated with sadness or determination.
* AU20: Lip Stretcher - Associated with happiness or surprise.
* AU23: Lip Tightener - Associated with fear or determination.
* AU24: Lip Pressor - Associated with sadness or anger.
* AU25: Lips Part - Associated with surprise or fear.
* AU26: Jaw Drop - Associated with surprise.

In [None]:
# for each individual feature
# Select the relevant columns for AUs
au_columns = ['AU01_r', 'AU02_r', 'AU04_r', 'AU06_r', 'AU07_r', 'AU09_r', 'AU10_r', 'AU12_r',
              'AU14_r', 'AU15_r', 'AU17_r', 'AU20_r', 'AU23_r']

# Loop through each AU and calculate its correlation matrix
for au in au_columns:
    correlation_matrix = df_clean.pivot(index='frame', columns='face_id', values=au).corr()
    print(f"Correlation matrix for {au}:")
    print(correlation_matrix)
    print('\n')


In [None]:
# for all features at the same time
correlation_matrix = df_clean.pivot(index='frame', columns='face_id', values=au_columns).corr()

# Print the correlation matrix
print(correlation_matrix)

# Gaze directions.

In [None]:
f,axes = plt.subplots(2,len(df_clean.face_id.unique()), figsize=(10,5))
for faces_ix, face_id in enumerate(df_clean.face_id.unique()[::-1]):
  df_clean.query(f'face_id=={face_id}').plot.scatter(x='gaze_angle_x', y='gaze_angle_y', ax=axes[0][faces_ix])
  axes[0][faces_ix].scatter(0,0, marker='x', color = 'k') # draw origin.
  axes[0][faces_ix].set(xlim=[-2,2], ylim=[-2,2], title=f'Gaze movement of face_id=={face_id}')
  df_clean.query(f'face_id=={face_id}')[['gaze_angle_x', 'gaze_angle_y']].plot(ax=axes[1][faces_ix])
  axes[1][faces_ix].set(ylim=[-1.5,1.5], xlabel='Frame Number', ylabel="Radians")
plt.tight_layout()
plt.show()

#Track Nodding Feature

In [None]:
!pip install opencv-python

In [None]:
import cv2
import numpy as np
import os
import csv

# Define the folder containing the CSV files
csv_folder = "/content/openface"

# Distance function
def distance(x, y):
    import math
    return math.sqrt((x[0]-y[0])**2 + (x[1]-y[1])**2)

# Parameters for lucas kanade optical flow
lk_params = dict(winSize=(15, 15), maxLevel=2, criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))

# Function to get coordinates
def get_coords(p1):
    try:
        return int(p1[0][0][0]), int(p1[0][0][1])
    except:
        return int(p1[0][0]), int(p1[0][1])

# Define font and text color
font = cv2.FONT_HERSHEY_SIMPLEX

# Define movement thresholds
max_head_movement = 20
movement_threshold = 50
gesture_threshold = 175

# Find the face in the image
def find_face(frame_gray):
    faces = face_cascade.detectMultiScale(frame_gray, 1.3, 5)
    for (x, y, w, h) in faces:
        cv2.rectangle(frame, (x, y), (x+w, y+h), (255, 0, 0), 2)
        face_center = x + w/2, y + h/3
        p0 = np.array([[face_center]], np.float32)
        return frame, p0, face_center
    return frame, None, None

# Initialize gesture variables
gesture = False
x_movement = 0
y_movement = 0
gesture_show = 60  # number of frames a gesture is shown

# Define the dimensions of your image frames (replace these with actual values)
width = 640
height = 480
channels = 3  # Assuming BGR images, set to 1 for grayscale images

# Loop through all CSV files in the folder
total_nods = 0
total_shakes = 0
csv_files = [file for file in os.listdir(csv_folder) if file.endswith(".csv")]
for csv_file in csv_files:
    csv_path = os.path.join(csv_folder, csv_file)

    # Open the CSV file and read the data
    with open(csv_path, "r") as file:
        csv_reader = csv.reader(file)
        data = list(csv_reader)

    frame_num = 0
    p0 = None
    face_found = False

    # Flag to skip the first row (header row)
    first_row_skipped = False

    for row in data:
        # Skip the first row (header row) if it hasn't been skipped yet
        if not first_row_skipped:
            first_row_skipped = True
            continue

        frame_num += 1

        # Assuming each row contains pixel values separated by commas
        pixel_values = [int(float(pixel)) for pixel in row]  # Convert the row data to integers

                # Check if the number of elements in pixel_values matches the expected size for the frame
        expected_size = width * height * channels
        if len(pixel_values) != expected_size:
            print(f"Error: Invalid number of pixels in row {frame_num}. Expected {expected_size} pixels, but got {len(pixel_values)} pixels.")
            continue

        # Reshape the pixel values into an image format (e.g., width x height x channels)
        frame = np.array(pixel_values, dtype=np.uint8).reshape(width, height, channels)

        frame_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)  # Convert the frame to grayscale

        if not face_found:
            frame, p0, face_center = find_face(frame_gray)
            if p0 is not None:
                face_found = True

        if not face_found:
            continue

        if p0 is not None:
            ret, frame = cap.read()
            p1, st, err = cv2.calcOpticalFlowPyrLK(old_gray, frame_gray, p0, None, **lk_params)
            cv2.circle(frame, get_coords(p1), 4, (0, 0, 255), -1)
            cv2.circle(frame, get_coords(p0), 4, (255, 0, 0))

            # Get the xy coordinates for points p0 and p1
            a, b = get_coords(p0), get_coords(p1)
            x_movement += abs(a[0] - b[0])
            y_movement += abs(a[1] - b[1])

            text = 'x_movement: ' + str(x_movement)
            if not gesture:
                cv2.putText(frame, text, (50, 50), font, 0.8, (0, 0, 255), 2)
            text = 'y_movement: ' + str(y_movement)
            if not gesture:
                cv2.putText(frame, text, (50, 100), font, 0.8, (0, 0, 255), 2)

            if x_movement > gesture_threshold:
                gesture = 'No'
            if y_movement > gesture_threshold:
                gesture = 'Yes'
            if gesture and gesture_show > 0:
                cv2.putText(frame, 'Gesture Detected: ' + gesture, (50, 50), font, 1.2, (0, 0, 255), 3)
                gesture_show -= 1
            if gesture_show == 0:
                if gesture == 'No':
                    total_shakes += 1
                elif gesture == 'Yes':
                    total_nods += 1

                gesture = False
                x_movement = 0
                y_movement = 0
                gesture_show = 60  # number of frames a gesture is shown

            p0 = p1

            cv2_imshow(frame)
            out.write(frame)
            cv2.waitKey(1)

# Save the total nods and shakes to a new CSV file
output_csv_path = "nods_shakes_count.csv"
with open(output_csv_path, 'w', newline='') as csvfile:
    csv_writer = csv.writer(csvfile)
    csv_writer.writerow(["Nods", "Shakes"])
    csv_writer.writerow([total_nods, total_shakes])

cv2.destroyAllWindows()



# Calculate head nods/shakes for the openface files

calculates total head movments per file and saves it in one output csv file - more an overview and not necessarily useful for analysis

In [None]:
import os
import pandas as pd
import numpy as np


def calculate_head_gestures(data_file, window_size=30, threshold=0.75):
    # Load the OpenFace output file as a DataFrame
    df = pd.read_csv(data_file, delimiter=',', encoding='latin-1')

    # Calculate the total distance traveled along pitch and yaw dimensions over a rolling window
    df['pitch_distance'] = np.sqrt(df['pose_Rx'].diff().pow(2))
    df['yaw_distance'] = np.sqrt(df['pose_Rz'].diff().pow(2))

    # Calculate the rolling sum of the distances over the window
    df['pitch_rolling_sum'] = df['pitch_distance'].rolling(window=30, min_periods=1).sum()
    df['yaw_rolling_sum'] = df['yaw_distance'].rolling(window=30, min_periods=1).sum()

    # Calculate the top quartile of distance traveled for pitch and yaw dimensions
    pitch_threshold = df['pitch_rolling_sum'].quantile(0.75)
    yaw_threshold = df['yaw_rolling_sum'].quantile(0.75)

    # Detect head nods and shakes based on the top quartile of distance traveled
    head_nods = df[df['pitch_rolling_sum'] >= pitch_threshold]
    head_shakes = df[df['yaw_rolling_sum'] >= yaw_threshold]

    # Count the total number of head nods and shakes per speaker (face_id)
    head_nods_count = head_nods.groupby('face_id').size()
    head_shakes_count = head_shakes.groupby('face_id').size()

    return head_nods_count, head_shakes_count

def count_head_gestures_per_speaker(data_file, speaker_id, speaker_talk_time, framerate=44100):
    head_nods, head_shakes = calculate_head_gestures(data_file, framerate=framerate)

    # Filter head gestures for the specified speaker and during their talking time
    speaker_head_nods = head_nods[(head_nods['face_id'] == int(speaker_id)) & (head_nods['timestamp'] >= speaker_talk_time[0]) & (head_nods['timestamp'] <= speaker_talk_time[1])]
    speaker_head_shakes = head_shakes[(head_shakes['face_id'] == int(speaker_id)) & (head_shakes['timestamp'] >= speaker_talk_time[0]) & (head_shakes['timestamp'] <= speaker_talk_time[1])]

    # Count the number of head nods and shakes during the speaker's talking time
    num_head_nods = len(speaker_head_nods)
    num_head_shakes = len(speaker_head_shakes)

    return num_head_nods, num_head_shakes

def extract_speaker_talk_times(transcription_file):
    # Load the transcription file as a DataFrame
    df_transcription = pd.read_csv(transcription_file, delimiter=',')

    # Replace 'SPEAKER_00' with 0 and 'SPEAKER_01' with 1 in the 'speaker' column
    df_transcription['speaker'] = df_transcription['speaker'].replace({'SPEAKER_00': 0, 'SPEAKER_01': 1})

    # Determine the unique speaker IDs
    speaker_ids = df_transcription['speaker'].unique()

    # Create a dictionary to store the talking times for each speaker
    speaker_talk_times = {speaker_id: [] for speaker_id in speaker_ids}

    # Iterate through the DataFrame to extract the talking times
    for index, row in df_transcription.iterrows():
        speaker_id = row['speaker']
        start_time = row['start']
        end_time = row['end']

        # Append the talking time as a tuple (start_time, end_time) to the speaker's entry
        if not pd.isnull(speaker_id):
            speaker_talk_times[speaker_id].append((start_time, end_time))

    return speaker_talk_times

# Function to process a single file and save the results
def process_file(file_path):
    head_nods_count, head_shakes_count = calculate_head_gestures(file_path)

    # Get the file name without extension
    file_name = os.path.basename(file_path).split('.')[0]

    # Create a DataFrame with the file name as the first column
    df_results = pd.DataFrame({
        'File Name': [file_name],
        'Total Head Nods': [head_nods_count],
        'Total Head Shakes': [head_shakes_count]
    })

    return df_results

# Example usage to loop through a folder and process each file
folder_path = '/Volumes/My Passport/openface/threshold/csvfiles/'
file_list = os.listdir(folder_path)

df_final_results = pd.DataFrame()

for file_name in file_list:
    if file_name.endswith(".csv"):
        file_path = os.path.join(folder_path, file_name)
        df_results = process_file(file_path)
        df_final_results = pd.concat([df_final_results, df_results])

# Save the final results to a new CSV file
output_file_path = '/Volumes/My Passport/openface/threshold/output_file.csv'
df_final_results.to_csv(output_file_path, index=False)


save all head movements per file - needed for combination with speaker id

In [None]:
import os
import pandas as pd
import numpy as np

def calculate_head_gestures(data_file, window_size=30, threshold=0.75):
    # Load the OpenFace output file as a DataFrame
    df = pd.read_csv(data_file, delimiter=',', encoding='latin-1')

    # Calculate the total distance traveled along pitch and yaw dimensions over a rolling window
    df['pitch_distance'] = np.sqrt(df['pose_Rx'].diff().pow(2))
    df['yaw_distance'] = np.sqrt(df['pose_Rz'].diff().pow(2))

    # Calculate the rolling sum of the distances over the window
    df['pitch_rolling_sum'] = df['pitch_distance'].rolling(window=30, min_periods=1).sum()
    df['yaw_rolling_sum'] = df['yaw_distance'].rolling(window=30, min_periods=1).sum()

    # Calculate the top quartile of distance traveled for pitch and yaw dimensions
    pitch_threshold = df['pitch_rolling_sum'].quantile(0.75)
    yaw_threshold = df['yaw_rolling_sum'].quantile(0.75)

    # Detect head nods and shakes based on the top quartile of distance traveled
    head_nods = df[df['pitch_rolling_sum'] >= pitch_threshold]
    head_shakes = df[df['yaw_rolling_sum'] >= yaw_threshold]

    # Create a list to store head gestures
    head_gestures = []

    # Find the consecutive frames for head nods
    head_nod_frames = head_nods.index.to_series().diff() != 1
    head_nod_segments = np.split(head_nods, head_nod_frames[head_nod_frames].index)

    # Add head nods to the head_gestures list
    for segment in head_nod_segments:
        if len(segment) > 0:
            start_time = segment['timestamp'].iloc[0]
            end_time = segment['timestamp'].iloc[-1]
            face_id = segment['face_id'].iloc[0]
            head_gestures.append({'start_time': start_time, 'end_time': end_time, 'movement': 'nod', 'face_id': face_id})

    # Find the consecutive frames for head shakes
    head_shake_frames = head_shakes.index.to_series().diff() != 1
    head_shake_segments = np.split(head_shakes, head_shake_frames[head_shake_frames].index)

    # Add head shakes to the head_gestures list
    for segment in head_shake_segments:
        if len(segment) > 0:
            start_time = segment['timestamp'].iloc[0]
            end_time = segment['timestamp'].iloc[-1]
            face_id = segment['face_id'].iloc[0]
            head_gestures.append({'start_time': start_time, 'end_time': end_time, 'movement': 'shake', 'face_id': face_id})

    # Create a DataFrame from the head_gestures list
    head_gestures_df = pd.DataFrame(head_gestures)

    return head_gestures_df

def process_file(file_path):
    head_gestures = calculate_head_gestures(file_path)

    # Get the file name without extension
    file_name = os.path.basename(file_path).split('.')[0]

    # Save the head gestures to a new CSV file
    output_file_path = f'/Volumes/My Passport/openface/threshold/movements/{file_name}_head_movements.csv'
    head_gestures.to_csv(output_file_path, index=False)

# Example usage to loop through a folder and process each file
folder_path = '/Volumes/My Passport/openface/threshold/csvfiles/'
file_list = os.listdir(folder_path)

for file_name in file_list:
    if file_name.endswith(".csv") and not file_name.startswith('._'):
        file_path = os.path.join(folder_path, file_name)
        process_file(file_path)


## plot the head movements

In [None]:
import os
import pandas as pd
import matplotlib.pyplot as plt

# Function to plot the head movements
def plot_head_movements(file_path):
    # Load the head gestures DataFrame from the output file
    head_gestures = pd.read_csv(file_path)

    # Set up the plot
    plt.figure(figsize=(12, 6))
    plt.title('Head Movements')
    plt.xlabel('Time (seconds)')
    plt.yticks([])

    # Calculate the timestamp corresponding to 5 minutes
    five_minutes_timestamp = 5 * 60

    # Plot head nods as blue dots within the first 5 minutes
    head_nods = head_gestures[(head_gestures['movement'] == 'nod') & (head_gestures['start_time'] <= five_minutes_timestamp)]
    plt.scatter(head_nods['start_time'], [0] * len(head_nods), color='blue', label='Head Nods', marker='o', s=10)

    # Plot head shakes as red dots within the first 5 minutes
    head_shakes = head_gestures[(head_gestures['movement'] == 'shake') & (head_gestures['start_time'] <= five_minutes_timestamp)]
    plt.scatter(head_shakes['start_time'], [0] * len(head_shakes), color='red', label='Head Shakes', marker='o', s=10)

    # Set x-axis limits to show only the first 5 minutes
    plt.xlim(0, five_minutes_timestamp)

    # Display the plot with a legend
    plt.legend()
    plt.show()

# Example usage to plot the head movements based on the first output file in the folder
output_folder_path = '/Volumes/My Passport/openface/threshold/movements/'
output_file_list = os.listdir(output_folder_path)

# Assuming the first output file in the list is the file of interest
if output_file_list:
    output_file_path = os.path.join(output_folder_path, output_file_list[1])
    plot_head_movements(output_file_path)


## Match head movement with listening/speaking times - transcription files

In [None]:
import pandas as pd

# Load transcription file and head movement file
transcription_file_path = '/Volumes/My Passport/whisper_take2/output/1025/segments1025 sessie 1.wav.csv'
head_movement_file_path = '/Volumes/My Passport/openface/threshold/movements/1025 sessie 1_head_movements.csv'

transcription_df = pd.read_csv(transcription_file_path)
head_movement_df = pd.read_csv(head_movement_file_path)

# Function to find the speaker for each head movement
def find_speaker_for_movement(movement_time, transcription_df):
    speakers = []
    for _, row in transcription_df.iterrows():
        if row['start'] <= movement_time <= row['end']:
            speakers.append(row['speaker'])
    return speakers

# Find the corresponding speaker for each head movement
head_movement_df['speakers'] = head_movement_df['start_time'].apply(lambda x: find_speaker_for_movement(x, transcription_df))

# Function to check if speaker 1 or 0 is in the list of speakers for each head movement
def check_speaker(speakers):
    if 1 in speakers:
        return 1
    elif 0 in speakers:
        return 0
    else:
        return -1

# Count head movements for speaker 1, speaker 0, and outside any speaking time
count_within_speaker1 = len(head_movement_df[head_movement_df['speakers'].apply(lambda x: check_speaker(x)) == 1])
count_within_speaker0 = len(head_movement_df[head_movement_df['speakers'].apply(lambda x: check_speaker(x)) == 0])
count_outside_speaking_time = len(head_movement_df[head_movement_df['speakers'].apply(lambda x: check_speaker(x)) == -1])


# Print the results
print(f"Head movements within speaking time of Speaker 1: {count_within_speaker1}")
print(f"Head movements within speaking time of Speaker 0: {count_within_speaker0}")
print(f"Head movements outside any speaking time: {count_outside_speaking_time}")


In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Assuming you have 'transcription_df' and 'head_movement_df' loaded correctly

# Function to check if a given head movement overlaps with speaker speaking time and return the active speaker
def get_active_speaker(row):
    speaker_00_overlap = any((transcription_df['speaker'] == 'SPEAKER_00') & (row['start_time'] <= transcription_df['end']) & (row['end_time'] >= transcription_df['start']))
    speaker_01_overlap = any((transcription_df['speaker'] == 'SPEAKER_01') & (row['start_time'] <= transcription_df['end']) & (row['end_time'] >= transcription_df['start']))

    if speaker_00_overlap and not speaker_01_overlap:
        return 'SPEAKER_00'
    elif speaker_01_overlap and not speaker_00_overlap:
        return 'SPEAKER_01'
    else:
        return 'BOTH_OR_NONE'

# Add a new column to 'head_movement_df' to indicate overlap with speakers and the active speaker
head_movement_df['overlap_with_speaker'] = head_movement_df.apply(get_active_speaker, axis=1)

# Set up the plot for speaker speaking times
plt.figure(figsize=(12, 6))
plt.title('Speaker Speaking Times')
plt.xlabel('Time')
plt.ylabel('Speaker')
plt.yticks([0, 1], ['Speaker 0', 'Speaker 1'])

# Plot the speaking times for each speaker as bars
for _, row in transcription_df.iterrows():
    plt.barh(y=row['speaker'], width=row['end'] - row['start'], left=row['start'], height=0.5, color='blue' if row['speaker'] == 'SPEAKER_00' else 'red')

# Plot green dots for head movements during speaker speaking times
speaker_head_movements = head_movement_df[head_movement_df['overlap_with_speaker'] != 'BOTH_OR_NONE']
plt.scatter(speaker_head_movements['start_time'], [0.5] * len(speaker_head_movements), color='green', marker='o', s=50)

# Display the plot
plt.show()


## check which speaker during a head movement -listening/speaking

In [None]:
import os
import pandas as pd

# Function to check if a given head movement overlaps with speaker speaking time and return the active speaker
def get_active_speaker(row, transcription_df):
    speaker_00_overlap = any((transcription_df['speaker'] == 'SPEAKER_00') & (row['start_time'] <= transcription_df['end']) & (row['end_time'] >= transcription_df['start']))
    speaker_01_overlap = any((transcription_df['speaker'] == 'SPEAKER_01') & (row['start_time'] <= transcription_df['end']) & (row['end_time'] >= transcription_df['start']))

    if speaker_00_overlap and not speaker_01_overlap:
        return '0'
    elif speaker_01_overlap and not speaker_00_overlap:
        return '1'
    else:
        return '2'

# Folder paths for head movements and transcriptions
head_movement_folder = '/Volumes/My Passport/openface/threshold/movements/'
transcription_folder = '/Volumes/My Passport/output 3/transcripts_firstbatch/'

# Loop through head movement CSV files
for head_movement_file in os.listdir(head_movement_folder):
    if head_movement_file.endswith('.csv') and not head_movement_file.startswith('._'):
        # Extract the file name (without extension) to find the corresponding transcription file
        file_name = os.path.splitext(head_movement_file)[0]
        # Remove '_head_movements' from the head movement file name and add '.wav' to match the transcription file name
        file_name_without_movement = file_name.replace('_head_movements', '') + '.wav.csv'
        transcription_file = os.path.join(transcription_folder, 'segments' + file_name_without_movement)

        if not os.path.isfile(transcription_file):
            # If the corresponding transcription file does not exist, skip this head movement file and print the filename
            print(f"Transcription file not found for: {head_movement_file}")
            continue

        # Read head movement and transcription CSV files into pandas dataframes
        head_movement_df = pd.read_csv(os.path.join(head_movement_folder, head_movement_file))
        try:
          transcription_df = pd.read_csv(transcription_file)
        except:
          print('error wrong datatype',transcription_file)
          continue

        # Add the 'overlap_with_speaker' column to the head movement dataframe
        head_movement_df['speaker'] = head_movement_df.apply(get_active_speaker, axis=1, transcription_df=transcription_df)

        # Write the updated head movement dataframe back to the CSV file
        head_movement_df.to_csv(os.path.join(head_movement_folder, head_movement_file), index=False)

## summarize the counts per file per speaker and movement

In [None]:
import os
import pandas as pd

# Folder path for CSV files
folder_path = '/Volumes/My Passport/openface/threshold/movements/'

# Initialize an empty list to store the results
results = []

# Loop through CSV files in the folder
for file_name in os.listdir(folder_path):
    if file_name.endswith('.csv'):
        # Read the CSV file into a pandas DataFrame
        df = pd.read_csv(os.path.join(folder_path, file_name), encoding='latin-1')

        # Check if 'face_id' column exists in the DataFrame
        if 'face_id' not in df.columns or 'speaker' not in df.columns:
            continue

        # Create a dictionary to store counts for each face_id (0 and 1 if available)
        counts = {'filename': file_name}

        for face_id in [0, 1]:
            face_df = df[df['face_id'] == face_id]  # Filter by face_id
            if len(face_df) > 0:  # Check if data exists for the face_id
                face_speaker_0_nod_count = face_df[(face_df['movement'] == 'nod') & (face_df['speaker'] == 0)].shape[0]
                face_speaker_1_nod_count = face_df[(face_df['movement'] == 'nod') & (face_df['speaker'] == 1)].shape[0]
                counts[f'nod_count_face_id_{face_id}_speaker_0'] = face_speaker_0_nod_count
                counts[f'nod_count_face_id_{face_id}_speaker_1'] = face_speaker_1_nod_count

                face_speaker_0_shake_count = face_df[(face_df['movement'] == 'shake') & (face_df['speaker'] == 0)].shape[0]
                face_speaker_1_shake_count = face_df[(face_df['movement'] == 'shake') & (face_df['speaker'] == 1)].shape[0]
                counts[f'shake_count_face_id_{face_id}_speaker_0'] = face_speaker_0_shake_count
                counts[f'shake_count_face_id_{face_id}_speaker_1'] = face_speaker_1_shake_count

        # Append the counts to the results list
        results.append(counts)

# Create a new DataFrame using the results list
counts_df = pd.DataFrame(results)

# Display the new DataFrame
print(counts_df)

# Save the DataFrame to a new CSV file with face_id column included
output_file_path = 'output_counts_file.csv'
counts_df.to_csv(output_file_path, index=False)
