<a href="https://colab.research.google.com/github/bigfoot-888/Creative-Sports-Commentary-Generation-AI/blob/main/Sports_Commentary_Generation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Object Detection

In [None]:
# Clone the YOLOv5 Repo
!git clone https://github.com/ultralytics/yolov5
%cd yolov5
!pip install -r requirements.txt

import json
import torch
import cv2
import os
from IPython.display import Video, display

# Upload the video file
from google.colab import files
uploaded = files.upload()
video_path = list(uploaded.keys())[0]

# Loading the model
model = torch.hub.load('ultralytics/yolov5', 'yolov5s')  # Using the small model

# YOLOv5 class labels from the COCO dataset
class_labels = model.names

# JSON for tracking data
tracking_data = {}

# Processing the video
output_path = "video_tracked.mp4"
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

# To get the duration of the video for later use
total_frames = cap.get(cv2.CAP_PROP_FRAME_COUNT)
duration = total_frames / fps

frame_index = 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Perform detection using YOLO
    results = model(frame)
    detections = results.xyxy[0].cpu().numpy()  # [xmin, ymin, xmax, ymax, conf, class]

    # Store data for this frame
    tracking_data[frame_index] = []

    # Annotate the frame
    for det in detections:
        xmin, ymin, xmax, ymax, confidence, cls = det
        xmin, ymin, xmax, ymax = int(xmin), int(ymin), int(xmax), int(ymax)
        class_label = class_labels[int(cls)]
        confidence = float(confidence)

        # Store detection info in JSON format
        tracking_data[frame_index].append({
            "class": class_label,
            "bbox": [xmin, ymin, xmax, ymax],
            "confidence": confidence
        })

        # Draw bounding box and label on the frame
        cv2.rectangle(frame, (xmin, ymin), (xmax, ymax), (255, 0, 0), 2)
        cv2.putText(frame, f'{class_label} {confidence:.2f}', (xmin, ymin - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)

    # Save the annotated frame
    out.write(frame)
    frame_index += 1

cap.release()
out.release()

# Save the tracking data to a JSON file
json_output_path = "tracking_data.json"
with open(json_output_path, "w") as json_file:
    json.dump(tracking_data, json_file, indent=4)

# Display the Annotated Video
display(Video(output_path, embed=True))


# Data Processing

In [None]:
import json
import math

# Load the tracking data file
with open("tracking_data.json", "r") as json_file:
    tracking_data = json.load(json_file)

# Function to calculate the distance between two bounding boxes
def calculate_distance(bbox1, bbox2):
    x1_center = (bbox1[0] + bbox1[2]) / 2
    y1_center = (bbox1[1] + bbox1[3]) / 2
    x2_center = (bbox2[0] + bbox2[2]) / 2
    y2_center = (bbox2[1] + bbox2[3]) / 2
    return math.sqrt((x2_center - x1_center) ** 2 + (y2_center - y1_center) ** 2)

# Function to calculate movement direction
def calculate_direction(start_bbox, end_bbox):
    start_center = ((start_bbox[0] + start_bbox[2]) // 2, (start_bbox[1] + start_bbox[3]) // 2)
    end_center = ((end_bbox[0] + end_bbox[2]) // 2, (end_bbox[1] + end_bbox[3]) // 2)
    dx = end_center[0] - start_center[0]
    dy = end_center[1] - start_center[1]

    direction = ""
    if abs(dx) > abs(dy):  # Horizontal movement dominates
        direction = "right" if dx > 0 else "left"
    else:  # Vertical movement dominates
        direction = "down" if dy > 0 else "up"
    return direction, abs(dx) > 50 or abs(dy) > 50  # Threshold for notable movement

# Processed information to be stored here
processed_information = []

# Processing for each frame
frame_keys = sorted([int(key) for key in tracking_data.keys()])
for i in range(len(frame_keys) - 1):
    frame_i = frame_keys[i]
    next_frame_i = frame_keys[i + 1]

    current_objects = tracking_data[str(frame_i)]
    next_objects = tracking_data[str(next_frame_i)]

    # Track movements
    for obj_i, obj in enumerate(current_objects):
        obj_class = obj["class"]
        current_bbox = obj["bbox"]

        # Match with the nearest object in the next frame
        min_distance = float("inf")
        best_match = None

        for next_obj in next_objects:
            distance = calculate_distance(current_bbox, next_obj["bbox"])
            if distance < min_distance:
                min_distance = distance
                best_match = next_obj

        if best_match:
            direction, notable = calculate_direction(current_bbox, best_match["bbox"])
            if notable:
                processed_information.append(f"Object {obj_class} at frame {frame_i} moved {direction}.")

    # Detect interactions within the same frame
    for j in range(len(current_objects)):
        obj1 = current_objects[j]
        for k in range(j + 1, len(current_objects)):
            obj2 = current_objects[k]

            # Calculate distance between objects
            distance = calculate_distance(obj1["bbox"], obj2["bbox"])
            if distance < 50:  # Interaction threshold
                processed_information.append(
                    f"Object {obj1['class']} came close to Object {obj2['class']} at frame {frame_i}."
                )

# Output processed information
for line in processed_information:
    print(line)

# Save processed information to a file
with open("processed_data.txt", "w") as processed_data:
    processed_data.write("\n".join(processed_information))


# LLM Models

In [None]:
# Ask the user to choose between Llama and GPTNeo
whichLLM = input("Write either 'llama' or 'gptneo' as the model of choice: ").strip().lower()

if whichLLM == "llama":
    print("You selected Llama.")
elif whichLLM == "gptneo":
    print("You selected GPT-Neo.")
else:
    print("Invalid choice, defaulting to Llama.")
    whichLLM = "llama"

In [None]:
!pip install transformers

from transformers import pipeline

if whichLLM == "gptneo":
    generator = pipeline('text-generation', model='EleutherAI/gpt-neo-125M', device=0)


In [None]:
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch

if whichLLM == "llama":
    !pip install huggingface-hub
    !huggingface-cli login
    !huggingface-cli download --local-dir /content/Llama-3.2-1B-Instruct meta-llama/Llama-3.2-1B-Instruct  --exclude "original/*"

    # Path to the downloaded model
    checkpoint_dir = "/content/Llama-3.2-1B-Instruct"

    # Load the tokenizer and model from the checkpoint directory
    tokenizer = AutoTokenizer.from_pretrained(checkpoint_dir)

    llama_model = AutoModelForCausalLM.from_pretrained(checkpoint_dir)



# Commentary Generation, Audio and Video processing

In [None]:
!pip install pydub
!pip install gTTS
import time
import re
from gtts import gTTS
from pydub import AudioSegment


# Generates the audio with gtts
def generate_audio(text, filename):
    # Generate speech from text
    tts = gTTS(text=text, lang='en')

    # Save to file
    tts.save(filename)

def parse_commentary_line_with_frame(line):
    # Match lines like the ones in the processed info file
    match = re.match(r"(.+?) frame (\d+) (.+)?\.", line)
    if match:
        frame_i = int(match.group(2))
        timestamp = frame_i / fps  # Convert frame to time in seconds
        event = line.strip()
        return timestamp, event
    return None, None

# Reading the processed data
with open("processed_data.txt", "r") as file:
    commentary_lines = file.readlines()

# Group commentary by timestamp
grouped_commentary = {}
for line in commentary_lines:
    timestamp, event = parse_commentary_line_with_frame(line)
    if timestamp is not None:
        if timestamp not in grouped_commentary:
            grouped_commentary[timestamp] = []
        if len(grouped_commentary[timestamp]) < 4:  # Limit to 3 events per timestamp
            grouped_commentary[timestamp].append(event)


last_timestamp = None # Last timestamp
last_end_time = 0 # End time of last commentary generated
playback_time = 0 # Time it takes for a given generated commentary to play

# New dictionary to store the selected events to store the name of each audio file
selected_grouped_commentary = {}

# Generate audio for each group
for timestamp, events in grouped_commentary.items():
    # If the timestamp exceeds the duration of the video, stop
    if timestamp >= duration:
        break

    # If the current events happen after the previous generated commentary ends + 1 as buffer
    if timestamp >= last_end_time + 1:
        # Remove the "at frame X" part for each
        cleaned_events = (re.sub(r" at frame \d+", "", event).strip() for event in events)

        # Join all the events of the group in one
        commentary_text = f"At {timestamp:.2f} seconds: " + ", ".join(cleaned_events)

        # Create the prompt
        input_text = f"Generate short, creative and thrilling, sports-style commentary for the following event, including the names and actions of the objects involved, and make it sound dramatic and exciting: {commentary_text}"

        # Variables to store the outputs of the LLMs
        response = " "
        trimmed_response = " "

        if whichLLM == "llama":
            inputs = tokenizer(input_text, return_tensors="pt")
            input_length = inputs["input_ids"].size(1)  # Number of tokens in the input
            max_length = input_length + 12
            with torch.no_grad():
                outputs = llama_model.generate(
                **inputs,
                max_length=max_length,  # Limit the length of the output
                num_return_sequences=1,
                temperature=1,  # Control the randomness of the output
                top_p=0.9,  # Nucleus sampling
                do_sample=True
            )
            response = tokenizer.decode(outputs[0], skip_special_tokens=True)
            if response.startswith(input_text):
                trimmed_response = response[len(input_text):].strip()  # Remove prompt from the start
            else:
                trimmed_response = response  # If prompt isn't at the start, don't trim

        elif whichLLM == "gptneo":
            input_text = f"Generate short, creative and thrilling, sports-style commentary for the following event: {commentary_text}"

            # In case the model fails to generate anything, retry with a limit on the number of attempts
            max_retries = 3
            retry_count = 0
            while not trimmed_response.strip():
                # Too many retries
                if retry_count >= max_retries:
                    response = "A lot is happening right now."
                    break
                else:
                    response = generator(input_text, max_new_tokens=18, num_return_sequences=1, do_sample=True, temperature=1, top_p=0.9)
                    retry_count += 1
                    if response and len(response) > 0 and "generated_text" in response[0]:
                        trimmed_response = response[0]["generated_text"][len(input_text):].strip()

        print(f"Generating audio for: {trimmed_response}")

        # How much time there is left of video
        remaining_time = duration - timestamp

        # Save to a file
        filename = f"commentary_{int(timestamp * 100)}.mp3"
        generate_audio(trimmed_response, filename)

        # Calculate how long the audio is, and if it is too long for the remaining time, stop
        commentary_audio = AudioSegment.from_file(filename)
        playback_time = (len(commentary_audio) / 1000)
        if playback_time > remaining_time:
            break

        last_end_time = timestamp + (len(commentary_audio) / 1000)
        selected_grouped_commentary[timestamp] = cleaned_events

# Initialize the final audio with silence
combined_audio = AudioSegment.silent(duration=0)

last_end_time = 0  # Variable to keep track of the last timestamp when audio was added

for timestamp, events in selected_grouped_commentary.items():
    commentary_file = f"commentary_{int(timestamp * 100)}.mp3"

    # If just starting
    if last_end_time == 0:
        # Silence until the timestamp of the first events happens
        silence_duration = timestamp * 1000
        silence_until_next_clip = AudioSegment.silent(duration=silence_duration)
        combined_audio += silence_until_next_clip

        # Add the previously saved audio
        commentary_audio = AudioSegment.from_file(commentary_file)
        combined_audio += commentary_audio
    else:
        silence_duration = (timestamp - last_end_time) * 1000
        if silence_duration > 0:
            # Add silence before the commentary if there is a gap between finish and start of next
            silence_until_next_clip = AudioSegment.silent(duration=silence_duration)
            combined_audio += silence_until_next_clip
        commentary_audio = AudioSegment.from_file(commentary_file)
        combined_audio += commentary_audio

    last_end_time = timestamp + (len(commentary_audio) / 1000)

# Export the final combined audio
combined_audio.export("final_audio.mp3", format="mp3")




In [None]:
# Combine the audio and video
!apt-get install -y ffmpeg
!ffmpeg -y -i video_tracked.mp4 -i final_audio.mp3 -c:v copy -c:a aac -strict experimental final_video_with_audio.mp4



# Licensing & Attributions

This Colab notebook uses the following dependencies:

- **Llama 3.2** — governed by the Llama 3.2 Community License. Users must obtain the model separately.  
  - Attribution: “Built with Llama”  
  - License: https://github.com/meta-llama/llama-models/tree/main/models/llama3_2

- **gTTS** (Google Text-to-Speech), MIT License  
  - Copyright © 2014-2024 Pierre Nicolas Durette  
  - See https://github.com/pndurette/gTTS for details

- **FFmpeg** — LGPL recommended

Other libraries (PyTorch, NumPy, pandas, Matplotlib, Transformers, etc.) are permissively licensed (BSD, MIT, Apache 2.0).  
By using this notebook, you agree to comply with all applicable third-party licenses.
