# 6 Visualise labelled data and remove artifacts

Code that let us overlay each frame of video with outputs from the models. And create time series plots. 

In [None]:
import os
import cv2
import matplotlib.pyplot as plt 
import numpy as np
import pandas as pd
import ultralytics.utils as ultrautils
import utils
import display
import json
import time

## Test data

In [None]:
# Add these to your imports
from src.config import PATH_CONFIG
from src.utils.notebook_utils import display_config_info, ensure_dir_exists

# Get paths from config
videos_in = PATH_CONFIG['videos_in']
data_out = PATH_CONFIG['data_out']

# Ensure output directory exists
if ensure_dir_exists(data_out):
    print(f"Created output directory: {data_out}")

# Display configuration information
display_config_info(videos_in, data_out, "Processing Configuration")
metadata_file = "_LookitLaughter.xlsx"

## Full data

In [None]:
videos_in = os.path.join("..","..","LookitLaughter.full.videos")
temp_out = os.path.join("..","..","LookitLaughter.full.data","0_temp")
data_out = os.path.join("..","..","LookitLaughter.full.data","1_interim")
videos_out = os.path.join("..","..","LookitLaughter.full.data","2_final")

metadata_file = "_LookitLaughter.xlsx"

In [1]:
processedvideos = utils.getProcessedVideos(data_out)
processedvideos.head()

NameError: name 'utils' is not defined

## 6.1 Add annotations to all vidoes.

Generate annotated videos for all videos in the test set.

In [None]:
forceAnnotation = False

for index, r in processedvideos.iterrows():

    videopath = os.path.join(videos_in,r["VideoID"])
    videoname = os.path.basename(r["VideoID"])
    try: 
        #let's get all the annotations for this video
        kpts = utils.getKeyPoints(processedvideos,videoname)
        facedata = utils.getFaceData(processedvideos,videoname)
        speechdata = utils.getSpeechData(processedvideos,videoname)
    except FileNotFoundError as e:
        print(f"Data error for {videoname}\n" + "Error: " + str(e))
        continue
    if forceAnnotation or pd.isnull(r["annotatedVideo"]) or not os.path.exists(r["annotatedVideo"]):
        print(f"Creating annotated video for {videoname}")
        annotatedVideo = display.createAnnotatedVideo(videopath, kpts, facedata, speechdata, temp_out, False)
        vidwithaudio = display.addSoundtoVideo(annotatedVideo, r["Audio.file"], videos_out)
        r["annotatedVideo"] = vidwithaudio
        r["annotated.when"] = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime())
        #update this row in processedvideos dataframe
        processedvideos.loc[index] = r
    else:
        print(f"Already processed {r['VideoID']}")

#save the processedvideos dataframe
utils.saveProcessedVideos(processedvideos, data_out)

In [None]:
#facedata = utils.getFaceData(processedvideos,videoname)
videodata = processedvideos[processedvideos["VideoID"] == videoname]
if videodata.shape[0] <= 0:
    raise FileNotFoundError(f"No face data file found for {videoname}")
print(f"We have a face data file for {videoname}")
facesfile = videodata["Faces.file"].values[0]
print(facesfile)
pd.read_csv(facesfile)


# 6.2  Correct person labels in all videos.

Swap parent and child if these are wrong. Ignore other people in video.

In metadata, we have 

In [None]:


def keyPointsStdev(df, frames=[], people="all", bodypart="whole"):
    """find standard deviation of x and y values of keypoints        

    args:   df - dataframe of keypoints
            frames - list of frames to include
            people - list of people to include
            bodypart - which bodypart to use, default is "whole" for all keypoints
    returns:
            dataframe of average positions
    """

    if len(frames) == 0:
        frames = df.frame.unique()

    if people == "all":
        people = df.person.unique()

    if bodypart != "whole":
        raise NotImplementedError("Only whole body implemented for now")

    threshold = 0.5

    # create new columns for the centre of gravity
    df["std.x"] = np.nan
    df["std.y"] = np.nan

    for frame in frames:
        for person in people:
            # get the keypoints for this person in this frame
            kpts = df[(df["frame"] == frame) & (df["person"] == person)]

            if not kpts.empty:
                # get the average position of the bodypart
                if bodypart == "whole":
                    xyc = kpts.iloc[:, 8:59].to_numpy()  # just keypoints
                    xyc = xyc.reshape(-1, 3)  # reshape to n x 3 array (x,y,conf
                    avgx, avgy = stdevxys(xyc, threshold)

                df.loc[
                    (df["frame"] == frame) & (df["person"] == person), "cog.x"
                ] = avgx
                df.loc[
                    (df["frame"] == frame) & (df["person"] == person), "cog.y"
                ] = avgy

    return df

In [None]:
def plot_sample_timeline(sample):
    """Create a custom timeline visualization for a sample"""
    if not hasattr(sample, "metadata") or "duration" not in sample.metadata:
        print(f"No duration metadata for {sample.id}")
        return
        
    videoname = os.path.basename(sample.filepath)
    duration = sample.metadata["duration"]
    
    # Create figure with multiple subplots
    fig, axs = plt.subplots(3, 1, figsize=(12, 8), sharex=True)
    fig.suptitle(f"Timeline for {videoname}", fontsize=16)
    
    # Set x-axis limits
    for ax in axs:
        ax.set_xlim(0, duration)
    
    # Plot speech segments
    ax_speech = axs[0]
    ax_speech.set_title("Speech")
    ax_speech.set_yticks([])
    
    if hasattr(sample, "temporal_detections"):
        detections = sample.temporal_detections.detections
        for det in detections:
            start, end = det.support
            text = det.attrs.get("text", "")
            # Truncate long text
            if len(text) > 40:
                text = text[:37] + "..."
            ax_speech.axvspan(start, end, alpha=0.3, color="blue")
            ax_speech.text(start, 0.5, text, fontsize=8, verticalalignment="center")
    
    # Plot emotions
    ax_emotion = axs[1]
    ax_emotion.set_title("Emotions")
    
    if hasattr(sample, "emotion_detections"):
        detections = sample.emotion_detections.detections
        
        # Group by person
        person_emotions = {"Child": [], "Adult": [], "Unknown": []}
        for det in detections:
            person = det.attrs.get("person", "Unknown")
            if person in person_emotions:
                person_emotions[person].append(det)
        
        # Define y-positions for each person
        y_positions = {"Child": 0.7, "Adult": 0.3, "Unknown": 0.5}
        
        # Plot each person's emotions
        for person, detections in person_emotions.items():
            y_pos = y_positions[person]
            
            for det in detections:
                start, end = det.support
                emotion = det.attrs.get("emotion", "unknown")
                color = emotionColors.get(emotion, {}).get("color", "gray")
                ax_emotion.axvspan(start, end, ymin=y_pos-0.15, ymax=y_pos+0.15, alpha=0.3, color=color)
                ax_emotion.text(start, y_pos, emotion, fontsize=8, verticalalignment="center")
        
        # Add legend for persons
        for person, y_pos in y_positions.items():
            ax_emotion.axhline(y=y_pos, color="black", linestyle="--", alpha=0.5)
            ax_emotion.text(duration, y_pos, person, fontsize=10, verticalalignment="center",
                           horizontalalignment="right")
    
    # Plot joke type and ratings
    ax_meta = axs[2]
    ax_meta.set_title("Metadata")
    ax_meta.set_yticks([])
    ax_meta.set_xlabel("Time (seconds)")
    
    # Create colored bar based on laugh yes/no
    laugh = sample.LaughYesNo if hasattr(sample, "LaughYesNo") else None
    if laugh is not None:
        color = "green" if laugh else "red"
        ax_meta.axhspan(0.4, 0.6, xmin=0, xmax=1, alpha=0.3, color=color)
        ax_meta.text(duration/2, 0.5, f"Laugh: {'Yes' if laugh else 'No'}", 
                    fontsize=12, horizontalalignment="center", verticalalignment="center")
    
    # Add joke type and rating as text
    joke_type = sample.JokeType if hasattr(sample, "JokeType") else "Unknown"
    how_funny = sample.HowFunny if hasattr(sample, "HowFunny") else "Unknown"
    txt = f"Joke: {joke_type}\nRating: {how_funny}"
    ax_meta.text(0.02, 0.8, txt, transform=ax_meta.transAxes, fontsize=10,
                verticalalignment="top", bbox=dict(boxstyle="round", alpha=0.1))
    
    plt.tight_layout()
    plt.subplots_adjust(top=0.9)
    return fig


# 7.2 Draw annotated timeline for a select video 

A group of visualisations to see what happens in a video. 

In each frame let's find the `centre of gravity` for each person (the average of all the high-confidence marker points). This is handy for time series visualisation. For example plotting the cog.x for each person over time shows how they move closer and further from each other. 

In [None]:
emotionColors = {"angry":{"color":"red","arousal":0.9,"valence":-0.2},
                 "fear":{"color":"orange","arousal":0.2,"valence":-0.9},
                 "happy":{"color":"yellow","arousal":0.2,"valence":0.9},
                 "neutral":{"color":"grey","arousal":0,"valence":0},
                 "sad":{"color":"blue","arousal":-0.2,"valence":-0.9},
                 "surprise":{"color":"green","arousal":0.9,"valence":0.2},
                 "disgust":{"color":"purple","arousal":-0.7,"valence":-0.7}}
who = ["child", "adult"]

In [None]:
plotCoGrav = True
plotStDev = True
plotSpeech = True
plotEmotions = True

#numerical sum of boolean flags
subplots = sum([plotCoGrav, plotStDev, plotSpeech, plotEmotions])

if len(session.selected) == 0:
    print("No video selected")
    exit()

VideoID = dataset[session.selected[0]]["VideoID"]
keypoints = utils.readKeyPointsFromCSV(processedvideos,VideoID)
FPS = utils.getVideoProperty(processedvideos, VideoID, "FPS")
xmax = keypoints["frame"].max()
#this bit of pandas magic calculates average x and y for all the rows.
keypoints[["cogx","cogy"]] = keypoints.apply(lambda row: calcs.rowcogs(row.iloc[8:59]), axis=1, result_type='expand')
keypoints[["stdx","stdy"]] = keypoints.apply(lambda row: calcs.rowstds(row.iloc[8:59]), axis=1, result_type='expand')

#going to add a subplot foe each of the above flags
plt.figure(figsize=(20, 5*subplots))
plt.suptitle("Video Time Line Plots")
pltidx = 0
if plotCoGrav:
    ax = plt.subplot(subplots, 1, pltidx + 1)
    pltidx += 1
    ax.set_xlabel("Time (seconds)")
    ax.set_ylabel("Horizontal Position")
    ax.set_xlim(0, xmax/FPS)
    child = keypoints[keypoints["person"]=="child"]
    adult = keypoints[keypoints["person"]=="adult"]
    #a plot of child's centre of gravity frame by frame
    childplot = ax.plot(child["frame"], child["cogx"], c="red", alpha=0.5)
    ## add line of adult's centre of gravity
    adultplot = ax.plot(adult["frame"], adult["cogx"], c="blue", alpha=0.5)
    #add legend
    ax.legend(['child', 'adult'], loc='upper left')

if plotStDev:
    ax = plt.subplot(subplots, 1, pltidx + 1)
    pltidx += 1
    ax.set_xlabel("Time (seconds)")
    ax.set_ylabel("Horizontal Position")
    ax.set_xlim(0, xmax/FPS)
    child = keypoints[keypoints["person"]=="child"]
    adult = keypoints[keypoints["person"]=="adult"]
    #a plot of child's centre of gravity frame by frame
    childplot = ax.plot(child["frame"], child["stdx"], c="red", alpha=0.5)
    ## add line of adult's centre of gravity
    adultplot = ax.plot(adult["frame"], adult["stdx"], c="blue", alpha=0.5)
    #add legend
    ax.legend(['child', 'adult'], loc='upper left')

if plotSpeech:
    ax2 = plt.subplot(subplots, 1, pltidx + 1)
    pltidx += 1
    ax2.set_xlabel("Time (seconds)")
    ax2.set_ylabel("Identified Speech")
    speechjson = utils.getSpeechData(processedvideos,VideoID)
    if speechjson is not None:
        nsegs = len(speechjson["segments"])
        ax2.set_xlim(0, xmax/FPS)
        ax2.set_ylim(0, nsegs)
        #let's plot the speech segments as boxes
        #label each one with the text
        for idx, seg in enumerate(speechjson["segments"]):
            # #rectangle with the start and end times as x coordinates and nsegs - idx as y coordinates
            #fill the rectangle
            ax2.fill([seg["start"], seg["end"], seg["end"], seg["start"]], [nsegs - idx - 1, nsegs - idx - 1, nsegs - idx, nsegs - idx], 'r', alpha=0.5)
            ax2.text(seg["start"], nsegs- idx -.5 , seg["text"])

if plotEmotions:
    ax3 = plt.subplot(subplots, 1, pltidx + 1)
    pltidx += 1
    ax3.set_xlabel("Time (seconds)")
    ax3.set_ylim(0, 2)
    ax3.set_xlim(0, xmax/FPS)  
    emotions = utils.getFaceData(processedvideos,VideoID)
    emotions["ticker"] = 1
    for index in range(2):
        ems = emotions[emotions["index"]==index]
        #who is the person we are plotting
        # key gives the emotion name, data gives the actual values (also labels)
        for key, data in ems.groupby('emotion'):
            #plot scatter plot of emotion occurances
            ax3.scatter(data["frame"], data["ticker"] + index, label=key, c=emotionColors[key]["color"], alpha=0.5, s=100)

        
    #show legend with emotion colours
    plt.legend(loc='best')



plt.show()


