In [None]:
# install opencv-python
%pip install opencv-python-headless
%pip install pandas
%pip install matplotlib

### Retrieve video data from Kaggle

1. Go to the NFL Impact Detection Kaggle competition data [page](https://www.kaggle.com/competitions/nfl-impact-detection/data)
2. Under the `Data Explorer` section, download `train_labels.csv`
3. Click on `train` directory and pick any pairs of `endzone` and `sideline` videos of the same game-play (For this exercise, we'll be using `57583_000082_Endzone.mp4` and `57583_000082_Sideline.mp4`)
4. `train_labels.csv` and the videos will be downloaded as `zip` files, so we'll have to unzip them
5. Place the videos downloaded and `train_labels.csv` to the directory where the `fuse_and_visualize_multiview_impacts.ipynb` is located
6. run the following cell to unzip all the artifacts

In [None]:
!unzip train_labels.csv.zip
!unzip 57583_000082_Sideline.mp4.zip
!unzip 57583_000082_Endzone.mp4.zip

In [None]:
import os
import cv2
import subprocess
from IPython.display import Video, display
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt


In [None]:
import pandas as pd
import numpy as np


def prep_data(df):
    df['game_play'] = df['gameKey'].astype('str') + '_' + df['playID'].astype('str').str.zfill(6)
    return df


def dedup_view(df, windows):
    # define view
    df = df.sort_values(by='frame')
    view_columns = ['frame', 'left', 'width', 'top', 'height', 'video']
    common_columns = ['game_play', 'label', 'view', 'impactType']
    label_cleaned = df[view_columns + common_columns]
    # rename columns
    sideline_column_rename = {col: 'Sideline_' + col for col in view_columns}
    endzone_column_rename = {col: 'Endzone_' + col for col in view_columns}
    sideline_columns = list(sideline_column_rename.values())
    # create two dataframes, one for sideline, one for endzone
    label_endzone = label_cleaned.query('view == "Endzone"')
    label_endzone.rename(columns=endzone_column_rename, inplace=True)
    label_sideline = label_cleaned.query('view == "Sideline"')
    label_sideline.rename(columns=sideline_column_rename, inplace=True)
    # prepare sideline labels
    label_sideline['is_dup'] = False
    for columns in sideline_columns:
        label_endzone[columns] = np.nan
    label_endzone['is_dup'] = False

    # iterrate endzone rows to find matches and dedup 
    for index, row in label_endzone.iterrows():
        player = row['label']
        frame = row['Endzone_frame']
        impact_type = row['impactType']
        sideline_row = label_sideline[(label_sideline['label'] == player) & (
                    (label_sideline['Sideline_frame'] >= frame - windows // 2) & (
                        label_sideline['Sideline_frame'] <= frame + windows // 2 + 1)) & (
                                                  label_sideline['is_dup'] == False) & (
                                                  label_sideline['impactType'] == impact_type)]

        if len(sideline_row) > 0:
            sideline_index = sideline_row.index[0]
            label_sideline['is_dup'].loc[sideline_index] = True

            for col in sideline_columns:
                label_endzone[col].loc[index] = sideline_row.iloc[0][col]
            label_endzone['is_dup'].loc[index] = True

    # calculate overlap perc
    not_dup_sideline = label_sideline[label_sideline['is_dup'] == False]
    final_output = pd.concat([not_dup_sideline, label_endzone])
    return final_output


def fuse_df(raw_df, windows):
    outputs = []
    all_game_play = raw_df['game_play'].unique()
    for game_play in all_game_play:
        df = raw_df.query('game_play ==@game_play')
        output = dedup_view(df, windows)
        outputs.append(output)

    output_df = pd.concat(outputs)
    output_df['gameKey'] = output_df['game_play'].apply(lambda x: x.split('_')[0]).map(int)
    output_df['playID'] = output_df['game_play'].apply(lambda x: x.split('_')[1]).map(int)

    return output_df


In [None]:
ground_truth = pd.read_csv('train_labels.csv')
ground_truth = ground_truth.pipe(prep_data).query('impact == 1')
fused_df = fuse_df(ground_truth, windows=30)


In [None]:
fused_df.query('gameKey == 57583')

In [None]:
def overlay_impacts(ez_vid_path:str, 
                    sl_vid_path:str, 
                    gt_df:pd.DataFrame, 
                    freeze_impacts=True):
    
    VIDEO_CODEC = "MP4V"

    game_key = os.path.basename(ez_vid_path).split('_')[0] # parse game_key
    play_id = os.path.basename(ez_vid_path).split('_')[1] # parse play_id
    
    output_path = f"{game_key}_{play_id}_output.mp4"
    
    BLACK = (0, 0, 0)
    RED = (0, 0, 255)  # Red
    WHITE = (255, 255, 255)  # White
    BLUE = (255, 0, 0)
    YELLOW = (0, 255, 255)
    
    # get meta data from endzone (ez) and sideline (sl) videos
    #
    
    ez_vid = cv2.VideoCapture(ez_vid_path)
    ez_total_frame_number = ez_vid.get(cv2.CAP_PROP_FRAME_COUNT)
    ez_width = int(ez_vid.get(cv2.CAP_PROP_FRAME_WIDTH))
    ez_height = int(ez_vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
    ez_fps = ez_vid.get(cv2.CAP_PROP_FPS)
    
    sl_vid = cv2.VideoCapture(sl_vid_path)
    sl_total_frame_number = sl_vid.get(cv2.CAP_PROP_FRAME_COUNT)
    sl_width = int(ez_vid.get(cv2.CAP_PROP_FRAME_WIDTH))
    sl_height = int(ez_vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
    sl_fps = sl_vid.get(cv2.CAP_PROP_FPS)
    
    assert ez_fps == sl_fps
    
    output_video = cv2.VideoWriter(output_path, 
                                   cv2.VideoWriter_fourcc(*VIDEO_CODEC), 
                                   ez_fps, (ez_width, ez_height+sl_height)) # stacking video horizontally

    # find shorter video
    #
    total_frame_number = int(min(ez_total_frame_number, sl_total_frame_number))
    
    for frame_cnt in range(total_frame_number):
        frame_has_impact = False
        frame_near_impact = False
        
        # reading frames from both endzone and sideline
        ez_ret, ez_frame = ez_vid.read() 
        sl_ret, sl_frame = sl_vid.read()

        # creating strings to be added to the output frames
        #
        img_name = f"Game key: {game_key}, Play ID: {play_id}, Frame: {frame_cnt}"
        video_frame = f'{game_key}_{play_id}_{frame_cnt}'
        
        if ez_ret == True and sl_ret == True:
            
            h, w, c = ez_frame.shape
            h1,w1,c1 = sl_frame.shape

            if h != h1 or w != w1: # resize images if they're different
                ez_frame = cv2.resize(ez_frame,(w1,h1))

            both = np.concatenate((sl_frame, ez_frame), axis=0) # stack the frames horizontally
            
            # look for duplicates
            #
            duplicates = gt_df.query(f"gameKey == {int(game_key)} and \
                                       playID == {int(play_id)} and \
                                       is_dup == True and \
                                       Sideline_frame == @frame_cnt")

            if len(duplicates) > 0:
                for duplicate in duplicates.itertuples(index=False):
                    if frame_cnt == duplicate.Sideline_frame:
                        color = RED
                        frame_has_impact = True
                    
                    if frame_has_impact:
                        cv2.rectangle(
                            both,
                            (int(duplicate.Sideline_left), int(duplicate.Sideline_top)),
                            (int(duplicate.Sideline_left) + int(duplicate.Sideline_width), int(duplicate.Sideline_top) + int(duplicate.Sideline_height)),
                            color,
                            thickness=3,
                                     )

                        cv2.rectangle(
                            both,
                            (int(duplicate.Endzone_left), int(duplicate.Endzone_top)+ h1),
                            (int(duplicate.Endzone_left) + int(duplicate.Endzone_width), int(duplicate.Endzone_top) + int(duplicate.Endzone_height) + h1),
                            color,
                            thickness=3,
                                     )

                        cv2.line(
                                 both, 
                                 (int(duplicate.Sideline_left), int(duplicate.Sideline_top)),
                                 (int(duplicate.Endzone_left), int(duplicate.Endzone_top) + h1),
                                 WHITE,
                                 thickness=4
                                )

            else:
                # if no duplicates, look for sideline then endzone and add to the view
                #
                sl_impacts = gt_df.query(f"gameKey == {int(game_key)} and \
                                           playID == {int(play_id)} and \
                                           is_dup == False and \
                                           view == 'Sideline' and \
                                           Sideline_frame == @frame_cnt")
                if len(sl_impacts) > 0:
                    for impact in sl_impacts.itertuples(index=False):
                        if frame_cnt == impact.Sideline_frame:
                            color = YELLOW
                            frame_has_impact = True

                        if frame_has_impact:
                            cv2.rectangle(
                                both,
                                (int(duplicate.Sideline_left), int(duplicate.Sideline_top)),
                                (int(duplicate.Sideline_left) + int(duplicate.Sideline_width), int(duplicate.Sideline_top) + int(duplicate.Sideline_height)),
                                color,
                                thickness=3,
                                         )
                        
                ez_impacts = gt_df.query(f"gameKey == {int(game_key)} and \
                                           playID == {int(play_id)} and \
                                           is_dup == False and \
                                           view == 'Endzone' and \
                                           Endzone_frame == @frame_cnt")
                
                if len(ez_impacts) > 0:
                    for impact in ez_impacts.itertuples(index=False):
                        if frame_cnt == impact.Endzone_frame:
                            color = YELLOW
                            frame_has_impact = True

                        if frame_has_impact:
                            cv2.rectangle(
                                both,
                                (int(duplicate.Endzone_left), int(duplicate.Endzone_top)+ h1),
                                (int(duplicate.Endzone_left) + int(duplicate.Endzone_width), int(duplicate.Endzone_top) + int(duplicate.Endzone_height) + h1 ),
                                color,
                                thickness=3,
                                         )
 

            cv2.putText(both, img_name, (30, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, WHITE, thickness=2)
            cv2.putText(both, str(frame_cnt), (w1-75, h1-20), cv2.FONT_HERSHEY_SIMPLEX, 1, WHITE, thickness=2)

            cv2.putText(both, str(frame_cnt), (w1-75, h1+h-20), cv2.FONT_HERSHEY_SIMPLEX, 1, WHITE, thickness=2)

            output_video.write(both)

            if frame_has_impact and freeze_impacts:
                for _ in range(60):
                # Freeze for 60 frames on impacts
                    output_video.write(both)

        else: 
            break
        
        frame_cnt += 1
        
    output_video.release()

    return(output_path)

In [None]:
overlay_impacts('57583_000082_Endzone.mp4',
                '57583_000082_Sideline.mp4',
                fused_df)

## Download the video and visualize the results!