# Audiovisual Analysis of Courtroom Videos

For the last part of our workshop, we will cover audiovisual analysis of courtroom data. The basic intuition underlying audio and video should feel familiar from your explorations of more standard machine learning and text analysis. In this part, we will see some of the standard parts of the audiovisual toolkit and preview some advances made possible by large language models.

In [None]:
# import pandas
import pandas as pd

# sklearn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import ConfusionMatrixDisplay


# opencv
import cv2
from PIL import Image

# librosa
import librosa
import librosa.display

# gdown
import gdown

# import moviepy
import moviepy.editor as mp

# import pydub
from pydub import AudioSegment
from pydub.playback import play

# pyaudio and wave
import pyaudio  
import wave  

# numpy
import numpy as np

# plotting
import matplotlib.pyplot as plt  
import seaborn as sns


# misc
import os
import random
import sys
import imutils
from imutils import paths
import shutil
import base64
import requests
import re
import json 
from fuzzywuzzy import fuzz
from fuzzywuzzy import process
import pickle

# deep learning
import tensorflow as tf
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten, Conv2D, MaxPooling2D
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras.utils import to_categorical # added this import to encode labels as one-hot vectors
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv2D, Flatten, Dropout, MaxPooling2D
from tensorflow.keras.preprocessing.image import ImageDataGenerator


# whisper and soundfile
import whisper
import soundfile as sf

# pyannote and pydub
from pyannote.audio import Pipeline
from pydub import AudioSegment
from pyannote.core import Segment, Timeline, Annotation
from pyannote.core import notebook

# face emotion recognition
from fer import Video
from fer import FER

# openai
import openai

## Audio Feature Extraction

We will start with audio. Our ultimate goal is to use audio to make predictions about who is speaking, and ultimately whether the way that they are speaking predicts how they will ultimately vote on a case. As with text data, the problem is that audio cannot be readily analyzed statistically - it needs to be transformed into <b>features</b> that can be used to train a machine learning model. Also like text, audio is high-dimensional which makes analysis computationally expensive. In this part, we will see some techniques for extracting audio features and analyzing them.

### Separate Audio from Video

Our first step is to separate an audio clip from its underlying video. Computer vision and computational audio analysis use different techniques and have a different "unit" of analysis. Whereas computer vision works on image at a time, with audio we are generally looking at longer segments of time. Let's start by extracting the audio from an example video:

In [None]:
# load a video clip and extract audio
video_path = "11-71541 Ara Hovanesyan v. Eric Holder, Jr.-QEvMv81TIQM.mp4"
audio_path = "11-71541 Ara Hovanesyan v. Eric Holder, Jr.-QEvMv81TIQM.wav"

# load the video
cap = cv2.VideoCapture(video_path)

# clip audio
clip = mp.VideoFileClip(video_path)
# write audio to file
clip.audio.write_audiofile(audio_path)


Next we'll go ahead and load the audio. `librosa.load()` returns two objects, a time series representing the audio and the "sample rate" or how often the audio clip was resampled.

In [None]:
y, sr = librosa.load(audio_path, res_type='kaiser_fast')

Now let's look at the first 10 minutes. What do we see?

In [None]:
# Extract the first 2000 samples
first_ten_minutes = librosa.time_to_samples(600, sr=sr)
intro = y[first_ten_minutes:]

# Plot the waveform
plt.figure(figsize=(10, 4))
librosa.display.waveshow(y, sr=sr, max_points=50000, x_axis='time', offset=0.0)
plt.title('Waveform')
plt.tight_layout()
plt.show()

We do see that there are different amplitudes at various parts of the audio. Without more context, it is hard to say what is happening exactly though.

### Mel-Frequency Cepstral Coefficients 

You likely hear dozens or even hundreds of different voices each day. How do you distinguish between all of those different voices? While you're not aware of it, you distinguish between voices by using features such as pitch, tone, and timbre. Mel-Frequency Cepstral Coefficients (MFCC) are a set of features commonly used in audio processing, especially in speech recognition. At a high level, they take a sound and break it up into "mels" which are time units that mimic how humans perceive the pitch of a sound. MFCCs are computed by:

1. Pre-Emphasis: A high-pass filter is applied to the audio signal to amplify high-frequency components. This balances the frequency spectrum and improves the signal-to-noise ratio.

2. Framing: The continuous audio signal is divided into overlapping frames, typically 20-40 milliseconds long. This allows for analysis of short-term spectral properties of the audio signal.

3. Windowing: Each frame is multiplied by a Hamming window to minimize discontinuities at the edges. This helps reduce spectral leakage when transforming the signal to the frequency domain.

4. Fast Fourier Transform (FFT): The windowed frames are converted from the time domain to the frequency domain using FFT. This results in a representation of the signal in terms of its frequency components.

5. Mel-Scale Filter Bank: The frequency domain signal is passed through a series of triangular filters spaced according to the Mel scale. The Mel scale is a perceptual scale of pitches judged by listeners to be equal in distance from one another, with a linear spacing below 1 kHz and a logarithmic spacing above 1 kHz. This step approximates the human ear's response to different frequencies.

6. Logarithm of Power Spectrum: The power of each Mel-scaled frequency component is computed and transformed using a logarithm. This step simulates the human ear's logarithmic perception of loudness.

7. Discrete Cosine Transform (DCT): The log-Mel power spectrum is then transformed using DCT to obtain the MFCCs. The DCT decorrelates the coefficients and packs the most significant features into the lower-order coefficients, which helps in reducing the dimensionality of the feature set.

That's a lot of steps! The end result is that MFCCs provide a compact representation of the audio signal that retains essential information relevant to human perception, making them highly effective for tasks such as speech and speaker recognition. Lucky for us, we don't need to implement each of these steps. The `librosa` library does it all for us. The code below extracts the MFCCs then plots a mel-spectrogram:

In [None]:
# Generate the Mel spectrogram
spec = librosa.feature.melspectrogram(y=y, sr=sr)

# Display the spectrogram
plt.figure(figsize=(10, 4))
librosa.display.specshow(librosa.power_to_db(spec, ref=np.max), y_axis='mel', x_axis='time', sr=sr)
plt.colorbar(format='%+2.0f dB')
plt.title('Mel spectrogram')
plt.tight_layout()
plt.show()


Y-Axis (Frequency): The vertical axis represents frequency in the Mel scale. Lower frequencies are at the bottom, and higher frequencies are at the top. The Mel scale spaces frequencies in a way that reflects human auditory perception, with finer resolution at lower frequencies.

X-Axis (Time): The horizontal axis represents time. As you move from left to right, you are looking at the progression of the audio signal over time.

Color Intensity: The colors in the spectrogram represent the amplitude (power) of the signal at each frequency and time point.

Black/Dark Colors: These indicate lower amplitude or quieter parts of the signal.
Purple to Blue: Intermediate amplitude levels.
Orange to Yellow: Higher amplitude or louder parts of the signal.
The color bar on the right side of the spectrogram shows the scale of amplitude in decibels (dB), with darker colors indicating lower decibels and brighter colors indicating higher decibels.

By interpreting the mel spectrogram, you can identify patterns in the frequency domain over time, such as the presence of certain tones, rhythms, and other acoustic features. This visualization is particularly useful for analyzing speech signals, musical notes, and other audio phenomena, providing insights into the structure and content of the audio data.

### Harmonic-Percussive Separation

Sounds can also contain <b>harmonic</b> and <b>percussive</b> components. While these components are usually used in analyzing music, human speech also has harmonic and percussive components. We can separate the harmonic and percussive parts of an audio stream as follows:

In [None]:
data_h, data_p = librosa.effects.hpss(y)
spec_h = librosa.feature.melspectrogram(y=data_h, sr=sr)
spec_p = librosa.feature.melspectrogram(y=data_p, sr=sr)
db_spec_h = librosa.power_to_db(spec_h,ref=np.max)
db_spec_p = librosa.power_to_db(spec_p,ref=np.max)

We can visualize the mel-frequencies of harmonic components like this:

In [None]:
librosa.display.specshow(db_spec_h,y_axis='mel', x_axis='s', sr=sr)
plt.colorbar();

And the percussive components like this:

In [None]:
librosa.display.specshow(db_spec_p,y_axis='mel', x_axis='s', sr=sr)
plt.colorbar();

And the amplitudes of both together like this:

In [None]:
#y, sr = librosa.load(librosa.ex('choice'), duration=10)
fig, ax = plt.subplots(nrows=3, sharex=True, sharey=True)
librosa.display.waveshow(y, sr=sr, ax=ax[0])
ax[0].set(title='Monophonic')
ax[0].label_outer()

#y, sr = librosa.load(librosa.ex('choice', hq=True), mono=False, duration=10)
librosa.display.waveshow(y, sr=sr, ax=ax[1])
ax[1].set(title='Stereo')
ax[1].label_outer()

#y, sr = librosa.load(librosa.ex('choice'), duration=10)
y_harm, y_perc = librosa.effects.hpss(y)
librosa.display.waveshow(y_harm, sr=sr, alpha=0.25, ax=ax[2])
librosa.display.waveshow(y_perc, sr=sr, color='r', alpha=0.5, ax=ax[2])
ax[2].set(title='Harmonic + Percussive')

### Chroma

We can further analyze pitch by analyzing the "chroma." Chroma are related to the twelve classes within an octave (e.g. C#, D, etc.). We can extract the chroma and visualize it like this (don't run this because it will take a long time):

In [None]:
"""
chroma = librosa.feature.chroma_cqt(y=data_h, sr=sr)
#plt.figure(figsize=(18,5))
plt.figure(figsize=(20,8))
librosa.display.specshow(chroma, sr=sr, x_axis='time', y_axis='chroma', vmin=0, vmax=1)
#plt.title('Ara Hovanesyan v. Eric Holder, Jr. Chroma Spectrogram')
plt.colorbar()
#plt.title('Ara Hovanesyan v. Eric Holder, Jr.: Chroma Spectrogram')
#librosa.display.specshow(chroma, sr=sr, x_axis='s', y_axis='chroma', );
"""

Looking at just the first 30 seconds:

In [None]:
first_thirty_seconds = librosa.time_to_samples(30, sr=sr)
intro = y[:first_thirty_seconds]
intro_harm = librosa.effects.harmonic(intro)
intro_chroma = librosa.feature.chroma_cqt(y=intro_harm, sr=sr)
plt.figure(figsize=(20,8))
#plt.title('U.S. v. Alfaro Chroma Spectrogram (First 30 seconds)')
librosa.display.specshow(intro_chroma, sr=sr, x_axis='s', y_axis='chroma', )
plt.colorbar();

In this plot we can see the pitch class and intensity across the first 30 seconds of the video. Again, while these are primarily useful for analyzing music, they can sometimes be helpful for speaker identification as well.

Putting it all together, we can extract all of these audio features with just a few lines of code. We can also extract the contrast (timber) of audio and tonnetz (tone):

In [None]:
# Sound feature engineering
mel = librosa.feature.melspectrogram(y=y, sr=sr)
mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=40)
chroma = librosa.feature.chroma_stft(S=mel, sr=sr)
contrast = librosa.feature.spectral_contrast(S=mel, sr=sr)
tonnetz = librosa.feature.tonnetz(y=librosa.effects.harmonic(y),sr=sr)

#### Challenge: Differences Between Speakers?

Listen to the clip or watch the video. Try listening to the attorney speaking, then listen to the judges asking questions. Try clipping the soundfile to separate the attorney from the judges. Do their features look different at all? For example, people speak more or less harmonically or percussively? Louder or more quietly?

### Audio Classification

By themselves, audio features are not too helpful in a long video. However, the key is that by decomposing human voices into numerical features, they become usable by machine learning models. How does this work? Let's take a look at two clips, one of Judge Wardlaw speaking and one of Judge Fletcher. What do you notice about their mel spectrograms?

In [None]:
# Load the audio file
audio_path = "data/audio_clips/wardlaw_seg1.wav"
y, sr = librosa.load(audio_path, res_type='kaiser_fast')

# Generate the Mel spectrogram
spec = librosa.feature.melspectrogram(y=y, sr=sr)

# Display the spectrogram
plt.figure(figsize=(10, 4))
librosa.display.specshow(librosa.power_to_db(spec, ref=np.max), y_axis='mel', x_axis='time', sr=sr)
plt.colorbar(format='%+2.0f dB')
plt.title('Mel spectrogram - Wardlaw')
plt.tight_layout()
plt.show()

In [None]:
# Load the audio file
audio_path = "data/audio_clips/Fletcher_seg1.wav"
y, sr = librosa.load(audio_path, res_type='kaiser_fast')

# Generate the Mel spectrogram
spec = librosa.feature.melspectrogram(y=y, sr=sr)

# Display the spectrogram
plt.figure(figsize=(10, 4))
librosa.display.specshow(librosa.power_to_db(spec, ref=np.max), y_axis='mel', x_axis='time', sr=sr)
plt.colorbar(format='%+2.0f dB')
plt.title('Mel spectrogram - Fletcher')
plt.tight_layout()
plt.show()

A visual inspection suggests that Judge Wardlaw is a bit louder than Judge Fletcher, and speaks in a higher pitch. Can we train a machine learning model to uncover these kinds of patterns at scale? First let's set the path where our pre-labeled audio clips are:

In [None]:
# set the path to the folder containing audio clips
path = "./data/audio_clips/"

Next let's define a function that extracts the mfcc's from an audio clip:

In [None]:
# load the audio files and extract features
def extract_features(file):
    X, sample_rate = librosa.load(file, res_type='kaiser_fast') 
    mfccs = np.mean(librosa.feature.mfcc(y=X, sr=sample_rate, n_mfcc=40).T,axis=0)
    return mfccs

Then get the mfcc's for each audio clip in our data and put them into a dataframe:

In [None]:
# loop through the audio files and extract features
data = []
for subdir, dirs, files in os.walk(path):
    for file in files:
        try:
            label = file.split("_")[0]
            file_path = os.path.join(subdir, file)
            features = extract_features(file_path)
            data.append([file_path, features, label])
        except Exception as e:
            print("Error encountered while parsing file: ", file)
            print(e)

# convert the data into a pandas dataframe
df = pd.DataFrame(data, columns=['file_path', 'feature', 'label'])
df.head()

Now let's go ahead and train a model. For audiovisual analysis, neural networks are oftentimes the best method available. We can start by doing our standard train/test splits:

In [None]:
classes = df['label'].unique()

# convert the labels into numerical values
le = LabelEncoder()
df['label'] = le.fit_transform(df['label'])

# split the data into training and testing sets
X_train, X_val, y_train, y_val = train_test_split(np.array(df['feature'].tolist()), 
                                                    np.array(df['label'].tolist()), 
                                                    test_size=0.2, 
                                                    random_state=42,
                                                   stratify = np.array(df['label'].tolist()))

# Convert labels to one-hot encoded vectors
y_train_enc = to_categorical(y_train)
y_val_enc = to_categorical(y_val)

We then set up our neural network. A neural network is a special type of supervised learning algorithm that simulates neurons in a brain to map inputs to outputs. While there is a lot to know about neural networks, here are a few key things:

1. A neural net starts with an "input" layer that will have as many nodes as features in your dataset
2. It will also have an "output" layer that will be the final prediction
3. In between these two layers are "hidden" layers. We cannot see what goes on inside these hidden layers. At a high level, training a neural network involves multiple rounds ("epochs") where the model learns how to map the input layer to the output layer. After a pass through, the model sees where it made mistakes with its predictions, sends that information back through the network ('backpropagation'), and the hidden layers update accordingly. This process is known as "deep learning."

To set up a deep learning model to classify audio segments, we can set it up as follows:

In [None]:
# Define the model architecture
model = Sequential()
# add the input layer
model.add(Dense(256, activation='relu', input_shape=(X_train.shape[1],)))
model.add(Dropout(0.3))

# add a hidden layer
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.3))

# add the output layer
model.add(Dense(len(le.classes_), activation='softmax')) # changed y.unique() to le.classes_

Once we have defined the model parameters, we are ready to train it. What happens to our validation accuracy as we proceed?

In [None]:
# Compile the model
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# Train the model
history = model.fit(X_train, y_train_enc, validation_data=(X_val, y_val_enc), epochs=50, batch_size=32)

In [None]:
def plot_confusion_matrix(cm, classes,
                          normalize=False,
                          title='Confusion matrix',
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    import itertools
    fig = plt.figure(figsize=(8, 8)) 
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, cm[i, j],
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.show()

In [None]:
# make predictions on the test set
y_pred = model.predict(X_val)
y_pred_classes = np.argmax(y_pred, axis=1)

# create the confusion matrix with class names
cm = confusion_matrix(y_val, y_pred_classes, labels=range(len(classes)), normalize=None)

# create a DataFrame with the confusion matrix and class names
#cm_df = pd.DataFrame(cm, index=classes, columns=classes)

plot_confusion_matrix(cm, classes=classes,
                      title='Confusion matrix')

Wow, that's really good! We can then save the model:

In [None]:
# save the trained model to a file
model.save("/models/audio_classification_example.h5")

### Optional Challenge: Neural Networks

We only used MFCCs and got great results, but this won't always be the case! Experiment with adding additional features to see how these performs as well:

## Transcription

### Diarization

Now that we have a way to classify speakers, we can start to analyze each speaker throughout a video. We can start with transcribing each speaker using OpenAI's `whisper` model. Before transcribing the video though, we first need to separate each speaker in a track. This is a process known as <b>diarization</b>. Diarization uses MFCCs to cluster speakers and separate them in an audio track. Luckily, this is easy to do with `pyannote`. First we initialize a diarization pipeline. If you haven't already, make sure you sign up for access at HuggingFace to get an authorization token [here](https://huggingface.co/pyannote/speaker-diarization-3.1)

In [None]:
with open("pyannote_token.txt", "r") as file:
    hg_token = file.read().replace("\n", "")

pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1",
                                    use_auth_token=hg_token)

Now let's see how diarization works on the first minute of audio. Let's clip the audio file to the first minute:

In [None]:
# Load the .wav file
input_file = "11-71541 Ara Hovanesyan v. Eric Holder, Jr.-QEvMv81TIQM.wav"
audio = AudioSegment.from_wav(input_file)

# Take the first five minutes (300 seconds)
five_minute = 300 * 1000  # pydub works in milliseconds
five_minute_audio = audio[:five_minute]

# Export the first minute as a new .wav file
output_file = "output_five_minute.wav"
five_minute_audio.export(output_file, format="wav")

print(f"The first minute of audio has been saved as {output_file}")

Now we apply the diarization pipeline to the audio file, then print out the result. This code takes a while to run, so don't actually run it, the results have already been saved for you!

In [None]:
# 4. apply pretrained pipeline
diarization = pipeline("output_five_minute.wav")

# 5. print the result
for turn, _, speaker in diarization.itertracks(yield_label=True):
    print(f"start={turn.start:.1f}s stop={turn.end:.1f}s speaker_{speaker}")

# dump the diarization output to disk using RTTM format
#with open("output_five_minute.rttm", "w") as rttm:
#    diarization.write_rttm(rttm)

Now let's visualize the speaker turns. We can reload our `rttm` file that contains the diarization tracks.

In [None]:
# Path to the RTTM file
rttm_file_path = 'output_five_minute.rttm'

# Function to read RTTM file and return an Annotation object
def read_rttm(file_path):
    annotation = Annotation()
    with open(file_path, 'r') as file:
        for line in file:
            fields = line.strip().split()
            start_time = float(fields[3])
            duration = float(fields[4])
            speaker_id = fields[7]
            segment = Segment(start_time, start_time + duration)
            annotation[segment] = speaker_id
    return annotation

# Read the RTTM file
diarization = read_rttm(rttm_file_path)

We can then segment as necessary:

In [None]:
# Display the diarization result
notebook.crop = Segment(0, 300) 
fig, ax = plt.subplots()
notebook.plot_annotation(diarization, ax=ax, time=True, legend=True)
plt.show()

# Print the contents for inspection
for segment, _, speaker in diarization.itertracks(yield_label=True):
    print(f"Speaker: {speaker}, Start: {segment.start:.2f}, End: {segment.end:.2f}")

Let's look at the speaker turns in the overall audio track. What do you notice about who is speaking the most in the first and second half of the audio? We won't run the code to create the diarization because this would take a while, so let's skip right to looking at the results:

In [None]:
"""
import matplotlib.pyplot as plt

# apply pretrained pipeline
diarization = pipeline("audio.wav")

# dump the diarization output to disk using RTTM format
with open("audio.rttm", "w") as rttm:
    diarization.write_rttm(rttm)
"""

In [None]:
# Read the RTTM file
diarization = read_rttm('audio.rttm')

# create plot
fig, ax = plt.subplots(figsize=(10, 5))

# define color map
color_map = {
    "SPEAKER_04": "blue",
    "SPEAKER_00": "green",
    "SPEAKER_01": "red",
    "SPEAKER_03": "purple",
    "SPEAKER_02": "orange",
}

# plot speaker turns
for turn, _, speaker in diarization.itertracks(yield_label=True):
    start = turn.start
    end = turn.end
    duration = end - start
    ax.barh(y=speaker, width=duration, left=start, height=0.5, color=color_map[speaker])

# set plot labels and title
ax.set_xlabel("Time (seconds)")
ax.set_ylabel("Speaker")
ax.set_title("Speaker turns")

plt.show()

### Speaker Classification

We're now ready to classify each speaker segment! First let's load the neural network we trained earlier:

In [None]:
# 1. load saved model
model = tf.keras.models.load_model("/models/audio_classification_example.h5")

def extract_features(file):
    X, sample_rate = librosa.load(file, res_type='kaiser_fast') 
    mfccs = np.mean(librosa.feature.mfcc(y=X, sr=sample_rate, n_mfcc=40).T,axis=0)
    return mfccs

Let's extract the MFCCs for each speaker:

In [None]:
start_times = []
end_times = []
speakers = []
features = []

audio_file, sample_rate = librosa.load('audio.wav', res_type='kaiser_fast')

for turn, _, speaker in diarization.itertracks(yield_label=True):

    # Get the start and end times of the track
    start_times.append(turn.start)
    end_times.append(turn.end)
    speakers.append(speaker)

# Loop through each segment and extract the audio data
for i, (start_time, end_time) in enumerate(zip(start_times, end_times)):
    start_frame = int(start_time * sample_rate)
    end_frame = int(end_time * sample_rate)
    segment = audio_file[start_frame:end_frame]
    mfccs = np.mean(librosa.feature.mfcc(y=segment, sr=sample_rate, n_mfcc=40).T,axis=0)
    features.append(mfccs)
    #librosa.output.write_wav(f"segment_{i+1}.wav", segment, sr)

clips = pd.DataFrame({'start': start_times, 'end': end_times,
                     'features': features,
                     'speakers': speakers})

Then predict the speaker for each track, using a .9 confidence threshold:

In [None]:
processed_audio = np.array(clips['features'].tolist())

# Predict the classes for the input data
y_pred_prob = model.predict(processed_audio)

# Define the threshold probability below which the predicted class will be changed
threshold = 0.9

y_preds = []
# Loop through each predicted probability vector
for i in range(len(y_pred_prob)):
    
    # Get the predicted class and probability
    y_pred = np.argmax(y_pred_prob[i])
    prob = y_pred_prob[i][y_pred]
    
    # If the predicted probability is below the threshold, predict a different class
    if prob < threshold:
        y_pred = 9  # predict class 9 instead
    
    y_preds.append(y_pred)

clips['speaker_pred'] = y_preds

Now let's plot it! We know that that the two most common speakers are probably the attorneys so we can take them out of our classification and then plot:

In [None]:
# Define the mapping dictionary
mapping = {0: 'Fletcher', 2: 'Owens', 3: 'Wardlaw', 9: 'unknown'}

# Apply the mapping to the 'values' column
clips['speaker_pred'] = clips['speaker_pred'].map(mapping)

value_counts = clips['speakers'].value_counts()
# Get the two most common values
most_common = value_counts.nlargest(2)

mask = clips['speakers'].isin(['SPEAKER_00', 'SPEAKER_01'])
clips.loc[mask, 'speaker_pred'] = 'attorney'
clips['speaker_pred'] = clips['speaker_pred'].fillna('unknown')

# create plot
fig, ax = plt.subplots(figsize=(10, 5))

# define color map
color_map = {
    "attorney": "blue",
    "Owens": "red",
    "Fletcher": "purple",
    "Wardlaw": "orange",
    'unknown': 'gray',
}

# plot speaker turns
for start, end, speaker in zip(clips['start'],
                               clips['end'],
                               clips['speaker_pred']):
    start = start
    end = end
    duration = end - start
    ax.barh(y=speaker, width=duration, left=start, height=0.5, color=color_map[speaker])

# set plot labels and title
ax.set_xlabel("Time (seconds)")
ax.set_ylabel("Speaker")
ax.set_title("Speaker turns")

plt.show()

### Transcribe

Now we're ready to create a transcript! First let's load the `whisper` transcription model:

In [None]:
model = whisper.load_model("base")

We can then loop through our speaker segments and transcribe each part. Let's see how this works on a short sample:

In [None]:
clips[0:2]

In [None]:
results = []
for start, end in zip(clips['start'][0:2],
                      clips['end'][0:2]):
    start_frame = int(start * sample_rate)
    end_frame = int(end * sample_rate)
    segment = audio_file[start_frame:end_frame]
    #result = model.transcribe(segment)
    sf.write('temp.wav', segment, 48000, 'PCM_24')
    result = model.transcribe("temp.wav")
    results.append(result['text'])
    
results

That doesn't seem great - why not? `whiper` works better with longer audio tracks! This is an important limitation to keep in mind in your own projects.

In [None]:
"""
results = []
for start, end, speaker in zip(clips['start'],
                               clips['end'],
                               clips['speaker_pred']):
    start_frame = int(start * sample_rate)
    end_frame = int(end * sample_rate)
    segment = audio_file[start_frame:end_frame]
    result = model.transcribe(segment)
    results.append(result)
    #librosa.output.write_wav('temp.wav', segment, sr)
    sf.write('temp.wav', segment, 48000, 'PCM_24')
    result = model.transcribe("temp.wav")
    results.append(result)
    
clips['transcription'] = results
"""

Alternatively we could transcribe the entire audio file first, and then segment it:

In [None]:
# create a new dataframe with simplified rows
new_df = pd.DataFrame(columns=['start', 'end', 'speaker_pred'])
i = 0
while i < len(clips):
    # get speaker and start time of the current row
    speaker_pred = clips.loc[i, 'speaker_pred']
    start_time = clips.loc[i, 'start']

    # find the index of the last row with the same speaker
    j = i + 1
    while j < len(clips) and clips.loc[j, 'speaker_pred'] == speaker_pred:
        j += 1

    # get the end time of the last row with the same speaker
    end_time = clips.loc[j - 1, 'end']

    # add the simplified row to the new dataframe
    new_df = new_df.append({'start': start_time, 'end': end_time, 'speaker_pred': speaker_pred},
                           ignore_index=True)

    # move to the next row with a different speaker
    i = j

print(new_df)


Let's go ahead and transcribe the first 5 minutes:

In [None]:
full_transcript = model.transcribe('output_five_minute.wav')

We can then match the transcript segments to our diarization timestamps:

In [None]:
for segment in full_transcript['segments']:
    # initialize speaker to None
    speaker_pred = None
    
    # iterate over the simplified rows in new_df
    for _, row in new_df.iterrows():
        # if clip's end time is less than or equal to a row's end time
        if segment['end'] <= row['end']:
            # set speaker to the row's speaker value
            speaker_pred = row['speaker_pred']
            break
    
    # add speaker key to clip dictionary
    segment['speaker_pred'] = speaker_pred

Let's see what our transcript looks like:

In [None]:
for d in full_transcript['segments']:
    print(f"start: {d['start']}, end: {d['end']}, speaker_pred: {d['speaker_pred']}, text: {d['text']}")

Listen to the video. How does the transcript match up to the actual audio? Do we capture who is talking when, and what they're saying exactly?

In [None]:
with open('./data/Ara Hovanesyan v. Eric Holder.txt', 'w') as f:
    for d in full_transcript['segments']:
        f.write(f"start: {d['start']}, end: {d['end']}, speaker_pred: {d['speaker_pred']}, text: {d['text']} '\n'")

In [None]:
# Create a DataFrame
df = pd.DataFrame(full_transcript['segments'])

# Select the relevant columns
df_selected = df[['start', 'end', 'text', 'speaker_pred']]

# Display the DataFrame
df_selected.head()

In [None]:
"""
full_transcript_all = model.transcribe('audio.wav')
with open('./data/Ara Hovanesyan v. Eric Holder full.txt', 'w') as f:
    for d in full_transcript['segments']:
        f.write(f"start: {d['start']}, end: {d['end']}, speaker_pred: {d['speaker_pred']}, text: {d['text']} '\n'")

for segment in full_transcript_all['segments']:
    # initialize speaker to None
    speaker_pred = None
    
    # iterate over the simplified rows in new_df
    for _, row in new_df.iterrows():
        # if clip's end time is less than or equal to a row's end time
        if segment['end'] <= row['end']:
            # set speaker to the row's speaker value
            speaker_pred = row['speaker_pred']
            break
    
    # add speaker key to clip dictionary
    segment['speaker_pred'] = speaker_pred
    
# Create a DataFrame
df_all = pd.DataFrame(full_transcript_all['segments'])

# Select the relevant columns
df_all_selected = df_all[['start', 'end', 'text', 'speaker_pred']]

# Display the DataFrame
df_all_selected.to_csv('Ara Hovanesyan v. Eric Holder full transcript.csv')
"""

#### Challenge: Text Analysis of Transcripts

Using some of the text analysis techniques that we covered earlier this week, see if you can explore any interesting insights about the transcript! Do different speakers have different sentiments? Do the two different attorneys have different common words or different sentiments?

In [None]:
df_all_selected = pd.read_csv('Ara Hovanesyan v. Eric Holder full transcript.csv')
df_all_selected.head()

## Computer Vision Analysis

Now that we're done with audio analysis, let's turn to video. The main difference between computational audio analysis and computer vision is that computer vision uses a <i>frame</i> as its basic unit. Videos are essentially lots of pictures stacked one after another. We therefore start with one picture, and then scale up to an entire video. Let's start by extracting the first frame from our video:

In [None]:
# Path to the video file
video_path = "11-71541 Ara Hovanesyan v. Eric Holder, Jr.-QEvMv81TIQM.mp4"
# Path to save the extracted frame
output_image_path = 'first_frame.jpg'

# Frame number to extract
frame_number = 0

# Open the video file
cap = cv2.VideoCapture(video_path)

# Check if the video was opened successfully
if not cap.isOpened():
    print("Error: Could not open video.")
else:
    # Set the video position to the desired frame number
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
    
    # Read the first frame
    ret, frame = cap.read()

    if ret:
        # Convert the frame from BGR to RGB format
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # Convert the frame to a PIL Image
        image = Image.fromarray(frame_rgb)
        
        # Save the image
        image.save(output_image_path)
        print(f"First frame saved as {output_image_path}")
        
        # Display the image (optional)
        image.show()
    else:
        print("Error: Could not read frame.")

    # Release the video capture object
    cap.release()


### Face Detection

Next, we need to detect the human faces in a frame. We will use pre-trained face detection models to do this. These are pre-trained Haar Cascades models that are specialized in finding faces in an image. We can load them like this:

In [None]:
# Load the image
image_path = "first_frame.jpg"
image = cv2.imread(image_path)

# Get the dimensions of the image
(h, w) = image.shape[:2]

# Crop the image to the top half
top_half = image[:h//2, :]

# Load the Haar Cascade for face detection
haar_cascade_path = cv2.data.haarcascades + "haarcascade_frontalface_default.xml"
face_cascade = cv2.CascadeClassifier(haar_cascade_path)

# Convert the cropped image to grayscale
gray_top_half = cv2.cvtColor(top_half, cv2.COLOR_BGR2GRAY)

# Perform face detection
faces = face_cascade.detectMultiScale(gray_top_half, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30), flags=cv2.CASCADE_SCALE_IMAGE)

# Draw bounding boxes around detected faces
for (x, y, w, h) in faces:
    cv2.rectangle(top_half, (x, y), (x + w, y + h), (0, 0, 255), 2)

# Display the output
plt.imshow(cv2.cvtColor(top_half, cv2.COLOR_BGR2RGB))
plt.axis("off")
plt.show()

# Save the output image with bounding boxes
output_path = 'output_image.jpg'

# Check the number of faces detected
num_faces_detected = len(faces)
num_faces_detected


### Face Classification

Now that we have detected the faces, we need to identify <i>who</i> is in each bounding box. There are different options for training a model - here we will use a combination of a neural network and a Support Vector Classifier. The neural network will detect faces in our training set, and the SVC model will be trained to detect faces in our actual videos.

First let's split the "patches" directory that contains all of our images into a training and validation set:

In [None]:
random.seed(10)

# The path to the folder containing the subfolders of images
folder_path = './patches/'

# The percentage of images to use for validation (e.g. 10% = 0.1)
validation_percentage = 0.1

# Create a list of the subfolders in the folder
subfolders = [f.name for f in os.scandir(folder_path) if f.is_dir()]

# Create a dictionary to map the subfolder names to their corresponding label
labels = {name: i for i, name in enumerate(subfolders)}

# Create empty lists for the training and validation images and labels
train_images = []
train_labels = []
validation_images = []
validation_labels = []

# Iterate over the subfolders
for subfolder in subfolders:
    # Get the path to the subfolder
    subfolder_path = os.path.join(folder_path, subfolder)

    # Get the list of image files in the subfolder
    image_files = [f.name for f in os.scandir(subfolder_path) if f.is_file()]

    # Shuffle the list of image files
    random.shuffle(image_files)

    # Get the number of images to use for validation
    num_validation_images = int(len(image_files) * validation_percentage)

    # Split the images into training and validation sets
    validation_images += [os.path.join(subfolder_path, f) for f in image_files[:num_validation_images]]
    train_images += [os.path.join(subfolder_path, f) for f in image_files[num_validation_images:]]

    # Create the corresponding labels for the training and validation sets
    validation_labels += [labels[subfolder]] * num_validation_images
    train_labels += [labels[subfolder]] * (len(image_files) - num_validation_images)

# Print the number of images in the training and validation sets
print(f'Number of training images: {len(train_images)}')
print(f'Number of validation images: {len(validation_images)}')

# The list of file paths to copy
#file_paths = ['path/to/file1', 'path/to/file2', 'path/to/file3']

# The path to the new folder
train_folder_path = './train'

# Create the new folder if it doesn't already exist
if not os.path.exists(train_folder_path):
    os.makedirs(train_folder_path)

# Iterate over the list of file paths
for file_path in train_images:
    # Get the directory of the file
    dir_path = os.path.dirname(file_path)

    # Create the directory in the new folder if it doesn't already exist
    dest_dir_path = os.path.join(train_folder_path, dir_path)
    if not os.path.exists(dest_dir_path):
        os.makedirs(dest_dir_path)

    # Copy the file to the new folder
    shutil.copy(file_path, dest_dir_path)
    
# The path to the new folder
val_folder_path = './val'

# Create the new folder if it doesn't already exist
if not os.path.exists(val_folder_path):
    os.makedirs(val_folder_path)

# Iterate over the list of file paths
for file_path in validation_images:
    # Get the directory of the file
    dir_path = os.path.dirname(file_path)

    # Create the directory in the new folder if it doesn't already exist
    dest_dir_path = os.path.join(val_folder_path, dir_path)
    if not os.path.exists(dest_dir_path):
        os.makedirs(dest_dir_path)

    # Copy the file to the new folder
    shutil.copy(file_path, dest_dir_path)

Now let's train a SVC model:

In [None]:
def train_recognition_model(train_dir='train'):
    # load our serialized face detector from disk
    print("[INFO] loading face detector...")
    protoPath = "./recognizers/face_detection_model/deploy.prototxt"
    modelPath = "./recognizers/face_detection_model/res10_300x300_ssd_iter_140000.caffemodel"
    detector = cv2.dnn.readNetFromCaffe(protoPath, modelPath)
    embedder = cv2.dnn.readNetFromTorch('./recognizers/openface.nn4.small2.v1.t7')

    print("[INFO] quantifying faces...")
    imagePaths = list(paths.list_images(train_dir))
    knownEmbeddings = []
    knownNames = []
    total = 0

    for (i, imagePath) in enumerate(imagePaths):
        # extract the person name from the image path
        name = imagePath.split(os.path.sep)[-2]

        # load the image and get its dimensions
        image = cv2.imread(imagePath)
        (h, w) = image.shape[:2]

        # construct a blob from the image
        imageBlob = cv2.dnn.blobFromImage(
            cv2.resize(image, (300, 300)), 1.0, (300, 300),
            (104.0, 177.0, 123.0), swapRB=False, crop=False)

        # apply OpenCV's deep learning-based face detector to localize faces in the input image
        detector.setInput(imageBlob)
        detections = detector.forward()

        # ensure at least one face was found
        if len(detections) > 0:
            # find the detection with the largest probability
            i = np.argmax(detections[0, 0, :, 2])
            confidence = detections[0, 0, i, 2]

            # ensure that the detection meets the minimum confidence threshold
            if confidence > .85:
                # compute the (x, y)-coordinates of the bounding box for the face
                box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
                (startX, startY, endX, endY) = box.astype("int")

                # extract the face ROI and grab the ROI dimensions
                face = image[startY:endY, startX:endX]
                (fH, fW) = face.shape[:2]

                # ensure the face width and height are sufficiently large
                if fW < 20 or fH < 20:
                    continue

                # construct a blob for the face ROI, then pass the blob through our face embedding model to obtain the 128-d quantification of the face
                faceBlob = cv2.dnn.blobFromImage(face, 1.0 / 255,
                    (96, 96), (0, 0, 0), swapRB=True, crop=False)
                embedder.setInput(faceBlob)
                vec = embedder.forward()

                # add the name of the person + corresponding face embedding to their respective lists
                knownNames.append(name)
                knownEmbeddings.append(vec.flatten())
                total += 1

    # serialize the facial embeddings + names to disk
    print("[INFO] serializing {} encodings...".format(total))
    data = {"embeddings": knownEmbeddings, "names": knownNames}
    with open('./recognizers/embeddings.pickle', "wb") as f:
        f.write(pickle.dumps(data))

    # load the face embeddings
    print("[INFO] loading face embeddings...")
    data = pickle.loads(open('./recognizers/embeddings.pickle', "rb").read())
    le = LabelEncoder()
    labels = le.fit_transform(data["names"])

    # train the model using GridSearchCV to find the best parameters
    print("[INFO] training model with GridSearchCV...")
    param_grid = {'C': [0.1, 1, 10, 100], 'kernel': ['linear', 'rbf']}
    model = GridSearchCV(SVC(probability=True), param_grid, cv=10)
    model.fit(data["embeddings"], labels)

    # save the best model
    print(f"[INFO] best model parameters: {model.best_params_}")
    recognizer = model.best_estimator_

    # write the actual face recognition model to disk
    with open('./recognizers/recognizer.pickle', "wb") as f:
        f.write(pickle.dumps(recognizer))

    # write the label encoder to disk
    with open('./recognizers/le.pickle', "wb") as f:
        f.write(pickle.dumps(le))

    return recognizer, le

# Now call this function before entering the loop
recognizer, le = train_recognition_model()


And let's see how we did:

In [None]:
def evaluate_model(val_dir='val', recognizer=None, le=None):
    # load our serialized face detector from disk
    print("[INFO] loading face detector...")
    protoPath = "./recognizers/face_detection_model/deploy.prototxt"
    modelPath = "./recognizers/face_detection_model/res10_300x300_ssd_iter_140000.caffemodel"
    detector = cv2.dnn.readNetFromCaffe(protoPath, modelPath)
    embedder = cv2.dnn.readNetFromTorch('./recognizers/openface.nn4.small2.v1.t7')

    imagePaths = list(paths.list_images(val_dir))
    true_labels = []
    predicted_labels = []

    for imagePath in imagePaths:
        # Extract the person name from the image path
        name = imagePath.split(os.path.sep)[-2]
        
        # Load the image and get its dimensions
        image = cv2.imread(imagePath)
        (h, w) = image.shape[:2]

        # Construct a blob from the image
        imageBlob = cv2.dnn.blobFromImage(
            cv2.resize(image, (300, 300)), 1.0, (300, 300),
            (104.0, 177.0, 123.0), swapRB=False, crop=False)

        # Apply OpenCV's deep learning-based face detector to localize faces in the input image
        detector.setInput(imageBlob)
        detections = detector.forward()

        # Ensure at least one face was found
        if len(detections) > 0:
            # Find the detection with the largest probability
            i = np.argmax(detections[0, 0, :, 2])
            confidence = detections[0, 0, i, 2]

            # Ensure that the detection with the largest probability also
            # meets our minimum probability test (thus helping filter out
            # weak detections)
            if confidence > .85:
                # Compute the (x, y)-coordinates of the bounding box for the face
                box = detections[0, 0, i, 3:7] * np.array([w, h, w, h])
                (startX, startY, endX, endY) = box.astype("int")

                # Extract the face ROI and grab the ROI dimensions
                face = image[startY:endY, startX:endX]
                (fH, fW) = face.shape[:2]

                # Ensure the face width and height are sufficiently large
                if fW < 20 or fH < 20:
                    continue

                # Construct a blob for the face ROI, then pass the blob
                # through our face embedding model to obtain the 128-d
                # quantification of the face
                faceBlob = cv2.dnn.blobFromImage(face, 1.0 / 255,
                    (96, 96), (0, 0, 0), swapRB=True, crop=False)
                embedder.setInput(faceBlob)
                vec = embedder.forward()

                # Classify faces with most likely prediction
                preds = recognizer.predict_proba(vec)[0]
                j = np.argmax(preds)
                predicted_name = le.classes_[j]

                true_labels.append(name)
                predicted_labels.append(predicted_name)

    # Convert true and predicted labels to encoded labels
    true_labels_encoded = le.transform(true_labels)
    predicted_labels_encoded = le.transform(predicted_labels)

    # Generate confusion matrix
    cm = confusion_matrix(true_labels_encoded, predicted_labels_encoded)
    report = classification_report(true_labels_encoded, predicted_labels_encoded, target_names=le.classes_)

    # Display the confusion matrix
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=le.classes_, yticklabels=le.classes_)
    plt.ylabel('Actual')
    plt.xlabel('Predicted')
    plt.title('Confusion Matrix')
    plt.show()

    print("Classification Report:\n", report)

# Evaluate the model using the validation set
evaluate_model(val_dir='val', recognizer=recognizer, le=le)

This is pretty good! Though we can already see some issues that might occur as we scale - a relatively low recall for Judge Owens and a relatively low precision for Judge Fletcher for example. In actuality, we would train with many more images to make boost these numbers, but this takes time so we will not do this for now.

### Face Emotion Recognition

Now that we know who is in the frame, we can start extracting other features about each judge. One thing we might be interested in is judges' emotional states throughout a video. Are they happy or sad? Bored? Angry? One way to do this might be to train a custom neural network to identify these emotions. For now, we will use another pre-trained model though. Here we will use the `FER` library: 

In [None]:
# Put in the location of the video file that has to be processed
location_videofile = "11-71541 Ara Hovanesyan v. Eric Holder, Jr.-QEvMv81TIQM.mp4"

# Build the Face detection detector
face_detector = FER(mtcnn=True)

In [None]:
# Load the image using OpenCV
image_path = "first_frame.jpg"
image = cv2.imread(image_path)

# Initialize the FER detector
detector = FER(mtcnn=True)

# Detect emotions in the image
emotions = detector.detect_emotions(image)

# Draw bounding boxes and emotions on the image
for emotion in emotions:
    box = emotion["box"]
    (x, y, w, h) = box
    cv2.rectangle(image, (x, y), (x + w, y + h), (255, 0, 0), 2)
    
    emotion_text = max(emotion["emotions"], key=emotion["emotions"].get)
    cv2.putText(image, emotion_text, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2)

# Convert the image from BGR to RGB format
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

# Display the image with matplotlib
plt.figure(figsize=(10, 10))
plt.imshow(image_rgb)
plt.axis('off')
plt.show()


### Pose

Body language also conveys information about how judges may be reacting to what is going on in the courtroom. There are several models that are useful for this task, but many such as `AlphaPose` and `OpenPose` require some difficult installations. We will use a basic version of `OpenPose` that detects individual body parts and records their positions.

In [None]:
# Define the body parts and pose pairs for visualization
BODY_PARTS = { "Nose": 0, "Neck": 1, "RShoulder": 2, "RElbow": 3, "RWrist": 4,
               "LShoulder": 5, "LElbow": 6, "LWrist": 7, "RHip": 8, "RKnee": 9,
               "RAnkle": 10, "LHip": 11, "LKnee": 12, "LAnkle": 13, "REye": 14,
               "LEye": 15, "REar": 16, "LEar": 17, "Background": 18 }

POSE_PAIRS = [ ["Neck", "RShoulder"], ["Neck", "LShoulder"], ["RShoulder", "RElbow"],
               ["RElbow", "RWrist"], ["LShoulder", "LElbow"], ["LElbow", "LWrist"],
               ["Neck", "RHip"], ["RHip", "RKnee"], ["RKnee", "RAnkle"], ["Neck", "LHip"],
               ["LHip", "LKnee"], ["LKnee", "LAnkle"], ["Neck", "Nose"], ["Nose", "REye"],
               ["REye", "REar"], ["Nose", "LEye"], ["LEye", "LEar"] ]

# Load the OpenPose model
net = cv.dnn.readNetFromTensorflow("recognizers/graph_opt.pb")

# Load the MobileNet-SSD model
ssd_net = cv.dnn.readNetFromCaffe("recognizers/body_detection_model/deploy.prototxt", 
                                  "recognizers/body_detection_model/mobilenet_iter_73000.caffemodel")

# Parameters
image_width = 600
image_height = 600
threshold = 0.1  # Lowered threshold to capture more keypoints

# Load the image
img = cv.imread('first_frame.jpg', cv.IMREAD_UNCHANGED)
photo_height, photo_width = img.shape[:2]

# Detect persons using MobileNet-SSD
blob = cv.dnn.blobFromImage(img, 0.007843, (300, 300), 127.5)
ssd_net.setInput(blob)
detections = ssd_net.forward()

print(f"Detected {detections.shape[2]} objects")

# Loop over the detections
for i in range(detections.shape[2]):
    confidence = detections[0, 0, i, 2]
    if confidence > 0.7:
        class_id = int(detections[0, 0, i, 1])
        # Check if the detected object is a person (class_id == 15 for MobileNet-SSD)
        if class_id == 15:
            x1 = int(detections[0, 0, i, 3] * photo_width)
            y1 = int(detections[0, 0, i, 4] * photo_height)
            x2 = int(detections[0, 0, i, 5] * photo_width)
            y2 = int(detections[0, 0, i, 6] * photo_height)
            cv.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), 2)
            print(f"Person detected at x1: {x1}, y1: {y1}, x2: {x2}, y2: {y2}")

            # Process the detected person region with OpenPose
            body_roi = img[y1:y2, x1:x2]
            body_blob = cv.dnn.blobFromImage(body_roi, 1.0, (image_width, image_height), (127.5, 127.5, 127.5), swapRB=True, crop=False)
            net.setInput(body_blob)
            out = net.forward()
            out = out[:, :19, :, :]

            points = []
            for j in range(len(BODY_PARTS)):
                heatMap = out[0, j, :, :]
                _, conf, _, point = cv.minMaxLoc(heatMap)
                x_coord = (point[0] * (x2 - x1)) / out.shape[3] + x1
                y_coord = (point[1] * (y2 - y1)) / out.shape[2] + y1
                if conf > threshold:
                    points.append((int(x_coord), int(y_coord)))
                    cv.ellipse(img, (int(x_coord), int(y_coord)), (5, 5), 0, 0, 360, (0, 255, 255), cv.FILLED)
                else:
                    points.append(None)

            for pair in POSE_PAIRS:
                partFrom = pair[0]
                partTo = pair[1]
                idFrom = BODY_PARTS[partFrom]
                idTo = BODY_PARTS[partTo]

                if points[idFrom] and points[idTo]:
                    cv.line(img, points[idFrom], points[idTo], (0, 255, 0), 3)
                    cv.ellipse(img, points[idFrom], (3, 3), 0, 0, 360, (0, 0, 255), cv.FILLED)
                    cv.ellipse(img, points[idTo], (3, 3), 0, 0, 360, (0, 0, 255), cv.FILLED)

# Display the image
cv.imshow("Pose Estimation", img)
cv.waitKey(0)
cv.destroyAllWindows()

# Save the image
cv.imwrite("openpose_example.png", img)


### Putting it All Together

Now we're ready to put it all together! We can take our code so far and loop through each frame in a video to build a dataset. One way to organize such a large amount of data is in a Python dictionary. A dictionary is structured as a series of key-value pairs. For example, the key can be the frame number, and then the values can be information about each judge. Even better, the value can be another dictionary! So we can make keys for each judge, keys corresponding to each of our measures (emotion, body language, etc.), and values corresponding to each. Let's see how we would do this:

In [None]:
# Load the Haar Cascade for face detection
haar_cascade_path = cv2.data.haarcascades + "haarcascade_frontalface_default.xml"
face_cascade = cv2.CascadeClassifier(haar_cascade_path)

# Load the face recognition model
embedder = cv2.dnn.readNetFromTorch('./recognizers/openface.nn4.small2.v1.t7')

# Load the face recognizer and label encoder
with open('./recognizers/recognizer.pickle', 'rb') as file:
    recognizer = pickle.load(file)

with open('./recognizers/le.pickle', 'rb') as file:
    le = pickle.load(file)

# List of valid class labels
valid_labels = ['KimWardlaw', 'JohnBOwens', 'WilliamFletcher']
valid_indices = [i for i, label in enumerate(le.classes_) if label in valid_labels]

# Open the video file
cap = cv2.VideoCapture('11-71541 Ara Hovanesyan v. Eric Holder, Jr.-QEvMv81TIQM.mp4')
fps = int(cap.get(cv2.CAP_PROP_FPS))
end_frame = min(30 * fps, int(cap.get(cv2.CAP_PROP_FRAME_COUNT)))

frame_dict = {}
frame_id = 0

while(frame_id < end_frame):
    ret, frame = cap.read()
    if not ret:
        break

    frame_id += 1
    (h, w) = frame.shape[:2]
    top_half = frame[:h//2, :]

    try:
        gray = cv2.cvtColor(top_half, cv2.COLOR_BGR2GRAY)
    except:
        break

    # Perform face detection
    faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30), flags=cv2.CASCADE_SCALE_IMAGE)

    class_ids = []
    confidences = []
    boxes = []
    emotions_list = []

    for (startX, startY, width, height) in faces:
        endX = startX + width
        endY = startY + height

        # Extract the face ROI
        face = top_half[startY:endY, startX:endX]
        (fH, fW) = face.shape[:2]
        if fW < 20 or fH < 20:
            continue

        # Construct image blob, 128-d representation of face
        faceBlob = cv2.dnn.blobFromImage(face, 1.0 / 255,
                                         (96, 96), (0, 0, 0), swapRB=True, crop=False)

        # Embed the 128-d representation
        embedder.setInput(faceBlob)
        vec = embedder.forward()

        # Classify faces with most likely prediction
        preds = recognizer.predict_proba(vec)[0]
        filtered_preds = np.full_like(preds, fill_value=-np.inf)
        filtered_preds[valid_indices] = preds[valid_indices]

        j = np.argmax(filtered_preds)
        proba = filtered_preds[j]
        name = le.classes_[j]

        # Placeholder for emotion detection (if available)
        emote_text = 'none'

        # Draw box, text, and emotion
        text = "{}: {:.2f}%, {}".format(name, proba * 100, emote_text)
        y = startY - 10 if startY - 10 > 10 else startY + 10
        cv2.rectangle(top_half, (startX, startY), (endX, endY), (0, 0, 255), 2)
        cv2.putText(top_half, text, (startX, y), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (0, 0, 255), 2)

        class_ids.append(name)
        confidences.append(float(proba))
        boxes.append([startX, startY, endX, endY])
        emotions_list.append(emote_text)

        unique_ids = []
        unique_probs = []
        unique_boxes = []
        unique_emotions = []

        for pred, prob, box, emotion in zip(class_ids, confidences, boxes, emotions_list):
            if pred in unique_ids:
                idx = unique_ids.index(pred)
                if prob > unique_probs[idx]:
                    unique_ids[idx] = pred
                    unique_probs[idx] = prob
                    unique_boxes[idx] = box
                    unique_emotions[idx] = emotion
            else:
                unique_ids.append(pred)
                unique_probs.append(prob)
                unique_boxes.append(box)
                unique_emotions.append(emotion)

        frame_info = {
            'frame': top_half,
            'class_ids': unique_ids,
            'confidences': unique_probs,
            'boxes': unique_boxes,
            'emotions': unique_emotions
        }

        frame_dict[frame_id] = frame_info

    # Show frame
    cv2.imshow("frame", top_half)
    key = cv2.waitKey(1) & 0xFF
    if key == ord("q"):
        break

cap.release()
cv2.destroyAllWindows()


## GPT Approach

The above approach to video reflects a primarily custom neural network based approach. But large language models have revolutionized more than just text! The latest models also are useful to analyzing images. The basic approach remains the same as before: we start by analyzing just one frame then scale up to an entire video. The main difference is that instead of standard outputs from trained neural networks and similar methods, our outputs will be generated by GPT. There are a number of advantages and disadvantages to this approach. The main advantages is that these models are <b>very</b> impressive. The disadvantage is that they generate a lot of unstructured data, and therefore require more cleanup in the backend.

### Names

With GPT, it can be helpful to break down steps so that we are not asking the model to do too many different things at once. Let's first start by seeing if we can identify the names of the judges in the first image. Let's start by getting our API key ready:

In [None]:
# OpenAI API Key

with open("openai_key.txt", "r") as file:
    openai_key = file.read().replace("\n", "")

api_key = openai_key

Next we need to get our image ready. The OpenAI API does not allow us to upload an image directly (unlike the user interface). So first we need to encode it using `base64`:

In [None]:
# Function to encode the image
def encode_image(image_path):
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode('utf-8')

# Path to your image
image_path = "first_frame.jpg"

# Getting the base64 string
base64_image = encode_image(image_path)

Next let's prepare the API call. Notice the prompt here: "What are the names of the judges in this image? Look at the text in the black square that comes after the word Before: and return the results as a list like [Judge1, Judge2, Judge3]"

Notice how we use the fact that the names of the judges are in image as text - ChatGPT vision models are quite good at optical character recognition!

In [None]:
headers = {
  "Content-Type": "application/json",
  "Authorization": f"Bearer {api_key}"
}

payload = {
  "model": "gpt-4o-mini",
  "messages": [
    {
      "role": "user",
      "content": [
        {
          "type": "text",
          "text": "What are the names of the judges in this image? Look at the text in the black square that comes after the word Before: and return the results as a list like [Judge1, Judge2, Judge3]"
        },
        {
          "type": "image_url",
          "image_url": {
            "url": f"data:image/jpeg;base64,{base64_image}"
          }
        }
      ]
    }
  ],
  "max_tokens": 300,
  "temperature": 0.0
}

Now let's make the API call and extract the content:

In [None]:
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)

response_text = response.json()['choices']

response_content = response_text[0]['message']['content']

And then write a regular expression to extract the names of the judges. Notice that this only works if we get the output we expect from GPT! With temperature at 0 this is a pretty safe bet, but not 100% guaranteed. 

In [None]:
# Find all items inside square brackets
judge_names = re.findall(r'\[([^]]*)\]', response_content)

if judge_names:
    # Split the first match by comma and strip spaces
    judge_names_list = [name.strip() for name in judge_names[0].split(',')]
else:
    judge_names_list = []

print(judge_names_list)

### Position

Next let's get each judge's position. Check out the prompt this time, we tell it which 3 judges are in the picture and also note that there is an attorney presenting an argument. We then ask GPT to ask where each judge is and structure the output like [Judge1 position1, Judge2 position2, Judge3 position3]. Again, we have to hope that GPT produces output the way we want!

In [None]:
headers = {
  "Content-Type": "application/json",
  "Authorization": f"Bearer {api_key}"
}

# Define the prompt
prompt = f"""
        The judges are identified as {judge_names_list[0]}, {judge_names_list[1]}, and {judge_names_list[2]}. An attorney is standing before them, presenting their argument. 
        
        Where is each judge located in the image? Structure your answer like [Judge1 position1, Judge2 position2, Judge3 position3]
        """

payload = {
  "model": "gpt-4o-mini",
  "messages": [
    {
      "role": "user",
      "content": [
        {
          "type": "text",
          "text": prompt
        },
        {
          "type": "image_url",
          "image_url": {
            "url": f"data:image/jpeg;base64,{base64_image}"
          }
        }
      ]
    }
  ],
  "max_tokens": 2000,
  "temperature": 0.0
}

response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)

response_text = response.json()['choices']

response_content = response_text[0]['message']['content']

# Find all items inside square brackets
judge_positions = re.findall(r'\[([^]]*)\]', response_content)

if judge_positions:
    # Split the first match by comma and strip spaces
    judge_positions = [position.strip() for position in judge_positions[0].split(',')]
else:
    judge_positions = []

print(judge_positions)

Now let's go ahead and match the judge names and positions in a dictionary that we will expand on later:

In [None]:
judges_dict = {'Judge1': {},
            'Judge2': {},
            'Judge3': {}}

# Populate the dictionary
for i, item in enumerate(judge_positions, 1):
    name, position = item.rsplit(' ', 1)  # Split from the right to separate name and position
    judges_dict[f'Judge{i}'] = {'name': name, 'position': position}

# Print the resulting dictionary
print(judges_dict)


But wait! Before we go on, let's take another look at the image again. What do you notice about the order of the judges? Does it match the order of the text at the bottom, or the list we created with GPT? 

You might notice that the judges are sitting like [W. Fletcher, Wardlaw, Owens] but their names are ordered [Wardlaw, W. Fletcher, Owens]. So GPT cannot rely on just using the order in the text to place the judges. Despite this limitation, what do you notice about how it guessed each judge's position? Most likely, it got it right!

Why does this work? GPT is good at doing zero-shot or few-shot learning. Whereas traditional machine learning relies on a lot of data to make accurate predictions, GPT benefits from the fact that it was already trained on a massive corpus of numbers, text, and images. It is therefore sometimes able to do things like infer who each judge in a frame is without explicit training examples.

### Features

Next let's extract some features. Earlier we looked at things like facial emotion expression, and extracted some preliminary features that could potentially be used to estimate body posture, gaze, etc. But would it be simpler to do this with GPT? What if we prompted GPT for certain features? For example, body language, facial expression, gestures, and gaze?

In [None]:
headers = {
  "Content-Type": "application/json",
  "Authorization": f"Bearer {api_key}"
}

# Define the prompt
prompt = f"""
    Analyze the following courtroom scene with three judges presiding over a case. 

    For each judge, provide the following details and structure the output as follows:

    - **Judge's Position**: [left, right, center])
    - **Body Language**:
    - **Facial Expression**:
    - **Gestures**: (hand movements, head nods, etc.) 
    - **Gaze**: (who or what they are looking at) 
    """

payload = {
  "model": "gpt-4o-mini",
  "messages": [
    {
      "role": "user",
      "content": [
        {
          "type": "text",
          "text": prompt
        },
        {
          "type": "image_url",
          "image_url": {
            "url": f"data:image/jpeg;base64,{base64_image}"
          }
        }
      ]
    }
  ],
  "max_tokens": 2000,
  "temperature": 0.0
}

response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)

Let's see what the output looks like:

In [None]:
print(response.json()['choices'][0]['message']['content'])

Looks like it structured the output pretty close to what we wanted, and gave us some interesting descriptions about what the judges in each position are doing! Let's go ahead and structure this output as a dictionary:

In [None]:
# Extract the response text
response_text = response.json()['choices'][0]['message']['content']

# Function to parse the response and structure it as a dictionary
def extract_features(text):
    # Regular expression for matching each judge's information in a flexible way
    judge_pattern = re.compile(
        r"(?:###\s*Judge\s+on\s+the\s+|-\s*\*\*Judge's\s+Position\*\*:\s*)(Left|Center|Right)\s*[:-]?\s*(?:\n\s*-\s*\*\*Judge's\s+Position\*\*:\s*\1\s*[:-]?)?\s*\n\s*-\s*\*\*Body\s+Language\*\*:\s*(.*?)\.\s*\n\s*-\s*\*\*Facial\s+Expression\*\*:\s*(.*?)\.\s*\n\s*-\s*\*\*Gestures\*\*:\s*(.*?)\.\s*\n\s*-\s*\*\*Gaze\*\*:\s*(.*?)\.",
        re.DOTALL | re.IGNORECASE
    )

    # Extracting judges' information
    judges = judge_pattern.findall(text)
    
    # Creating the dictionary to store extracted information
    courtroom_info = {}
    
    # Populating the dictionary with judges' information
    for judge in judges:
        judge_id, body_language, facial_expression, gestures, gaze = judge
        courtroom_info[judge_id.lower()] = {
            'Body Language': body_language.strip(),
            'Facial Expression': facial_expression.strip(),
            'Gestures': gestures.strip(),
            'Gaze': gaze.strip()
        }
    
    return courtroom_info

# Extracting the features
courtroom_info = extract_features(response_text)

# Printing the extracted information
print(json.dumps(courtroom_info, indent=2))

And then update our original dictionary with this info:

In [None]:
# Mapping function to add features to the existing dictionary
def add_features_to_existing(existing_dict, new_features):
    for judge, info in existing_dict.items():
        # Match position using fuzzy matching
        position = info['position'].strip().lower()
        matched_position, score = process.extractOne(position, new_features.keys(), scorer=fuzz.partial_ratio)
        
        # If the match is strong enough, update the dictionary
        if score > 80:
            existing_dict[judge].update(new_features[matched_position])
    
    return existing_dict

# Updating the existing dictionary with the extracted features
updated_dict = add_features_to_existing(judges_dict, courtroom_info)

# Printing the updated dictionary
print(json.dumps(updated_dict, indent=2))

### Video

Now let's go ahead and try this on a video. We can start by creating a dictionary that will store all of our info:

In [None]:
video_dict = {'frame': {'id': 0, 
                       'frame_info': updated_dict}}

video_dict

For a video we need to make some additional preparations. Here we'll define functions that resize frames, extract frames from a video (skipping every nth frame so we don't waste money analyzing frames within the same second), analyze the frame based on our code for one image, and update the video dictionary with info from the new frame:

In [None]:
# Function to resize the frame
def resize_frame(frame, width, height):
    return cv2.resize(frame, (width, height))

# Function to extract frames from the video and resize them
def extract_frames(video_path, frame_rate, duration, width, height):
    video_capture = cv2.VideoCapture(video_path)
    frame_count = 0
    frames = []
    total_frames = int(duration * frame_rate)

    while video_capture.isOpened() and frame_count < total_frames:
        success, frame = video_capture.read()
        if not success:
            break

        if frame_count % frame_rate == 0:
            frame_id = int(frame_count / frame_rate)
            resized_frame = resize_frame(frame, width, height)
            frame_path = f"frame_{frame_id}.jpg"
            cv2.imwrite(frame_path, resized_frame)
            frames.append((frame_id, frame_path))

        frame_count += 1

    video_capture.release()
    return frames

def analyze_frame(frame_path):
    # Getting the base64 string
    base64_image = encode_image(frame_path)

    headers = {
      "Content-Type": "application/json",
      "Authorization": f"Bearer {api_key}"
    }

    # Define the prompt
    prompt = f"""
        Analyze the following courtroom scene with three judges presiding over a case. 

        For each judge, provide the following details and structure the output as follows:

        - **Judge's Position**: [left, right, center])
        - **Body Language**:
        - **Facial Expression**:
        - **Gestures**: (hand movements, head nods, etc.) 
        - **Gaze**: (who or what they are looking at) 
        """

    payload = {
      "model": "gpt-4o",
      "messages": [
        {
          "role": "user",
          "content": [
            {
              "type": "text",
              "text": prompt
            },
            {
              "type": "image_url",
              "image_url": {
                "url": f"data:image/jpeg;base64,{base64_image}"
              }
            }
          ]
        }
      ],
      "max_tokens": 2000,
      "temperature": 0.0
    }

    response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
    response.raise_for_status()  # Raise an exception for HTTP errors
    return response.json()

def extract_number(filename):
    # Regular expression to find the number
    match = re.search(r'\d+', filename)
    
    if match:
        return match.group()
    else:
        return None

# Function to add a new frame to the video dictionary
def add_new_frame(video_dict, file_path, new_frame_info):
    # Extract the frame number
    frame_number = extract_number(file_path)
    
    if frame_number is not None:
        # Create the new frame entry
        new_frame_key = 'frame' + frame_number
        new_frame_id = int(frame_number)
        new_frame = {
            'id': new_frame_id,
            'frame_info': new_frame_info
        }
        
        # Update the video_dict with the new frame
        video_dict[new_frame_key] = new_frame
    else:
        print("No frame number found in the file path.")
    
    return video_dict

Now let's go ahead and process the first 10 seconds of our video:

In [None]:
# Path to the video file
video_path = "11-71541 Ara Hovanesyan v. Eric Holder, Jr.-QEvMv81TIQM.mp4"

# Extract frames from the video (first 10 seconds, one frame per second, assuming 30 fps)
frames = extract_frames(video_path, frame_rate=30, duration=10, width=640, height=480)

# Dictionary to store analysis results
#video_analysis = {}

# Analyze each frame
for frame_id, frame_path in frames:
    try:
        response = analyze_frame(frame_path)
        response_text = response['choices'][0]['message']['content']
        analysis = extract_features(response_text)
        # Updating the existing dictionary with the extracted features
        analysis_dict = add_features_to_existing(judges_dict, analysis)
        add_new_frame(video_dict, frame_path, analysis_dict)
    except Exception as e:
        print(f"Error processing frame {frame_id}: {e}")
    finally:
        os.remove(frame_path)  # Clean up the frame file after processing
        
# Print the structured results
print("\nStructured Results as Python Dictionary:")
print(json.dumps(video_dict, indent=4))

And finally convert the dictionary to a `pandas` dataframe!

In [None]:
# Initialize an empty list to store rows
rows = []

# Iterate through each frame in the dictionary
for frame_key, frame_data in video_dict.items():
    frame_id = frame_data['id']
    frame_info = frame_data['frame_info']
    
    # Extract information for each judge and flatten the dictionary
    row = {'frame_id': frame_id}
    for judge_key, judge_data in frame_info.items():
        for info_key, info_value in judge_data.items():
            row[f"{judge_key}_{info_key}"] = info_value
    
    # Append the row to the list
    rows.append(row)

# Create a DataFrame from the list of rows
df = pd.DataFrame(rows)

# Display the DataFrame
df.head()

Wow! We now have data about what the judges are doing in the first 10 seconds of the video. Now that the data is structured by judge, and the values are text, we can use some of our standard computer science and statistics toolkit to analyze the data!

## Bonus Material

### Semantic Segmentation

### Color 

### Complexity